mutable
A Database System for Research and Fast Prototyping
Loading...
Searching...
No Matches
SerialScheduler.cpp
Go to the documentation of this file.
2#include "parse/Sema.hpp"
3#include <mutable/mutable.hpp>
4
5
6using namespace m;
7
8
9std::optional<m::Scheduler::queued_command> SerialScheduler::CommandQueue::pop()
10{
11 std::unique_lock<std::mutex> lock(mutex_);
12 has_element_.wait(lock, []{
13 // always wake up if the queue is closed
14 if (query_queue_.closed_) [[unlikely]]
15 return true;
16 // keep waiting if the queue is empty but not closed
17 else if (query_queue_.command_list_.empty())
18 return false;
19 // wake up if there is no running_transaction_ or if the next command is from the running_transaction_
21 return true;
22 // otherwise keep waiting
23 else
24 return false;
25 });
26 // if the queue is still empty here the queue should be closed.
27 if (command_list_.empty()) [[unlikely]] return std::nullopt;
28
29 queued_command res = std::move(command_list_.front());
30 command_list_.pop_front();
31
32 // if there is currently no running transaction, set the next transaction in the queue as the running transaction
34 running_transaction_ = &std::get<0>(res);
35
36 return {std::move(res)};
37}
38
39void SerialScheduler::CommandQueue::push(Transaction &t, std::unique_ptr<ast::Command> command, Diagnostic &diag, std::promise<bool> promise)
40{
41 std::unique_lock<std::mutex> lock(mutex_);
42 if (closed_) {
43 /* Since the command queue is closed, no more command will be executed
44 * => set the promise of this newly pushed command to false right away */
45 promise.set_value(false);
46 return;
47 }
48
49 /* Check if transaction `t` already has elements in the queue, if not then check if the transaction is the currently
50 * running transaction. If yes, then emplace the command in the front, otherwise emplace at the end. */
51 auto it = std::find_if(command_list_.rend(), command_list_.rbegin(),
52 [&t](queued_command &x){ return (t == std::get<0>(x)); }
53 );
54 if (running_transaction_ and *running_transaction_ == t and it == command_list_.rend())
55 command_list_.emplace_front(t, std::move(command), diag, std::move(promise));
56 else if (it == command_list_.rbegin())
57 command_list_.emplace_back(t, std::move(command), diag, std::move(promise));
58 else
59 command_list_.emplace(++it.base(), t, std::move(command), diag, std::move(promise));
60
61 lock.unlock();
62 has_element_.notify_one();
63}
64
66{
67 std::unique_lock<std::mutex> lock(mutex_);
68 closed_ = true;
69 while (not command_list_.empty()) {
70 std::get<3>(command_list_.front()).set_value(false);
71 command_list_.pop_front();
72 }
73 lock.unlock();
74 has_element_.notify_all();
75}
76
78{
79 std::lock_guard<std::mutex> lock(mutex_);
80 return query_queue_.closed_;
81}
82
84 std::unique_lock<std::mutex> lock(mutex_);
85 M_insist(&t == running_transaction_);
86 running_transaction_ = nullptr;
87 lock.unlock();
88 has_element_.notify_one();
89}
90
92std::atomic<int64_t> SerialScheduler::next_start_time = 0;
93
95{
96 if (schedule_thread_.joinable()) {
98 schedule_thread_.join();
99 }
100}
101
102std::future<bool> SerialScheduler::schedule_command(Transaction &t, std::unique_ptr<ast::Command> command, Diagnostic &diag)
103{
104 std::promise<bool> execution_completed;
105 auto execution_completed_future = execution_completed.get_future();
106 query_queue_.push(t, std::move(command), diag, std::move(execution_completed));
107
108 if (not schedule_thread_.joinable()) [[unlikely]]
109 // Creating the worker thread not here but in the constructor of `SerialScheduler` causes deadlocks.
110 schedule_thread_ = std::thread(schedule_thread);
111 return execution_completed_future;
112}
113
114std::unique_ptr<SerialScheduler::Transaction> SerialScheduler::begin_transaction() {
115 return std::make_unique<SerialScheduler::Transaction>();
116}
117
118bool SerialScheduler::commit(std::unique_ptr<SerialScheduler::Transaction> t) {
119 /* TODO: When autocommit is not used as the default anymore, the transaction must check for conflicts with
120 * other transactions that were introduced in the time between when this transaction executed statements and now. */
122 return true;
123}
124
125bool SerialScheduler::abort(std::unique_ptr<SerialScheduler::Transaction> t) {
126 /* TODO: Undo changes of transaction */
128 return true;
129}
130
132{
133 Catalog &C = Catalog::Get();
134 while (not query_queue_.is_closed()) {
135 auto ret = query_queue_.pop();
136 // pop() should only return no value if the queue is closed
137 if (not ret.has_value()) continue;
138
139 auto [t, ast, diag, promise] = std::move(ret.value());
140
141 // check if transaction has a start_time, set one if not. -1 represents an undefined value.
142 if (t.start_time() == -1) t.start_time(next_start_time++);
143 /* TODO: Implement overflow handling: if the transaction timestamps overflow, then outdated versions of tuples
144 * can become visible and other weird behaviour can occur. For this to happen, (2^63)-1 Transactions need to
145 * run without every restarting mutable. */
146 if (next_start_time < 0) [[unlikely]] M_unreachable("Transaction timestamp overflow");
147
148 ast::Sema sema(diag);
149 bool err = diag.num_errors() > 0; // parser errors
150
151 diag.clear();
152 auto cmd = sema.analyze(std::move(ast));
153 err |= diag.num_errors() > 0; // sema errors
154
155 M_insist(not err == bool(cmd), "when there are no errors, Sema must have returned a command");
156 if (not err and cmd) {
157 cmd->transaction(&t);
158 cmd->execute(diag);
159 promise.set_value(true);
160 continue;
161 }
162 promise.set_value(false);
163 }
164}
165
166__attribute__((constructor(202)))
167static void register_scheduler()
168{
169 Catalog &C = Catalog::Get();
171 C.pool("SerialScheduler"),
172 std::make_unique<SerialScheduler>(),
173 "executes all incoming queries serially"
174 );
175}
__attribute__((constructor(202))) static void register_interpreter()
#define M_unreachable(MSG)
Definition: macro.hpp:146
#define M_insist(...)
Definition: macro.hpp:129
‍mutable namespace
Definition: Backend.hpp:10
and
Definition: enum_ops.hpp:12
The catalog contains all Databases and keeps track of all meta information of the database system.
Definition: Catalog.hpp:215
void register_scheduler(ThreadSafePooledString name, std::unique_ptr< Scheduler > scheduler, const char *description=nullptr)
Registers a new Scheduler with the given name.
Definition: Catalog.hpp:543
ThreadSafePooledString pool(const char *str) const
Creates an internalized copy of the string str by adding it to the internal StringPool.
Definition: Catalog.hpp:274
static Catalog & Get()
Return a reference to the single Catalog instance.
std::tuple< Transaction &, std::unique_ptr< ast::Command >, Diagnostic &, std::promise< bool > > queued_command
Definition: Scheduler.hpp:38
A thread-safe query plan queue.
std::condition_variable has_element_
std::optional< queued_command > pop()
‍returns the next queued ast::Command. Returns std::nullopt if the queue is closed.
void stop_transaction(Transaction &t)
Marks t as no longer running.
bool is_closed()
signals waiting threads that no more elements will be pushed
void close()
empties and closes the queue without executing the remaining ast::Commands.
std::list< queued_command > command_list_
void push(Transaction &t, std::unique_ptr< ast::Command > command, Diagnostic &diag, std::promise< bool > promise)
Inserts the command into the queue.
Transaction * running_transaction_
the currently running transaction. Only commands by this transaction are returned.
bool commit(std::unique_ptr< Transaction > t) override
Closes the given Scheduler::Transaction and commits its changes.
std::future< bool > schedule_command(Transaction &t, std::unique_ptr< ast::Command > command, Diagnostic &diag) override
Schedule a ast::Command for execution within the given Scheduler::Transaction.
std::unique_ptr< Transaction > begin_transaction() override
Returns a new Scheduler::Transaction object that is passed along when scheduling commands.
static std::atomic< int64_t > next_start_time
stores the next transaction start time
static void schedule_thread()
The method run by the worker thread schedule_thread_.
bool abort(std::unique_ptr< Transaction > t) override
Closes the given Scheduler::Transaction and discards its changes.
std::thread schedule_thread_
the worker thread that executes all incoming queries.
static CommandQueue query_queue_
instance of our thread-safe query queue that stores all incoming plans.
std::unique_ptr< DatabaseCommand > analyze(std::unique_ptr< ast::Command > ast)
Perform semantic analysis of an ast::Command.
Definition: Sema.cpp:15