25static
void add_wasm_operator_args()
33 "--scan-implementations",
34 "a comma seperated list of physical scan implementations to consider (`Scan` or `IndexScan`)",
35 [](std::vector<std::string_view> impls){
37 for (
const auto &elem : impls) {
38 if (
strneq(elem.data(),
"Scan", elem.size()))
39 options::scan_implementations |= option_configs::ScanImplementation::SCAN;
40 else if (
strneq(elem.data(),
"IndexScan", elem.size()))
41 options::scan_implementations |= option_configs::ScanImplementation::INDEX_SCAN;
43 std::cerr <<
"warning: ignore invalid physical scan implementation " << elem << std::endl;
50 "--grouping-implementations",
51 "a comma seperated list of physical grouping implementations to consider (`HashBased` or "
53 [](std::vector<std::string_view> impls){
55 for (
const auto &elem : impls) {
56 if (
strneq(elem.data(),
"HashBased", elem.size()))
57 options::grouping_implementations |= option_configs::GroupingImplementation::HASH_BASED;
58 else if (
strneq(elem.data(),
"Ordered", elem.size()))
59 options::grouping_implementations |= option_configs::GroupingImplementation::ORDERED;
61 std::cerr <<
"warning: ignore invalid physical grouping implementation " << elem << std::endl;
68 "--sorting-implementations",
69 "a comma seperated list of physical sorting implementations to consider (`Quicksort` or "
71 [](std::vector<std::string_view> impls){
73 for (
const auto &elem : impls) {
74 if (
strneq(elem.data(),
"Quicksort", elem.size()))
75 options::sorting_implementations |= option_configs::SortingImplementation::QUICKSORT;
76 else if (
strneq(elem.data(),
"NoOp", elem.size()))
77 options::sorting_implementations |= option_configs::SortingImplementation::NOOP;
79 std::cerr <<
"warning: ignore invalid physical sorting implementation " << elem << std::endl;
86 "--join-implementations",
87 "a comma seperated list of physical join implementations to consider (`NestedLoops`, "
88 "`SimpleHash`, or `SortMerge`)",
89 [](std::vector<std::string_view> impls){
91 for (
const auto &elem : impls) {
92 if (
strneq(elem.data(),
"NestedLoops", elem.size()))
93 options::join_implementations |= option_configs::JoinImplementation::NESTED_LOOPS;
94 else if (
strneq(elem.data(),
"SimpleHash", elem.size()))
95 options::join_implementations |= option_configs::JoinImplementation::SIMPLE_HASH;
96 else if (
strneq(elem.data(),
"SortMerge", elem.size()))
97 options::join_implementations |= option_configs::JoinImplementation::SORT_MERGE;
99 std::cerr <<
"warning: ignore invalid physical join implementation " << elem << std::endl;
106 "--index-implementations",
107 "a comma separated list of index implementations to consider for index scans (`Array`, or"
109 [](std::vector<std::string_view> impls){
111 for (
const auto &elem : impls) {
112 if (
strneq(elem.data(),
"Array", elem.size()))
113 options::index_implementations |= option_configs::IndexImplementation::ARRAY;
114 else if (
strneq(elem.data(),
"Rmi", elem.size()))
115 options::index_implementations |= option_configs::IndexImplementation::RMI;
117 std::cerr <<
"warning: ignore invalid index implementation " << elem << std::endl;
124 "--index-scan-strategy",
125 "specify the index scan strategy (`Compilation`, `Interpretation`, or `Hybrid`)",
126 [](
const char *strategy){
127 if (
streq(strategy,
"Compilation"))
128 options::index_scan_strategy = option_configs::IndexScanStrategy::COMPILATION;
129 else if (
streq(strategy,
"Interpretation"))
130 options::index_scan_strategy = option_configs::IndexScanStrategy::INTERPRETATION;
131 else if (
streq(strategy,
"Hybrid"))
132 options::index_scan_strategy = option_configs::IndexScanStrategy::HYBRID;
134 std::cerr <<
"warning: ignore invalid index scan strategy " << strategy << std::endl;
140 "--index-scan-compilation-strategy",
141 "specify the materialization strategy for index scans (`Callback` or `ExposedMemory`)",
142 [](
const char *strategy){
143 if (
streq(strategy,
"Callback"))
144 options::index_scan_compilation_strategy = option_configs::IndexScanCompilationStrategy::CALLBACK;
145 else if (
streq(strategy,
"ExposedMemory"))
146 options::index_scan_compilation_strategy = option_configs::IndexScanCompilationStrategy::EXPOSED_MEMORY;
148 std::cerr <<
"warning: ignore invalid index scan strategy " << strategy << std::endl;
154 "--index-scan-materialization-strategy",
155 "specify the materialization strategy for index scans (`Inline` or `Memory`)",
156 [](
const char *strategy){
157 if (
streq(strategy,
"Inline"))
158 options::index_scan_materialization_strategy = option_configs::IndexScanMaterializationStrategy::INLINE;
159 else if (
streq(strategy,
"Memory"))
160 options::index_scan_materialization_strategy = option_configs::IndexScanMaterializationStrategy::MEMORY;
162 std::cerr <<
"warning: ignore invalid index scan strategy " << strategy << std::endl;
168 "--filter-selection-strategy",
169 "specify the selection strategy for filters (`Branching` or `Predicated`)",
170 [](
const char *strategy){
171 if (
streq(strategy,
"Branching"))
172 options::filter_selection_strategy = option_configs::SelectionStrategy::BRANCHING;
173 else if (
streq(strategy,
"Predicated"))
174 options::filter_selection_strategy = option_configs::SelectionStrategy::PREDICATED;
176 std::cerr <<
"warning: ignore invalid filter selection strategy " << strategy << std::endl;
182 "--quicksort-cmp-selection-strategy",
183 "specify the selection strategy for comparisons in quicksort (`Branching` or `Predicated`)",
184 [](
const char *strategy){
185 if (
streq(strategy,
"Branching"))
186 options::quicksort_cmp_selection_strategy = option_configs::SelectionStrategy::BRANCHING;
187 else if (
streq(strategy,
"Predicated"))
188 options::quicksort_cmp_selection_strategy = option_configs::SelectionStrategy::PREDICATED;
190 std::cerr <<
"warning: ignore invalid quicksort comparison selection strategy " << strategy << std::endl;
196 "--nested-loops-join-selection-strategy",
197 "specify the selection strategy for nested-loops joins (`Branching` or `Predicated`)",
198 [](
const char *strategy){
199 if (
streq(strategy,
"Branching"))
200 options::nested_loops_join_selection_strategy = option_configs::SelectionStrategy::BRANCHING;
201 else if (
streq(strategy,
"Predicated"))
202 options::nested_loops_join_selection_strategy = option_configs::SelectionStrategy::PREDICATED;
204 std::cerr <<
"warning: ignore invalid nested-loops join selection strategy " << strategy << std::endl;
210 "--simple-hash-join-selection-strategy",
211 "specify the selection strategy for simple hash joins (`Branching` or `Predicated`)",
212 [](
const char *strategy){
213 if (
streq(strategy,
"Branching"))
214 options::simple_hash_join_selection_strategy = option_configs::SelectionStrategy::BRANCHING;
215 else if (
streq(strategy,
"Predicated"))
216 options::simple_hash_join_selection_strategy = option_configs::SelectionStrategy::PREDICATED;
218 std::cerr <<
"warning: ignore invalid simple hash join selection strategy " << strategy << std::endl;
224 "--simple-hash-join-ordering-strategy",
225 "specify the ordering strategy for simple hash joins (`BuildOnLeft` or `BuildOnRight`)",
226 [](
const char *strategy){
227 if (
streq(strategy,
"BuildOnLeft"))
228 options::simple_hash_join_ordering_strategy = option_configs::OrderingStrategy::BUILD_ON_LEFT;
229 else if (
streq(strategy,
"BuildOnRight"))
230 options::simple_hash_join_ordering_strategy = option_configs::OrderingStrategy::BUILD_ON_RIGHT;
232 std::cerr <<
"warning: ignore invalid simple hash join ordering strategy " << strategy << std::endl;
238 "--sort-merge-join-selection-strategy",
239 "specify the selection strategy for sort merge joins (`Branching` or `Predicated`)",
240 [](
const char *strategy){
241 if (
streq(strategy,
"Branching"))
242 options::sort_merge_join_selection_strategy = option_configs::SelectionStrategy::BRANCHING;
243 else if (
streq(strategy,
"Predicated"))
244 options::sort_merge_join_selection_strategy = option_configs::SelectionStrategy::PREDICATED;
246 std::cerr <<
"warning: ignore invalid sort merge join selection strategy " << strategy << std::endl;
252 "--sort-merge-join-cmp-selection-strategy",
253 "specify the selection strategy for comparisons while sorting in sort merge joins "
254 "(`Branching` or `Predicated`)",
255 [](
const char *strategy){
256 if (
streq(strategy,
"Branching"))
257 options::sort_merge_join_cmp_selection_strategy = option_configs::SelectionStrategy::BRANCHING;
258 else if (
streq(strategy,
"Predicated"))
259 options::sort_merge_join_cmp_selection_strategy = option_configs::SelectionStrategy::PREDICATED;
261 std::cerr <<
"warning: ignore invalid sort merge join comparison selection strategy " << strategy
268 "--hash-table-implementation",
269 "specify the hash table implementation (`OpenAddressing` or `Chained`)",
270 [](
const char *impl){
271 if (
streq(impl,
"OpenAddressing"))
272 options::hash_table_implementation = option_configs::HashTableImplementation::OPEN_ADDRESSING;
273 else if (
streq(impl,
"Chained"))
274 options::hash_table_implementation = option_configs::HashTableImplementation::CHAINED;
276 std::cerr <<
"warning: ignore invalid hash table implementation " << impl << std::endl;
282 "--hash-table-probing-strategy",
283 "specify the probing strategy for hash tables (`Linear` or `Quadratic`)",
284 [](
const char *strategy){
285 if (
streq(strategy,
"Linear"))
286 options::hash_table_probing_strategy = option_configs::ProbingStrategy::LINEAR;
287 else if (
streq(strategy,
"Quadratic"))
288 options::hash_table_probing_strategy = option_configs::ProbingStrategy::QUADRATIC;
290 std::cerr <<
"warning: ignore invalid hash table probing strategy " << strategy << std::endl;
296 "--hash-table-storing-strategy",
297 "specify the storing strategy for hash tables (`InPlace` or `OutOfPlace`)",
298 [](
const char *strategy){
299 if (
streq(strategy,
"InPlace"))
300 options::hash_table_storing_strategy = option_configs::StoringStrategy::IN_PLACE;
301 else if (
streq(strategy,
"OutOfPlace"))
302 options::hash_table_storing_strategy = option_configs::StoringStrategy::OUT_OF_PLACE;
304 std::cerr <<
"warning: ignore invalid hash table storing strategy " << strategy << std::endl;
310 "--hash-table-max-load-factor",
311 "specify the maximal load factor for hash tables, i.e. the load factor at which rehashing "
312 "should occur (must be in [1,∞) for chained and in [0.5,1) for open-addressing hash tables)",
313 [](
double load_factor){
314 options::load_factor_open_addressing = load_factor;
315 options::load_factor_chained = load_factor;
321 "--hash-table-initial-capacity",
322 "specify the initial capacity for hash tables",
323 [](uint32_t initial_capacity){
324 options::hash_table_initial_capacity = initial_capacity;
330 "--no-hash-based-group-join",
331 "disable potential use of hash-based group-join",
332 [](
bool){ options::hash_based_group_join =
false; }
337 "--hard-pipeline-breaker-layout",
338 "specify the layout for hard pipeline breakers (`Row`, `PAX4K`, `PAX64K`, `PAX512K`, "
339 "`PAX4M`, or `PAX64M`)",
340 [](
const char *layout){
341 if (
streq(layout,
"Row")) {
342 options::hard_pipeline_breaker_layout = std::make_unique<RowLayoutFactory>();
346 if (
streq(layout,
"PAX4K")) {
348 block_size = 1UL << 12;
349 }
else if (
streq(layout,
"PAX64K")) {
351 block_size = 1UL << 16;
352 }
else if (
streq(layout,
"PAX512K")) {
354 block_size = 1UL << 19;
355 }
else if (
streq(layout,
"PAX4M")) {
357 block_size = 1UL << 22;
358 }
else if (
streq(layout,
"PAX64M")) {
360 block_size = 1UL << 26;
361 }
else if (
streq(layout,
"PAX16Tup")) {
364 }
else if (
streq(layout,
"PAX128Tup")) {
367 }
else if (
streq(layout,
"PAX1024Tup")) {
371 std::cerr <<
"warning: ignore invalid layout for hard pipeline breakers " << layout << std::endl;
373 options::hard_pipeline_breaker_layout = std::make_unique<PAXLayoutFactory>(size_type, block_size);
380 "--soft-pipeline-breaker",
381 "a comma seperated list where to insert soft pipeline breakers (`AfterAll`, `AfterScan`, "
382 "`AfterFilter`, `AfterProjection`, `AfterNestedLoopsJoin`, or `AfterSimpleHashJoin`)",
383 [](std::vector<std::string_view> location){
385 for (
const auto &elem : location) {
386 if (
strneq(elem.data(),
"AfterAll", elem.size()))
387 options::soft_pipeline_breaker |= option_configs::SoftPipelineBreakerStrategy::AFTER_ALL;
388 else if (
strneq(elem.data(),
"AfterScan", elem.size()))
389 options::soft_pipeline_breaker |= option_configs::SoftPipelineBreakerStrategy::AFTER_SCAN;
390 else if (
strneq(elem.data(),
"AfterFilter", elem.size()))
391 options::soft_pipeline_breaker |= option_configs::SoftPipelineBreakerStrategy::AFTER_FILTER;
392 else if (
strneq(elem.data(),
"AfterIndexScan", elem.size()))
393 options::soft_pipeline_breaker |= option_configs::SoftPipelineBreakerStrategy::AFTER_INDEX_SCAN;
394 else if (
strneq(elem.data(),
"AfterProjection", elem.size()))
395 options::soft_pipeline_breaker |= option_configs::SoftPipelineBreakerStrategy::AFTER_PROJECTION;
396 else if (
strneq(elem.data(),
"AfterNestedLoopsJoin", elem.size()))
397 options::soft_pipeline_breaker |= option_configs::SoftPipelineBreakerStrategy::AFTER_NESTED_LOOPS_JOIN;
398 else if (
strneq(elem.data(),
"AfterSimpleHashJoin", elem.size()))
399 options::soft_pipeline_breaker |= option_configs::SoftPipelineBreakerStrategy::AFTER_SIMPLE_HASH_JOIN;
401 std::cerr <<
"warning: ignore invalid location for soft pipeline breakers " << elem << std::endl;
408 "--soft-pipeline-breaker-layout",
409 "specify the layout for soft pipeline breakers (`Row`, `PAX4K`, `PAX64K`, `PAX512K`, "
410 "`PAX4M`, or `PAX64M`)",
411 [](
const char *layout){
412 if (
streq(layout,
"Row")) {
413 options::soft_pipeline_breaker_layout = std::make_unique<RowLayoutFactory>();
417 if (
streq(layout,
"PAX4K")) {
419 block_size = 1UL << 12;
420 }
else if (
streq(layout,
"PAX64K")) {
422 block_size = 1UL << 16;
423 }
else if (
streq(layout,
"PAX512K")) {
425 block_size = 1UL << 19;
426 }
else if (
streq(layout,
"PAX4M")) {
428 block_size = 1UL << 22;
429 }
else if (
streq(layout,
"PAX64M")) {
431 block_size = 1UL << 26;
432 }
else if (
streq(layout,
"PAX16Tup")) {
435 }
else if (
streq(layout,
"PAX128Tup")) {
438 }
else if (
streq(layout,
"PAX1024Tup")) {
442 std::cerr <<
"warning: ignore invalid layout for soft pipeline breakers " << layout << std::endl;
444 options::soft_pipeline_breaker_layout = std::make_unique<PAXLayoutFactory>(size_type, block_size);
451 "--soft-pipeline-breaker-num-tuples",
452 "set the size in tuples for soft pipeline breakers (0 means infinite)",
453 [](std::size_t num_tuples){ options::soft_pipeline_breaker_num_tuples = num_tuples; }
458 "--index-sequential-scan-batch-size",
459 "set the number of tuples ids communicated between host and V8 per batch during index "
461 "(0 means infinite), ignored in case of --isam-compile-qualifying",
462 [](std::size_t size){ options::index_sequential_scan_batch_size = size; }
467 "--result-set-window-size",
468 "set the window size in tuples for the result set (0 means infinite)",
469 [](std::size_t size){ options::result_set_window_size = size; }
474 "--no-exploit-unique-build",
475 "disable potential exploitation of uniqueness of build key in hash joins",
476 [](
bool){ options::exploit_unique_build =
false; }
482 "disable potential use of SIMDfication",
483 [](
bool){ options::simd =
false; }
488 "--no-double-pumping",
489 "disable use of double pumping (has only an effect if SIMDfication is enabled)",
490 [](
bool){ options::double_pumping =
false; }
496 "set the number of SIMD lanes to prefer",
497 [](std::size_t lanes){ options::simd_lanes = lanes; }
502 "--xxx-asc-sorted-attributes",
503 "a comma seperated list of attributes, i.e. of the format `T.x` where `T` is either the "
504 "table name or the alias and `x` is the attribute name, which are assumed to be sorted "
506 [&C](std::vector<std::string_view> attrs){
507 for (
const auto &elem : attrs) {
508 auto idx = elem.find(
'.');
509 if (idx == std::string_view::npos)
510 std::cerr <<
"warning: ignore invalid attribute " << elem << std::endl;
512 options::sorted_attributes.emplace_back(std::move(attr),
true);
519 "--xxx-desc-sorted-attributes",
520 "a comma seperated list of attributes, i.e. of the format `T.x` where `T` is either the "
521 "table name or the alias and `x` is the attribute name, which are assumed to be sorted "
523 [&C](std::vector<std::string_view> attrs){
524 for (
const auto &elem : attrs) {
525 auto idx = elem.find(
'.');
526 if (idx == std::string_view::npos)
527 std::cerr <<
"warning: ignore invalid attribute " << elem << std::endl;
529 options::sorted_attributes.emplace_back(std::move(attr),
false);
549 if (
bool(options::scan_implementations bitand option_configs::ScanImplementation::SCAN)) {
554 if (
bool(options::scan_implementations bitand option_configs::ScanImplementation::INDEX_SCAN)) {
555 if (
bool(options::index_implementations bitand option_configs::IndexImplementation::ARRAY))
557 if (
bool(options::index_implementations bitand option_configs::IndexImplementation::RMI))
560 if (
bool(options::filter_selection_strategy bitand option_configs::SelectionStrategy::BRANCHING))
562 if (
bool(options::filter_selection_strategy bitand option_configs::SelectionStrategy::PREDICATED))
566 if (
bool(options::grouping_implementations bitand option_configs::GroupingImplementation::HASH_BASED))
568 if (
bool(options::grouping_implementations bitand option_configs::GroupingImplementation::ORDERED))
571 if (
bool(options::sorting_implementations bitand option_configs::SortingImplementation::QUICKSORT)) {
572 if (
bool(options::quicksort_cmp_selection_strategy bitand option_configs::SelectionStrategy::BRANCHING))
574 if (
bool(options::quicksort_cmp_selection_strategy bitand option_configs::SelectionStrategy::PREDICATED))
577 if (
bool(options::sorting_implementations bitand option_configs::SortingImplementation::NOOP))
579 if (
bool(options::join_implementations bitand option_configs::JoinImplementation::NESTED_LOOPS)) {
580 if (
bool(options::nested_loops_join_selection_strategy bitand option_configs::SelectionStrategy::BRANCHING))
582 if (
bool(options::nested_loops_join_selection_strategy bitand option_configs::SelectionStrategy::PREDICATED))
585 if (
bool(options::join_implementations bitand option_configs::JoinImplementation::SIMPLE_HASH)) {
586 if (
bool(options::simple_hash_join_selection_strategy bitand option_configs::SelectionStrategy::BRANCHING)) {
588 if (options::exploit_unique_build)
591 if (
bool(options::simple_hash_join_selection_strategy bitand option_configs::SelectionStrategy::PREDICATED)) {
593 if (options::exploit_unique_build)
597 if (
bool(options::join_implementations bitand option_configs::JoinImplementation::SORT_MERGE)) {
598 if (
bool(options::sort_merge_join_selection_strategy bitand option_configs::SelectionStrategy::BRANCHING)) {
599 if (
bool(options::sort_merge_join_cmp_selection_strategy bitand option_configs::SelectionStrategy::BRANCHING)) {
605 if (
bool(options::sort_merge_join_cmp_selection_strategy bitand option_configs::SelectionStrategy::PREDICATED)) {
612 if (
bool(options::sort_merge_join_selection_strategy bitand option_configs::SelectionStrategy::PREDICATED)) {
613 if (
bool(options::sort_merge_join_cmp_selection_strategy bitand option_configs::SelectionStrategy::BRANCHING)) {
619 if (
bool(options::sort_merge_join_cmp_selection_strategy bitand option_configs::SelectionStrategy::PREDICATED)) {
628 if (options::hash_based_group_join)
649 context.result_set_factory = factory.
clone();
652 if (window_size != 0) {
656 std::optional<Var<U32x1>> counter;
661 FUNCTION(child_pipeline,
void(
void))
673 "SIMDfication with predication not supported");
674 *counter += env.extract_predicate<_Boolx1>().is_true_and_not_null().to<uint32_t>();
680 IF (*counter == window_size) {
688 counter_backup = *counter;
702 FUNCTION(child_pipeline,
void(
void))
706 std::optional<Var<U32x1>> num_tuples;
716 "SIMDfication with predication not supported");
717 *num_tuples += env.extract_predicate<_Boolx1>().is_true_and_not_null().to<uint32_t>();
735 if (window_size != 0) {
740 GlobalBuffer result_set(schema, factory,
false, window_size);
743 FUNCTION(child_pipeline,
void(
void))
759 IF (single_slot_free
and result_set.
size() == 0
U) {
780 FUNCTION(child_pipeline,
void(
void))
786 [&](){ result_set.
consume(); },
806 const std::vector<std::unique_ptr<ast::Expr>> &
args;
823std::pair<std::vector<aggregate_info_t>, std::unordered_map<Schema::Identifier, avg_aggregate_info_t>>
825 const Schema &schema, std::size_t aggregates_offset = 0)
827 std::vector<aggregate_info_t> aggregates_info;
828 std::unordered_map<Schema::Identifier, avg_aggregate_info_t> avg_aggregates_info;
830 for (std::size_t i = aggregates_offset; i < schema.
num_entries(); ++i) {
833 auto pred = [&e](
const auto &info){
return info.entry.id == e.id; };
834 if (
auto it = std::find_if(aggregates_info.cbegin(), aggregates_info.cend(), pred); it != aggregates_info.cend())
837 auto &fn_expr = aggregates[i - aggregates_offset].get();
838 auto &fn = fn_expr.get_function();
839 M_insist(fn.kind == m::Function::FN_Aggregate,
"not an aggregation function");
841 if (fn.fnid == m::Function::FN_AVG) {
842 M_insist(fn_expr.args.size() == 1,
"AVG aggregate function expects exactly one argument");
845 auto pred = [&fn_expr](
const auto &_fn_expr){
846 M_insist(_fn_expr.get().get_function().fnid != m::Function::FN_COUNT or _fn_expr.get().args.size() <= 1,
847 "COUNT aggregate function expects exactly one argument");
848 return _fn_expr.get().get_function().fnid == m::Function::FN_COUNT
and
849 not _fn_expr.get().args.empty()
and *_fn_expr.get().args[0] == *fn_expr.args[0];
851 std::optional<Schema::Identifier> running_count;
852 if (
auto it = std::find_if(aggregates.cbegin(), aggregates.cend(), pred);
853 it != aggregates.cend())
855 const auto idx_agg = std::distance(aggregates.cbegin(), it);
856 running_count = schema[aggregates_offset + idx_agg].id;
858 std::ostringstream oss;
859 oss <<
"$running_count_" << fn_expr;
863 .fnid = m::Function::FN_COUNT,
869 std::optional<Schema::Identifier> sum;
870 bool compute_running_avg;
871 if (fn_expr.args[0]->type()->size() <= 32) {
874 compute_running_avg =
false;
875 auto pred = [&fn_expr](
const auto &_fn_expr){
876 M_insist(_fn_expr.get().get_function().fnid != m::Function::FN_SUM or
877 _fn_expr.get().args.size() == 1,
878 "SUM aggregate function expects exactly one argument");
879 return _fn_expr.get().get_function().fnid == m::Function::FN_SUM
and
880 *_fn_expr.get().args[0] == *fn_expr.args[0];
882 if (
auto it = std::find_if(aggregates.cbegin(), aggregates.cend(), pred);
883 it != aggregates.cend())
885 const auto idx_agg = std::distance(aggregates.cbegin(), it);
886 sum = schema[aggregates_offset + idx_agg].id;
888 std::ostringstream oss;
889 oss <<
"$sum_" << fn_expr;
892 switch (as<const Numeric>(*fn_expr.args[0]->type()).kind) {
894 case Numeric::N_Decimal:
897 case Numeric::N_Float:
901 .
entry = { *sum, type, e.constraints },
902 .fnid = m::Function::FN_SUM,
909 compute_running_avg =
true;
913 .fnid = m::Function::FN_AVG,
920 .running_count = std::move(*running_count),
921 .sum = std::move(*sum),
922 .compute_running_avg = compute_running_avg
933 return { std::move(aggregates_info), std::move(avg_aggregates_info) };
939std::pair<std::vector<Schema::Identifier>, std::vector<Schema::Identifier>>
942 std::vector<Schema::Identifier> ids_left, ids_right;
943 for (
auto &clause : cnf) {
944 M_insist(clause.size() == 1,
"invalid equi-predicate");
945 auto &literal = clause[0];
946 auto &
binary = as<const BinaryExpr>(literal.expr());
948 (literal.negative()
and binary.tok == TK_BANG_EQUAL),
"invalid equi-predicate");
949 M_insist(is<const Designator>(
binary.lhs),
"invalid equi-predicate");
950 M_insist(is<const Designator>(
binary.rhs),
"invalid equi-predicate");
952 const auto &[id_left, id_right] = schema_left.
has(id_first) ? std::make_pair(id_first, id_second)
953 : std::make_pair(id_second, id_first);
954 ids_left.push_back(std::move(id_left));
955 ids_right.push_back(std::move(id_right));
957 M_insist(ids_left.size() == ids_right.size(),
"number of found IDs differ");
958 M_insist(not ids_left.empty(),
"must find at least one ID");
959 return { std::move(ids_left), std::move(ids_right) };
964 static std::ostringstream oss;
966 oss << table_name <<
"_num_rows";
972 static std::ostringstream oss;
974 oss << table_name <<
"_mem";
981 uint64_t initial_capacity;
982 if (options::hash_table_initial_capacity) {
983 initial_capacity = *options::hash_table_initial_capacity;
986 initial_capacity =
static_cast<uint64_t
>(std::ceil(
op.info().estimated_cardinality / load_factor));
987 else if (
auto scan = cast<const ScanOperator>(&
op))
988 initial_capacity =
static_cast<uint64_t
>(std::ceil(scan->store().num_rows() / load_factor));
990 initial_capacity = 1024;
992 return std::in_range<uint32_t>(initial_capacity) ? initial_capacity : std::numeric_limits<uint32_t>::max();
999 std::optional<std::reference_wrapper<const ast::Expr>>
lo,
hi;
1005 if (is<const Constant>(
expr))
1007 if (
auto u = cast<const UnaryExpr>(&
expr)) {
1009 if ((u->op().type == TK_MINUS or u->op().type == TK_PLUS)
and is<const Constant>(*u->expr))
1019 if (
auto c = cast<const Constant>(&
expr))
1020 return { *c,
false };
1021 auto &u = as<const UnaryExpr>(
expr);
1022 return { as<const Constant>(*u.expr), u.op().type == TK_MINUS };
1032 M_insist(not cnf.empty(),
"filter condition must not be empty");
1034 M_insist(designators.num_entries() == 1,
"filter condition must contain exactly one designator");
1037 for (
auto &clause : cnf) {
1038 M_insist(clause.size() == 1,
"invalid predicate");
1039 auto &literal = clause[0];
1040 auto &
binary = as<const BinaryExpr>(literal.expr());
1043 bool has_attribute_left = is<const Designator>(
binary.lhs);
1044 auto &bound = has_attribute_left ? *
binary.rhs : *
binary.lhs;
1046 switch(
binary.tok.type) {
1050 M_insist(not
bool(bounds.
lo)
and not
bool(bounds.
hi),
"bound already set");
1051 bounds.
lo = bounds.
hi = std::cref(bound);
1055 if (has_attribute_left) {
1056 M_insist(not
bool(bounds.
lo),
"lo bound already set");
1057 bounds.
lo = std::cref(bound);
1060 M_insist(not
bool(bounds.
hi),
"hi bound already set");
1061 bounds.
hi = std::cref(bound);
1065 case TK_GREATER_EQUAL:
1066 if (has_attribute_left) {
1067 M_insist(not
bool(bounds.
lo),
"lo bound already set");
1068 bounds.
lo = std::cref(bound);
1071 M_insist(not
bool(bounds.
hi),
"hi bound already set");
1072 bounds.
hi = std::cref(bound);
1077 if (has_attribute_left) {
1078 M_insist(not
bool(bounds.
hi),
"hi bound already set");
1079 bounds.
hi = std::cref(bound);
1082 M_insist(not
bool(bounds.
lo),
"lo bound already set");
1083 bounds.
lo = std::cref(bound);
1088 if (has_attribute_left) {
1089 M_insist(not
bool(bounds.
hi),
"hi bound already set");
1090 bounds.
hi = std::cref(bound);
1093 M_insist(not
bool(bounds.
lo),
"lo bound already set");
1094 bounds.
lo = std::cref(bound);
1100 M_insist(
bool(bounds.
lo) or
bool(bounds.
hi),
"either bound must be set");
1111 std::optional<Var<U32x1>> num_tuples;
1121 *num_tuples += env.extract_predicate<_Boolx1>().is_true_and_not_null().to<uint32_t>();
1125 auto pred = env.extract_predicate<_Boolx16>().is_true_and_not_null();
1126 *num_tuples += pred.bitmask().popcnt();
1147template<
bool SIMDfied>
1154 if constexpr (SIMDfied) {
1165template<
bool SIMDfied>
1168 M_insist(
bool(M.result_set_factory),
"`wasm::Callback` must have a factory for the result set");
1170 auto result_set_schema = M.callback.schema().drop_constants().deduplicate();
1171 write_result_set(result_set_schema, *M.result_set_factory, M.result_set_window_size, *M.child);
1179template<
bool SIMDfied>
1186 if constexpr (SIMDfied) {
1197template<
bool SIMDfied>
1200 M_insist(
bool(M.result_set_factory),
"`wasm::Print` must have a factory for the result set");
1202 auto result_set_schema = M.print_op.schema().drop_constants().deduplicate();
1203 write_result_set(result_set_schema, *M.result_set_factory, M.result_set_window_size, *M.child);
1211template<
bool SIMDfied>
1213 const std::tuple<const ScanOperator*> &partial_inner_nodes)
1219 if constexpr (SIMDfied) {
1220 auto &scan = *std::get<0>(partial_inner_nodes);
1221 auto &table = scan.store().table();
1224 if (not supports_simd(table.layout(), table.schema(scan.alias()), scan.schema()))
1228 if (scan.store().num_rows() %
get_num_simd_lanes(table.layout(), table.schema(scan.alias()), scan.schema()) != 0)
1235template<
bool SIMDfied>
1243 if constexpr (SIMDfied) {
1245 auto &table = M.scan.store().table();
1255 for (
auto &e : M.scan.schema()) {
1256 auto pred = [&e](
const auto &p){
return e.id == p.first; };
1263 if (not orders.
empty())
1269template<
bool SIMDfied>
1272 auto &schema = M.scan.schema();
1273 auto &table = M.scan.store().table();
1275 M_insist(schema == schema.drop_constants().deduplicate(),
"schema of `ScanOperator` must not contain NULL or duplicates");
1276 M_insist(not table.layout().is_finite(),
"layout for `wasm::Scan` must be infinite");
1283 const auto num_simd_lanes_preferred =
1295 if (schema.num_entries() == 0) {
1297 WHILE (tuple_id < num_rows) {
1312 static Schema empty_schema;
1317 inits.attach_to_current();
1318 WHILE (tuple_id < num_rows) {
1319 loads.attach_to_current();
1321 jumps.attach_to_current();
1333template<
idx::IndexMethod IndexMethod>
1341 auto &filter = *std::get<0>(partial_inner_nodes);
1342 auto &cnf = filter.filter();
1344 M_insist(not cnf.empty(),
"Filter condition must not be empty");
1346 auto &scan = *std::get<1>(partial_inner_nodes);
1347 auto &table = scan.store().table();
1353 std::vector<Schema::Identifier> ids;
1354 for (
auto &entry : cnf.get_required()) {
1356 M_insist(table.name() ==
id.prefix,
"Table name should match designator table name");
1359 if (DB.has_index(table.name(),
id.name, IndexMethod))
1377 bool has_lo_bound =
false;
1378 bool has_hi_bound =
false;
1379 for (
auto &clause : cnf) {
1380 if (clause.size() != 1)
1383 auto &predicate = clause[0];
1384 if (predicate.negative())
1387 auto expr = cast<const BinaryExpr>(&predicate.expr());
1391 bool has_attribute_left = is<const Designator>(
expr->lhs);
1392 auto &attribute = has_attribute_left ? *
expr->lhs : *
expr->rhs;
1393 auto &constant = has_attribute_left ? *
expr->rhs : *
expr->lhs;
1394 if (not is<const Designator>(attribute))
1399 switch(
expr->tok.type) {
1403 if (not has_lo_bound
and not has_hi_bound) {
1404 has_lo_bound = has_hi_bound =
true;
1410 case TK_GREATER_EQUAL:
1411 if (has_attribute_left
and not has_lo_bound) {
1412 has_lo_bound =
true;
1413 }
else if (not has_attribute_left
and not has_hi_bound) {
1414 has_hi_bound =
true;
1421 if (has_attribute_left
and not has_hi_bound) {
1422 has_hi_bound =
true;
1423 }
else if (not has_attribute_left
and not has_lo_bound) {
1424 has_lo_bound =
true;
1434template<
idx::IndexMethod IndexMethod>
1447 auto &cnf = M.filter.filter();
1448 Schema designators = cnf.get_required();
1449 M_insist(designators.
num_entries() == 1,
"filter condition must contain exactly one designator");
1460template<
idx::IndexMethod IndexMethod>
1467template<
idx::IndexMethod IndexMethod,
typename Index, sql_type SqlT>
1471 using key_type = Index::key_type;
1472 using sql_type = SqlT;
1476 const char *scan_fn, *lower_bound_fn, *upper_bound_fn;
1477#define SET_CALLBACK_FNS(INDEX, KEY) \
1478 scan_fn = M_STR(idx_scan_##INDEX##_##KEY); \
1479 lower_bound_fn = M_STR(idx_lower_bound_##INDEX##_##KEY); \
1480 upper_bound_fn = M_STR(idx_upper_bound_##INDEX##_##KEY)
1482#define RESOLVE_KEYTYPE(INDEX) \
1483 if constexpr(std::same_as<SqlT, _Boolx1>) { \
1484 SET_CALLBACK_FNS(INDEX, b); \
1485 } else if constexpr(std::same_as<sql_type, _I8x1>) { \
1486 SET_CALLBACK_FNS(INDEX, i1); \
1487 } else if constexpr(std::same_as<sql_type, _I16x1>) { \
1488 SET_CALLBACK_FNS(INDEX, i2); \
1489 } else if constexpr(std::same_as<sql_type, _I32x1>) { \
1490 SET_CALLBACK_FNS(INDEX, i4); \
1491 } else if constexpr(std::same_as<sql_type, _I64x1>) { \
1492 SET_CALLBACK_FNS(INDEX, i8); \
1493 } else if constexpr(std::same_as<sql_type, _Floatx1>) { \
1494 SET_CALLBACK_FNS(INDEX, f); \
1495 } else if constexpr(std::same_as<sql_type, _Doublex1>) { \
1496 SET_CALLBACK_FNS(INDEX, d); \
1497 } else if constexpr(std::same_as<sql_type, NChar>) { \
1498 SET_CALLBACK_FNS(IDNEX, p); \
1500 M_unreachable("incompatible SQL type"); \
1502 if constexpr(is_specialization<Index, idx::ArrayIndex>) {
1504 }
else if constexpr(is_specialization<Index, idx::RecursiveModelIndex>) {
1509#undef RESOLVE_KEYTYPE
1510#undef SET_CALLBACK_FNS
1514 U64x1 index_id(context.add_index(index));
1517 auto compile_bound_lookup = [&](
const ast::Expr &bound,
bool is_lower_bound) {
1519 auto c = Interpreter::eval(constant);
1523 M_insist(not is_negative,
"boolean cannot be negative");
1525 auto i64 = int64_t(c);
1526 M_insist(std::in_range<key_type>(i64),
"integeral constant must be in range");
1527 _key = key_type(i64);
1528 _key = is_negative ? -_key : _key;
1529 }
else if constexpr(std::same_as<float, key_type>) {
1532 M_insist(_key == d,
"downcasting should not impact precision");
1533 _key = is_negative ? -_key : _key;
1534 }
else if constexpr(std::same_as<double, key_type>) {
1536 _key = is_negative ? -_key : _key;
1537 }
else if constexpr(std::same_as<const char*, key_type>) {
1538 _key =
reinterpret_cast<const char*
>(c.as_p());
1539 M_insist(not is_negative,
"string cannot be negative");
1542 std::optional<typename sql_type::primitive_type> key;
1544 if constexpr (std::same_as<sql_type, NChar>) {
1553 if constexpr (std::same_as<sql_type, NChar>) {
1558 key.emplace(U32x1(*key_ptr).to<
char*>(),
false, as<const CharacterSequence>(bound.
type()));
1561 *key_address = _key;
1564 key.emplace(*key_ptr);
1569 M_insist(
bool(key),
"key must be set");
1571 is_lower_bound ? lower_bound_fn : upper_bound_fn,
1579 : U32x1(index.num_entries()));
1583 M_insist(std::in_range<uint32_t>(M.batch_size),
"should fit in uint32_t");
1588 U32x1 num_results = hi - lo;
1589 U32x1 num_results_cpy = num_results.clone();
1590 U32x1 batch_size = M.batch_size == 0 ? num_results.clone() : U32x1(M.batch_size);
1591 U32x1 batch_size_cpy = batch_size.clone();
1592 return Select(batch_size < num_results, batch_size_cpy, num_results_cpy);
1603 num_tuples_in_batch =
Select(hi - lo > alloc_size, alloc_size, hi - lo);
1609 buffer_address.clone(),
1610 num_tuples_in_batch.val()
1612 lo += num_tuples_in_batch;
1613 ptr = buffer_address.clone();
1614 WHILE(num_tuples_in_batch > 0
U) {
1615 static Schema empty_schema;
1620 M.scan.store().table().layout(),
1621 M.scan.store().table().schema(M.scan.alias()),
1625 num_tuples_in_batch -= 1U;
1634 IF (alloc_size > U32x1(0)) {
1644template<
idx::IndexMethod IndexMethod,
typename Index>
1649 using key_type = Index::key_type;
1651 static Schema empty_schema;
1654 auto interpret_and_lookup_bound = [&](
const ast::Expr &bound,
bool is_lower_bound) -> std::size_t {
1656 auto c = Interpreter::eval(constant);
1660 M_insist(not is_negative,
"boolean cannot be negative");
1662 auto i64 = int64_t(c);
1663 M_insist(std::in_range<key_type>(i64),
"integeral constant must be in range");
1664 key = key_type(i64);
1665 key = is_negative ? -key : key;
1666 }
else if constexpr(std::same_as<float, key_type>) {
1669 M_insist(key == d,
"downcasting should not impact precision");
1670 key = is_negative ? -key : key;
1671 }
else if constexpr(std::same_as<double, key_type>) {
1673 key = is_negative ? -key : key;
1674 }
else if constexpr(std::same_as<const char*, key_type>) {
1675 key =
reinterpret_cast<const char*
>(c.as_p());
1676 M_insist(not is_negative,
"string cannot be negative");
1678 return std::distance(index.begin(), is_lower_bound ? index.lower_bound(key)
1679 : index.upper_bound(key));
1684 : index.num_entries();
1685 M_insist(lo <= hi,
"bounds need to be valid");
1689 uint32_t num_results = hi - lo;
1690 uint32_t *buffer_address =
Module::Allocator().raw_malloc<uint32_t>(num_results + 1);
1693 uint32_t *buffer_ptr = buffer_address;
1694 *buffer_ptr = num_results;
1696 for (
auto it = index.begin() + lo; it != index.begin() + hi; ++it) {
1697 M_insist(std::in_range<uint32_t>(it->second),
"tuple id must fit in uint32_t");
1698 *buffer_ptr = it->second;
1714 M.scan.store().table().layout(),
1715 M.scan.store().table().schema(M.scan.alias()),
1726 FUNCTION(index_scan_parent_pipeline,
void(uint32_t))
1738 M.scan.store().table().layout(),
1739 M.scan.store().table().schema(M.scan.alias()),
1751 for (
auto it = index.begin() + lo; it != index.begin() + hi; ++it) {
1752 M_insist(std::in_range<uint32_t>(it->second),
"tuple id must fit in uint32_t");
1753 index_scan_parent_pipeline(uint32_t(it->second));
1760template<
idx::IndexMethod IndexMethod,
typename Index, sql_type SqlT>
1765 using key_type = Index::key_type;
1766 using sql_type = SqlT;
1769 const char *scan_fn;
1770#define SET_CALLBACK_FNS(INDEX, KEY) \
1771 scan_fn = M_STR(idx_scan_##INDEX##_##KEY)
1773#define RESOLVE_KEYTYPE(INDEX) \
1774 if constexpr(std::same_as<SqlT, _Boolx1>) { \
1775 SET_CALLBACK_FNS(INDEX, b); \
1776 } else if constexpr(std::same_as<sql_type, _I8x1>) { \
1777 SET_CALLBACK_FNS(INDEX, i1); \
1778 } else if constexpr(std::same_as<sql_type, _I16x1>) { \
1779 SET_CALLBACK_FNS(INDEX, i2); \
1780 } else if constexpr(std::same_as<sql_type, _I32x1>) { \
1781 SET_CALLBACK_FNS(INDEX, i4); \
1782 } else if constexpr(std::same_as<sql_type, _I64x1>) { \
1783 SET_CALLBACK_FNS(INDEX, i8); \
1784 } else if constexpr(std::same_as<sql_type, _Floatx1>) { \
1785 SET_CALLBACK_FNS(INDEX, f); \
1786 } else if constexpr(std::same_as<sql_type, _Doublex1>) { \
1787 SET_CALLBACK_FNS(INDEX, d); \
1788 } else if constexpr(std::same_as<sql_type, NChar>) { \
1789 SET_CALLBACK_FNS(IDNEX, p); \
1791 M_unreachable("incompatible SQL type"); \
1793 if constexpr(is_specialization<Index, idx::ArrayIndex>) {
1795 }
else if constexpr(is_specialization<Index, idx::RecursiveModelIndex>) {
1800#undef RESOLVE_KEYTYPE
1801#undef SET_CALLBACK_FNS
1805 U64x1 index_id(context.add_index(index));
1808 auto interpret_and_lookup_bound = [&](
const ast::Expr &bound,
bool is_lower_bound) -> std::size_t {
1810 auto c = Interpreter::eval(constant);
1814 M_insist(not is_negative,
"boolean cannot be negative");
1816 auto i64 = int64_t(c);
1817 M_insist(std::in_range<key_type>(i64),
"integeral constant must be in range");
1818 key = key_type(i64);
1819 key = is_negative ? -key : key;
1820 }
else if constexpr(std::same_as<float, key_type>) {
1823 M_insist(key == d,
"downcasting should not impact precision");
1824 key = is_negative ? -key : key;
1825 }
else if constexpr(std::same_as<double, key_type>) {
1827 key = is_negative ? -key : key;
1828 }
else if constexpr(std::same_as<const char*, key_type>) {
1829 key =
reinterpret_cast<const char*
>(c.as_p());
1830 M_insist(not is_negative,
"string cannot be negative");
1832 return std::distance(index.begin(), is_lower_bound ? index.lower_bound(key)
1833 : index.upper_bound(key));
1838 : index.num_entries();
1839 M_insist(lo <= hi,
"bounds need to be valid");
1840 M_insist(std::in_range<uint32_t>(lo),
"should fit in uint32_t");
1841 M_insist(std::in_range<uint32_t>(hi),
"should fit in uint32_t");
1845 std::optional<U32x1> end;
1854 offset_address[0] = uint32_t(lo);
1855 offset_address[1] = uint32_t(hi);
1858 begin = *offset_ptr.clone();
1859 end.emplace(*(offset_ptr + 1));
1863 M_insist(
bool(end),
"end must be set");
1867 M_insist(std::in_range<uint32_t>(M.batch_size),
"should fit in uint32_t");
1872 U32x1 num_results = end->clone() - begin;
1873 U32x1 num_results_cpy = num_results.clone();
1874 U32x1 batch_size = M.batch_size == 0 ? num_results.clone() : U32x1(M.batch_size);
1875 U32x1 batch_size_cpy = batch_size.clone();
1876 return Select(batch_size < num_results, batch_size_cpy, num_results_cpy);
1887 auto end_cpy = end->clone();
1888 num_tuples_in_batch =
Select(*end - begin > alloc_size, alloc_size, end_cpy - begin);
1894 buffer_address.clone(),
1895 num_tuples_in_batch.val()
1897 begin += num_tuples_in_batch;
1898 ptr = buffer_address.clone();
1899 WHILE(num_tuples_in_batch > 0
U) {
1900 static Schema empty_schema;
1905 M.scan.store().table().layout(),
1906 M.scan.store().table().schema(M.scan.alias()),
1910 num_tuples_in_batch -= 1U;
1919 IF (alloc_size > U32x1(0)) {
1930template<
idx::IndexMethod IndexMethod,
typename Index, sql_type SqlT>
1934 index_scan_codegen_compilation<IndexMethod, Index, SqlT>(index, bounds, M, std::move(setup), std::move(pipeline), std::move(teardown));
1936 index_scan_codegen_interpretation<IndexMethod, Index>(index, bounds, M, std::move(setup), std::move(pipeline), std::move(teardown));
1938 index_scan_codegen_hybrid<IndexMethod, Index, SqlT>(index, bounds, M, std::move(setup), std::move(pipeline), std::move(teardown));
1945template<
idx::IndexMethod IndexMethod,
typename AttrT, sql_type SqlT>
1952 auto &table_name = M.scan.store().table().
name();
1954 auto &index_base = DB.get_index(table_name, attribute_name, IndexMethod);
1958 auto &index = as<const idx::ArrayIndex<AttrT>>(index_base);
1959 index_scan_resolve_strategy<IndexMethod, const idx::ArrayIndex<AttrT>, SqlT>(
1960 index, bounds, M, std::move(setup), std::move(pipeline), std::move(teardown)
1963 auto &index = as<const idx::RecursiveModelIndex<AttrT>>(index_base);
1964 index_scan_resolve_strategy<IndexMethod, const idx::RecursiveModelIndex<AttrT>, SqlT>(
1965 index, bounds, M, std::move(setup), std::move(pipeline), std::move(teardown)
1973template<
idx::IndexMethod IndexMethod>
1978 auto &cnf = M.filter.filter();
1982#define RESOLVE_INDEX_METHOD(ATTRTYPE, SQLTYPE) \
1983 index_scan_resolve_index_method<IndexMethod, ATTRTYPE, SQLTYPE>(bounds, M, std::move(setup), std::move(pipeline), std::move(teardown))
1989 case Numeric::N_Int:
1990 case Numeric::N_Decimal:
1999 case Numeric::N_Float:
2012 }, *bounds.attribute.type);
2014#undef RESOLVE_INDEX_METHOD
2017template<
idx::IndexMethod IndexMethod>
2021 auto &schema = M.scan.schema();
2022 M_insist(schema == schema.drop_constants().deduplicate(),
"Schema of `ScanOperator` must neither contain NULL nor duplicates");
2024 auto &table = M.scan.store().table();
2025 M_insist(not table.layout().is_finite(),
"layout for `wasm::IndexScan` must be infinite");
2041template<
bool Predicated>
2056template<
bool Predicated>
2069template<
bool Predicated>
2072 const cnf::CNF &cond = M.filter.filter();
2073 const unsigned cost = std::accumulate(cond.cbegin(), cond.cend(), 0
U, [](
unsigned cost,
const cnf::Clause &clause) {
2074 return cost + clause.size();
2079template<
bool Predicated>
2088 [&, pipeline=std::move(pipeline)](){
2122 const cnf::CNF &cond = M.filter.filter();
2123 M_insist(cond.size() == 1,
"disjunctive filter condition must be a single clause");
2124 return cond[0].size() / 2.0;
2134 [&, pipeline=std::move(pipeline)](){
2136 BLOCK(lazy_disjunctive_filter)
2138 BLOCK(lazy_disjunctive_filter_then)
2142 if (pred.negative())
2143 GOTO(cond.is_false_and_not_null(), lazy_disjunctive_filter_then);
2145 GOTO(cond.is_true_and_not_null(), lazy_disjunctive_filter_then);
2147 GOTO(lazy_disjunctive_filter);
2162 std::size_t child_idx,
2163 const std::tuple<const ProjectionOperator*> &partial_inner_nodes)
2169 auto &projection = *std::get<0>(partial_inner_nodes);
2171 if (not projection.children().empty()) {
2173 auto is_simd_computable = [](
const ast::Expr &e){
2174 bool simd_computable =
true;
2177 if (b.lhs->type()->is_character_sequence() or b.rhs->type()->is_character_sequence()) {
2178 simd_computable =
false;
2181 if (b.common_operand_type->is_integral()
and b.op().type == TK_SLASH) {
2182 simd_computable =
false;
2185 if (b.op().type == TK_PERCENT) {
2186 simd_computable =
false;
2195 return simd_computable;
2197 auto pred = [&](
auto &p){
return not is_simd_computable(p.first); };
2198 if (std::any_of(projection.projections().cbegin(), projection.projections().cend(), pred))
2210 M_insist(M.projection.projections().size() == M.projection.schema().num_entries(),
2211 "projections must match the operator's schema");
2212 std::vector<std::pair<Schema::Identifier, Schema::Identifier>> old2new;
2213 auto p = M.projection.projections().begin();
2214 for (
auto &e: M.projection.schema()) {
2215 auto pred = [&e](
const auto &p) {
return p.second == e.id; };
2216 if (std::find_if(old2new.cbegin(), old2new.cend(), pred) == old2new.cend()) {
2217 M_insist(p != M.projection.projections().end());
2237 auto execute_projection = [&, pipeline=std::move(pipeline)](){
2242 if (old_env.predicated())
2246 M_insist(M.projection.projections().size() == M.projection.schema().num_entries(),
2247 "projections must match the operator's schema");
2248 auto p = M.projection.projections().begin();
2249 for (
auto &e: M.projection.schema()) {
2250 if (not new_env.
has(e.id)
and not e.id.is_constant()) {
2251 if (old_env.has(e.id)) {
2253 new_env.
add(e.id, old_env.get(e.id));
2256 M_insist(p != M.projection.projections().end());
2259 if (
value.can_be_null()) {
2261 new_env.
add(e.id, var);
2271 value.guarantees_terminating_nul()));
2273 [](std::monostate) ->
void {
M_unreachable(
"invalid expression"); },
2274 }, old_env.compile(p->first));
2289 uint64_t min_size_in_bytes = 16;
2290 for (
auto &p : M.projection.projections()) {
2297 if (fn.get_function().is_aggregate())
2299 min_size_in_bytes = std::min(min_size_in_bytes, (fn.type()->size() + 7) / 8);
2300 if (min_size_in_bytes == 1)
2303 [&min_size_in_bytes](
auto &e) ->
void {
2304 min_size_in_bytes = std::min(min_size_in_bytes, (e.type()->size() + 7) / 8);
2305 if (min_size_in_bytes == 1)
2313 M.child->get()->execute(std::move(setup), std::move(execute_projection), std::move(teardown));
2318 execute_projection();
2342 return 1.5 * M.child->get_matched_root().info().estimated_cardinality;
2362 const uint64_t AGGREGATES_SIZE_THRESHOLD_IN_BITS =
2363 M.use_in_place_values ? std::numeric_limits<uint64_t>::max() : 0;
2365 const auto num_keys = M.grouping.group_by().size();
2370 for (std::size_t i = 0; i < num_keys; ++i) {
2371 auto &e = M.grouping.schema()[i];
2372 ht_schema.
add(e.id, e.type, e.constraints);
2376 const auto &aggregates = p.first;
2377 const auto &avg_aggregates = p.second;
2378 uint64_t aggregates_size_in_bits = 0;
2379 for (
auto &info : aggregates) {
2380 ht_schema.
add(info.entry);
2381 aggregates_size_in_bits += info.entry.type->size();
2388 std::unique_ptr<HashTable> ht;
2389 std::vector<HashTable::index_t> key_indices(num_keys);
2390 std::iota(key_indices.begin(), key_indices.end(), 0);
2391 if (M.use_open_addressing_hashing) {
2392 if (aggregates_size_in_bits < AGGREGATES_SIZE_THRESHOLD_IN_BITS)
2393 ht = std::make_unique<GlobalOpenAddressingInPlaceHashTable>(ht_schema, std::move(key_indices),
2396 ht = std::make_unique<GlobalOpenAddressingOutOfPlaceHashTable>(ht_schema, std::move(key_indices),
2398 if (M.use_quadratic_probing)
2399 as<OpenAddressingHashTableBase>(*ht).set_probing_strategy<
QuadraticProbing>();
2401 as<OpenAddressingHashTableBase>(*ht).set_probing_strategy<
LinearProbing>();
2403 ht = std::make_unique<GlobalChainedHashTable>(ht_schema, std::move(key_indices), initial_capacity);
2407 FUNCTION(hash_based_grouping_child_pipeline,
void(
void))
2411 std::optional<HashTable::entry_t> dummy;
2416 ht->set_high_watermark(M.load_factor);
2417 dummy.emplace(ht->dummy_entry());
2424 std::vector<SQL_t> key;
2425 for (
auto &p : M.grouping.group_by())
2426 key.emplace_back(env.compile(p.first.get()));
2427 auto [entry, inserted] = ht->try_emplace(std::move(key));
2430 Block init_aggs(
"hash_based_grouping.init_aggs",
false),
2431 update_aggs(
"hash_based_grouping.update_aggs",
false),
2432 update_avg_aggs(
"hash_based_grouping.update_avg_aggs",
false);
2433 for (
auto &info : aggregates) {
2434 bool is_min =
false;
2435 switch (info.fnid) {
2438 case m::Function::FN_MIN:
2440 case m::Function::FN_MAX: {
2442 "MIN and MAX aggregate functions expect exactly one argument");
2443 const auto &arg = *info.args[0];
2446 requires (not (std::same_as<_T, _Boolx1> or std::same_as<_T, NChar>)) {
2447 using type =
typename _T::type;
2450 auto _arg = env.compile(arg);
2451 _T _new_val = convert<_T>(_arg);
2454 auto [val_,
is_null] = _new_val.clone().split();
2457 auto neutral = is_min ?
T(std::numeric_limits<type>::max())
2458 :
T(std::numeric_limits<type>::lowest());
2459 r.clone().set_value(neutral);
2460 if (info.entry.nullable())
2461 r.clone().set_null();
2463 r.clone().set_value(val);
2464 if (info.entry.nullable())
2465 r.clone().set_not_null();
2469 if (_new_val.can_be_null()) {
2471 auto [new_val_, new_val_is_null_] = _new_val.split();
2472 auto [old_min_max_, old_min_max_is_null] = _T(r.clone()).split();
2473 const Var<Boolx1> new_val_is_null(new_val_is_null_);
2475 auto chosen_r =
Select(new_val_is_null, dummy->extract<_T>(info.entry.id),
2477 if constexpr (std::floating_point<type>) {
2479 is_min ?
min(old_min_max_, new_val_)
2480 :
max(old_min_max_, new_val_)
2483 const Var<T> new_val(new_val_),
2484 old_min_max(old_min_max_);
2485 auto cmp = is_min ? new_val < old_min_max : new_val > old_min_max;
2494 r.set_value(new_val);
2499 old_min_max_is_null
and new_val_is_null
2502 auto new_val_ = _new_val.insist_not_null();
2503 auto old_min_max_ = _T(r.clone()).insist_not_null();
2504 if constexpr (std::floating_point<type>) {
2506 is_min ?
min(old_min_max_, new_val_)
2507 :
max(old_min_max_, new_val_)
2510 const Var<T> new_val(new_val_),
2511 old_min_max(old_min_max_);
2512 auto cmp = is_min ? new_val < old_min_max : new_val > old_min_max;
2521 r.set_value(new_val);
2530 requires std::same_as<_T,_Boolx1> or std::same_as<_T, NChar> {
2533 [](std::monostate) ->
void {
M_unreachable(
"invalid reference"); },
2534 }, entry.extract(info.entry.id));
2537 case m::Function::FN_AVG: {
2538 auto it = avg_aggregates.find(info.entry.id);
2539 M_insist(it != avg_aggregates.end());
2540 const auto &avg_info = it->second;
2541 M_insist(avg_info.compute_running_avg,
2542 "AVG aggregate may only occur for running average computations");
2543 M_insist(info.args.size() == 1,
"AVG aggregate function expects exactly one argument");
2544 const auto &arg = *info.args[0];
2546 auto r = entry.extract<_Doublex1>(info.entry.id);
2547 auto _arg = env.compile(arg);
2548 _Doublex1 _new_val = convert<_Doublex1>(_arg);
2551 auto [val_,
is_null] = _new_val.clone().split();
2554 r.clone().set_value(Doublex1(0.0));
2555 if (info.entry.nullable())
2556 r.clone().set_null();
2558 r.clone().set_value(val);
2559 if (info.entry.nullable())
2560 r.clone().set_not_null();
2566 if (_new_val.can_be_null()) {
2568 auto [new_val, new_val_is_null_] = _new_val.split();
2569 auto [old_avg_, old_avg_is_null] = _Doublex1(r.clone()).split();
2570 const Var<Boolx1> new_val_is_null(new_val_is_null_);
2573 auto delta_absolute = new_val - old_avg;
2574 auto running_count = _I64x1(entry.get<_I64x1>(avg_info.running_count)).insist_not_null();
2575 auto delta_relative = delta_absolute / running_count.to<
double>();
2577 auto chosen_r =
Select(new_val_is_null, dummy->extract<_Doublex1>(info.entry.id),
2580 old_avg + delta_relative
2583 old_avg_is_null
and new_val_is_null
2586 auto new_val = _new_val.insist_not_null();
2587 auto old_avg_ = _Doublex1(r.clone()).insist_not_null();
2590 auto delta_absolute = new_val - old_avg;
2591 auto running_count = _I64x1(entry.get<_I64x1>(avg_info.running_count)).insist_not_null();
2592 auto delta_relative = delta_absolute / running_count.to<
double>();
2594 old_avg + delta_relative
2601 case m::Function::FN_SUM: {
2602 M_insist(info.args.size() == 1,
"SUM aggregate function expects exactly one argument");
2603 const auto &arg = *info.args[0];
2606 requires (not (std::same_as<_T, _Boolx1> or std::same_as<_T, NChar>)) {
2607 using type =
typename _T::type;
2610 auto _arg = env.compile(arg);
2611 _T _new_val = convert<_T>(_arg);
2614 auto [val_,
is_null] = _new_val.clone().split();
2617 r.clone().set_value(
T(type(0)));
2618 if (info.entry.nullable())
2619 r.clone().set_null();
2621 r.clone().set_value(val);
2622 if (info.entry.nullable())
2623 r.clone().set_not_null();
2627 if (_new_val.can_be_null()) {
2629 auto [new_val, new_val_is_null_] = _new_val.split();
2630 auto [old_sum, old_sum_is_null] = _T(r.clone()).split();
2631 const Var<Boolx1> new_val_is_null(new_val_is_null_);
2633 auto chosen_r =
Select(new_val_is_null, dummy->extract<_T>(info.entry.id),
2639 old_sum_is_null
and new_val_is_null
2642 auto new_val = _new_val.insist_not_null();
2643 auto old_sum = _T(r.clone()).insist_not_null();
2652 requires std::same_as<_T,_Boolx1> or std::same_as<_T, NChar> {
2655 [](std::monostate) ->
void {
M_unreachable(
"invalid reference"); },
2656 }, entry.extract(info.entry.id));
2659 case m::Function::FN_COUNT: {
2660 M_insist(info.args.size() <= 1,
"COUNT aggregate function expects at most one argument");
2662 auto r = entry.get<_I64x1>(info.entry.id);
2664 if (info.args.empty()) {
2666 r.clone() = _I64x1(1);
2669 auto old_count = _I64x1(r.clone()).insist_not_null();
2671 old_count + int64_t(1)
2676 const auto &arg = *info.args[0];
2678 auto _arg = env.compile(arg);
2679 I64x1 new_val_not_null =
not_null(_arg).to<int64_t>();
2682 r.clone() = _I64x1(new_val_not_null.clone());
2685 auto old_count = _I64x1(r.clone()).insist_not_null();
2687 old_count + new_val_not_null
2699 init_aggs.attach_to_current();
2701 update_aggs.attach_to_current();
2708 hash_based_grouping_child_pipeline();
2713 setup_t(std::move(setup), [&](){ ht->setup(); })();
2717 for (std::size_t i = 0; i < num_keys; ++i) {
2718 auto &e = M.grouping.schema()[i];
2719 key_schema.
add(e.id, e.type, e.constraints);
2723 for (
auto &e : M.grouping.schema().deduplicate()) {
2725 key_schema.find(e.id);
2730 if (
auto it = avg_aggregates.find(e.id);
2731 it != avg_aggregates.end()
and not it->second.compute_running_avg)
2733 auto &avg_info = it->second;
2734 auto sum = std::visit(overloaded {
2735 [&]<sql_type T>(HashTable::const_reference_t<T> &&r) -> _Doublex1
2736 requires (std::same_as<T, _I64x1> or std::same_as<T, _Doublex1>) {
2737 return T(r).template to<double>();
2739 [](auto&&) -> _Doublex1 { M_unreachable(
"invalid type"); },
2740 [](std::monostate&&) -> _Doublex1 { M_unreachable(
"invalid reference"); },
2741 }, entry.
get(avg_info.sum));
2742 auto count = _I64x1(entry.
get<_I64x1>(avg_info.running_count)).insist_not_null().to<
double>();
2743 auto avg = sum / count;
2744 if (avg.can_be_null()) {
2750 env.add(e.id, _Doublex1(var));
2756 if (
value.can_be_null()) {
2769 value.guarantees_terminating_nul()));
2771 [](std::monostate&&) ->
void {
M_unreachable(
"invalid reference"); },
2772 }, entry.
get(e.id));
2779 teardown_t(std::move(teardown), [&](){ ht->teardown(); })();
2783 std::size_t child_idx,
2784 const std::tuple<const GroupingOperator*> &partial_inner_nodes)
2792 for (
auto &p : std::get<0>(partial_inner_nodes)->group_by()) {
2794 if (orders.
find(
id) == orders.
cend())
2807 return 1.0 * M.child->get_matched_root().info().estimated_cardinality;
2820 for (
auto &[
expr, alias] : M.grouping.group_by()) {
2822 M_insist(it != sortedness_child.orders().cend());
2825 if (orders.
find(
id) == orders.
cend())
2826 orders.
add(std::move(
id), it->second);
2839 const auto num_keys = M.grouping.group_by().size();
2843 for (std::size_t i = 0; i < num_keys; ++i) {
2844 auto &e = M.grouping.schema()[i];
2845 key_schema.
add(e.id, e.type, e.constraints);
2850 const auto &aggregates = p.first;
2851 const auto &avg_aggregates = p.second;
2854 FunctionProxy<void(
void)> emit_group_and_resume_pipeline(
"emit_group_and_resume_pipeline");
2856 std::optional<Var<Boolx1>> first_iteration;
2862 agg_t agg_values[aggregates.size()];
2863 agg_backup_t agg_value_backups[aggregates.size()];
2867 key_t key_values[num_keys];
2868 key_backup_t key_value_backups[num_keys];
2870 auto store_locals_to_globals = [&](){
2872 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
2873 auto &info = aggregates[idx];
2875 switch (info.fnid) {
2878 case m::Function::FN_MIN:
2879 case m::Function::FN_MAX: {
2880 auto min_max = [&]<
typename T>() {
2884 auto &[min_max_backup, is_null_backup] = *
M_notnull((
2890 min_max_backup = min_max;
2894 auto &
n = as<const Numeric>(*info.entry.type);
2896 case Numeric::N_Int:
2897 case Numeric::N_Decimal:
2900 case 8: min_max.template operator()<int8_t >();
break;
2901 case 16: min_max.template operator()<int16_t>();
break;
2902 case 32: min_max.template operator()<int32_t>();
break;
2903 case 64: min_max.template operator()<int64_t>();
break;
2906 case Numeric::N_Float:
2908 min_max.template
operator()<
float>();
2910 min_max.template operator()<
double>();
2914 case m::Function::FN_AVG: {
2918 auto &[avg_backup, is_null_backup] = *
M_notnull((
2929 case m::Function::FN_SUM: {
2930 M_insist(info.args.size() == 1,
"SUM aggregate function expects exactly one argument");
2931 const auto &arg = *info.args[0];
2933 auto sum = [&]<
typename T>() {
2937 auto &[sum_backup, is_null_backup] = *
M_notnull((
2947 auto &
n = as<const Numeric>(*info.entry.type);
2949 case Numeric::N_Int:
2950 case Numeric::N_Decimal:
2953 case 8: sum.template operator()<int8_t >();
break;
2954 case 16: sum.template operator()<int16_t>();
break;
2955 case 32: sum.template operator()<int32_t>();
break;
2956 case 64: sum.template operator()<int64_t>();
break;
2959 case Numeric::N_Float:
2961 sum.template
operator()<
float>();
2963 sum.template operator()<
double>();
2967 case m::Function::FN_COUNT: {
2971 count_backup = count;
2979 auto store = [&]<
typename T>(std::size_t idx) {
2983 auto &[key_backup, is_null_backup] = *
M_notnull((
2992 for (std::size_t idx = 0; idx < num_keys; ++idx) {
2994 [&](
const Boolean&) { store.template operator()<
bool>(idx); },
2997 case Numeric::N_Int:
2998 case Numeric::N_Decimal:
3001 case 8: store.template operator()<int8_t >(idx);
break;
3002 case 16: store.template operator()<int16_t>(idx);
break;
3003 case 32: store.template operator()<int32_t>(idx);
break;
3004 case 64: store.template operator()<int64_t>(idx);
break;
3007 case Numeric::N_Float:
3009 store.template
operator()<
float>(idx);
3011 store.template operator()<
double>(idx);
3020 [&](
const Date&) { store.template operator()<int32_t>(idx); },
3021 [&](
const DateTime&) { store.template operator()<int64_t>(idx); },
3023 }, *M.grouping.schema()[idx].type);
3029 first_iteration.emplace(first_iteration_backup);
3032 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
3033 auto &info = aggregates[idx];
3034 const bool nullable = info.entry.nullable();
3036 bool is_min =
false;
3037 switch (info.fnid) {
3039 M_unreachable(
"unsupported aggregate function");
3040 case m::Function::FN_MIN:
3042 case m::Function::FN_MAX: {
3043 auto min_max = [&]<typename T>() {
3044 auto neutral = is_min ? std::numeric_limits<T>::max()
3045 : std::numeric_limits<T>::lowest();
3047 Var<PrimitiveExpr<T>> min_max;
3048 Global<PrimitiveExpr<T>> min_max_backup(neutral);
3049 std::optional<Var<Boolx1>> is_null;
3050 std::optional<Global<Boolx1>> is_null_backup;
3053 min_max = min_max_backup;
3055 is_null_backup.emplace(true);
3056 is_null.emplace(*is_null_backup);
3061 results.add(info.entry.id, Select(*is_null_backup, Expr<T>::Null(), min_max_backup));
3063 results.add(info.entry.id, min_max_backup.val());
3066 new (&agg_values[idx]) agg_t(std::make_pair(std::move(min_max), std::move(is_null)));
3067 new (&agg_value_backups[idx]) agg_backup_t(std::make_pair(
3068 std::move(min_max_backup), std::move(is_null_backup)
3071 auto &n = as<const Numeric>(*info.entry.type);
3073 case Numeric::N_Int:
3074 case Numeric::N_Decimal:
3076 default: M_unreachable(
"invalid size");
3077 case 8: min_max.template operator()<int8_t >(); break;
3078 case 16: min_max.template operator()<int16_t>(); break;
3079 case 32: min_max.template operator()<int32_t>(); break;
3080 case 64: min_max.template operator()<int64_t>(); break;
3083 case Numeric::N_Float:
3085 min_max.template operator()<float>();
3087 min_max.template operator()<double>();
3091 case m::Function::FN_AVG: {
3093 Global<Doublex1> avg_backup(0.0);
3094 std::optional<Var<Boolx1>> is_null;
3095 std::optional<Global<Boolx1>> is_null_backup;
3100 is_null_backup.emplace(true);
3101 is_null.emplace(*is_null_backup);
3106 results.
add(info.entry.id,
Select(*is_null_backup, _Doublex1::Null(), avg_backup));
3108 results.
add(info.entry.id, avg_backup.val());
3111 new (&agg_values[idx]) agg_t(std::make_pair(std::move(avg), std::move(
is_null)));
3112 new (&agg_value_backups[idx]) agg_backup_t(std::make_pair(
3113 std::move(avg_backup), std::move(is_null_backup)
3118 case m::Function::FN_SUM: {
3119 auto sum = [&]<
typename T>() {
3122 std::optional<Var<Boolx1>>
is_null;
3123 std::optional<Global<Boolx1>> is_null_backup;
3128 is_null_backup.emplace(
true);
3129 is_null.emplace(*is_null_backup);
3136 results.
add(info.entry.id, sum_backup.val());
3139 new (&agg_values[idx]) agg_t(std::make_pair(std::move(sum), std::move(
is_null)));
3140 new (&agg_value_backups[idx]) agg_backup_t(std::make_pair(
3141 std::move(sum_backup), std::move(is_null_backup)
3144 auto &
n = as<const Numeric>(*info.entry.type);
3146 case Numeric::N_Int:
3147 case Numeric::N_Decimal:
3150 case 8: sum.template operator()<int8_t >();
break;
3151 case 16: sum.template operator()<int16_t>();
break;
3152 case 32: sum.template operator()<int32_t>();
break;
3153 case 64: sum.template operator()<int64_t>();
break;
3156 case Numeric::N_Float:
3158 sum.template
operator()<
float>();
3160 sum.template operator()<
double>();
3164 case m::Function::FN_COUNT: {
3170 count = count_backup;
3173 results.
add(info.entry.id, count_backup.val());
3176 new (&agg_values[idx]) agg_t(std::move(count));
3177 new (&agg_value_backups[idx]) agg_backup_t(std::move(count_backup));
3185 auto init = [&]<
typename T>(std::size_t idx) {
3186 const bool nullable = M.grouping.schema()[idx].nullable();
3190 std::optional<Var<Boolx1>>
is_null;
3191 std::optional<Global<Boolx1>> is_null_backup;
3196 is_null_backup.emplace();
3197 is_null.emplace(*is_null_backup);
3201 auto id = M.grouping.schema()[idx].id;
3202 key_schema.find(
id);
3208 results.add(
id, key_backup.val());
3214 new (&key_values[idx]) key_t(std::make_pair(std::move(key), std::move(
is_null)));
3215 new (&key_value_backups[idx]) key_backup_t(std::make_pair(
3216 std::move(key_backup), std::move(is_null_backup)
3219 for (std::size_t idx = 0; idx < num_keys; ++idx) {
3221 [&](
const Boolean&) {
init.template operator()<
bool>(idx); },
3224 case Numeric::N_Int:
3225 case Numeric::N_Decimal:
3228 case 8:
init.template operator()<int8_t >(idx);
break;
3229 case 16:
init.template operator()<int16_t>(idx);
break;
3230 case 32:
init.template operator()<int32_t>(idx);
break;
3231 case 64:
init.template operator()<int64_t>(idx);
break;
3234 case Numeric::N_Float:
3236 init.template
operator()<
float>(idx);
3238 init.template operator()<
double>(idx);
3250 auto id = M.grouping.schema()[idx].id;
3251 key_schema.find(
id);
3254 NChar str(key_backup.val(), M.grouping.schema()[idx].nullable(), cs.length, cs.is_varying);
3255 results.add(
id, std::move(str));
3262 new (&key_values[idx]) key_t(std::move(key));
3263 new (&key_value_backups[idx]) key_backup_t(std::move(key_backup));
3265 [&](
const Date&) {
init.template operator()<int32_t>(idx); },
3266 [&](
const DateTime&) {
init.template operator()<int64_t>(idx); },
3268 }, *M.grouping.schema()[idx].type);
3275 std::optional<Var<Boolx1>> pred;
3276 if (env.predicated()) {
3278 pred = env.extract_predicate<_Boolx1>().is_true_and_not_null();
3282 Block reset_aggs(
"ordered_grouping.reset_aggs",
false),
3283 update_aggs(
"ordered_grouping.update_aggs",
false),
3284 update_avg_aggs(
"ordered_grouping.update_avg_aggs",
false);
3285 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
3286 auto &info = aggregates[idx];
3288 bool is_min =
false;
3289 switch (info.fnid) {
3292 case m::Function::FN_MIN:
3294 case m::Function::FN_MAX: {
3295 M_insist(info.args.size() == 1,
"MIN and MAX aggregate functions expect exactly one argument");
3296 const auto &arg = *info.args[0];
3297 auto min_max = [&]<
typename T>() {
3298 auto neutral = is_min ? std::numeric_limits<T>::max()
3299 :
std::numeric_limits<
T>::lowest();
3312 auto _arg = env.compile(arg);
3313 Expr<T> _new_val = convert<Expr<T>>(_arg);
3318 auto [new_val_, new_val_is_null_] = _new_val_pred.split();
3319 const Var<Boolx1> new_val_is_null(new_val_is_null_);
3321 if constexpr (std::floating_point<T>) {
3322 min_max =
Select(new_val_is_null,
3324 is_min ?
min(min_max, new_val_)
3325 :
max(min_max, new_val_));
3328 auto cmp = is_min ? new_val < min_max : new_val > min_max;
3330 min_max =
Select(new_val_is_null,
3336 IF (not new_val_is_null
and cmp) {
3343 auto _new_val_pred = pred ?
Select(*pred, _new_val, neutral) : _new_val;
3344 auto new_val_ = _new_val_pred.insist_not_null();
3345 if constexpr (std::floating_point<T>) {
3346 min_max = is_min ?
min(min_max, new_val_)
3347 :
max(min_max, new_val_);
3350 auto cmp = is_min ? new_val < min_max : new_val > min_max;
3364 auto &
n = as<const Numeric>(*info.entry.type);
3366 case Numeric::N_Int:
3367 case Numeric::N_Decimal:
3370 case 8: min_max.template operator()<int8_t >();
break;
3371 case 16: min_max.template operator()<int16_t>();
break;
3372 case 32: min_max.template operator()<int32_t>();
break;
3373 case 64: min_max.template operator()<int64_t>();
break;
3376 case Numeric::N_Float:
3378 min_max.template
operator()<
float>();
3380 min_max.template operator()<
double>();
3384 case m::Function::FN_AVG:
3386 case m::Function::FN_SUM: {
3387 M_insist(info.args.size() == 1,
"SUM aggregate function expects exactly one argument");
3388 const auto &arg = *info.args[0];
3390 auto sum = [&]<
typename T>() {
3402 auto _arg = env.compile(arg);
3403 Expr<T> _new_val = convert<Expr<T>>(_arg);
3408 auto [new_val, new_val_is_null_] = _new_val_pred.split();
3409 const Var<Boolx1> new_val_is_null(new_val_is_null_);
3411 sum +=
Select(new_val_is_null,
3416 auto _new_val_pred = pred ?
Select(*pred, _new_val,
T(0)) : _new_val;
3417 sum += _new_val_pred.insist_not_null();
3421 auto &
n = as<const Numeric>(*info.entry.type);
3423 case Numeric::N_Int:
3424 case Numeric::N_Decimal:
3427 case 8: sum.template operator()<int8_t >();
break;
3428 case 16: sum.template operator()<int16_t>();
break;
3429 case 32: sum.template operator()<int32_t>();
break;
3430 case 64: sum.template operator()<int64_t>();
break;
3433 case Numeric::N_Float:
3435 sum.template
operator()<
float>();
3437 sum.template operator()<
double>();
3441 case m::Function::FN_COUNT: {
3442 M_insist(info.args.size() <= 1,
"COUNT aggregate function expects at most one argument");
3443 M_insist(info.entry.type->is_integral()
and info.entry.type->size() == 64);
3452 if (info.args.empty()) {
3453 count += pred ? pred->to<int64_t>() : I64x1(1);
3455 auto _new_val = env.compile(*info.args[0]);
3458 I64x1 inc = pred ? (
not_null(_new_val)
and *pred).to<int64_t>()
3463 I64x1 inc = pred ? pred->to<int64_t>() : I64x1(1);
3474 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
3475 auto &info = aggregates[idx];
3477 if (info.fnid == m::Function::FN_AVG) {
3478 M_insist(info.args.size() == 1,
"AVG aggregate function expects exactly one argument");
3479 const auto &arg = *info.args[0];
3480 M_insist(info.entry.type->is_double());
3482 auto it = avg_aggregates.find(info.entry.id);
3483 M_insist(it != avg_aggregates.end());
3484 const auto &avg_info = it->second;
3485 M_insist(avg_info.compute_running_avg,
3486 "AVG aggregate may only occur for running average computations");
3501 auto running_count_idx = std::distance(
3502 aggregates.cbegin(),
3503 std::find_if(aggregates.cbegin(), aggregates.cend(), [&avg_info](
const auto &info){
3504 return info.entry.id == avg_info.running_count;
3507 M_insist(0 <= running_count_idx
and running_count_idx < aggregates.size());
3508 auto &running_count = *
M_notnull(std::get_if<
Var<I64x1>>(&agg_values[running_count_idx]));
3510 auto _arg = env.compile(arg);
3511 _Doublex1 _new_val = convert<_Doublex1>(_arg);
3513 if (_new_val.can_be_null()) {
3515 auto _new_val_pred = pred ?
Select(*pred, _new_val, _Doublex1::Null()) : _new_val;
3516 auto [new_val, new_val_is_null_] = _new_val_pred.split();
3517 const Var<Boolx1> new_val_is_null(new_val_is_null_);
3519 auto delta_absolute = new_val - avg;
3520 auto delta_relative = delta_absolute / running_count.to<
double>();
3522 avg +=
Select(new_val_is_null,
3527 auto _new_val_pred = pred ?
Select(*pred, _new_val, avg) : _new_val;
3528 auto delta_absolute = _new_val_pred.insist_not_null() - avg;
3529 auto delta_relative = delta_absolute / running_count.to<
double>();
3531 avg += delta_relative;
3538 std::optional<Boolx1> group_differs;
3539 Block update_keys(
"ordered_grouping.update_grouping_keys",
false);
3540 for (std::size_t idx = 0; idx < num_keys; ++idx) {
3543 auto &[key_val, key_is_null] = *
M_notnull((
3548 if (
value.can_be_null()) {
3551 auto null_differs =
is_null != *key_is_null;
3552 Boolx1 key_differs = null_differs or (not *key_is_null
and val != key_val);
3554 group_differs.emplace(key_differs or *group_differs);
3556 group_differs.emplace(key_differs);
3559 std::tie(key_val, key_is_null) =
value.split();
3562 Boolx1 key_differs = key_val !=
value.clone().insist_not_null();
3564 group_differs.emplace(key_differs or *group_differs);
3566 group_differs.emplace(key_differs);
3569 key_val =
value.insist_not_null();
3576 auto [key_addr, key_is_nullptr] = key.val().split();
3577 auto [addr, is_nullptr] =
value.val().clone().split();
3580 value.guarantees_terminating_nul()),
3582 value.guarantees_terminating_nul()),
3583 U32x1(
value.length()),
3586 auto [addr_differs_value, addr_differs_is_null] = addr_differs.split();
3587 addr_differs_is_null.discard();
3588 auto nullptr_differs = is_nullptr != key_is_nullptr.clone();
3589 Boolx1 key_differs = nullptr_differs or (not key_is_nullptr
and addr_differs_value);
3591 group_differs.emplace(key_differs or *group_differs);
3593 group_differs.emplace(key_differs);
3599 [](
auto) ->
void {
M_unreachable(
"SIMDfication currently not supported"); },
3600 [](std::monostate) ->
void {
M_unreachable(
"invalid expression"); },
3601 }, env.compile(M.grouping.group_by()[idx].first.get()));
3607 Boolx1 cond = *first_iteration or *group_differs;
3608 IF (pred ?
Select(*pred, cond,
false) : cond) {
3609 IF (not *first_iteration) {
3610 store_locals_to_globals();
3611 emit_group_and_resume_pipeline();
3612 reset_aggs.attach_to_current();
3614 update_keys.attach_to_current();
3615 *first_iteration =
false;
3619 update_aggs.attach_to_current();
3620 update_avg_aggs.attach_to_current();
3623 store_locals_to_globals();
3626 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
3627 agg_values[idx].~agg_t();
3628 agg_value_backups[idx].~agg_backup_t();
3632 first_iteration_backup = *first_iteration;
3633 first_iteration.reset();
3638 IF (not first_iteration_backup) {
3639 emit_group_and_resume_pipeline();
3643 auto fn = emit_group_and_resume_pipeline.make_function();
3652 for (
auto &e : M.grouping.schema().deduplicate()) {
3654 key_schema.find(e.id);
3659 if (
auto it = avg_aggregates.find(e.id);
3660 it != avg_aggregates.end()
and not it->second.compute_running_avg)
3662 auto &avg_info = it->second;
3663 auto sum = results.get(avg_info.sum);
3664 auto count = results.get<_I64x1>(avg_info.running_count).insist_not_null().to<
double>();
3665 auto avg = convert<_Doublex1>(sum) / count;
3666 if (avg.can_be_null()) {
3672 env.add(e.id, _Doublex1(var));
3677 if (
value.can_be_null()) {
3689 value.guarantees_terminating_nul()));
3691 [](
auto) ->
void {
M_unreachable(
"SIMDfication currently not supported"); },
3692 [](std::monostate) ->
void {
M_unreachable(
"invalid reference"); },
3693 }, results.get(e.id));
3728 for (
auto &e : M.aggregation.schema().deduplicate())
3742 std::vector<std::function<void(
void)>> finalize_aggregates;
3746 const auto &aggregates = p.first;
3747 const auto &avg_aggregates = p.second;
3750 uint64_t min_size_in_bytes = 16;
3751 for (
auto &fn : M.aggregation.aggregates()) {
3752 for (
auto &e : fn.get().args) {
3759 M_insist(not fn.get_function().is_aggregate(),
"aggregate arguments must not be aggregates");
3760 min_size_in_bytes = std::min(min_size_in_bytes, (fn.type()->size() + 7) / 8);
3761 if (min_size_in_bytes == 1)
3764 [&min_size_in_bytes](
auto &e) ->
void {
3765 min_size_in_bytes = std::min(min_size_in_bytes, (e.type()->size() + 7) / 8);
3766 if (min_size_in_bytes == 1)
3775 if (std::any_of(avg_aggregates.begin(), avg_aggregates.end(), [](
auto &i){ return i.second.compute_running_avg; }))
3779 FUNCTION(aggregation_child_pipeline,
void(
void))
3787 void *_agg_value_backups;
3791 auto execute_setup = [&]<std::size_t
L>() {
3799 auto agg_values =
new agg_t[aggregates.size()];
3800 auto agg_value_backups =
new agg_backup_t[aggregates.size()];
3803 _agg_values =
static_cast<void*
>(agg_values);
3804 _agg_value_backups =
static_cast<void*
>(agg_value_backups);
3807 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
3808 auto &info = aggregates[idx];
3810 bool is_min =
false;
3811 switch (info.fnid) {
3814 case m::Function::FN_MIN:
3816 case m::Function::FN_MAX: {
3817 auto min_max = [&]<
typename T>() {
3818 auto neutral = is_min ? std::numeric_limits<T>::max()
3819 : std::numeric_limits<T>::lowest();
3829 min_max = min_max_backup;
3833 if constexpr (
L == 1) {
3835 Boolx1
is_null = is_null_backup;
3847 auto simd_is_null =
new Bool<L>(is_null_backup.val());
3848 finalize_aggregates.emplace_back([&, is_min, simd_min_max, simd_is_null]() {
3851 auto update = [&]<std::size_t I>(){
3853 res = is_min ?
min(
res, simd_min_max->clone().template extract<I>())
3854 :
max(
res, simd_min_max->clone().template extract<I>());
3857 simd_min_max->clone().template extract<I>()
3859 auto cmp = is_min ? extracted < res : extracted >
res;
3869 (update.template operator()<Is + 1>(), ...);
3871 }(std::make_index_sequence<
L - 1>{});
3872 simd_min_max->discard();
3873 Boolx1
is_null = simd_is_null->all_true();
3875 delete simd_min_max;
3876 delete simd_is_null;
3881 new (&agg_values[idx]) agg_t(std::make_pair(
3882 std::move(min_max), std::move(
is_null))
3884 new (&agg_value_backups[idx]) agg_backup_t(std::make_pair(
3885 std::move(min_max_backup), std::move(is_null_backup)
3888 auto &
n = as<const Numeric>(*info.entry.type);
3890 case Numeric::N_Int:
3891 case Numeric::N_Decimal:
3894 case 8: min_max.template operator()<int8_t >();
break;
3895 case 16: min_max.template operator()<int16_t>();
break;
3896 case 32: min_max.template operator()<int32_t>();
break;
3897 case 64: min_max.template operator()<int64_t>();
break;
3900 case Numeric::N_Float:
3902 min_max.template
operator()<
float>();
3904 min_max.template operator()<
double>();
3908 case m::Function::FN_AVG:
3910 case m::Function::FN_SUM: {
3911 auto sum = [&]<
typename T>() {
3922 if constexpr (
L == 1) {
3924 Boolx1
is_null = is_null_backup;
3928 return (sum_backup.template extract<Is>() + ...);
3929 }(std::make_index_sequence<L>{});
3930 Boolx1
is_null = is_null_backup.all_true();
3935 new (&agg_values[idx]) agg_t(std::make_pair(std::move(sum), std::move(
is_null)));
3936 new (&agg_value_backups[idx]) agg_backup_t(std::make_pair(
3937 std::move(sum_backup), std::move(is_null_backup)
3940 auto &
n = as<const Numeric>(*info.entry.type);
3942 case Numeric::N_Int:
3943 case Numeric::N_Decimal:
3946 case 8: sum.template operator()<int8_t >();
break;
3947 case 16: sum.template operator()<int16_t>();
break;
3948 case 32: sum.template operator()<int32_t>();
break;
3949 case 64: sum.template operator()<int64_t>();
break;
3952 case Numeric::N_Float:
3954 sum.template
operator()<
float>();
3956 sum.template operator()<
double>();
3960 case m::Function::FN_COUNT: {
3966 count = count_backup;
3969 if constexpr (
L == 1) {
3970 I64x1
value = count_backup;
3973 I64x1
value = [&]<std::size_t... Is>(std::index_sequence<Is...>) {
3974 return (count_backup.template extract<Is>() + ...);
3975 }(std::make_index_sequence<L>{});
3980 new (&agg_values[idx]) agg_t(std::move(count));
3981 new (&agg_value_backups[idx]) agg_backup_t(std::move(count_backup));
3989 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
3990 auto &info = aggregates[idx];
3992 if (info.fnid == m::Function::FN_AVG) {
4003 if constexpr (
L == 1) {
4004 Doublex1
value = avg_backup;
4005 Boolx1
is_null = is_null_backup;
4016 auto simd_avg =
new Double<L>(avg_backup.val());
4017 auto simd_is_null =
new Bool<L>(is_null_backup.val());
4018 auto simd_running_count =
new I64<L>([&](){
4019 auto it = avg_aggregates.find(info.entry.id);
4020 M_insist(it != avg_aggregates.end());
4021 const auto &avg_info = it->second;
4022 M_insist(avg_info.compute_running_avg,
4023 "AVG aggregate may only occur for running average computations");
4025 auto running_count_idx = std::distance(
4026 aggregates.cbegin(),
4028 aggregates.cbegin(), aggregates.cend(), [&avg_info](
const auto &info){
4029 return info.entry.id == avg_info.running_count;
4032 M_insist(0 <= running_count_idx
and running_count_idx < aggregates.size());
4034 auto &running_count =
4035 *
M_notnull(std::get_if<
Global<I64<L>>>(&agg_value_backups[running_count_idx]));
4036 return running_count.val();
4038 finalize_aggregates.emplace_back([&, simd_avg, simd_is_null, simd_running_count]() {
4039 Doublex1
value = [&]<std::size_t... Is>(std::index_sequence<Is...>) {
4040 I64x1 count = (simd_running_count->clone().template extract<Is>() + ...);
4042 if constexpr (
L != 2) {
4043 return *simd_avg * simd_running_count->template to<double>();
4045 M_unreachable(
"conversion from `I64<2>` to `Double<2>` not supported");
4046 return Double<L>(0.0);
4049 return (simd_sum.template extract<Is>() + ...) / count.to<
double>();
4050 }(std::make_index_sequence<L>{});
4051 Boolx1
is_null = simd_is_null->all_true();
4054 delete simd_is_null;
4055 delete simd_running_count;
4060 new (&agg_values[idx]) agg_t(std::make_pair(std::move(avg), std::move(
is_null)));
4061 new (&agg_value_backups[idx]) agg_backup_t(std::make_pair(
4062 std::move(avg_backup), std::move(is_null_backup)
4069 case 1: execute_setup.operator()<1>();
break;
4070 case 2: execute_setup.operator()<2>();
break;
4071 case 4: execute_setup.operator()<4>();
break;
4072 case 8: execute_setup.operator()<8>();
break;
4073 case 16: execute_setup.operator()<16>();
break;
4074 case 32: execute_setup.operator()<32>();
break;
4078 auto execute_pipeline = [&]<std::size_t
L>(){
4081 "number of SIMD lanes in pipeline callback must match the one in setup callback");
4087 auto agg_values =
static_cast<agg_t*
>(_agg_values);
4088 auto agg_value_backups =
static_cast<agg_backup_t*
>(_agg_value_backups);
4093 std::optional<Var<Bool<L>>> pred;
4094 if (env.predicated()) {
4102 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
4103 auto &info = aggregates[idx];
4105 bool is_min =
false;
4106 switch (info.fnid) {
4109 case m::Function::FN_MIN:
4111 case m::Function::FN_MAX: {
4113 "MIN and MAX aggregate functions expect exactly one argument");
4114 const auto &arg = *info.args[0];
4123 auto _arg = env.compile(arg);
4124 Expr<T, L> _new_val = convert<Expr<T, L>>(_arg);
4125 if (_new_val.can_be_null()) {
4127 auto _new_val_pred =
4129 auto [new_val_, new_val_is_null_] = _new_val_pred.split();
4130 const Var<Bool<L>> new_val_is_null(new_val_is_null_);
4133 min_max =
Select(new_val_is_null,
4135 is_min ?
min(min_max, new_val_)
4136 :
max(min_max, new_val_));
4139 auto cmp = is_min ? new_val < min_max : new_val > min_max;
4141 min_max =
Select(new_val_is_null,
4147 IF (not new_val_is_null
and cmp) {
4154 auto neutral = is_min ? std::numeric_limits<T>::max()
4155 : std::numeric_limits<T>::lowest();
4156 auto _new_val_pred =
4158 auto new_val_ = _new_val_pred.insist_not_null();
4160 min_max = is_min ?
min(min_max, new_val_)
4161 :
max(min_max, new_val_);
4164 auto cmp = is_min ? new_val < min_max : new_val > min_max;
4178 []<
typename>() {
M_unreachable(
"invalid type for given number of SIMD lanes"); }
4180 auto &
n = as<const Numeric>(*info.entry.type);
4182 case Numeric::N_Int:
4183 case Numeric::N_Decimal:
4186 case 8: min_max.template operator()<int8_t >();
break;
4187 case 16: min_max.template operator()<int16_t>();
break;
4188 case 32: min_max.template operator()<int32_t>();
break;
4189 case 64: min_max.template operator()<int64_t>();
break;
4192 case Numeric::N_Float:
4194 min_max.template
operator()<
float>();
4196 min_max.template operator()<
double>();
4200 case m::Function::FN_AVG:
4202 case m::Function::FN_SUM: {
4203 M_insist(info.args.size() == 1,
"SUM aggregate function expects exactly one argument");
4204 const auto &arg = *info.args[0];
4214 auto _arg = env.compile(arg);
4215 Expr<T, L> _new_val = convert<Expr<T, L>>(_arg);
4216 if (_new_val.can_be_null()) {
4218 auto _new_val_pred =
4220 auto [new_val, new_val_is_null_] = _new_val_pred.split();
4221 const Var<Bool<L>> new_val_is_null(new_val_is_null_);
4223 sum +=
Select(new_val_is_null,
4228 auto _new_val_pred =
4230 sum += _new_val_pred.insist_not_null();
4234 []<
typename>() {
M_unreachable(
"invalid type for given number of SIMD lanes"); }
4236 auto &
n = as<const Numeric>(*info.entry.type);
4238 case Numeric::N_Int:
4239 case Numeric::N_Decimal:
4242 case 8: sum.template operator()<int8_t >();
break;
4243 case 16: sum.template operator()<int16_t>();
break;
4244 case 32: sum.template operator()<int32_t>();
break;
4245 case 64: sum.template operator()<int64_t>();
break;
4248 case Numeric::N_Float:
4250 sum.template
operator()<
float>();
4252 sum.template operator()<
double>();
4256 case m::Function::FN_COUNT: {
4257 M_insist(info.args.size() <= 1,
"COUNT aggregate function expects at most one argument");
4258 M_insist(info.entry.type->is_integral()
and info.entry.type->size() == 64);
4260 auto &count = *
M_notnull(std::get_if<
Var<I64<L>>>(&agg_values[idx]));
4262 if (info.args.empty()) {
4263 count += pred ? pred->template to<int64_t>() : I64<L>(1);
4265 auto _new_val = env.compile(*info.args[0]);
4268 I64<L> inc = pred ? (not_null<L>(_new_val)
and *pred).
template to<int64_t>()
4269 : not_null<L>(_new_val).template to<int64_t>();
4273 I64<L> inc = pred ? pred->template to<int64_t>() : I64<L>(1);
4283 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
4284 auto &info = aggregates[idx];
4286 if (info.fnid == m::Function::FN_AVG) {
4287 M_insist(info.args.size() == 1,
"AVG aggregate function expects exactly one argument");
4288 const auto &arg = *info.args[0];
4289 M_insist(info.entry.type->is_double());
4291 auto it = avg_aggregates.find(info.entry.id);
4292 M_insist(it != avg_aggregates.end());
4293 const auto &avg_info = it->second;
4294 M_insist(avg_info.compute_running_avg,
4295 "AVG aggregate may only occur for running average computations");
4298 std::get_if<std::pair<
Var<Double<L>>,
Var<Bool<L>>>>(&agg_values[idx])
4303 auto running_count_idx = std::distance(
4304 aggregates.cbegin(),
4305 std::find_if(aggregates.cbegin(), aggregates.cend(), [&avg_info](
const auto &info){
4306 return info.entry.id == avg_info.running_count;
4309 M_insist(0 <= running_count_idx
and running_count_idx < aggregates.size());
4310 Double<L> running_count = [&](){
4311 auto &running_count =
4312 *
M_notnull(std::get_if<
Var<I64<L>>>(&agg_values[running_count_idx]));
4313 if constexpr (
L != 2) {
4314 return running_count.template to<double>();
4316 M_unreachable(
"conversion from `I64<2>` to `Double<2>` not supported");
4317 return Double<L>(0.0);
4321 auto _arg = env.compile(arg);
4322 _Double<L> _new_val = convert<_Double<L>>(_arg);
4323 if (_new_val.can_be_null()) {
4325 auto _new_val_pred = pred ?
Select(*pred, _new_val, _Double<L>::Null()) : _new_val;
4326 auto [new_val, new_val_is_null_] = _new_val_pred.split();
4327 const Var<Bool<L>> new_val_is_null(new_val_is_null_);
4329 auto delta_absolute = new_val - avg;
4330 auto delta_relative = delta_absolute / running_count;
4332 avg +=
Select(new_val_is_null,
4337 auto _new_val_pred = pred ?
Select(*pred, _new_val, avg) : _new_val;
4338 auto delta_absolute = _new_val_pred.insist_not_null() - avg;
4339 auto delta_relative = delta_absolute / running_count;
4341 avg += delta_relative;
4349 case 1: execute_pipeline.operator()<1>();
break;
4350 case 2: execute_pipeline.operator()<2>();
break;
4351 case 4: execute_pipeline.operator()<4>();
break;
4352 case 8: execute_pipeline.operator()<8>();
break;
4353 case 16: execute_pipeline.operator()<16>();
break;
4354 case 32: execute_pipeline.operator()<32>();
break;
4358 auto execute_teardown = [&]<std::size_t
L>(){
4361 "number of SIMD lanes in teardown callback must match the one in setup callback");
4367 auto agg_values =
static_cast<agg_t*
>(_agg_values);
4368 auto agg_value_backups =
static_cast<agg_backup_t*
>(_agg_value_backups);
4371 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
4372 auto &info = aggregates[idx];
4374 bool is_min =
false;
4375 switch (info.fnid) {
4378 case m::Function::FN_MIN:
4380 case m::Function::FN_MAX: {
4381 auto min_max = [&]<
typename T>() {
4382 auto &[min_max_backup, is_null_backup] = *
M_notnull((
4385 >(&agg_value_backups[idx])
4387 std::tie(min_max_backup, is_null_backup) = *
M_notnull((
4391 auto &
n = as<const Numeric>(*info.entry.type);
4393 case Numeric::N_Int:
4394 case Numeric::N_Decimal:
4397 case 8: min_max.template operator()<int8_t >();
break;
4398 case 16: min_max.template operator()<int16_t>();
break;
4399 case 32: min_max.template operator()<int32_t>();
break;
4400 case 64: min_max.template operator()<int64_t>();
break;
4403 case Numeric::N_Float:
4405 min_max.template
operator()<
float>();
4407 min_max.template operator()<
double>();
4411 case m::Function::FN_AVG: {
4412 auto &[avg_backup, is_null_backup] = *
M_notnull((
4413 std::get_if<std::pair<
Global<Double<L>>,
Global<Bool<L>>>>(&agg_value_backups[idx])
4415 std::tie(avg_backup, is_null_backup) = *
M_notnull((
4416 std::get_if<std::pair<
Var<Double<L>>,
Var<Bool<L>>>>(&agg_values[idx])
4421 case m::Function::FN_SUM: {
4422 M_insist(info.args.size() == 1,
"SUM aggregate function expects exactly one argument");
4423 const auto &arg = *info.args[0];
4425 auto sum = [&]<
typename T>() {
4426 auto &[sum_backup, is_null_backup] = *
M_notnull((
4429 >(&agg_value_backups[idx])
4431 std::tie(sum_backup, is_null_backup) = *
M_notnull((
4435 auto &
n = as<const Numeric>(*info.entry.type);
4437 case Numeric::N_Int:
4438 case Numeric::N_Decimal:
4441 case 8: sum.template operator()<int8_t >();
break;
4442 case 16: sum.template operator()<int16_t>();
break;
4443 case 32: sum.template operator()<int32_t>();
break;
4444 case 64: sum.template operator()<int64_t>();
break;
4447 case Numeric::N_Float:
4449 sum.template
operator()<
float>();
4451 sum.template operator()<
double>();
4455 case m::Function::FN_COUNT: {
4456 auto &count_backup = *
M_notnull(std::get_if<
Global<I64<L>>>(&agg_value_backups[idx]));
4457 count_backup = *
M_notnull(std::get_if<
Var<I64<L>>>(&agg_values[idx]));
4465 for (std::size_t idx = 0; idx < aggregates.size(); ++idx) {
4466 agg_values[idx].~agg_t();
4467 agg_value_backups[idx].~agg_backup_t();
4471 delete[] agg_values;
4472 delete[] agg_value_backups;
4476 case 1: execute_teardown.operator()<1>();
break;
4477 case 2: execute_teardown.operator()<2>();
break;
4478 case 4: execute_teardown.operator()<4>();
break;
4479 case 8: execute_teardown.operator()<8>();
break;
4480 case 16: execute_teardown.operator()<16>();
break;
4481 case 32: execute_teardown.operator()<32>();
break;
4486 aggregation_child_pipeline();
4492 for (
auto &fn : finalize_aggregates)
4497 for (
auto &e : M.aggregation.schema().deduplicate()) {
4498 if (
auto it = avg_aggregates.find(e.id);
4499 it != avg_aggregates.end()
and not it->second.compute_running_avg)
4501 auto &avg_info = it->second;
4502 auto sum = results.
get(avg_info.sum);
4503 auto count = results.
get<_I64x1>(avg_info.running_count).insist_not_null().to<
double>();
4504 auto avg = convert<_Doublex1>(sum) / count;
4511 if (
value.can_be_null()) {
4520 [](
auto) ->
void {
M_unreachable(
"only scalar and non-string values must occur"); },
4521 [](std::monostate) ->
void {
M_unreachable(
"invalid reference"); },
4522 }, results.
get(e.id));
4539template<
bool CmpPredicated>
4552template<
bool CmpPredicated>
4562 for (
auto &o : M.sorting.order_by()) {
4564 if (orders.
find(
id) == orders.
cend())
4575template<
bool CmpPredicated>
4580 M_insist(
bool(M.materializing_factory),
"`wasm::Quicksort` must have a factory for the materialized child");
4581 const auto buffer_schema = M.child->get_matched_root().schema().drop_constants().deduplicate();
4582 const auto sorting_schema = M.sorting.schema().drop_constants().deduplicate();
4584 buffer_schema, *M.materializing_factory,
false, 0, std::move(setup), std::move(pipeline), std::move(teardown)
4588 FUNCTION(sorting_child_pipeline,
void(
void))
4598 sorting_child_pipeline();
4601 quicksort<CmpPredicated>(buffer, M.sorting.order_by());
4608 const std::tuple<const SortingOperator*> &partial_inner_nodes)
4616 for (
auto &o : std::get<0>(partial_inner_nodes)->order_by()) {
4618 if (orders.
find(
id) == orders.
cend())
4628 M.child->execute(std::move(setup), std::move(pipeline), std::move(teardown));
4636template<
bool Predicated>
4647template<
bool Predicated>
4650 std::vector<std::reference_wrapper<const ConditionSet>> &&post_cond_children)
4652 M_insist(post_cond_children.size() >= 2);
4654 ConditionSet post_cond(post_cond_children.back().get());
4664template<
bool Predicated>
4668 for (
auto &child : M.children)
4669 cost *= child->get_matched_root().info().estimated_cardinality;
4673template<
bool Predicated>
4677 const auto num_left_children = M.children.size() - 1;
4679 std::vector<Schema> schemas;
4680 schemas.reserve(num_left_children);
4681 std::vector<GlobalBuffer> buffers;
4682 buffers.reserve(num_left_children);
4685 for (std::size_t i = 0; i < num_left_children; ++i) {
4687 FUNCTION(nested_loop_join_child_pipeline,
void(
void))
4692 M_insist(
bool(M.materializing_factories_[i]),
4693 "`wasm::NestedLoopsJoin` must have a factory for each materialized child");
4694 const auto &schema = schemas.emplace_back(
4695 M.children[i]->get_matched_root().schema().drop_constants().deduplicate()
4699 buffers.emplace_back(
4701 *M.materializing_factories_[i],
4705 [&, pipeline=std::move(pipeline)](){
4706 if constexpr (Predicated) {
4707 CodeGenContext::Get().env().add_predicate(M.join.predicate());
4710 M_insist(CodeGenContext::Get().num_simd_lanes() == 1,
"invalid number of SIMD lanes");
4711 IF (CodeGenContext::Get().env().compile<_Boolx1>(M.join.predicate()).is_true_and_not_null()) {
4727 buffers.emplace_back(
4729 *M.materializing_factories_[i],
4733 [&](){ buffers.back().resume_pipeline_inline(); },
4739 M.children[i]->execute(
4741 [&](){ buffers.back().consume(); },
4745 nested_loop_join_child_pipeline();
4749 M.children.back()->execute(
4751 [&](){ buffers.back().resume_pipeline_inline(); },
4756template<
bool UniqueBuild,
bool Predicated>
4759 const std::tuple<const JoinOperator*, const Wildcard*, const Wildcard*> &partial_inner_nodes)
4764 auto &join = *std::get<0>(partial_inner_nodes);
4765 if (not join.predicate().is_equi())
4768 if constexpr (UniqueBuild) {
4770 auto &build = *std::get<1>(partial_inner_nodes);
4771 for (
auto &clause : join.predicate()) {
4772 M_insist(clause.size() == 1,
"invalid equi-predicate");
4773 auto &literal = clause[0];
4774 auto &
binary = as<const BinaryExpr>(literal.expr());
4776 (literal.negative()
and binary.tok == TK_BANG_EQUAL),
"invalid equi-predicate");
4777 M_insist(is<const Designator>(
binary.lhs),
"invalid equi-predicate");
4778 M_insist(is<const Designator>(
binary.rhs),
"invalid equi-predicate");
4780 const auto &entry_build = build.schema().has(id_first) ? build.schema()[id_first].second
4781 : build.schema()[id_second].second;
4784 if (not entry_build.unique())
4795template<
bool UniqueBuild,
bool Predicated>
4798 std::vector<std::reference_wrapper<const ConditionSet>> &&post_cond_children)
4800 M_insist(post_cond_children.size() == 2);
4815template<
bool UniqueBuild,
bool Predicated>
4819 return (M.build.id() == M.children[0]->get_matched_root().id() ? 1.0 : 2.0) + (UniqueBuild ? 0.0 : 0.1);
4821 return M.build.id() == M.children[1]->get_matched_root().id() ? 1.0 : 2.0 + (UniqueBuild ? 0.0 : 0.1);
4823 return 1.5 * M.build.info().estimated_cardinality +
4824 (UniqueBuild ? 1.0 : 1.1) * M.probe.info().estimated_cardinality;
4827template<
bool UniqueBuild,
bool Predicated>
4832 const uint64_t PAYLOAD_SIZE_THRESHOLD_IN_BITS =
4833 M.use_in_place_values ? std::numeric_limits<uint64_t>::max() : 0;
4835 M_insist(((M.join.schema() | M.join.predicate().get_required()) & M.build.schema()) == M.build.schema());
4836 M_insist(M.build.schema().drop_constants() == M.build.schema());
4837 const auto ht_schema = M.build.schema().deduplicate();
4843 std::vector<Schema::Identifier> payload_ids;
4844 uint64_t payload_size_in_bits = 0;
4845 for (
auto &e : ht_schema) {
4846 if (not
contains(build_keys, e.id)) {
4847 payload_ids.push_back(e.id);
4848 payload_size_in_bits += e.type->size();
4856 std::unique_ptr<HashTable> ht;
4857 std::vector<HashTable::index_t> build_key_indices;
4858 for (
auto &build_key : build_keys)
4859 build_key_indices.push_back(ht_schema[build_key].first);
4860 if (M.use_open_addressing_hashing) {
4861 if (payload_size_in_bits < PAYLOAD_SIZE_THRESHOLD_IN_BITS)
4862 ht = std::make_unique<GlobalOpenAddressingInPlaceHashTable>(ht_schema, std::move(build_key_indices),
4865 ht = std::make_unique<GlobalOpenAddressingOutOfPlaceHashTable>(ht_schema, std::move(build_key_indices),
4867 if (M.use_quadratic_probing)
4868 as<OpenAddressingHashTableBase>(*ht).set_probing_strategy<
QuadraticProbing>();
4870 as<OpenAddressingHashTableBase>(*ht).set_probing_strategy<
LinearProbing>();
4872 ht = std::make_unique<GlobalChainedHashTable>(ht_schema, std::move(build_key_indices), initial_capacity);
4876 FUNCTION(simple_hash_join_child_pipeline,
void(
void))
4880 M.children[0]->execute(
4883 ht->set_high_watermark(M.load_factor);
4888 std::optional<Boolx1> build_key_not_null;
4889 for (
auto &build_key : build_keys) {
4890 auto val = env.
get(build_key);
4891 if (build_key_not_null)
4892 build_key_not_null.emplace(*build_key_not_null
and not_null(val));
4894 build_key_not_null.emplace(
not_null(val));
4896 M_insist(
bool(build_key_not_null));
4897 IF (*build_key_not_null) {
4899 std::vector<SQL_t> key;
4900 for (
auto &build_key : build_keys)
4901 key.emplace_back(env.get(build_key));
4902 auto entry = ht->emplace(std::move(key));
4905 for (
auto &
id : payload_ids) {
4908 [](std::monostate) ->
void {
M_unreachable(
"invalid reference"); },
4909 }, entry.extract(
id));
4916 simple_hash_join_child_pipeline();
4918 M.children[1]->execute(
4919 setup_t(std::move(setup), [&](){ ht->setup(); }),
4920 [&, pipeline=std::move(pipeline)](){
4925 for (
auto &e : ht_schema) {
4926 if (not entry.has(e.id)) {
4928 M_insist(env.has(e.id),
"build key must already be contained in the current environment");
4935 if (
value.can_be_null()) {
4948 value.guarantees_terminating_nul()));
4950 [](std::monostate) ->
void {
M_unreachable(
"invalid reference"); },
4951 }, entry.extract(e.id));
4960 std::vector<SQL_t> key;
4961 for (
auto &probe_key : probe_keys)
4962 key.emplace_back(env.get(probe_key));
4963 if constexpr (UniqueBuild) {
4965 for (
auto build_it = build_keys.cbegin(), probe_it = probe_keys.cbegin(); build_it != build_keys.cend();
4966 ++build_it, ++probe_it)
4968 M_insist(probe_it != probe_keys.cend());
4969 if (not env.has(*build_it))
4970 env.add(*build_it, env.get(*probe_it));
4974 auto p = ht->find(std::move(key));
4975 auto &entry = p.first;
4976 auto &found = p.second;
4978 env.add_predicate(found);
4979 emit_tuple_and_resume_pipeline(std::move(entry));
4982 emit_tuple_and_resume_pipeline(std::move(entry));
4987 ht->for_each_in_equal_range(std::move(key), std::move(emit_tuple_and_resume_pipeline),
Predicated);
4990 teardown_t(std::move(teardown), [&](){ ht->teardown(); })
4994template<
bool SortLeft,
bool SortRight,
bool Predicated,
bool CmpPredicated>
4996 std::size_t child_idx,
4997 const std::tuple<const JoinOperator*, const Wildcard*, const Wildcard*> &partial_inner_nodes)
5002 auto &join = *std::get<0>(partial_inner_nodes);
5003 if (not join.predicate().is_equi())
5007 auto parent = std::get<1>(partial_inner_nodes);
5008 auto child = std::get<2>(partial_inner_nodes);
5011 std::vector<Schema::Identifier> keys_parent, keys_child;
5012 for (
auto &clause : join.predicate()) {
5013 M_insist(clause.size() == 1,
"invalid equi-predicate");
5014 auto &literal = clause[0];
5015 auto &
binary = as<const BinaryExpr>(literal.expr());
5017 (literal.negative()
and binary.tok == TK_BANG_EQUAL),
"invalid equi-predicate");
5018 M_insist(is<const Designator>(
binary.lhs),
"invalid equi-predicate");
5019 M_insist(is<const Designator>(
binary.rhs),
"invalid equi-predicate");
5022 const auto &[entry_parent, entry_child] = parent->schema().has(id_first)
5023 ? std::make_pair(parent->schema()[id_first].second, child_idx == 1 ? child->schema()[id_second].second : std::move(dummy))
5024 : std::make_pair(parent->schema()[id_second].second, child_idx == 1 ? child->schema()[id_first].second : std::move(dummy));
5025 keys_parent.push_back(entry_parent.id);
5026 keys_child.push_back(entry_child.id);
5029 if (not entry_parent.unique())
5032 M_insist(keys_parent.size() == keys_child.size(),
"number of found IDs differ");
5033 M_insist(not keys_parent.empty(),
"must find at least one ID");
5035 if constexpr (not SortLeft or not SortRight) {
5039 if (not SortLeft
and child_idx == 0) {
5040 for (
auto &key_parent : keys_parent) {
5041 if (orders.
find(key_parent) == orders.
cend())
5044 }
else if (not SortRight
and child_idx == 1) {
5045 for (
auto &key_child : keys_child) {
5046 if (orders.
find(key_child) == orders.
cend())
5059template<
bool SortLeft,
bool SortRight,
bool Predicated,
bool CmpPredicated>
5062 std::vector<std::reference_wrapper<const ConditionSet>> &&post_cond_children)
5064 M_insist(post_cond_children.size() == 2);
5077 if constexpr (not SortLeft) {
5078 Sortedness sorting_left(post_cond_children[0].get().get_condition<Sortedness>());
5081 if constexpr (not SortRight) {
5082 Sortedness sorting_right(post_cond_children[1].get().get_condition<Sortedness>());
5085 if constexpr (SortLeft or SortRight) {
5090 if constexpr (SortLeft) {
5091 for (
auto &key_parent : keys_parent) {
5092 if (orders.
find(key_parent) == orders.
cend())
5096 if constexpr (SortRight) {
5097 for (
auto &key_child : keys_child) {
5098 if (orders.
find(key_child) == orders.
cend())
5108template<
bool SortLeft,
bool SortRight,
bool Predicated,
bool CmpPredicated>
5111 const double card_left = M.parent.info().estimated_cardinality;
5112 const double card_right = M.child.info().estimated_cardinality;
5114 double cost = card_left + card_right;
5115 if constexpr (SortLeft)
5116 cost += std::log2(card_left) * card_left;
5117 if constexpr (SortRight)
5118 cost += std::log2(card_right) * card_right;
5123template<
bool SortLeft,
bool SortRight,
bool Predicated,
bool CmpPredicated>
5131 const bool needs_buffer_parent = not is<const ScanOperator>(M.parent) or SortLeft;
5132 const bool needs_buffer_child = not is<const ScanOperator>(M.child) or SortRight;
5135 M_insist(
bool(M.left_materializing_factory),
5136 "`wasm::SortMergeJoin` must have a factory for the materialized left child");
5137 M_insist(
bool(M.right_materializing_factory),
5138 "`wasm::SortMergeJoin` must have a factory for the materialized right child");
5139 const auto schema_parent = M.parent.schema().drop_constants().deduplicate();
5140 const auto schema_child = M.child.schema().drop_constants().deduplicate();
5141 std::optional<GlobalBuffer> buffer_parent, buffer_child;
5142 if (needs_buffer_parent)
5143 buffer_parent.emplace(schema_parent, *M.left_materializing_factory);
5144 if (needs_buffer_child)
5145 buffer_child.emplace(schema_child, *M.right_materializing_factory);
5148 if (needs_buffer_parent) {
5149 FUNCTION(sort_merge_join_parent_pipeline,
void(
void))
5152 M.children[0]->execute(
5154 [&](){ buffer_parent->consume(); },
5158 sort_merge_join_parent_pipeline();
5160 if (needs_buffer_child) {
5161 FUNCTION(sort_merge_join_child_pipeline,
void(
void))
5164 M.children[1]->execute(
5166 [&](){ buffer_child->consume(); },
5170 sort_merge_join_child_pipeline();
5174 std::vector<SortingOperator::order_type> order_parent, order_child;
5175 for (
auto &clause : M.join.predicate()) {
5176 M_insist(clause.size() == 1,
"invalid equi-predicate");
5177 auto &literal = clause[0];
5178 auto &
binary = as<const BinaryExpr>(literal.expr());
5180 (literal.negative()
and binary.tok == TK_BANG_EQUAL),
"invalid equi-predicate");
5181 M_insist(is<const Designator>(
binary.lhs),
"invalid equi-predicate");
5182 M_insist(is<const Designator>(
binary.rhs),
"invalid equi-predicate");
5185 order_parent.emplace_back(*expr_parent,
true);
5186 order_child.emplace_back(*expr_child,
true);
5188 M_insist(order_parent.size() == order_child.size(),
"number of found IDs differ");
5189 M_insist(not order_parent.empty(),
"must find at least one ID");
5192 if constexpr (SortLeft)
5193 quicksort<CmpPredicated>(*buffer_parent, order_parent);
5194 if constexpr (SortRight)
5195 quicksort<CmpPredicated>(*buffer_child, order_child);
5198 auto child_smaller_equal = [&]() -> Boolx1 {
5199 std::optional<Boolx1> child_smaller_equal_;
5200 for (std::size_t i = 0; i < order_child.size(); ++i) {
5201 auto &des_parent = as<const Designator>(order_parent[i].first);
5202 auto &des_child = as<const Designator>(order_child[i].first);
5204 auto cpy_parent = std::make_unique<Designator>(des_parent.tok, des_parent.table_name, des_parent.attr_name,
5205 des_parent.type(), des_parent.target());
5206 auto cpy_child = std::make_unique<Designator>(des_child.tok, des_child.table_name, des_child.attr_name,
5207 des_child.type(), des_child.target());
5208 BinaryExpr expr(std::move(leq), std::move(cpy_child), std::move(cpy_parent));
5211 Boolx1 cmp = env.compile<_Boolx1>(
expr).is_true_and_not_null();
5212 if (child_smaller_equal_)
5213 child_smaller_equal_.emplace(*child_smaller_equal_
and (
is_null(child) or cmp));
5215 child_smaller_equal_.emplace(
is_null(child) or cmp);
5217 M_insist(
bool(child_smaller_equal_));
5218 return *child_smaller_equal_;
5222 static Schema empty_schema;
5224 auto [inits_parent, loads_parent, _jumps_parent] = [&](){
5225 if (needs_buffer_parent) {
5227 buffer_parent->layout(), 1, buffer_parent->schema(), tuple_id_parent);
5229 auto &scan = as<const ScanOperator>(M.parent);
5231 scan.store().table().layout(), 1, scan.store().table().schema(scan.alias()),
5235 auto [inits_child, loads_child, _jumps_child] = [&](){
5236 if (needs_buffer_child) {
5238 buffer_child->layout(), 1, buffer_child->schema(), tuple_id_child);
5240 auto &scan = as<const ScanOperator>(M.child);
5242 scan.store().table().layout(), 1, scan.store().table().schema(scan.alias()),
5247 Block jumps_parent(std::move(_jumps_parent)), jumps_child(std::move(_jumps_child));
5251 inits_parent.attach_to_current();
5252 inits_child.attach_to_current();
5253 U32x1 size_parent = needs_buffer_parent ? buffer_parent->size()
5254 :
get_num_rows(as<const ScanOperator>(M.parent).store().table().name());
5255 U32x1 size_child = needs_buffer_child ? buffer_child->size()
5256 :
get_num_rows(as<const ScanOperator>(M.child).store().table().name());
5257 WHILE (tuple_id_parent < size_parent
and tuple_id_child < size_child) {
5258 loads_parent.attach_to_current();
5259 loads_child.attach_to_current();
5261 env.add_predicate(M.join.predicate());
5265 IF (env.compile<_Boolx1>(M.join.predicate()).is_true_and_not_null()) {
5269 IF (child_smaller_equal()) {
5272 jumps_parent.attach_to_current();
5297 std::optional<Block> teardown_block;
5298 std::optional<BlockUser> use_teardown;
5300 std::optional<Var<U32x1>> counter;
5305 setup_t(std::move(setup), [&](){
5306 counter.emplace(counter_backup);
5307 teardown_block.emplace(
"limit.teardown",
true);
5308 use_teardown.emplace(*teardown_block);
5310 [&, pipeline=std::move(pipeline)](){
5313 const uint32_t limit = M.limit.offset() + M.limit.limit();
5316 IF (*counter >= limit) {
5317 GOTO(*teardown_block);
5321 if (M.limit.offset()) {
5322 IF (*counter >= uint32_t(M.limit.offset())) {
5323 Wasm_insist(*counter < limit,
"counter must not exceed limit");
5327 Wasm_insist(*counter < limit,
"counter must not exceed limit");
5337 use_teardown.reset();
5338 teardown_block.reset();
5341 counter_backup = *counter;
5353 std::size_t child_idx,
5354 const std::tuple<const GroupingOperator*, const JoinOperator*, const Wildcard*, const Wildcard*>
5355 &partial_inner_nodes)
5360 auto &grouping = *std::get<0>(partial_inner_nodes);
5361 for (
auto &fn_expr : grouping.aggregates()) {
5362 M_insist(fn_expr.get().args.size() <= 1);
5363 if (fn_expr.get().args.size() == 1
and not is<const Designator>(fn_expr.get().args[0]))
5368 auto &join = *std::get<1>(partial_inner_nodes);
5369 if (not join.predicate().is_equi())
5373 if (child_idx == 0) {
5375 auto &build = *std::get<2>(partial_inner_nodes);
5379 const auto num_grouping_keys = grouping.group_by().size();
5380 if (num_grouping_keys != build_keys.size())
5382 for (std::size_t i = 0; i < num_grouping_keys; ++i) {
5384 if (not
contains(build_keys, grouping_key))
5397 return 1.5 * M.build.info().estimated_cardinality + 1.0 * M.probe.info().estimated_cardinality +
5398 1.0 * M.join.info().estimated_cardinality;
5418 const uint64_t AGGREGATES_SIZE_THRESHOLD_IN_BITS =
5419 M.use_in_place_values ? std::numeric_limits<uint64_t>::max() : 0;
5422 const auto num_keys = M.grouping.group_by().size();
5426 for (std::size_t i = 0; i < num_keys; ++i) {
5427 auto &e = M.grouping.schema()[i];
5428 ht_schema.
add(e.id, e.type, e.constraints);
5431 const auto &aggregates = aggregates_info.first;
5432 const auto &avg_aggregates = aggregates_info.second;
5433 bool needs_build_counter =
false;
5434 uint64_t aggregates_size_in_bits = 0;
5435 for (
auto &info : aggregates) {
5436 ht_schema.
add(info.entry);
5437 aggregates_size_in_bits += info.entry.type->size();
5440 if (info.fnid == m::Function::FN_COUNT or info.fnid == m::Function::FN_SUM) {
5441 if (not info.args.empty()) {
5442 M_insist(info.args.size() == 1,
"aggregate functions expect at most one argument");
5443 auto &des = as<const Designator>(*info.args[0]);
5445 if (M.probe.schema().has(arg))
5446 needs_build_counter =
true;
5450 if (needs_build_counter) {
5453 aggregates_size_in_bits += 64;
5457 aggregates_size_in_bits += 64;
5461 M_insist(build_keys.size() == num_keys);
5467 std::unique_ptr<HashTable> ht;
5468 std::vector<HashTable::index_t> key_indices(num_keys);
5469 std::iota(key_indices.begin(), key_indices.end(), 0);
5470 if (M.use_open_addressing_hashing) {
5471 if (aggregates_size_in_bits < AGGREGATES_SIZE_THRESHOLD_IN_BITS)
5472 ht = std::make_unique<GlobalOpenAddressingInPlaceHashTable>(ht_schema, std::move(key_indices),
5475 ht = std::make_unique<GlobalOpenAddressingOutOfPlaceHashTable>(ht_schema, std::move(key_indices),
5477 if (M.use_quadratic_probing)
5478 as<OpenAddressingHashTableBase>(*ht).set_probing_strategy<
QuadraticProbing>();
5480 as<OpenAddressingHashTableBase>(*ht).set_probing_strategy<
LinearProbing>();
5482 ht = std::make_unique<GlobalChainedHashTable>(ht_schema, std::move(key_indices), initial_capacity);
5485 std::optional<HashTable::entry_t> dummy;
5494 bool build_phase) -> std::tuple<Block, Block, Block>
5496 Block init_aggs(
"hash_based_group_join.init_aggs",
false),
5497 update_aggs(
"hash_based_group_join.update_aggs",
false),
5498 update_avg_aggs(
"hash_based_group_join.update_avg_aggs",
false);
5499 for (
auto &info : aggregates) {
5500 bool is_min =
false;
5501 switch (info.fnid) {
5504 case m::Function::FN_MIN:
5506 case m::Function::FN_MAX: {
5507 M_insist(info.args.size() == 1,
"MIN and MAX aggregate functions expect exactly one argument");
5508 auto &arg = as<const Designator>(*info.args[0]);
5510 arg.attr_name.text.assert_not_none()));
5514 requires (not (std::same_as<_T, _Boolx1> or std::same_as<_T, NChar>)) {
5515 using type =
typename _T::type;
5520 auto neutral = is_min ?
T(std::numeric_limits<type>::max())
5521 :
T(std::numeric_limits<type>::lowest());
5523 auto _arg = env.compile(arg);
5524 auto [val_,
is_null] = convert<_T>(_arg).split();
5527 r.clone().set_value(neutral);
5528 if (info.entry.nullable())
5529 r.clone().set_null_bit(Boolx1(
true));
5531 r.clone().set_value(val);
5532 if (info.entry.nullable())
5533 r.clone().set_null_bit(Boolx1(
false));
5536 r.clone().set_value(neutral);
5537 if (info.entry.nullable())
5538 r.clone().set_null_bit(Boolx1(
true));
5547 auto _arg = env.compile(arg);
5548 _T _new_val = convert<_T>(_arg);
5549 if (_new_val.can_be_null()) {
5550 auto [new_val_, new_val_is_null_] = _new_val.split();
5551 auto [old_min_max_, old_min_max_is_null] = _T(r.clone()).split();
5552 const Var<Boolx1> new_val_is_null(new_val_is_null_);
5554 auto chosen_r =
Select(new_val_is_null, dummy->extract<_T>(info.entry.id), r.clone());
5555 if constexpr (std::floating_point<type>) {
5557 is_min ?
min(old_min_max_, new_val_)
5558 :
max(old_min_max_, new_val_)
5561 const Var<T> new_val(new_val_),
5562 old_min_max(old_min_max_);
5563 auto cmp = is_min ? new_val < old_min_max : new_val > old_min_max;
5571 old_min_max_is_null
and new_val_is_null
5574 auto new_val_ = _new_val.insist_not_null();
5575 auto old_min_max_ = _T(r.clone()).insist_not_null();
5576 if constexpr (std::floating_point<type>) {
5578 is_min ?
min(old_min_max_, new_val_)
5579 :
max(old_min_max_, new_val_)
5582 const Var<T> new_val(new_val_),
5583 old_min_max(old_min_max_);
5584 auto cmp = is_min ? new_val < old_min_max : new_val > old_min_max;
5596 requires std::same_as<_T,_Boolx1> or std::same_as<_T, NChar> {
5599 [](std::monostate) ->
void {
M_unreachable(
"invalid reference"); },
5600 }, entry.
extract(info.entry.id));
5603 case m::Function::FN_AVG: {
5604 auto it = avg_aggregates.find(info.entry.id);
5605 M_insist(it != avg_aggregates.end());
5606 const auto &avg_info = it->second;
5607 M_insist(avg_info.compute_running_avg,
5608 "AVG aggregate may only occur for running average computations");
5609 M_insist(info.args.size() == 1,
"AVG aggregate function expects exactly one argument");
5610 auto &arg = as<const Designator>(*info.args[0]);
5612 arg.attr_name.text.assert_not_none()));
5614 auto r = entry.
extract<_Doublex1>(info.entry.id);
5619 auto _arg = env.compile(arg);
5620 auto [val_,
is_null] = convert<_Doublex1>(_arg).split();
5623 r.clone().set_value(Doublex1(0.0));
5624 if (info.entry.nullable())
5625 r.clone().set_null_bit(Boolx1(
true));
5627 r.clone().set_value(val);
5628 if (info.entry.nullable())
5629 r.clone().set_null_bit(Boolx1(
false));
5632 r.clone().set_value(Doublex1(0.0));
5633 if (info.entry.nullable())
5634 r.clone().set_null_bit(Boolx1(
true));
5645 auto _arg = env.compile(arg);
5646 _Doublex1 _new_val = convert<_Doublex1>(_arg);
5647 if (_new_val.can_be_null()) {
5648 auto [new_val, new_val_is_null_] = _new_val.split();
5649 auto [old_avg_, old_avg_is_null] = _Doublex1(r.clone()).split();
5650 const Var<Boolx1> new_val_is_null(new_val_is_null_);
5653 auto delta_absolute = new_val - old_avg;
5654 auto running_count = _I64x1(entry.
get<_I64x1>(avg_info.running_count)).insist_not_null();
5655 auto delta_relative = delta_absolute / running_count.to<
double>();
5657 auto chosen_r =
Select(new_val_is_null, dummy->extract<_Doublex1>(info.entry.id), r.clone());
5659 old_avg + delta_relative
5662 old_avg_is_null
and new_val_is_null
5665 auto new_val = _new_val.insist_not_null();
5666 auto old_avg_ = _Doublex1(r.clone()).insist_not_null();
5669 auto delta_absolute = new_val - old_avg;
5670 auto running_count = _I64x1(entry.
get<_I64x1>(avg_info.running_count)).insist_not_null();
5671 auto delta_relative = delta_absolute / running_count.to<
double>();
5673 old_avg + delta_relative
5680 case m::Function::FN_SUM: {
5681 M_insist(info.args.size() == 1,
"SUM aggregate function expects exactly one argument");
5682 auto &arg = as<const Designator>(*info.args[0]);
5684 arg.attr_name.text.assert_not_none()));
5688 requires (not (std::same_as<_T, _Boolx1> or std::same_as<_T, NChar>)) {
5689 using type =
typename _T::type;
5695 auto _arg = env.compile(arg);
5696 auto [val_,
is_null] = convert<_T>(_arg).split();
5699 r.clone().set_value(
T(type(0)));
5700 if (info.entry.nullable())
5701 r.clone().set_null_bit(Boolx1(
true));
5703 r.clone().set_value(val);
5704 if (info.entry.nullable())
5705 r.clone().set_null_bit(Boolx1(
false));
5708 r.clone().set_value(
T(type(0)));
5709 if (info.entry.nullable())
5710 r.clone().set_null_bit(Boolx1(
true));
5719 auto _arg = env.compile(arg);
5720 _T _new_val = convert<_T>(_arg);
5721 if (_new_val.can_be_null()) {
5722 auto [new_val, new_val_is_null_] = _new_val.split();
5723 auto [old_sum, old_sum_is_null] = _T(r.clone()).split();
5724 const Var<Boolx1> new_val_is_null(new_val_is_null_);
5726 auto chosen_r =
Select(new_val_is_null, dummy->extract<_T>(info.entry.id), r.clone());
5731 old_sum_is_null
and new_val_is_null
5734 auto new_val = _new_val.insist_not_null();
5735 auto old_sum = _T(r.clone()).insist_not_null();
5744 requires std::same_as<_T,_Boolx1> or std::same_as<_T, NChar> {
5747 [](std::monostate) ->
void {
M_unreachable(
"invalid reference"); },
5748 }, entry.
extract(info.entry.id));
5751 case m::Function::FN_COUNT: {
5752 M_insist(info.args.size() <= 1,
"COUNT aggregate function expects at most one argument");
5754 auto r = entry.
get<_I64x1>(info.entry.id);
5756 if (info.args.empty()) {
5757 if (not build_phase) {
5762 r.clone() = _I64x1(1);
5765 auto old_count = _I64x1(r.clone()).insist_not_null();
5767 old_count + int64_t(1)
5772 auto &arg = as<const Designator>(*info.args[0]);
5774 arg.attr_name.text.assert_not_none()));
5779 auto _arg = env.compile(arg);
5780 I64x1 new_val_not_null =
5783 r.clone() = _I64x1(new_val_not_null);
5785 r.clone() = _I64x1(0);
5794 auto _arg = env.compile(arg);
5795 I64x1 new_val_not_null =
5798 auto old_count = _I64x1(r.clone()).insist_not_null();
5800 old_count + new_val_not_null
5809 return { std::move(init_aggs), std::move(update_aggs), std::move(update_avg_aggs) };
5813 FUNCTION(hash_based_group_join_build_child_pipeline,
void(
void))
5817 M.children[0]->execute(
5820 ht->set_high_watermark(M.load_factor);
5821 dummy.emplace(ht->dummy_entry());
5827 std::optional<Boolx1> build_key_not_null;
5828 for (
auto &build_key : build_keys) {
5829 auto val = env.
get(build_key);
5830 if (build_key_not_null)
5831 build_key_not_null.emplace(*build_key_not_null
and not_null(val));
5833 build_key_not_null.emplace(
not_null(val));
5835 M_insist(
bool(build_key_not_null));
5836 IF (*build_key_not_null) {
5838 std::vector<SQL_t> key;
5839 for (
auto &build_key : build_keys)
5840 key.emplace_back(env.get(build_key));
5841 auto [entry, inserted] = ht->try_emplace(std::move(key));
5844 auto t = compile_aggregates(entry, env, M.build.schema(),
true);
5845 auto &init_aggs = std::get<0>(t);
5846 auto &update_aggs = std::get<1>(t);
5847 auto &update_avg_aggs = std::get<2>(t);
5850 if (needs_build_counter) {
5851 auto r = entry.
extract<_I64x1>(C.
pool(
"$build_counter"));
5853 r.clone() = _I64x1(1);
5856 auto old_count = _I64x1(r.clone()).insist_not_null();
5858 old_count + int64_t(1)
5864 auto r = entry.
extract<_I64x1>(C.
pool(
"$probe_counter"));
5870 init_aggs.attach_to_current();
5872 update_aggs.attach_to_current();
5873 update_avg_aggs.attach_to_current();
5880 hash_based_group_join_build_child_pipeline();
5883 FUNCTION(hash_based_group_join_probe_child_pipeline,
void(
void))
5887 M.children[1]->execute(
5890 dummy.emplace(ht->dummy_entry());
5898 std::vector<SQL_t> key;
5899 for (
auto &probe_key : probe_keys)
5900 key.emplace_back(env.get(probe_key));
5901 auto [entry, found] = ht->find(std::move(key));
5904 auto t = compile_aggregates(entry, env, M.probe.schema(),
false);
5905 auto &init_aggs = std::get<0>(t);
5906 auto &update_aggs = std::get<1>(t);
5907 auto &update_avg_aggs = std::get<2>(t);
5911 auto r = entry.
extract<_I64x1>(C.
pool(
"$probe_counter"));
5912 auto old_count = _I64x1(r.clone()).insist_not_null();
5914 old_count + int64_t(1)
5920 M_insist(init_aggs.empty(),
"aggregates must be initialized in build phase");
5922 update_aggs.attach_to_current();
5923 update_avg_aggs.attach_to_current();
5929 hash_based_group_join_probe_child_pipeline();
5934 setup_t(std::move(setup), [&](){ ht->setup(); })();
5937 I64x1 probe_counter = _I64x1(entry.
get<_I64x1>(C.
pool(
"$probe_counter"))).insist_not_null();
5938 IF (probe_counter != int64_t(0)) {
5941 for (std::size_t i = 0; i < num_keys; ++i) {
5942 auto &e = M.grouping.schema()[i];
5943 key_schema.
add(e.id, e.type, e.constraints);
5947 for (
auto &e : M.grouping.schema().deduplicate()) {
5949 key_schema.
find(e.id);
5954 if (
auto it = avg_aggregates.find(e.id);
5955 it != avg_aggregates.end()
and not it->second.compute_running_avg)
5957 auto &avg_info = it->second;
5960 requires (std::same_as<T, _I64x1> or std::same_as<T, _Doublex1>) {
5961 return T(r).template to<double>();
5964 [](std::monostate&&) -> _Doublex1 {
M_unreachable(
"invalid reference"); },
5965 }, entry.
get(avg_info.sum));
5966 auto count = _I64x1(entry.
get<_I64x1>(avg_info.running_count)).insist_not_null().to<
double>();
5967 auto avg = sum / count;
5968 if (avg.can_be_null()) {
5974 env.add(e.id, _Doublex1(var));
5981 auto pred = [&e](
const auto &info) ->
bool {
return info.entry.id == e.id; };
5982 if (
auto it = std::find_if(aggregates.cbegin(), aggregates.cend(), pred);
5983 it != aggregates.cend())
5987 if (it->args.empty()) {
5988 M_insist(it->fnid == m::Function::FN_COUNT,
5989 "only COUNT aggregate function may have no argument");
5990 I64x1 probe_counter =
5991 _I64x1(entry.
get<_I64x1>(C.
pool(
"$probe_counter"))).insist_not_null();
5997 M_insist(it->args.size() == 1,
"aggregate functions expect at most one argument");
5998 auto &des = as<const Designator>(*it->args[0]);
6000 if (it->fnid == m::Function::FN_COUNT or it->fnid == m::Function::FN_SUM) {
6001 if (M.probe.schema().has(arg)) {
6002 I64x1 build_counter =
6003 _I64x1(entry.
get<_I64x1>(C.
pool(
"$build_counter"))).insist_not_null();
6004 auto agg =
value * build_counter.to<
T>();
6005 if (agg.can_be_null()) {
6014 M_insist(M.build.schema().has(arg),
6015 "argument ID must occur in either child schema");
6016 I64x1 probe_counter =
6017 _I64x1(entry.
get<_I64x1>(C.
pool(
"$probe_counter"))).insist_not_null();
6018 auto agg =
value * probe_counter.to<
T>();
6019 if (agg.can_be_null()) {
6034 if (
value.can_be_null()) {
6045 auto pred = [&e](
const auto &info) ->
bool {
return info.entry.id == e.id; };
6046 M_insist(std::find_if(aggregates.cbegin(), aggregates.cend(), pred) == aggregates.cend(),
6047 "booleans must not be the result of aggregate functions");
6050 if (
value.can_be_null()) {
6056 env.add(e.id, _Boolx1(var));
6061 auto pred = [&e](
const auto &info) ->
bool {
return info.entry.id == e.id; };
6062 M_insist(std::find_if(aggregates.cbegin(), aggregates.cend(), pred) == aggregates.cend(),
6063 "strings must not be the result of aggregate functions");
6068 value.guarantees_terminating_nul()));
6070 [](std::monostate&&) ->
void {
M_unreachable(
"invalid reference"); },
6071 }, entry.
get(e.id));
6079 teardown_t(std::move(teardown), [&](){ ht->teardown(); })();
6100 indent(out, level) <<
"wasm::NoOp" <<
print_info(this->noop) <<
" (cumulative cost " << cost() <<
')';
6101 this->child->print(out, level + 1);
6104template<
bool SIMDfied>
6108 << this->callback.schema() <<
print_info(this->callback)
6109 <<
" (cumulative cost " << cost() <<
')';
6110 this->child->print(out, level + 1);
6113template<
bool SIMDfied>
6117 << this->print_op.schema() <<
print_info(this->print_op)
6118 <<
" (cumulative cost " << cost() <<
')';
6119 this->child->print(out, level + 1);
6122template<
bool SIMDfied>
6125 indent(out, level) << (SIMDfied ?
"wasm::SIMDScan(" :
"wasm::Scan(") << this->scan.alias() <<
") ";
6126 if (this->buffer_factory_
and this->scan.schema().drop_constants().deduplicate().num_entries())
6127 out <<
"with " << this->buffer_num_tuples_ <<
" tuples output buffer ";
6128 out << this->scan.schema() <<
print_info(this->scan) <<
" (cumulative cost " << cost() <<
')';
6131template<
idx::IndexMethod IndexMethod>
6135 indent(out, level) <<
"wasm::ArrayIndexScan(";
6137 indent(out, level) <<
"wasm::RecursiveModelIndexScan(";
6142 out <<
"Compilation[";
6146 out <<
"ExposedMemory";
6150 out <<
"Interpretation[";
6168 out <<
"ExposedMemory";
6175 out <<
"], " << this->scan.alias() <<
", " << this->filter.filter() <<
") ";
6176 if (this->buffer_factory_
and this->scan.schema().drop_constants().deduplicate().num_entries())
6177 out <<
"with " << this->buffer_num_tuples_ <<
" tuples output buffer ";
6178 out << this->scan.schema() <<
print_info(this->scan) <<
" (cumulative cost " << cost() <<
')';
6181template<
bool Predicated>
6184 indent(out, level) <<
"wasm::" << (
Predicated ?
"Predicated" :
"Branching") <<
"Filter ";
6185 if (this->buffer_factory_
and this->filter.schema().drop_constants().deduplicate().num_entries())
6186 out <<
"with " << this->buffer_num_tuples_ <<
" tuples output buffer ";
6187 out << this->filter.schema() <<
print_info(this->filter) <<
" (cumulative cost " << cost() <<
')';
6188 this->child->print(out, level + 1);
6193 indent(out, level) <<
"wasm::LazyDisjunctiveFilter ";
6194 if (this->buffer_factory_
and this->filter.schema().drop_constants().deduplicate().num_entries())
6195 out <<
"with " << this->buffer_num_tuples_ <<
" tuples output buffer ";
6196 const cnf::Clause &clause = this->filter.filter()[0];
6197 for (
auto it = clause.cbegin(); it != clause.cend(); ++it) {
6198 if (it != clause.cbegin()) out <<
" → ";
6201 out <<
' ' << this->filter.schema() <<
print_info(this->filter) <<
" (cumulative cost " << cost() <<
')';
6202 this->child->print(out, level + 1);
6207 indent(out, level) <<
"wasm::Projection ";
6208 if (this->buffer_factory_
and this->projection.schema().drop_constants().deduplicate().num_entries())
6209 out <<
"with " << this->buffer_num_tuples_ <<
" tuples output buffer ";
6210 out << this->projection.schema() <<
print_info(this->projection) <<
" (cumulative cost " << cost() <<
')';
6212 this->child->get()->print(out, level + 1);
6217 indent(out, level) <<
"wasm::HashBasedGrouping " << this->grouping.schema() <<
print_info(this->grouping)
6218 <<
" (cumulative cost " << cost() <<
')';
6219 this->child->print(out, level + 1);
6224 indent(out, level) <<
"wasm::OrderedGrouping " << this->grouping.schema() <<
print_info(this->grouping)
6225 <<
" (cumulative cost " << cost() <<
')';
6226 this->child->print(out, level + 1);
6231 indent(out, level) <<
"wasm::Aggregation " << this->aggregation.schema() <<
print_info(this->aggregation)
6232 <<
" (cumulative cost " << cost() <<
')';
6233 this->child->print(out, level + 1);
6236template<
bool CmpPredicated>
6239 indent(out, level) <<
"wasm::" << (CmpPredicated ?
"Predicated" :
"") <<
"Quicksort " << this->sorting.schema()
6240 <<
print_info(this->sorting) <<
" (cumulative cost " << cost() <<
')';
6241 this->child->print(out, level + 1);
6246 indent(out, level) <<
"wasm::NoOpSorting" <<
print_info(this->sorting) <<
" (cumulative cost " << cost() <<
')';
6247 this->child->print(out, level + 1);
6250template<
bool Predicated>
6253 indent(out, level) <<
"wasm::" << (
Predicated ?
"Predicated" :
"") <<
"NestedLoopsJoin ";
6254 if (this->buffer_factory_
and this->join.schema().drop_constants().deduplicate().num_entries())
6255 out <<
"with " << this->buffer_num_tuples_ <<
" tuples output buffer ";
6256 out << this->join.schema() <<
print_info(this->join) <<
" (cumulative cost " << cost() <<
')';
6259 std::size_t i = this->children.size();
6262 indent(out, level) << i <<
". input";
6263 child.
print(out, level + 1);
6267template<
bool Unique,
bool Predicated>
6270 indent(out, level) <<
"wasm::" << (
Predicated ?
"Predicated" :
"") <<
"SimpleHashJoin";
6271 if (Unique) out <<
" on UNIQUE key ";
6272 if (this->buffer_factory_
and this->join.schema().drop_constants().deduplicate().num_entries())
6273 out <<
"with " << this->buffer_num_tuples_ <<
" tuples output buffer ";
6274 out << this->join.schema() <<
print_info(this->join) <<
" (cumulative cost " << cost() <<
')';
6279 indent(out, level) <<
"probe input";
6280 probe.
print(out, level + 1);
6281 indent(out, level) <<
"build input";
6282 build.
print(out, level + 1);
6285template<
bool SortLeft,
bool SortRight,
bool Predicated,
bool CmpPredicated>
6287 unsigned level)
const
6289 indent(out, level) <<
"wasm::" << (
Predicated ?
"Predicated" :
"") <<
"SortMergeJoin ";
6290 switch ((
unsigned(SortLeft) << 1) | unsigned(SortRight))
6292 case 0: out <<
"pre-sorted ";
break;
6293 case 1: out <<
"sorting right input " << (CmpPredicated ?
"predicated " :
"");
break;
6294 case 2: out <<
"sorting left input " << (CmpPredicated ?
"predicated " :
"");
break;
6295 case 3: out <<
"sorting both inputs " << (CmpPredicated ?
"predicated " :
"");
break;
6297 const bool needs_buffer_parent = not is<const ScanOperator>(this->parent) or SortLeft;
6298 const bool needs_buffer_child = not is<const ScanOperator>(this->child) or SortRight;
6299 if (needs_buffer_parent
and needs_buffer_child)
6300 out <<
"and materializing both inputs ";
6301 else if (needs_buffer_parent)
6302 out <<
"and materializing left input ";
6303 else if (needs_buffer_child)
6304 out <<
"and materializing right input ";
6305 out << this->join.schema() <<
print_info(this->join) <<
" (cumulative cost " << cost() <<
')';
6310 indent(out, level) <<
"right input";
6311 right.
print(out, level + 1);
6312 indent(out, level) <<
"left input";
6313 left.
print(out, level + 1);
6318 indent(out, level) <<
"wasm::Limit " << this->limit.schema() <<
print_info(this->limit)
6319 <<
" (cumulative cost " << cost() <<
')';
6320 this->child->print(out, level + 1);
6325 indent(out, level) <<
"wasm::HashBasedGroupJoin ";
6326 if (this->buffer_factory_
and this->grouping.schema().drop_constants().deduplicate().num_entries())
6327 out <<
"with " << this->buffer_num_tuples_ <<
" tuples output buffer ";
6328 out << this->grouping.schema() <<
print_info(this->grouping) <<
" (cumulative cost " << cost() <<
')';
6333 indent(out, level) <<
"probe input";
6334 probe.
print(out, level + 1);
6335 indent(out, level) <<
"build input";
6336 build.
print(out, level + 1);
6346template<
bool C,
bool PreOrder>
6350 template<
typename T>
using Const =
typename super::template Const<T>;
6351 using callback_t = std::conditional_t<C, ConstMatchBaseVisitor, MatchBaseVisitor>;
6354 callback_t &callback_;
6357 recursive_matchbase_visitor(callback_t &callback) : callback_(callback) { }
6359 using super::operator();
6360#define DECLARE(CLASS) \
6361 void operator()(Const<CLASS> &M) override { \
6362 if constexpr (PreOrder) try { callback_(M); } catch (visit_skip_subtree) { return; } \
6363 super::operator()(M); \
6364 if constexpr (not PreOrder) callback_(M); \
6375 recursive_matchbase_visitor<C,
true>{*
this}(M);
6381 recursive_matchbase_visitor<C,
false>{*
this}(M);
6389#define INSTANTIATE(CLASS) \
6390 template struct m::wasm::CLASS; \
6391 template struct m::Match<m::wasm::CLASS>;
__attribute__((constructor(202))) static void register_interpreter()
#define M_insist_no_ternary_logic()
#define FUNCTION(NAME, TYPE)
std::pair< const Constant &, bool > get_valid_bound(const ast::Expr &expr)
Given an Expr expr representing a valid bound, returns a pair consiting of a constant and a boolean f...
void index_scan_resolve_attribute_type(const Match< IndexScan< IndexMethod > > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
Resolves the attribute type and calls the appropriate codegen function.
void index_scan_resolve_strategy(const Index &index, const index_scan_bounds_t &bounds, const Match< IndexScan< IndexMethod > > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
Resolves the index scan strategy and calls the appropriate codegen function.
std::pair< std::vector< Schema::Identifier >, std::vector< Schema::Identifier > > decompose_equi_predicate(const cnf::CNF &cnf, const Schema &schema_left)
Decompose the equi-predicate cnf, i.e.
uint32_t compute_initial_ht_capacity(const Operator &op, double load_factor)
Computes the initial hash table capacity for op.
#define RESOLVE_INDEX_METHOD(ATTRTYPE, SQLTYPE)
void write_result_set(const Schema &schema, const DataLayoutFactory &factory, uint32_t window_size, const m::wasm::MatchBase &child)
Emits code to write the result set of the Schema schema using the DataLayout created by factory.
void index_scan_codegen_compilation(const Index &index, const index_scan_bounds_t &bounds, const Match< IndexScan< IndexMethod > > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
bool is_valid_bound(const ast::Expr &expr)
Returns true iff expr is a valid bound.
void index_scan_resolve_index_method(const index_scan_bounds_t &bounds, const Match< IndexScan< IndexMethod > > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
Resolves the index method and calls the appropriate codegen function.
#define INSTANTIATE(CLASS)
Ptr< void > get_base_address(const ThreadSafePooledString &table_name)
Returns a pointer to the beginning of table table_name in the WebAssembly linear memory.
U32x1 get_num_rows(const ThreadSafePooledString &table_name)
Returns the number of rows of table table_name.
std::pair< std::vector< aggregate_info_t >, std::unordered_map< Schema::Identifier, avg_aggregate_info_t > > compute_aggregate_info(const std::vector< std::reference_wrapper< const FnApplicationExpr > > &aggregates, const Schema &schema, std::size_t aggregates_offset=0)
Computes and returns information about the aggregates aggregates which are contained in the schema sc...
void index_scan_codegen_hybrid(const Index &index, const index_scan_bounds_t &bounds, const Match< IndexScan< IndexMethod > > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
void index_scan_codegen_interpretation(const Index &index, const index_scan_bounds_t &bounds, const Match< IndexScan< IndexMethod > > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
#define RESOLVE_KEYTYPE(INDEX)
index_scan_bounds_t extract_index_scan_bounds(const cnf::CNF &cnf)
Extracts the bounds for performing index scan from CNF cnf.
#define M_WASM_OPERATOR_LIST_TEMPLATED(X)
#define M_WASM_MATCH_LIST(X)
void add(const char *group_name, const char *short_name, const char *long_name, const char *description, Callback &&callback)
Adds a new group option to the ArgParser.
#define M_unreachable(MSG)
std::ostream & indent(std::ostream &out, unsigned indentation)
Start a new line with proper indentation.
SoftPipelineBreakerStrategy
option_configs::OrderingStrategy simple_hash_join_ordering_strategy
Which ordering strategy should be used for wasm::SimpleHashJoin.
option_configs::IndexScanStrategy index_scan_strategy
Which index scan strategy should be used for wasm::IndexScan.
option_configs::IndexScanMaterializationStrategy index_scan_materialization_strategy
Which materialization strategy should be used for wasm::IndexScan.
std::vector< std::pair< m::Schema::Identifier, bool > > sorted_attributes
Which attributes are assumed to be sorted.
bool double_pumping
Whether to use double pumping if SIMDfication is enabled.
std::size_t result_set_window_size
Which window size should be used for the result set.
std::size_t simd_lanes
Which number of SIMD lanes to prefer.
option_configs::IndexScanCompilationStrategy index_scan_compilation_strategy
Which compilation strategy should be used for wasm::IndexScan.
std::size_t get_num_simd_lanes(const DataLayout &layout, const Schema &layout_schema, const Schema &tuple_schema)
Returns the number of SIMD lanes used for accessing tuples of schema tuple_schema in SIMDfied manner ...
const Schema & layout_schema
_I32x1 strncmp(NChar left, NChar right, U32x1 len, bool reverse=false)
Compares two strings left and right.
bool can_be_null(const SQL_t &variant)
Bool< L > not_null(SQL_t &variant)
typename detail::_var_helper< T >::type _Var
Local variable that can always be NULL.
typename detail::var_helper< T >::type Var
Local variable.
auto make_signed()
Conversion of a PrimitiveExpr<T, L> to a PrimitiveExpr<std::make_signed_t<T>, L>.
void GOTO(const Block &block)
Jumps to the end of block.
void compile_load_point_access(const Schema &tuple_value_schema, const Schema &tuple_addr_schema, Ptr< void > base_address, const storage::DataLayout &layout, const Schema &layout_schema, U32x1 tuple_id)
Compiles the data layout layout starting at memory address base_address and containing tuples of sche...
and
Constructs a new PrimitiveExpr from a constant value.
typename detail::global_helper< T >::type Global
Global variable.
Bool< L > is_null(SQL_t &variant)
auto Select(C &&_cond, T &&_tru, U &&_fals)
std::tuple< Block, Block, Block > compile_load_sequential(const Schema &tuple_value_schema, const Schema &tuple_addr_schema, Ptr< void > base_address, const storage::DataLayout &layout, std::size_t num_simd_lanes, const Schema &layout_schema, Variable< uint32_t, Kind, false > &tuple_id)
Compiles the data layout layout containing tuples of schema layout_schema such that it sequentially l...
auto max(PrimitiveExpr< U, L > other) -> PrimitiveExpr< common_type_t< T, U >, L > std
Computes the maximum of this and other.
void discard()
Discards this.
PrimitiveExpr< uint64_t, L > L L L L U
for(std::size_t idx=1;idx< num_vectors;++idx) res.emplace((vectors_[idx].bitmask()<< uint32_t(idx *vector_type return * res
PrimitiveExpr< ResultType, ResultL > binary(::wasm::BinaryOp op, PrimitiveExpr< OperandType, OperandL > other)
Helper function to implement binary operations.
PrimitiveExpr clone() const
Creates and returns a deep copy of this.
and arithmetically_combinable< T, U, L > auto L auto L auto min(PrimitiveExpr< U, L > other) -> PrimitiveExpr< common_type_t< T, U >, L >
static constexpr std::size_t num_simd_lanes
the number of SIMD lanes of the represented expression, i.e. 1 for scalar and at least 2 for vectori...
std::function< void(void)> pipeline_t
bool streq(const char *first, const char *second)
bool strneq(const char *first, const char *second, std::size_t n)
bool M_EXPORT contains(const H &haystack, const N &needle)
Checks whether haystack contains needle.
and arithmetic< U > and same_signedness< T, U > U
bool M_EXPORT init(void)
Initializes the mu*t*able library.
auto visit(Callable &&callable, Base &obj, m::tag< Callable > &&=m::tag< Callable >())
Generic implementation to visit a class hierarchy, with similar syntax as std::visit.
void register_wasm_operators(PhysicalOptimizer &phys_opt)
Registers physical Wasm operators in phys_opt depending on the set CLI options.
helper struct for aggregates
Schema::entry_type entry
aggregate entry consisting of identifier, type, and constraints
const std::vector< std::unique_ptr< ast::Expr > > & args
aggregate arguments
m::Function::fnid_t fnid
aggregate function
helper struct for AVG aggregates
Schema::Identifier running_count
identifier of running count
Schema::Identifier sum
potential identifier for sum (only set if AVG is computed once at the end)
bool compute_running_avg
flag whether running AVG must be computed instead of one computation at the end
helper struct holding the bounds for index scan
bool is_inclusive_hi
flag to indicate if bounds are inclusive
Schema::entry_type attribute
Attribute for which bounds should hold.
std::optional< std::reference_wrapper< const ast::Expr > > hi
lo and hi bounds
std::optional< std::reference_wrapper< const ast::Expr > > lo
A block of size N contains N tuples.
The catalog contains all Databases and keeps track of all meta information of the database system.
Database & get_database_in_use()
Returns a reference to the Database that is currently in use, if any.
ThreadSafePooledString pool(const char *str) const
Creates an internalized copy of the string str by adding it to the internal StringPool.
static Catalog & Get()
Return a reference to the single Catalog instance.
m::ArgParser & arg_parser()
The type of character strings, both fixed length and varying length.
auto find(const Schema::Identifier &id) const
void merge(ConditionPropertyMap &other)
void add(Schema::Identifier id, Property P)
static ConditionSet Make_Unsatisfiable()
void add_condition(Cond &&cond)
void project_and_rename(const std::vector< std::pair< Schema::Identifier, Schema::Identifier > > &old2new)
void add_or_replace_condition(Cond &&cond)
ThreadSafePooledString name
the name of the database
virtual void execute(setup_t setup, pipeline_t pipeline, teardown_t teardown) const =0
Executes this physical operator match.
virtual void print(std::ostream &out, unsigned level=0) const =0
The numeric type represents integer and floating-point types of different precision and scale.
An Operator represents an operation in a query plan.
OperatorInformation & info()
The physical optimizer interface.
void register_operator()
Registers a new physical operator which then may be used to find a covering.
A data type representing a pooled (or internalized) object.
An Identifier is composed of a name and an optional prefix.
ThreadSafePooledString name
the name of this Identifier
static entry_type CreateArtificial()
@ NOT_NULLABLE
entry must not be NULL
A Schema represents a sequence of identifiers, optionally with a prefix, and their associated types.
std::size_t num_entries() const
Returns the number of entries in this Schema.
Schema deduplicate() const
Returns a deduplicated version of this Schema, i.e.
iterator find(const Identifier &id)
Returns an iterator to the entry with the given Identifier id, or end() if no such entry exists.
void add(entry_type e)
Adds the entry e to this Schema.
Schema drop_constants() const
Returns a copy of this Schema where all constant entries are removed.
bool has(const Identifier &id) const
Returns true iff this Schema contains an entry with Identifier id.
This class represents types in the SQL type system.
static Pooled< Numeric > Get_Double(category_t category)
Returns a Numeric type of given category for 64 bit floating-points.
static Pooled< Numeric > Get_Integer(category_t category, unsigned num_bytes)
Returns a Numeric type for integrals of given category and num_bytes bytes.
static WasmContext & Get_Wasm_Context_By_ID(unsigned id)
Returns a reference to the WasmContext with ID id.
A constant: a string literal or a numeric constant.
const Type * type() const
Returns the Type of this Expr.
virtual bool can_be_null() const =0
Returns true iff this Expr is nullable, i.e.
A query expression for nested queries.
static Token CreateArtificial(TokenType type=TK_EOF)
A CNF represents a conjunction of cnf::Clauses.
Schema get_required() const
Returns a Schema instance containing all required definitions (of Attributes and other Designators).
A cnf::Clause represents a disjunction of Predicates.
A Predicate contains a Expr of Boolean type in either positive or negative form.
A simple index based on a sorted array that maps keys to their tuple_id.
A recursive model index with two layers consiting only of linear monels that maps keys to their tuple...
Signals that an argument to a function of method was invalid.
static setup_t Make_Without_Parent(base_t &&callback=base_t())
This is an interface for factories that compute particular DataLayouts for a given sequence of Types,...
virtual std::unique_ptr< DataLayoutFactory > clone() const =0
Creates and returns a deep copy of this.
static teardown_t Make_Without_Parent(base_t &&callback=base_t())
Exception class which can be thrown to skip recursion of the subtree in pre-order visitors.
Exception class which can be thrown to stop entire recursion in visitors.
static ConditionSet post_condition(const Match< Aggregation > &M)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const AggregationOperator * > &partial_inner_nodes)
static void execute(const Match< Aggregation > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
std::variant< var_t_< IsGlobal, I64< L > >, std::pair< var_t_< IsGlobal, I8< L > >, var_t_< IsGlobal, Bool< L > > >, std::pair< var_t_< IsGlobal, I16< L > >, var_t_< IsGlobal, Bool< L > > >, std::pair< var_t_< IsGlobal, I32< L > >, var_t_< IsGlobal, Bool< L > > >, std::pair< var_t_< IsGlobal, I64< L > >, var_t_< IsGlobal, Bool< L > > >, std::pair< var_t_< IsGlobal, Float< L > >, var_t_< IsGlobal, Bool< L > > >, std::pair< var_t_< IsGlobal, Double< L > >, var_t_< IsGlobal, Bool< L > > > > agg_t_
Represents a code block, i.e.
void attach_to_current()
Attaches this Block to the wasm::Block currently active in the Module.
Buffers tuples by materializing them into memory.
void resume_pipeline(param_t tuple_value_schema=param_t(), param_t tuple_addr_schema=param_t()) const
Emits code into a separate function to resume the pipeline for each value tuple of schema tuple_value...
Ptr< void > base_address() const
Returns the base address of the buffer.
void consume()
Emits code to store the current tuple into the buffer.
void setup()
Performs the setup of all local variables of this buffer (by reading them from the global backups iff...
void teardown()
Performs the teardown of all local variables of this buffer (by storing them into the global backups ...
U32x1 size() const
Returns the current size of the buffer.
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const CallbackOperator * > &partial_inner_nodes)
static void execute(const Match< Callback > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
uint32_t get_literal_raw_address(const char *literal) const
Returns the raw address at which literal is stored.
void inc_num_tuples(U32x1 n=U32x1(1))
Increments the number of result tuples produced by n.
U32x1 num_tuples() const
Returns the number of result tuples produced.
std::size_t num_simd_lanes() const
Returns the number of SIMD lanes used.
NChar get_literal_address(const char *literal) const
Returns the address at which literal is stored.
std::size_t num_simd_lanes_preferred() const
Returns the number of SIMD lanes preferred by other operators.
Environment & env()
Returns the current Environment.
void set_num_simd_lanes(std::size_t n)
Sets the number of SIMD lanes used to n.
void update_num_simd_lanes_preferred(std::size_t n)
Updates the number of SIMD lanes preferred by n.
void set_num_tuples(U32x1 n)
Set the number of result tuples produced to n.
static CodeGenContext & Get()
Scope scoped_environment()
Creates a new, scoped Environment.
Binds Schema::Identifiers to Expr<T>s.
auto compile(T &&t) const
Compile t by delegating compilation to an ExprCompiler for this Environment.
void add_predicate(SQL_boolean_t &&pred)
Adds the predicate pred to the predication predicate.
void add(Schema::Identifier id, T &&expr)
Adds a mapping from id to expr.
SQL_t get(const Schema::Identifier &id) const
Returns the copied entry for identifier id.
SQL_boolean_t extract_predicate()
Returns the moved current predication predicate.
bool has(const Schema::Identifier &id) const
Returns true iff this Environment contains id.
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const FilterOperator * > &partial_inner_nodes)
static ConditionSet adapt_post_condition(const Match< Filter > &M, const ConditionSet &post_cond_child)
static double cost(const Match< Filter > &)
static void execute(const Match< Filter > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
A handle to create a Function and to create invocations of that function.
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const GroupingOperator *, const JoinOperator *, const Wildcard *, const Wildcard * > &partial_inner_nodes)
static double cost(const Match< HashBasedGroupJoin > &)
static ConditionSet post_condition(const Match< HashBasedGroupJoin > &M)
static void execute(const Match< HashBasedGroupJoin > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static double cost(const Match< HashBasedGrouping > &)
static ConditionSet post_condition(const Match< HashBasedGrouping > &M)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const GroupingOperator * > &partial_inner_nodes)
static void execute(const Match< HashBasedGrouping > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
Helper struct as proxy to access a hash table entry.
value_t get(const Schema::Identifier &id) const
Returns the copied entry for identifier id.
value_t extract(const Schema::Identifier &id)
Returns the moved entry for identifier id.
Helper struct as proxy to access a single value (inclusive NULL bit) of a hash table entry.
static double cost(const Match< IndexScan > &M)
static void execute(const Match< IndexScan > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const FilterOperator *, const ScanOperator * > &partial_inner_nodes)
static ConditionSet post_condition(const Match< IndexScan > &M)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const FilterOperator * > &partial_inner_nodes)
static double cost(const Match< LazyDisjunctiveFilter > &M)
static void execute(const Match< LazyDisjunctiveFilter > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static void execute(const Match< Limit > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const LimitOperator * > &partial_inner_nodes)
Linear probing strategy, i.e.
An abstract MatchBase for the WasmV8 backend.
PrimitiveExpr< T, L > get_global(const char *name)
static unsigned ID()
Returns the ID of the current module.
void emit_call(const char *fn, PrimitiveExpr< ParamTypes, ParamLs >... args)
static void execute(const Match< NestedLoopsJoin > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const JoinOperator * > &partial_inner_nodes)
static double cost(const Match< NestedLoopsJoin > &M)
static ConditionSet adapt_post_conditions(const Match< NestedLoopsJoin > &M, std::vector< std::reference_wrapper< const ConditionSet > > &&post_cond_children)
static void execute(const Match< NoOpSorting > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const SortingOperator * > &partial_inner_nodes)
static void execute(const Match< NoOp > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
std::variant< var_t_< IsGlobal, I64x1 >, std::pair< var_t_< IsGlobal, I8x1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, I16x1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, I32x1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, I64x1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, Floatx1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, Doublex1 >, std::optional< var_t_< IsGlobal, Boolx1 > > > > agg_t_
static ConditionSet adapt_post_condition(const Match< OrderedGrouping > &M, const ConditionSet &post_cond_child)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const GroupingOperator * > &partial_inner_nodes)
std::variant< var_t_< IsGlobal, Ptr< Charx1 > >, std::pair< var_t_< IsGlobal, Boolx1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, I8x1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, I16x1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, I32x1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, I64x1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, Floatx1 >, std::optional< var_t_< IsGlobal, Boolx1 > > >, std::pair< var_t_< IsGlobal, Doublex1 >, std::optional< var_t_< IsGlobal, Boolx1 > > > > key_t_
static double cost(const Match< OrderedGrouping > &)
static void execute(const Match< OrderedGrouping > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const PrintOperator * > &partial_inner_nodes)
static void execute(const Match< Print > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const ProjectionOperator * > &partial_inner_nodes)
static void execute(const Match< Projection > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet adapt_post_condition(const Match< Projection > &M, const ConditionSet &post_cond_child)
Quadratic probing strategy, i.e.
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const SortingOperator * > &partial_inner_nodes)
static void execute(const Match< Quicksort > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet post_condition(const Match< Quicksort > &M)
static ConditionSet post_condition(const Match< Scan > &M)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const ScanOperator * > &partial_inner_nodes)
static void execute(const Match< Scan > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static double cost(const Match< SimpleHashJoin > &M)
static ConditionSet adapt_post_conditions(const Match< SimpleHashJoin > &M, std::vector< std::reference_wrapper< const ConditionSet > > &&post_cond_children)
static void execute(const Match< SimpleHashJoin > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const JoinOperator *, const Wildcard *, const Wildcard * > &partial_inner_nodes)
static ConditionSet pre_condition(std::size_t child_idx, const std::tuple< const JoinOperator *, const Wildcard *, const Wildcard * > &partial_inner_nodes)
static double cost(const Match< SortMergeJoin > &M)
static ConditionSet adapt_post_conditions(const Match< SortMergeJoin > &M, std::vector< std::reference_wrapper< const ConditionSet > > &&post_cond_children)
static void execute(const Match< SortMergeJoin > &M, setup_t setup, pipeline_t pipeline, teardown_t teardown)
void operator()(Const< MatchBase > &)
typename super::template Const< T > Const
typename super::template Const< T > Const
void operator()(Const< MatchBase > &)
A generic base class for implementing recursive wasm::MatchBase visitors.
Helper type to deduce the Expr<U> type given a.
friend std::ostream & operator<<(std::ostream &out, const print_info &info)