11 std::unique_lock<std::mutex> lock(
mutex_);
36 return {std::move(res)};
41 std::unique_lock<std::mutex> lock(mutex_);
45 promise.set_value(
false);
51 auto it = std::find_if(command_list_.rend(), command_list_.rbegin(),
54 if (running_transaction_
and *running_transaction_ == t
and it == command_list_.rend())
55 command_list_.emplace_front(t, std::move(command), diag, std::move(promise));
56 else if (it == command_list_.rbegin())
57 command_list_.emplace_back(t, std::move(command), diag, std::move(promise));
59 command_list_.emplace(++it.base(), t, std::move(command), diag, std::move(promise));
62 has_element_.notify_one();
67 std::unique_lock<std::mutex> lock(mutex_);
69 while (not command_list_.empty()) {
70 std::get<3>(command_list_.front()).set_value(
false);
71 command_list_.pop_front();
74 has_element_.notify_all();
79 std::lock_guard<std::mutex> lock(mutex_);
84 std::unique_lock<std::mutex> lock(mutex_);
85 M_insist(&t == running_transaction_);
86 running_transaction_ =
nullptr;
88 has_element_.notify_one();
104 std::promise<bool> execution_completed;
105 auto execution_completed_future = execution_completed.get_future();
106 query_queue_.
push(t, std::move(command), diag, std::move(execution_completed));
111 return execution_completed_future;
115 return std::make_unique<SerialScheduler::Transaction>();
137 if (not ret.has_value())
continue;
139 auto [t, ast, diag, promise] = std::move(ret.value());
149 bool err = diag.num_errors() > 0;
152 auto cmd = sema.
analyze(std::move(ast));
153 err |= diag.num_errors() > 0;
155 M_insist(not err ==
bool(cmd),
"when there are no errors, Sema must have returned a command");
156 if (not err
and cmd) {
157 cmd->transaction(&t);
159 promise.set_value(
true);
162 promise.set_value(
false);
167static
void register_scheduler()
171 C.
pool(
"SerialScheduler"),
172 std::make_unique<SerialScheduler>(),
173 "executes all incoming queries serially"
__attribute__((constructor(202))) static void register_interpreter()
#define M_unreachable(MSG)
The catalog contains all Databases and keeps track of all meta information of the database system.
void register_scheduler(ThreadSafePooledString name, std::unique_ptr< Scheduler > scheduler, const char *description=nullptr)
Registers a new Scheduler with the given name.
ThreadSafePooledString pool(const char *str) const
Creates an internalized copy of the string str by adding it to the internal StringPool.
static Catalog & Get()
Return a reference to the single Catalog instance.
std::tuple< Transaction &, std::unique_ptr< ast::Command >, Diagnostic &, std::promise< bool > > queued_command
A thread-safe query plan queue.
std::condition_variable has_element_
std::optional< queued_command > pop()
returns the next queued ast::Command. Returns std::nullopt if the queue is closed.
void stop_transaction(Transaction &t)
Marks t as no longer running.
bool is_closed()
signals waiting threads that no more elements will be pushed
void close()
empties and closes the queue without executing the remaining ast::Commands.
std::list< queued_command > command_list_
void push(Transaction &t, std::unique_ptr< ast::Command > command, Diagnostic &diag, std::promise< bool > promise)
Inserts the command into the queue.
Transaction * running_transaction_
the currently running transaction. Only commands by this transaction are returned.
bool commit(std::unique_ptr< Transaction > t) override
Closes the given Scheduler::Transaction and commits its changes.
std::future< bool > schedule_command(Transaction &t, std::unique_ptr< ast::Command > command, Diagnostic &diag) override
Schedule a ast::Command for execution within the given Scheduler::Transaction.
std::unique_ptr< Transaction > begin_transaction() override
Returns a new Scheduler::Transaction object that is passed along when scheduling commands.
static std::atomic< int64_t > next_start_time
stores the next transaction start time
static void schedule_thread()
The method run by the worker thread schedule_thread_.
bool abort(std::unique_ptr< Transaction > t) override
Closes the given Scheduler::Transaction and discards its changes.
std::thread schedule_thread_
the worker thread that executes all incoming queries.
static CommandQueue query_queue_
instance of our thread-safe query queue that stores all incoming plans.
std::unique_ptr< DatabaseCommand > analyze(std::unique_ptr< ast::Command > ast)
Perform semantic analysis of an ast::Command.