mutable
A Database System for Research and Fast Prototyping
Loading...
Searching...
No Matches
Interpreter.cpp
Go to the documentation of this file.
2
4#include <algorithm>
5#include <cerrno>
6#include <cstdlib>
7#include <iterator>
9#include <mutable/Options.hpp>
10#include <mutable/parse/AST.hpp>
11#include <mutable/util/fn.hpp>
12#include <numeric>
13#include <type_traits>
14
15
16using namespace m;
17using namespace m::storage;
18
19
20/*======================================================================================================================
21 * Helper function
22 *====================================================================================================================*/
23
34template<bool IsStore>
35static StackMachine compile_data_layout(const Schema &tuple_schema, void *address, const DataLayout &layout,
36 const Schema &layout_schema, std::size_t row_id, std::size_t tuple_id)
37{
38 StackMachine SM; // the `StackMachine` to compile
39
40 struct stride_info_t
41 {
42 std::size_t counter_id;
43 uint64_t num_tuples;
44 uint64_t stride_in_bits;
45 };
46 std::vector<stride_info_t> stride_info_stack; // used to track all strides from root to leaf
47
48 struct {
49 std::size_t id = -1UL;
50 std::size_t offset_id = -1UL;
51 uintptr_t bit_offset;
52 uint64_t bit_stride;
53 uint64_t num_tuples;
54 std::size_t row_id;
55
56 operator bool() { return id != -1UL; }
57 bool adjustable_offset() { return offset_id != -1UL; }
58 } null_bitmap_info;
59
60 std::unordered_map<std::size_t, std::size_t> leaf2id;
61 std::unordered_map<std::size_t, std::size_t> leaf2mask;
62
63 /*----- Check whether any of the entries in `tuple_schema` can be NULL, so that we need the NULL bitmap. -----*/
64 const bool needs_null_bitmap = [&]() {
65 for (auto &tuple_entry : tuple_schema) {
66 if (layout_schema[tuple_entry.id].second.nullable())
67 return true; // found an entry in `tuple_schema` that can be NULL according to `layout_schema`
68 }
69 return false; // no attribute in `tuple_schema` can be NULL according to `layout_schema`
70 }();
71
72 /* Compute location of NULL bitmap. */
73 const auto null_bitmap_idx = layout_schema.num_entries();
74 auto find_null_bitmap = [&](const DataLayout &layout, std::size_t row_id) -> void {
75 auto find_null_bitmap_impl = [&](const DataLayout::INode &node, uintptr_t offset, std::size_t row_id,
76 auto &find_null_bitmap_ref) -> void
77 {
78 for (auto &child : node) {
79 if (auto child_leaf = cast<const DataLayout::Leaf>(child.ptr.get())) {
80 if (child_leaf->index() == null_bitmap_idx) {
81 M_insist(not null_bitmap_info, "there must be at most one null bitmap in the linearization");
82 const uint64_t additional_offset_in_bits = child.offset_in_bits + row_id * child.stride_in_bits;
83 /* add NULL bitmap address to context */
84 null_bitmap_info.id = SM.add(reinterpret_cast<void*>(offset + additional_offset_in_bits / 8));
85 null_bitmap_info.bit_offset = (additional_offset_in_bits) % 8;
86 null_bitmap_info.bit_stride = child.stride_in_bits;
87 null_bitmap_info.num_tuples = node.num_tuples();
88 null_bitmap_info.row_id = row_id;
89 }
90 } else {
91 auto child_inode = as<const DataLayout::INode>(child.ptr.get());
92 const std::size_t lin_id = row_id / child_inode->num_tuples();
93 const std::size_t inner_row_id = row_id % child_inode->num_tuples();
94 const uint64_t additional_offset = child.offset_in_bits / 8 + lin_id * child.stride_in_bits / 8;
95 find_null_bitmap_ref(*child_inode, offset + additional_offset, inner_row_id, find_null_bitmap_ref);
96 }
97 }
98 };
99 find_null_bitmap_impl(static_cast<const DataLayout::INode&>(layout), uintptr_t(address), row_id,
100 find_null_bitmap_impl);
101 };
102 if (needs_null_bitmap)
103 find_null_bitmap(layout, row_id);
104 if (null_bitmap_info and null_bitmap_info.bit_stride) {
105 null_bitmap_info.offset_id = SM.add(null_bitmap_info.bit_offset); // add initial NULL bitmap offset to context
106 }
107
108 /* Emit code for attribute access and pointer increment. */
109 auto compile_accesses = [&](const DataLayout &layout, std::size_t row_id) -> void {
110 auto compile_accesses_impl = [&](const DataLayout::INode &node, uintptr_t offset, std::size_t row_id,
111 auto &compile_accesses_ref) -> void
112 {
113 for (auto &child : node) {
114 if (auto child_leaf = cast<const DataLayout::Leaf>(child.ptr.get())) {
115 if (child_leaf->index() != null_bitmap_idx) {
116 const bool attr_can_be_null = null_bitmap_info and layout_schema[child_leaf->index()].nullable();
117 auto &id = layout_schema[child_leaf->index()].id;
118
119 /* Locate the attribute in the operator schema. */
120 if (auto it = tuple_schema.find(id); it != tuple_schema.end()) {
121 uint64_t idx = std::distance(tuple_schema.begin(), it); // get attribute index in schema
122 const uint64_t additional_offset_in_bits = child.offset_in_bits + row_id * child.stride_in_bits;
123 const std::size_t byte_offset = additional_offset_in_bits / 8;
124 const std::size_t bit_offset = additional_offset_in_bits % 8;
125 M_insist(not bit_offset or child_leaf->type()->is_boolean() or child_leaf->type()->is_bitmap(),
126 "only booleans and bitmaps may not be byte aligned");
127
128 const std::size_t byte_stride = child.stride_in_bits / 8;
129 const std::size_t bit_stride = child.stride_in_bits % 8;
130 M_insist(not bit_stride or child_leaf->type()->is_boolean() or child_leaf->type()->is_bitmap(),
131 "only booleans and bitmaps may not be byte aligned");
132 M_insist(bit_stride == 0 or byte_stride == 0,
133 "the stride must be a whole multiple of a byte or less than a byte");
134
135 /* Access NULL bit. */
136 if (attr_can_be_null) {
137 if (not null_bitmap_info.bit_stride) {
138 /* No bit stride means the NULL bitmap only advances with parent sequence. */
139 const std::size_t bit_offset = null_bitmap_info.bit_offset + child_leaf->index();
140 if (bit_offset < 8) {
141 SM.emit_Ld_Ctx(null_bitmap_info.id);
142 if constexpr (IsStore) {
143 SM.emit_Ld_Tup(tuple_id, idx);
144 SM.emit_Is_Null();
145 SM.emit_St_b(bit_offset);
146 } else {
147 SM.emit_Ld_b(0x1UL << bit_offset);
148 }
149 } else {
150 /* Advance to respective byte. */
151 SM.add_and_emit_load(uint64_t(bit_offset / 8));
152 SM.emit_Ld_Ctx(null_bitmap_info.id);
153 SM.emit_Add_p();
154 if constexpr (IsStore) {
155 SM.emit_Ld_Tup(tuple_id, idx);
156 SM.emit_Is_Null();
157 SM.emit_St_b(bit_offset % 8);
158 } else {
159 SM.emit_Ld_b(0x1UL << (bit_offset % 8));
160 }
161 }
162 } else {
163 /* With bit stride. Use adjustable offset instead of fixed offset. */
164 M_insist(null_bitmap_info.adjustable_offset());
165
166 /* Create variables for address and mask in context. Only used for storing.*/
167 std::size_t address_id, mask_id;
168 if constexpr (IsStore) {
169 address_id = SM.add(reinterpret_cast<void*>(0));
170 mask_id = SM.add(uint64_t(0));
171 }
172
173 /* Compute address of entire byte containing the NULL bit. */
174 SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
175 SM.add_and_emit_load(child_leaf->index());
176 SM.emit_Add_i();
177 SM.emit_SARi_i(3); // (adj_offset + attr.id) / 8
178 SM.emit_Ld_Ctx(null_bitmap_info.id);
179 SM.emit_Add_p();
180 if constexpr (IsStore)
181 SM.emit_Upd_Ctx(address_id); // store address in context
182 else
183 SM.emit_Ld_i8(); // load byte from address
184
185 if constexpr (IsStore) {
186 /* Test whether value equals NULL. */
187 SM.emit_Ld_Tup(tuple_id, idx);
188 SM.emit_Is_Null();
189 }
190
191 /* Initialize mask. */
192 SM.add_and_emit_load(uint64_t(0x1UL));
193
194 /* Compute offset of NULL bit. */
195 SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
196 SM.add_and_emit_load(child_leaf->index());
197 SM.emit_Add_i();
198 SM.add_and_emit_load(uint64_t(0b111));
199 SM.emit_And_i(); // (adj_offset + attr.id) % 8
200
201 /* Shift mask by offset. */
202 SM.emit_ShL_i();
203 if constexpr (IsStore) {
204 SM.emit_Upd_Ctx(mask_id); // store mask in context
205
206 /* Load byte and set NULL bit to 1. */
207 SM.emit_Ld_Ctx(address_id);
208 SM.emit_Ld_i8();
209 SM.emit_Or_i(); // in case of NULL
210
211 /* Load byte and set NULL bit to 0. */
212 SM.emit_Ld_Ctx(mask_id);
213 SM.emit_Neg_i();
214 SM.emit_Ld_Ctx(address_id);
215 SM.emit_Ld_i8();
216 SM.emit_And_i(); // in case of not NULL
217
218 SM.emit_Sel(); // select the respective modified byte
219
220 /* Write entire byte back to the store. */
221 SM.emit_St_i8();
222 } else {
223 /* Apply mask and cast to boolean. */
224 SM.emit_And_i();
225 SM.emit_NEZ_i();
226 }
227 }
228 if constexpr (not IsStore)
229 SM.emit_Push_Null(); // to select it later if NULL
230 }
231
232 /* Introduce leaf pointer. */
233 const std::size_t offset_id = SM.add_and_emit_load(reinterpret_cast<void*>(offset + byte_offset));
234 leaf2id[child_leaf->index()] = offset_id;
235
236 if (bit_stride) {
237 M_insist(child_leaf->type()->is_boolean(), "only booleans are supported yet");
238
239 if constexpr (IsStore) {
240 /* Load value to stack. */
241 SM.emit_Ld_Tup(tuple_id, idx); // boolean
242 } else {
243 /* Load byte with the respective value. */
244 SM.emit_Ld_i8();
245 }
246
247 /* Introduce mask. */
248 const std::size_t mask_id = SM.add(uint64_t(0x1UL << bit_offset));
249 leaf2mask[child_leaf->index()] = mask_id;
250
251 if constexpr (IsStore) {
252 /* Load byte and set bit to 1. */
253 SM.emit_Ld_Ctx(offset_id);
254 SM.emit_Ld_i8();
255 SM.emit_Ld_Ctx(mask_id);
256 SM.emit_Or_i(); // in case of TRUE
257
258 /* Load byte and set bit to 0. */
259 SM.emit_Ld_Ctx(offset_id);
260 SM.emit_Ld_i8();
261 SM.emit_Ld_Ctx(mask_id);
262 SM.emit_Neg_i(); // negate mask
263 SM.emit_And_i(); // in case of FALSE
264
265 SM.emit_Sel(); // select the respective modified byte
266
267 /* Write entire byte back to the store. */
268 SM.emit_St_i8();
269 } else {
270 /* Load and apply mask and convert to bool. */
271 SM.emit_Ld_Ctx(mask_id);
272 SM.emit_And_i();
273 SM.emit_NEZ_i();
274
275 if (attr_can_be_null)
276 SM.emit_Sel();
277
278 /* Store value in output tuple. */
279 SM.emit_St_Tup(tuple_id, idx, child_leaf->type());
280 SM.emit_Pop();
281 }
282
283 /* Update the mask. */
284 SM.emit_Ld_Ctx(mask_id);
285 SM.emit_ShLi_i(1);
286 SM.emit_Upd_Ctx(mask_id);
287
288 /* Check whether we are in the 8th iteration and reset mask. */
289 SM.add_and_emit_load(uint64_t(0x1UL) << 8);
290 SM.emit_Eq_i();
291 SM.emit_Dup(); // duplicate outcome for later use
292 SM.add_and_emit_load(uint64_t(0x1UL));
293 SM.emit_Ld_Ctx(mask_id);
294 SM.emit_Sel();
295 SM.emit_Upd_Ctx(mask_id); // mask <- mask == 256 ? 1 : mask
296 SM.emit_Pop();
297
298 /* If the mask was reset, advance to the next byte. */
299 SM.emit_Cast_i_b(); // convert outcome of previous check to int
300 SM.emit_Ld_Ctx(offset_id);
301 SM.emit_Add_p();
302 SM.emit_Upd_Ctx(offset_id);
303 SM.emit_Pop();
304 } else {
305 if constexpr (IsStore) {
306 /* Load value to stack. */
307 SM.emit_Ld_Tup(tuple_id, idx);
308
309 /* Store value. */
310 if (child_leaf->type()->is_boolean())
311 SM.emit_St_b(bit_offset);
312 else
313 SM.emit_St(child_leaf->type());
314 } else {
315 /* Load value. */
316 if (child_leaf->type()->is_boolean())
317 SM.emit_Ld_b(0x1UL << bit_offset); // convert the fixed bit offset to a fixed mask
318 else
319 SM.emit_Ld(child_leaf->type());
320
321 if (attr_can_be_null)
322 SM.emit_Sel();
323
324 /* Store value in output tuple. */
325 SM.emit_St_Tup(tuple_id, idx, child_leaf->type());
326 SM.emit_Pop();
327 }
328
329 /* If the attribute has a stride, advance the pointer accordingly. */
330 M_insist(not bit_stride);
331 if (byte_stride) {
332 /* Advance the attribute pointer by the attribute's stride. */
333 SM.add_and_emit_load(int64_t(byte_stride));
334 SM.emit_Ld_Ctx(offset_id);
335 SM.emit_Add_p();
336 SM.emit_Upd_Ctx(offset_id);
337 SM.emit_Pop();
338 }
339 }
340
341 }
342 }
343 } else {
344 auto child_inode = as<const DataLayout::INode>(child.ptr.get());
345 const std::size_t lin_id = row_id / child_inode->num_tuples();
346 const std::size_t inner_row_id = row_id % child_inode->num_tuples();
347 const uint64_t additional_offset = child.offset_in_bits / 8 + lin_id * child.stride_in_bits / 8;
348 compile_accesses_ref(*child_inode, offset + additional_offset, inner_row_id, compile_accesses_ref);
349 }
350 }
351 };
352 compile_accesses_impl(static_cast<const DataLayout::INode&>(layout), uintptr_t(address), row_id,
353 compile_accesses_impl);
354 };
355 compile_accesses(layout, row_id);
356
357 /* If the NULL bitmap has a stride, advance the adjustable offset accordingly. */
358 if (null_bitmap_info and null_bitmap_info.bit_stride) {
359 M_insist(null_bitmap_info.adjustable_offset());
360 M_insist(null_bitmap_info.num_tuples > 1);
361
362 /* Update adjustable offset. */
363 SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
364 SM.add_and_emit_load(null_bitmap_info.bit_stride);
365 SM.emit_Add_i();
366 SM.emit_Upd_Ctx(null_bitmap_info.offset_id);
367 SM.emit_Pop();
368
369 /* Check whether we are in the last iteration and advance to correct byte. */
370 const auto counter_id = SM.add_and_emit_load(uint64_t(null_bitmap_info.row_id));
371 SM.emit_Inc();
372 SM.emit_Upd_Ctx(counter_id);
373 SM.add_and_emit_load(null_bitmap_info.num_tuples);
374 SM.emit_NE_i();
375 SM.emit_Dup(); SM.emit_Dup(); // triple outcome for later use
376 SM.emit_Not_b(); // negate outcome of check
377 SM.emit_Cast_i_b(); // convert to int
378 SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
379 SM.emit_SARi_i(3); // corresponds div 8
380 SM.emit_Mul_i();
381 SM.emit_Ld_Ctx(null_bitmap_info.id);
382 SM.emit_Add_p();
383 SM.emit_Upd_Ctx(null_bitmap_info.id); // id <- counter != num_tuples ? id : id + adj_offset / 8
384 SM.emit_Pop();
385
386 /* If we were in the last iteration, reset adjustable offset. */
387 SM.emit_Cast_i_b(); // convert outcome of previous check to int
388 SM.emit_Ld_Ctx(null_bitmap_info.offset_id);
389 SM.emit_Mul_i();
390 SM.emit_Upd_Ctx(null_bitmap_info.offset_id);
391 SM.emit_Pop();
392
393 /* If we were in the last iteration, reset the counter. */
394 SM.emit_Cast_i_b(); // convert outcome of previous check to int
395 SM.emit_Ld_Ctx(counter_id);
396 SM.emit_Mul_i();
397 SM.emit_Upd_Ctx(counter_id);
398 SM.emit_Pop();
399 }
400
401 /* Emit code to gap strides. */
402 auto compile_strides = [&](const DataLayout &layout, std::size_t row_id) -> void {
403 auto compile_strides_impl = [&](const DataLayout::INode &node, std::size_t row_id,
404 auto &compile_strides_ref) -> void {
405 for (auto &child : node) {
406 if (auto child_leaf = cast<const DataLayout::Leaf>(child.ptr.get())) {
407 std::size_t offset_id;
408 std::size_t mask_id = -1UL;
409 if (null_bitmap_info and child_leaf->index() == null_bitmap_idx) {
410 offset_id = null_bitmap_info.id;
411 mask_id = null_bitmap_info.offset_id;
412 } else if (auto it = leaf2id.find(child_leaf->index()); it != leaf2id.end()) {
413 offset_id = it->second;
414 if (auto it = leaf2mask.find(child_leaf->index()); it != leaf2mask.end())
415 mask_id = it->second;
416 } else {
417 continue; // nothing to be done
418 }
419
420 /* Emit code for stride jumps. */
421 std::size_t prev_num_tuples = 1;
422 std::size_t prev_stride_in_bits = child.stride_in_bits;
423 for (auto it = stride_info_stack.rbegin(), end = stride_info_stack.rend(); it != end; ++it) {
424 auto &info = *it;
425
426 /* Compute the remaining stride in bits. */
427 const std::size_t stride_remaining_in_bits =
428 info.stride_in_bits - (info.num_tuples / prev_num_tuples) * prev_stride_in_bits;
429
430 /* Perform stride jump, if necessary. */
431 if (stride_remaining_in_bits) {
432 std::size_t byte_stride = stride_remaining_in_bits / 8;
433 const std::size_t bit_stride = stride_remaining_in_bits % 8;
434
435 if (bit_stride) {
436 M_insist(child_leaf->index() == null_bitmap_idx or child_leaf->type()->is_boolean(),
437 "only the null bitmap or booleans may cause not byte aligned stride jumps, "
438 "bitmaps are not supported yet");
439 M_insist(child_leaf->index() != null_bitmap_idx or null_bitmap_info.adjustable_offset(),
440 "only null bitmaps with adjustable offset may cause not byte aligned stride jumps");
441 M_insist(mask_id != -1UL);
442
443 /* Reset mask. */
444 if (child_leaf->index() == null_bitmap_idx) {
445 /* Reset adjustable bit offset to 0. */
446 if (info.num_tuples != 1) {
447 /* Check whether counter equals num_tuples. */
448 SM.emit_Ld_Ctx(info.counter_id);
449 SM.add_and_emit_load(int64_t(info.num_tuples));
450 SM.emit_NE_i();
451 SM.emit_Cast_i_b();
452 } else {
453 SM.add_and_emit_load(uint64_t(0));
454 }
455 SM.emit_Ld_Ctx(mask_id);
456 SM.emit_Mul_i();
457 SM.emit_Upd_Ctx(mask_id);
458 SM.emit_Pop();
459 } else {
460 /* Reset mask to 0x1UL to access first bit again. */
461 if (info.num_tuples != 1) {
462 /* Check whether counter equals num_tuples. */
463 SM.emit_Ld_Ctx(info.counter_id);
464 SM.add_and_emit_load(int64_t(info.num_tuples));
465 SM.emit_Eq_i();
466 SM.add_and_emit_load(uint64_t(0x1UL));
467 SM.emit_Ld_Ctx(mask_id);
468 SM.emit_Sel();
469 } else {
470 SM.add_and_emit_load(uint64_t(0x1UL));
471 }
472 SM.emit_Upd_Ctx(mask_id);
473 SM.emit_Pop();
474 }
475
476 /* Ceil to next entire byte. */
477 ++byte_stride;
478 }
479
480 /* Advance pointer. */
481 if (info.num_tuples != 1) {
482 /* Check whether counter equals num_tuples. */
483 SM.emit_Ld_Ctx(info.counter_id);
484 SM.add_and_emit_load(int64_t(info.num_tuples));
485 SM.emit_Eq_i();
486 SM.emit_Cast_i_b();
487
488 SM.add_and_emit_load(byte_stride);
489 SM.emit_Mul_i();
490 } else {
491 SM.add_and_emit_load(byte_stride);
492 }
493 SM.emit_Ld_Ctx(offset_id);
494 SM.emit_Add_p();
495 SM.emit_Upd_Ctx(offset_id);
496 SM.emit_Pop();
497 }
498
499 /* Update variables for next iteration. */
500 prev_num_tuples = info.num_tuples;
501 prev_stride_in_bits = info.stride_in_bits;
502 }
503 } else {
504 auto child_inode = as<const DataLayout::INode>(child.ptr.get());
505
506 /* Initialize counter and emit increment. */
507 const std::size_t inner_row_id = row_id % child_inode->num_tuples();
508 const auto counter_id = SM.add_and_emit_load(inner_row_id); // introduce counter to track iteration count
509 SM.emit_Inc();
510 SM.emit_Upd_Ctx(counter_id);
511 SM.emit_Pop(); // XXX: not needed if recursion cleans up stack properly
512
513 /* Put context on stack and perform recursive descend. */
514 stride_info_stack.push_back(stride_info_t{
515 .counter_id = counter_id,
516 .num_tuples = child_inode->num_tuples(),
517 .stride_in_bits = child.stride_in_bits
518 });
519 compile_strides_ref(*child_inode, inner_row_id, compile_strides_ref);
520 stride_info_stack.pop_back();
521
522 /* Reset counter if iteration is whole multiple of num_tuples. */
523 if (child_inode->num_tuples() != 1) {
524 SM.emit_Ld_Ctx(counter_id); // XXX: not needed if recursion cleans up stack properly
525 SM.add_and_emit_load(child_inode->num_tuples());
526 SM.emit_NE_i();
527 SM.emit_Cast_i_b();
528 SM.emit_Ld_Ctx(counter_id);
529 SM.emit_Mul_i();
530 } else {
531 SM.add_and_emit_load(int64_t(0));
532 }
533 SM.emit_Upd_Ctx(counter_id);
534 }
535 }
536 };
537 compile_strides_impl(static_cast<const DataLayout::INode&>(layout), row_id, compile_strides_impl);
538 };
539 compile_strides(layout, row_id);
540
541 return SM;
542}
543
545 const Schema &layout_schema, std::size_t row_id, std::size_t tuple_id)
546{
547 return compile_data_layout<false>(tuple_schema, address, layout, layout_schema, row_id, tuple_id);
548}
549
551 const Schema &layout_schema, std::size_t row_id, std::size_t tuple_id)
552{
553 return compile_data_layout<true>(tuple_schema, address, layout, layout_schema, row_id, tuple_id);
554}
555
556/*======================================================================================================================
557 * Declaration of operator data.
558 *====================================================================================================================*/
559
560namespace {
561
562struct PrintData : OperatorData
563{
564 uint32_t num_rows = 0;
565 StackMachine printer;
566 PrintData(const PrintOperator &op)
567 : printer(op.schema())
568 {
569 auto &S = op.schema();
570 auto ostream_index = printer.add(&op.out);
571 for (std::size_t i = 0; i != S.num_entries(); ++i) {
572 if (i != 0)
573 printer.emit_Putc(ostream_index, ',');
574 printer.emit_Ld_Tup(0, i);
575 printer.emit_Print(ostream_index, S[i].type);
576 }
577 }
578};
579
580struct NoOpData : OperatorData
581{
582 uint32_t num_rows = 0;
583};
584
585struct ProjectionData : OperatorData
586{
587 Pipeline pipeline;
588 std::optional<StackMachine> projections;
589 Tuple res;
590
591 ProjectionData(const ProjectionOperator &op)
592 : pipeline(op.schema())
593 , res(op.schema())
594 { }
595
596 void emit_projections(const Schema &pipeline_schema, const ProjectionOperator &op) {
597 projections.emplace(pipeline_schema);
598 std::size_t out_idx = 0;
599 for (auto &p : op.projections()) {
600 projections->emit(p.first.get(), 1);
601 projections->emit_St_Tup(0, out_idx++, p.first.get().type());
602 }
603 }
604};
605
606struct JoinData : OperatorData
607{
608 Pipeline pipeline;
609 std::vector<StackMachine> load_attrs;
610
611 JoinData(const JoinOperator &op) : pipeline(op.schema()) { }
612
613 void emit_load_attrs(const Schema &in_schema) {
614 auto &SM = load_attrs.emplace_back();
615 for (std::size_t schema_idx = 0; schema_idx != in_schema.num_entries(); ++schema_idx) {
616 auto &e = in_schema[schema_idx];
617 auto it = pipeline.schema().find(e.id);
618 if (it != pipeline.schema().end()) { // attribute is needed
619 SM.emit_Ld_Tup(1, schema_idx);
620 SM.emit_St_Tup(0, std::distance(pipeline.schema().begin(), it), e.type);
621 }
622 }
623 }
624};
625
626struct NestedLoopsJoinData : JoinData
627{
628 using buffer_type = std::vector<Tuple>;
629
630 StackMachine predicate;
631 std::vector<Schema> buffer_schemas;
632 buffer_type *buffers;
633 std::size_t active_child;
634 Tuple res;
635
636 NestedLoopsJoinData(const JoinOperator &op)
637 : JoinData(op)
638 , buffers(new buffer_type[op.children().size() - 1])
639 , res({ Type::Get_Boolean(Type::TY_Vector) })
640 { }
641
642 ~NestedLoopsJoinData() { delete[] buffers; }
643};
644
645struct SimpleHashJoinData : JoinData
646{
647 Catalog &C = Catalog::Get();
648 bool is_probe_phase = false;
649 std::vector<std::pair<const ast::Expr*, const ast::Expr*>> exprs;
650 StackMachine build_key;
651 StackMachine probe_key;
653
654 Schema key_schema;
655 Tuple key;
656
657 SimpleHashJoinData(const JoinOperator &op)
658 : JoinData(op)
659 , ht(1024)
660 {
661 auto &schema_lhs = op.child(0)->schema();
662#ifndef NDEBUG
663 auto &schema_rhs = op.child(1)->schema();
664#endif
665
666 /* Decompose each join predicate of the form `A.x = B.y` into parts `A.x` and `B.y` and build the schema of the
667 * join key. */
668 auto &pred = op.predicate();
669 for (auto &clause : pred) {
670 M_insist(clause.size() == 1, "invalid predicate for simple hash join");
671 auto &literal = clause[0];
672 M_insist(not literal.negative(), "invalid predicate for simple hash join");
673 auto &expr = literal.expr();
674 auto binary = as<const ast::BinaryExpr>(&expr);
675 M_insist(binary->tok == TK_EQUAL);
676 auto first = binary->lhs.get();
677 auto second = binary->rhs.get();
678 M_insist(is_comparable(first->type(), second->type()), "the two sides of a comparison should be comparable");
679 M_insist(first->type() == second->type(), "operand types must be equal");
680
681 /* Add type to general key schema. */
682 key_schema.add(C.pool("key"), first->type());
683
684 /*----- Decide which side of the join the predicate belongs to. -----*/
685 auto required_by_first = first->get_required();
686#ifndef NDEBUG
687 auto required_by_second = second->get_required();
688#endif
689 if ((required_by_first & schema_lhs).num_entries() != 0) {
690#ifndef NDEBUG
691 M_insist((required_by_second & schema_rhs).num_entries() != 0, "second must belong to RHS");
692#endif
693 exprs.emplace_back(first, second);
694 } else {
695#ifndef NDEBUG
696 M_insist((required_by_first & schema_rhs).num_entries() != 0, "first must belong to RHS");
697 M_insist((required_by_second & schema_lhs).num_entries() != 0, "second must belong to LHS");
698#endif
699 exprs.emplace_back(second, first);
700 }
701 }
702
703 /* Create the tuple holding a key. */
704 key = Tuple(key_schema);
705 }
706
707 void load_build_key(const Schema &pipeline_schema) {
708 for (std::size_t i = 0; i != exprs.size(); ++i) {
709 const ast::Expr *expr = exprs[i].first;
710 build_key.emit(*expr, pipeline_schema, 1); // compile expr
711 build_key.emit_St_Tup(0, i, expr->type()); // write result to index i
712 }
713 }
714
715 void load_probe_key(const Schema &pipeline_schema) {
716 for (std::size_t i = 0; i != exprs.size(); ++i) {
717 const ast::Expr *expr = exprs[i].second;
718 probe_key.emit(*expr, pipeline_schema, 1); // compile expr
719 probe_key.emit_St_Tup(0, i, expr->type()); // write result to index i
720 }
721 }
722};
723
724struct LimitData : OperatorData
725{
726 std::size_t num_tuples = 0;
727};
728
729struct GroupingData : OperatorData
730{
731 Pipeline pipeline;
732 StackMachine compute_key;
733 std::vector<StackMachine> compute_aggregate_arguments;
734 std::vector<Tuple> args;
735
736 GroupingData(const GroupingOperator &op)
737 : pipeline(op.schema())
738 , compute_key(op.child(0)->schema())
739 {
740 std::ostringstream oss;
741
742 /* Compile the stack machine to compute the key and compute the key schema. */
743 {
744 std::size_t key_idx = 0;
745 for (auto [grp, alias] : op.group_by()) {
746 compute_key.emit(grp.get(), 1);
747 compute_key.emit_St_Tup(0, key_idx++, grp.get().type());
748 }
749 }
750
751 /* Compile a StackMachine to compute the arguments of each aggregation function. For example, for the
752 * aggregation `AVG(price * tax)`, the compiled StackMachine computes `price * tax`. */
753 for (auto agg : op.aggregates()) {
754 auto &fe = as<const ast::FnApplicationExpr>(agg.get());
755 std::size_t arg_idx = 0;
756 StackMachine sm(op.child(0)->schema());
757 std::vector<const Type*> arg_types;
758 for (auto &arg : fe.args) {
759 sm.emit(*arg, 1);
760 sm.emit_Cast(agg.get().type(), arg->type()); // cast argument type to aggregate type, e.g. f32 to f64 for SUM
761 sm.emit_St_Tup(0, arg_idx++, arg->type());
762 arg_types.push_back(arg->type());
763 }
764 args.emplace_back(Tuple(arg_types));
765 compute_aggregate_arguments.emplace_back(std::move(sm));
766 }
767 }
768};
769
770struct AggregationData : OperatorData
771{
772 Pipeline pipeline;
773 Tuple aggregates;
774 std::vector<StackMachine> compute_aggregate_arguments;
775 std::vector<Tuple> args;
776
777 AggregationData(const AggregationOperator &op)
778 : pipeline(op.schema())
779 {
780 std::vector<const Type*> types;
781 for (auto &e : op.schema())
782 types.push_back(e.type);
783 types.push_back(Type::Get_Integer(Type::TY_Scalar, 8)); // add nth_tuple counter
784 aggregates = Tuple(std::move(types));
785 aggregates.set(op.schema().num_entries(), 0L); // initialize running count
786
787 for (auto agg : op.aggregates()) {
788 auto &fe = as<const ast::FnApplicationExpr>(agg.get());
789 std::size_t arg_idx = 0;
790 StackMachine sm(op.child(0)->schema());
791 std::vector<const Type*> arg_types;
792 for (auto &arg : fe.args) {
793 sm.emit(*arg, 1);
794 sm.emit_Cast(agg.get().type(), arg->type()); // cast argument type to aggregate type, e.g. f32 to f64 for SUM
795 sm.emit_St_Tup(0, arg_idx++, agg.get().type()); // store casted argument of aggregate type to tuple
796 arg_types.push_back(agg.get().type());
797 }
798 args.emplace_back(Tuple(arg_types));
799 compute_aggregate_arguments.emplace_back(std::move(sm));
800 }
801 }
802};
803
804struct HashBasedGroupingData : GroupingData
805{
807 struct hasher
808 {
809 std::size_t key_size;
810
811 hasher(std::size_t key_size) : key_size(key_size) { }
812
813 uint64_t operator()(const Tuple &tup) const {
814 std::hash<Value> h;
815 uint64_t hash = 0xcbf29ce484222325;
816 for (std::size_t i = 0; i != key_size; ++i) {
817 hash ^= tup.is_null(i) ? 0 : h(tup[i]);
818 hash *= 1099511628211;
819 }
820 return hash;
821 }
822 };
823
825 struct equals
826 {
827 std::size_t key_size;
828
829 equals(std::size_t key_size) : key_size(key_size) { }
830
831 uint64_t operator()(const Tuple &first, const Tuple &second) const {
832 for (std::size_t i = 0; i != key_size; ++i) {
833 if (first.is_null(i) != second.is_null(i)) return false;
834 if (not first.is_null(i))
835 if (first.get(i) != second.get(i)) return false;
836 }
837 return true;
838 }
839 };
840
843 std::unordered_map<Tuple, unsigned, hasher, equals> groups;
844
845 HashBasedGroupingData(const GroupingOperator &op)
846 : GroupingData(op)
847 , groups(1024, hasher(op.group_by().size()), equals(op.group_by().size()))
848 { }
849};
850
851struct SortingData : OperatorData
852{
853 Pipeline pipeline;
854 std::vector<Tuple> buffer;
855
856 SortingData(Schema buffer_schema) : pipeline(std::move(buffer_schema)) { }
857};
858
859struct FilterData : OperatorData
860{
861 StackMachine filter;
862 Tuple res;
863
864 FilterData(const FilterOperator &op, const Schema &pipeline_schema)
865 : filter(pipeline_schema)
866 , res({ Type::Get_Boolean(Type::TY_Vector) })
867 {
868 filter.emit(op.filter(), 1);
869 filter.emit_St_Tup_b(0, 0);
870 }
871};
872
873struct DisjunctiveFilterData : OperatorData
874{
875 std::vector<StackMachine> predicates;
876 Tuple res;
877
878 DisjunctiveFilterData(const DisjunctiveFilterOperator &op, const Schema &pipeline_schema)
879 : res({ Type::Get_Boolean(Type::TY_Vector) })
880 {
881 auto clause = op.filter()[0];
882 for (cnf::Predicate &pred : clause) {
883 cnf::Clause clause({ pred });
884 cnf::CNF cnf({ clause });
885 StackMachine &SM = predicates.emplace_back(pipeline_schema);
886 SM.emit(cnf, 1); // compile single predicate
887 SM.emit_St_Tup_b(0, 0);
888 }
889 }
890};
891
892}
893
894
895/*======================================================================================================================
896 * Pipeline
897 *====================================================================================================================*/
898
899void Pipeline::operator()(const ScanOperator &op)
900{
901 auto &store = op.store();
902 auto &table = store.table();
903 const auto num_rows = store.num_rows();
904
905 /* Compile StackMachine to load tuples from store. */
906 auto loader = Interpreter::compile_load(op.schema(), store.memory().addr(), table.layout(), table.schema());
907
908 const auto remainder = num_rows % block_.capacity();
909 std::size_t i = 0;
910 /* Fill entire vector. */
911 for (auto end = num_rows - remainder; i != end; i += block_.capacity()) {
912 block_.clear();
913 block_.fill();
914 for (std::size_t j = 0; j != block_.capacity(); ++j) {
915 Tuple *args[] = { &block_[j] };
916 loader(args);
917 }
918 op.parent()->accept(*this);
919 }
920 if (i != num_rows) {
921 /* Fill last vector with remaining tuples. */
922 block_.clear();
923 block_.mask((1UL << remainder) - 1);
924 for (std::size_t j = 0; i != op.store().num_rows(); ++i, ++j) {
925 M_insist(j < block_.capacity());
926 Tuple *args[] = { &block_[j] };
927 loader(args);
928 }
929 op.parent()->accept(*this);
930 }
931}
932
933void Pipeline::operator()(const CallbackOperator &op)
934{
935 for (auto &t : block_)
936 op.callback()(op.schema(), t);
937}
938
939void Pipeline::operator()(const PrintOperator &op)
940{
941 auto data = as<PrintData>(op.data());
942 data->num_rows += block_.size();
943 for (auto &t : block_) {
944 Tuple *args[] = { &t };
945 data->printer(args);
946 op.out << '\n';
947 }
948}
949
950void Pipeline::operator()(const NoOpOperator &op)
951{
952 as<NoOpData>(op.data())->num_rows += block_.size();
953}
954
955void Pipeline::operator()(const FilterOperator &op)
956{
957 if (not op.data())
958 op.data(new FilterData(op, this->schema()));
959
960 auto data = as<FilterData>(op.data());
961 for (auto it = block_.begin(); it != block_.end(); ++it) {
962 Tuple *args[] = { &data->res, &*it };
963 data->filter(args);
964 if (data->res.is_null(0) or not data->res[0].as_b()) block_.erase(it);
965 }
966 if (not block_.empty())
967 op.parent()->accept(*this);
968}
969
970void Pipeline::operator()(const DisjunctiveFilterOperator &op)
971{
972 if (not op.data())
973 op.data(new DisjunctiveFilterData(op, this->schema()));
974
975 auto data = as<DisjunctiveFilterData>(op.data());
976 for (auto it = block_.begin(); it != block_.end(); ++it) {
977 data->res.set(0, false); // reset
978 Tuple *args[] = { &data->res, &*it };
979
980 for (auto &pred : data->predicates) {
981 pred(args);
982 if (not data->res.is_null(0) and data->res[0].as_b())
983 goto satisfied; // one predicate is satisfied ⇒ entire clause is satisfied
984 }
985 block_.erase(it); // no predicate was satisfied ⇒ drop tuple
986satisfied:;
987 }
988 if (not block_.empty())
989 op.parent()->accept(*this);
990}
991
992void Pipeline::operator()(const JoinOperator &op)
993{
994 if (is<SimpleHashJoinData>(op.data())) {
995 /* Perform simple hash join. */
996 auto data = as<SimpleHashJoinData>(op.data());
997 Tuple *args[2] = { &data->key, nullptr };
998 if (data->is_probe_phase) {
999 if (data->load_attrs.size() != 2) {
1000 data->load_probe_key(this->schema());
1001 data->emit_load_attrs(this->schema());
1002 }
1003 auto &pipeline = data->pipeline;
1004 std::size_t i = 0;
1005 for (auto &t : block_) {
1006 args[1] = &t;
1007 data->probe_key(args);
1008 pipeline.block_.fill();
1009 data->ht.for_all(*args[0], [&](std::pair<const Tuple, Tuple> &v) {
1010 if (i == pipeline.block_.capacity()) {
1011 pipeline.push(*op.parent());
1012 i = 0;
1013 }
1014
1015 {
1016 Tuple *load_args[2] = { &pipeline.block_[i], &v.second };
1017 data->load_attrs[0](load_args); // load build attrs
1018 }
1019 {
1020 Tuple *load_args[2] = { &pipeline.block_[i], &t };
1021 data->load_attrs[1](load_args); // load probe attrs
1022 }
1023 ++i;
1024 });
1025 }
1026
1027 if (i != 0) {
1028 M_insist(i <= pipeline.block_.capacity());
1029 pipeline.block_.mask(i == pipeline.block_.capacity() ? -1UL : (1UL << i) - 1);
1030 pipeline.push(*op.parent());
1031 }
1032 } else {
1033 if (data->load_attrs.size() != 1) {
1034 data->load_build_key(this->schema());
1035 data->emit_load_attrs(this->schema());
1036 }
1037 const auto &tuple_schema = op.child(0)->schema();
1038 for (auto &t : block_) {
1039 args[1] = &t;
1040 data->build_key(args);
1041 data->ht.insert_with_duplicates(args[0]->clone(data->key_schema), t.clone(tuple_schema));
1042 }
1043 }
1044 } else {
1045 /* Perform nested-loops join. */
1046 auto data = as<NestedLoopsJoinData>(op.data());
1047 auto size = op.children().size();
1048 std::vector<Tuple*> predicate_args(size + 1, nullptr);
1049 predicate_args[0] = &data->res;
1050
1051 if (data->active_child == size - 1) {
1052 /* This is the right-most child. Combine its produced tuple with all combinations of the buffered
1053 * tuples. */
1054 std::vector<std::size_t> positions(size - 1, std::size_t(-1L)); // positions within each buffer
1055 std::size_t child_id = 0; // cursor to the child that provides the next part of the joined tuple
1056 auto &pipeline = data->pipeline;
1057
1058 /* Compile loading data from current child. */
1059 if (data->buffer_schemas.size() != size) {
1060 M_insist(data->buffer_schemas.size() == size - 1);
1061 M_insist(data->load_attrs.size() == size - 1);
1062 data->emit_load_attrs(this->schema());
1063 data->buffer_schemas.emplace_back(this->schema()); // save the schema of the current pipeline
1064 if (op.predicate().size()) {
1065 std::vector<std::size_t> tuple_ids(size);
1066 std::iota(tuple_ids.begin(), tuple_ids.end(), 1); // start at index 1
1067 data->predicate.emit(op.predicate(), data->buffer_schemas, tuple_ids);
1068 data->predicate.emit_St_Tup_b(0, 0);
1069 }
1070 }
1071
1072 M_insist(data->buffer_schemas.size() == size);
1073 M_insist(data->load_attrs.size() == size);
1074
1075 for (;;) {
1076 if (child_id == size - 1) { // right-most child, which produced the RHS `block_`
1077 /* Combine the tuples. One tuple from each buffer. */
1078 pipeline.clear();
1079 pipeline.block_.mask(block_.mask());
1080
1081 if (op.predicate().size()) {
1082 for (std::size_t cid = 0; cid != child_id; ++cid)
1083 predicate_args[cid + 1] = &data->buffers[cid][positions[cid]];
1084 }
1085
1086 /* Concatenate tuples from the first n-1 children. */
1087 for (auto output_it = pipeline.block_.begin(); output_it != pipeline.block_.end(); ++output_it) {
1088 auto &rhs = block_[output_it.index()];
1089 if (op.predicate().size()) { // do we have a predicate?
1090 predicate_args[size] = &rhs;
1091 data->predicate(predicate_args.data()); // evaluate predicate
1092 if (data->res.is_null(0) or not data->res[0].as_b()) {
1093 pipeline.block_.erase(output_it);
1094 continue;
1095 }
1096 }
1097
1098 for (std::size_t i = 0; i != child_id; ++i) {
1099 auto &buffer = data->buffers[i]; // get buffer of i-th child
1100 Tuple *load_args[2] = { &*output_it, &buffer[positions[i]] }; // load child's current tuple
1101 data->load_attrs[i](load_args);
1102 }
1103
1104 {
1105 Tuple *load_args[2] = { &*output_it, &rhs }; // load last child's attributes
1106 data->load_attrs[child_id](load_args);
1107 }
1108 }
1109
1110 if (not pipeline.block_.empty())
1111 pipeline.push(*op.parent());
1112 --child_id;
1113 } else { // child whose tuples have been materialized in a buffer
1114 ++positions[child_id];
1115 auto &buffer = data->buffers[child_id];
1116 if (positions[child_id] == buffer.size()) { // reached the end of this buffer; backtrack
1117 if (child_id == 0)
1118 break;
1119 positions[child_id] = std::size_t(-1L);
1120 --child_id;
1121 } else {
1122 M_insist(positions[child_id] < buffer.size(), "position out of bounds");
1123 ++child_id;
1124 }
1125 }
1126 }
1127 } else {
1128 /* This is not the right-most child. Collect its produced tuples in a buffer. */
1129 const auto &tuple_schema = op.child(data->active_child)->schema();
1130 if (data->buffer_schemas.size() <= data->active_child) {
1131 data->buffer_schemas.emplace_back(this->schema()); // save the schema of the current pipeline
1132 data->emit_load_attrs(this->schema());
1133 M_insist(data->buffer_schemas.size() == data->load_attrs.size());
1134 }
1135 for (auto &t : block_)
1136 data->buffers[data->active_child].emplace_back(t.clone(tuple_schema));
1137 }
1138 }
1139}
1140
1141void Pipeline::operator()(const ProjectionOperator &op)
1142{
1143 auto data = as<ProjectionData>(op.data());
1144 auto &pipeline = data->pipeline;
1145 if (not data->projections)
1146 data->emit_projections(this->schema(), op);
1147
1148 pipeline.clear();
1149 pipeline.block_.mask(block_.mask());
1150
1151 for (auto it = block_.begin(); it != block_.end(); ++it) {
1152 auto &out = pipeline.block_[it.index()];
1153 Tuple *args[] = { &out, &*it };
1154 (*data->projections)(args);
1155 }
1156
1157 pipeline.push(*op.parent());
1158}
1159
1160void Pipeline::operator()(const LimitOperator &op)
1161{
1162 auto data = as<LimitData>(op.data());
1163
1164 for (auto it = block_.begin(); it != block_.end(); ++it) {
1165 if (data->num_tuples < op.offset() or data->num_tuples >= op.offset() + op.limit())
1166 block_.erase(it); /* discard this tuple */
1167 ++data->num_tuples;
1168 }
1169
1170 if (not block_.empty())
1171 op.parent()->accept(*this);
1172
1173 if (data->num_tuples >= op.offset() + op.limit())
1174 throw LimitOperator::stack_unwind(); // all tuples produced, now unwind the stack
1175}
1176
1177void Pipeline::operator()(const GroupingOperator &op)
1178{
1179 auto perform_aggregation = [&](decltype(HashBasedGroupingData::groups)::value_type &entry, Tuple &tuple,
1180 GroupingData &data)
1181 {
1182 const std::size_t key_size = op.group_by().size();
1183
1184 Tuple &group = const_cast<Tuple&>(entry.first);
1185 const unsigned nth_tuple = ++entry.second;
1186
1187 /* Add this tuple to its group by computing the aggregates. */
1188 for (std::size_t i = 0, end = op.aggregates().size(); i != end; ++i) {
1189 auto &aggregate_arguments = data.args[i];
1190 Tuple *args[] = { &aggregate_arguments, &tuple };
1191 data.compute_aggregate_arguments[i](args);
1192
1193 bool is_null = group.is_null(key_size + i);
1194 auto &val = group[key_size + i];
1195
1196 auto &fe = as<const ast::FnApplicationExpr>(op.aggregates()[i].get());
1197 auto ty = fe.type();
1198 auto &fn = fe.get_function();
1199
1200 switch (fn.fnid) {
1201 default:
1202 M_unreachable("function kind not implemented");
1203
1204 case Function::FN_UDF:
1205 M_unreachable("UDFs not yet supported");
1206
1207 case Function::FN_COUNT:
1208 if (is_null)
1209 group.set(key_size + i, 0); // initialize
1210 if (fe.args.size() == 0) { // COUNT(*)
1211 val.as_i() += 1;
1212 } else { // COUNT(x) aka. count not NULL
1213 val.as_i() += not aggregate_arguments.is_null(0);
1214 }
1215 break;
1216
1217 case Function::FN_SUM: {
1218 auto n = as<const Numeric>(ty);
1219 if (is_null) {
1220 if (n->is_floating_point())
1221 group.set(key_size + i, 0.); // double precision
1222 else
1223 group.set(key_size + i, 0); // int
1224 }
1225 if (aggregate_arguments.is_null(0)) continue; // skip NULL
1226 if (n->is_floating_point())
1227 val.as_d() += aggregate_arguments[0].as_d();
1228 else
1229 val.as_i() += aggregate_arguments[0].as_i();
1230 break;
1231 }
1232
1233 case Function::FN_AVG: {
1234 if (is_null) {
1235 if (ty->is_floating_point())
1236 group.set(key_size + i, 0.); // double precision
1237 else
1238 group.set(key_size + i, 0); // int
1239 }
1240 if (aggregate_arguments.is_null(0)) continue; // skip NULL
1241 /* Compute AVG as iterative mean as described in Knuth, The Art of Computer Programming Vol 2,
1242 * section 4.2.2. */
1243 val.as_d() += (aggregate_arguments[0].as_d() - val.as_d()) / nth_tuple;
1244 break;
1245 }
1246
1247 case Function::FN_MIN: {
1248 using std::min;
1249 if (aggregate_arguments.is_null(0)) continue; // skip NULL
1250 if (is_null) {
1251 group.set(key_size + i, aggregate_arguments[0]);
1252 continue;
1253 }
1254
1255 auto n = as<const Numeric>(ty);
1256 if (n->is_float())
1257 val.as_f() = min(val.as_f(), aggregate_arguments[0].as_f());
1258 else if (n->is_double())
1259 val.as_d() = min(val.as_d(), aggregate_arguments[0].as_d());
1260 else
1261 val.as_i() = min(val.as_i(), aggregate_arguments[0].as_i());
1262 break;
1263 }
1264
1265 case Function::FN_MAX: {
1266 using std::max;
1267 if (aggregate_arguments.is_null(0)) continue; // skip NULL
1268 if (is_null) {
1269 group.set(key_size + i, aggregate_arguments[0]);
1270 continue;
1271 }
1272
1273 auto n = as<const Numeric>(ty);
1274 if (n->is_float())
1275 val.as_f() = max(val.as_f(), aggregate_arguments[0].as_f());
1276 else if (n->is_double())
1277 val.as_d() = max(val.as_d(), aggregate_arguments[0].as_d());
1278 else
1279 val.as_i() = max(val.as_i(), aggregate_arguments[0].as_i());
1280 break;
1281 }
1282 }
1283 }
1284 };
1285
1286 /* Find the group. */
1287 auto data = as<HashBasedGroupingData>(op.data());
1288 auto &groups = data->groups;
1289
1290 Tuple key(op.schema());
1291 for (auto &tuple : block_) {
1292 Tuple *args[] = { &key, &tuple };
1293 data->compute_key(args);
1294 auto it = groups.find(key);
1295 if (it == groups.end()) {
1296 /* Initialize the group's aggregate to NULL. This will be overwritten by the neutral element w.r.t.
1297 * the aggregation function. */
1298 it = groups.emplace_hint(it, std::move(key), 0);
1299 key = Tuple(op.schema());
1300 }
1301 perform_aggregation(*it, tuple, *data);
1302 }
1303}
1304
1305void Pipeline::operator()(const AggregationOperator &op)
1306{
1307 auto data = as<AggregationData>(op.data());
1308 auto &nth_tuple = data->aggregates[op.schema().num_entries()].as_i();
1309
1310 for (auto &tuple : block_) {
1311 nth_tuple += 1UL;
1312 for (std::size_t i = 0, end = op.aggregates().size(); i != end; ++i) {
1313 auto &aggregate_arguments = data->args[i];
1314 Tuple *args[] = { &aggregate_arguments, &tuple };
1315 data->compute_aggregate_arguments[i](args);
1316
1317 auto &fe = as<const ast::FnApplicationExpr>(op.aggregates()[i].get());
1318 auto ty = fe.type();
1319 auto &fn = fe.get_function();
1320
1321 bool agg_is_null = data->aggregates.is_null(i);
1322 auto &val = data->aggregates[i];
1323
1324 switch (fn.fnid) {
1325 default:
1326 M_unreachable("function kind not implemented");
1327
1328 case Function::FN_UDF:
1329 M_unreachable("UDFs not yet supported");
1330
1331 case Function::FN_COUNT:
1332 if (fe.args.size() == 0) { // COUNT(*)
1333 val.as_i() += 1;
1334 } else { // COUNT(x) aka. count not NULL
1335 val.as_i() += not aggregate_arguments.is_null(0);
1336 }
1337 break;
1338
1339 case Function::FN_SUM: {
1340 auto n = as<const Numeric>(ty);
1341 if (aggregate_arguments.is_null(0)) continue; // skip NULL
1342 if (n->is_floating_point())
1343 val.as_d() += aggregate_arguments[0].as_d();
1344 else
1345 val.as_i() += aggregate_arguments[0].as_i();
1346 break;
1347 }
1348
1349 case Function::FN_AVG: {
1350 if (aggregate_arguments.is_null(0)) continue; // skip NULL
1351 /* Compute AVG as iterative mean as described in Knuth, The Art of Computer Programming Vol 2,
1352 * section 4.2.2. */
1353 val.as_d() += (aggregate_arguments[0].as_d() - val.as_d()) / nth_tuple;
1354 break;
1355 }
1356
1357 case Function::FN_MIN: {
1358 using std::min;
1359 if (aggregate_arguments.is_null(0)) continue; // skip NULL
1360 if (agg_is_null) {
1361 data->aggregates.set(i, aggregate_arguments[0]);
1362 continue;
1363 }
1364
1365 auto n = as<const Numeric>(ty);
1366 if (n->is_float())
1367 val.as_f() = min(val.as_f(), aggregate_arguments[0].as_f());
1368 else if (n->is_double())
1369 val.as_d() = min(val.as_d(), aggregate_arguments[0].as_d());
1370 else
1371 val.as_i() = min(val.as_i(), aggregate_arguments[0].as_i());
1372 break;
1373 }
1374
1375 case Function::FN_MAX: {
1376 using std::max;
1377 if (aggregate_arguments.is_null(0)) continue; // skip NULL
1378 if (agg_is_null) {
1379 data->aggregates.set(i, aggregate_arguments[0]);
1380 continue;
1381 }
1382
1383 auto n = as<const Numeric>(ty);
1384 if (n->is_float())
1385 val.as_f() = max(val.as_f(), aggregate_arguments[0].as_f());
1386 else if (n->is_double())
1387 val.as_d() = max(val.as_d(), aggregate_arguments[0].as_d());
1388 else
1389 val.as_i() = max(val.as_i(), aggregate_arguments[0].as_i());
1390 break;
1391 }
1392 }
1393 }
1394 }
1395}
1396
1397void Pipeline::operator()(const SortingOperator &op)
1398{
1399 if (not op.data())
1400 op.data(new SortingData(this->schema()));
1401
1402 /* cache all tuples for sorting */
1403 auto data = as<SortingData>(op.data());
1404 for (auto &t : block_)
1405 data->buffer.emplace_back(t.clone(this->schema()));
1406}
1407
1408/*======================================================================================================================
1409 * Interpreter - Recursive descent
1410 *====================================================================================================================*/
1411
1412void Interpreter::operator()(const CallbackOperator &op)
1413{
1414 op.child(0)->accept(*this);
1415}
1416
1417void Interpreter::operator()(const PrintOperator &op)
1418{
1419 op.data(new PrintData(op));
1420 op.child(0)->accept(*this);
1421 if (not Options::Get().quiet)
1422 op.out << as<PrintData>(op.data())->num_rows << " rows\n";
1423}
1424
1425void Interpreter::operator()(const NoOpOperator &op)
1426{
1427 op.data(new NoOpData());
1428 op.child(0)->accept(*this);
1429 op.out << as<NoOpData>(op.data())->num_rows << " rows\n";
1430}
1431
1432void Interpreter::operator()(const ScanOperator &op)
1433{
1434 Pipeline pipeline(op.schema());
1435 pipeline.push(op);
1436}
1437
1438void Interpreter::operator()(const FilterOperator &op)
1439{
1440 op.child(0)->accept(*this);
1441}
1442
1443void Interpreter::operator()(const DisjunctiveFilterOperator &op)
1444{
1445 op.child(0)->accept(*this);
1446}
1447
1448void Interpreter::operator()(const JoinOperator &op)
1449{
1450 if (op.predicate().is_equi()) {
1451 /* Perform simple hash join. */
1452 auto data = new SimpleHashJoinData(op);
1453 op.data(data);
1454 if (op.has_info())
1455 data->ht.resize(op.info().estimated_cardinality);
1456 op.child(0)->accept(*this); // build HT on LHS
1457 if (data->ht.size() == 0) // no tuples produced
1458 return;
1459 data->is_probe_phase = true;
1460 op.child(1)->accept(*this); // probe HT with RHS
1461 } else {
1462 /* Perform nested-loops join. */
1463 auto data = new NestedLoopsJoinData(op);
1464 op.data(data);
1465 for (std::size_t i = 0, end = op.children().size(); i != end; ++i) {
1466 data->active_child = i;
1467 auto c = op.child(i);
1468 c->accept(*this);
1469 if (i != op.children().size() - 1 and data->buffers[i].empty()) // no tuples produced
1470 return;
1471 }
1472 }
1473}
1474
1475void Interpreter::operator()(const ProjectionOperator &op)
1476{
1477 bool has_child = op.children().size();
1478 auto data = new ProjectionData(op);
1479 op.data(data);
1480
1481 /* Evaluate the projection. */
1482 if (has_child)
1483 op.child(0)->accept(*this);
1484 else {
1485 Pipeline pipeline;
1486 pipeline.block_.mask(1); // evaluate the projection EXACTLY ONCE on an empty tuple
1487 pipeline.push(op);
1488 }
1489}
1490
1491void Interpreter::operator()(const LimitOperator &op)
1492{
1493 try {
1494 op.data(new LimitData());
1495 op.child(0)->accept(*this);
1496 } catch (LimitOperator::stack_unwind) {
1497 /* OK, we produced all tuples and unwinded the stack */
1498 }
1499}
1500
1501void Interpreter::operator()(const GroupingOperator &op)
1502{
1503 auto &parent = *op.parent();
1504 auto data = new HashBasedGroupingData(op);
1505 op.data(data);
1506
1507 op.child(0)->accept(*this);
1508
1509 const auto num_groups = data->groups.size();
1510 const auto remainder = num_groups % data->pipeline.block_.capacity();
1511 auto it = data->groups.begin();
1512 for (std::size_t i = 0; i != num_groups - remainder; i += data->pipeline.block_.capacity()) {
1513 data->pipeline.block_.clear();
1514 data->pipeline.block_.fill();
1515 for (std::size_t j = 0; j != data->pipeline.block_.capacity(); ++j) {
1516 auto node = data->groups.extract(it++);
1517 swap(data->pipeline.block_[j], node.key());
1518 }
1519 data->pipeline.push(parent);
1520 }
1521 data->pipeline.block_.clear();
1522 data->pipeline.block_.mask((1UL << remainder) - 1UL);
1523 for (std::size_t i = 0; i != remainder; ++i) {
1524 auto node = data->groups.extract(it++);
1525 swap(data->pipeline.block_[i], node.key());
1526 }
1527 data->pipeline.push(parent);
1528}
1529
1530void Interpreter::operator()(const AggregationOperator &op)
1531{
1532 op.data(new AggregationData(op));
1533 auto data = as<AggregationData>(op.data());
1534
1535 /* Initialize aggregates. */
1536 for (std::size_t i = 0, end = op.aggregates().size(); i != end; ++i) {
1537 auto &fe = as<const ast::FnApplicationExpr>(op.aggregates()[i].get());
1538 auto ty = fe.type();
1539 auto &fn = fe.get_function();
1540
1541 switch (fn.fnid) {
1542 default:
1543 M_unreachable("function kind not implemented");
1544
1545 case Function::FN_UDF:
1546 M_unreachable("UDFs not yet supported");
1547
1548 case Function::FN_COUNT:
1549 data->aggregates.set(i, 0); // initialize
1550 break;
1551
1552 case Function::FN_SUM: {
1553 auto n = as<const Numeric>(ty);
1554 if (n->is_floating_point())
1555 data->aggregates.set(i, 0.); // double precision
1556 else
1557 data->aggregates.set(i, 0L); // int64
1558 break;
1559 }
1560
1561 case Function::FN_AVG: {
1562 if (ty->is_floating_point())
1563 data->aggregates.set(i, 0.); // double precision
1564 else
1565 data->aggregates.set(i, 0L); // int64
1566 break;
1567 }
1568
1569 case Function::FN_MIN:
1570 case Function::FN_MAX: {
1571 data->aggregates.null(i); // initialize to NULL
1572 break;
1573 }
1574 }
1575 }
1576 op.child(0)->accept(*this);
1577
1578 using std::swap;
1579 data->pipeline.block_.clear();
1580 data->pipeline.block_.mask(1UL);
1581 swap(data->pipeline.block_[0], data->aggregates);
1582 data->pipeline.push(*op.parent());
1583}
1584
1585void Interpreter::operator()(const SortingOperator &op)
1586{
1587 op.child(0)->accept(*this);
1588
1589 auto data = as<SortingData>(op.data());
1590 if (not data) // no tuples produced
1591 return;
1592
1593 const auto &orderings = op.order_by();
1594
1595 StackMachine comparator(data->pipeline.schema());
1596 for (auto o : orderings) {
1597 comparator.emit(o.first.get(), 1); // LHS
1598 comparator.emit(o.first.get(), 2); // RHS
1599
1600 /* Emit comparison. */
1601 auto ty = o.first.get().type();
1603 [&comparator](const Boolean&) { comparator.emit_Cmp_b(); },
1604 [&comparator](const CharacterSequence&) { comparator.emit_Cmp_s(); },
1605 [&comparator](const Numeric &n) {
1606 switch (n.kind) {
1607 case Numeric::N_Int:
1608 case Numeric::N_Decimal:
1609 comparator.emit_Cmp_i();
1610 break;
1611
1612 case Numeric::N_Float:
1613 if (n.size() <= 32)
1614 comparator.emit_Cmp_f();
1615 else
1616 comparator.emit_Cmp_d();
1617 break;
1618 }
1619 },
1620 [&comparator](const Date&) { comparator.emit_Cmp_i(); },
1621 [&comparator](const DateTime&) { comparator.emit_Cmp_i(); },
1622 [](auto&&) { M_insist("invalid type"); }
1623 }, *ty);
1624
1625 if (not o.second)
1626 comparator.emit_Minus_i(); // sort descending
1627 comparator.emit_St_Tup_i(0, 0);
1628 comparator.emit_Stop_NZ();
1629 }
1630
1631 Tuple res({ Type::Get_Integer(Type::TY_Vector, 4) });
1632 std::sort(data->buffer.begin(), data->buffer.end(), [&](Tuple &first, Tuple &second) {
1633 Tuple *args[] = { &res, &first, &second };
1634 comparator(args);
1635 M_insist(not res.is_null(0));
1636 return res[0].as_i() < 0;
1637 });
1638
1639 auto &parent = *op.parent();
1640 const auto num_tuples = data->buffer.size();
1641 const auto remainder = num_tuples % data->pipeline.block_.capacity();
1642 auto it = data->buffer.begin();
1643 for (std::size_t i = 0; i != num_tuples - remainder; i += data->pipeline.block_.capacity()) {
1644 data->pipeline.block_.clear();
1645 data->pipeline.block_.fill();
1646 for (std::size_t j = 0; j != data->pipeline.block_.capacity(); ++j)
1647 data->pipeline.block_[j] = std::move(*it++);
1648 data->pipeline.push(parent);
1649 }
1650 data->pipeline.block_.clear();
1651 data->pipeline.block_.mask((1UL << remainder) - 1UL);
1652 for (std::size_t i = 0; i != remainder; ++i)
1653 data->pipeline.block_[i] = std::move(*it++);
1654 data->pipeline.push(parent);
1655}
1656
1657__attribute__((constructor(202)))
1658static void register_interpreter()
1659{
1660 Catalog &C = Catalog::Get();
1661 C.register_backend<Interpreter>(C.pool("Interpreter"), "tuple-at-a-time Interpreter built with virtual stack machines");
1662}
static StackMachine compile_data_layout(const Schema &tuple_schema, void *address, const DataLayout &layout, const Schema &layout_schema, std::size_t row_id, std::size_t tuple_id)
Compile a StackMachine to load or store a tuple of Schema tuple_schema using a given memory address a...
Definition: Interpreter.cpp:35
__attribute__((constructor(202))) static void register_interpreter()
struct @5 args
#define M_unreachable(MSG)
Definition: macro.hpp:146
#define M_insist(...)
Definition: macro.hpp:129
const Schema const Schema & tuple_schema
Definition: DataLayout.hpp:255
const Schema & layout_schema
Definition: DataLayout.hpp:255
::wasm::Expression * expr()
Moves the underlying Binaryen ::wasm::Expression out of this.
Definition: WasmDSL.hpp:1558
Bool< L > is_null(SQL_t &variant)
Definition: WasmUtil.hpp:461
auto max(PrimitiveExpr< U, L > other) -> PrimitiveExpr< common_type_t< T, U >, L > std
Computes the maximum of this and other.
Definition: WasmDSL.hpp:2504
Bool< L > uint8_t n
Definition: WasmUtil.hpp:1318
for(std::size_t idx=1;idx< num_vectors;++idx) res.emplace((vectors_[idx].bitmask()<< uint32_t(idx *vector_type return * res
Definition: WasmDSL.hpp:3696
auto op
Definition: WasmDSL.hpp:2384
PrimitiveExpr< ResultType, ResultL > binary(::wasm::BinaryOp op, PrimitiveExpr< OperandType, OperandL > other)
Helper function to implement binary operations.
Definition: WasmDSL.hpp:1617
std::pair<::wasm::Expression *, std::list< std::shared_ptr< Bit > > > move()
Moves the underlying Binaryen ::wasm::Expression and the referenced bits out of this.
Definition: WasmDSL.hpp:1567
PrimitiveExpr< uint64_t, L > hash() and(L
PrimitiveExpr clone() const
Creates and returns a deep copy of this.
Definition: WasmDSL.hpp:1577
and arithmetically_combinable< T, U, L > auto L auto L auto min(PrimitiveExpr< U, L > other) -> PrimitiveExpr< common_type_t< T, U >, L >
Definition: WasmDSL.hpp:2474
‍mutable namespace
Definition: Backend.hpp:10
bool M_EXPORT is_comparable(const Type *first, const Type *second)
Returns true iff both types have the same PrimitiveType, i.e.
Definition: Type.hpp:547
void swap(PlanTableBase< Actual > &first, PlanTableBase< Actual > &second)
Definition: PlanTable.hpp:394
and
Definition: enum_ops.hpp:12
auto visit(Callable &&callable, Base &obj, m::tag< Callable > &&=m::tag< Callable >())
Generic implementation to visit a class hierarchy, with similar syntax as std::visit.
Definition: Visitor.hpp:138
STL namespace.
bool empty() const
Returns true iff the block has no alive tuples, i.e. size() == 0.
void erase(std::size_t index)
Erase the tuple at the given index from this Block.
void clear()
Renders all tuples dead and removes their attributes.
uint64_t mask() const
Returns the bit mask that identifies which tuples of this Block are alive.
iterator begin()
Definition: Interpreter.hpp:90
static constexpr std::size_t capacity()
Return the capacity of this Block.
Definition: Interpreter.hpp:86
iterator end()
Definition: Interpreter.hpp:91
std::size_t size() const
Return the number of alive tuples in this Block.
Definition: Interpreter.hpp:88
void fill()
Make all tuples in this Block alive.
The boolean type.
Definition: Type.hpp:230
The catalog contains all Databases and keeps track of all meta information of the database system.
Definition: Catalog.hpp:215
ThreadSafePooledString pool(const char *str) const
Creates an internalized copy of the string str by adding it to the internal StringPool.
Definition: Catalog.hpp:274
static Catalog & Get()
Return a reference to the single Catalog instance.
void register_backend(ThreadSafePooledString name, const char *description=nullptr)
Registers a new Backend with the given name.
Definition: Catalog.hpp:458
The type of character strings, both fixed length and varying length.
Definition: Type.hpp:290
Producer * child(std::size_t i) const
Returns the i-th child of this Consumer.
Definition: Operator.hpp:176
The date type.
Definition: Type.hpp:364
The date type.
Definition: Type.hpp:335
Evaluates SQL operator trees on the database.
static StackMachine compile_store(const Schema &tuple_schema, void *address, const storage::DataLayout &layout, const Schema &layout_schema, std::size_t row_id=0, std::size_t tuple_id=0)
Compile a StackMachine to store a tuple of Schema tuple_schema using a given memory address and a giv...
static StackMachine compile_load(const Schema &tuple_schema, void *address, const storage::DataLayout &layout, const Schema &layout_schema, std::size_t row_id=0, std::size_t tuple_id=0)
Compile a StackMachine to load a tuple of Schema tuple_schema using a given memory address and a give...
Drops the produced results and outputs only the number of result tuples produced.
Definition: Operator.hpp:238
The numeric type represents integer and floating-point types of different precision and scale.
Definition: Type.hpp:393
This interface allows for attaching arbitrary data to Operator instances.
Definition: Operator.hpp:38
Schema & schema()
Returns the Schema of this Operator.
Definition: Operator.hpp:67
static Options & Get()
Return a reference to the single Options instance.
Definition: Options.cpp:9
Implements push-based evaluation of a pipeline in the plan.
Block< 64 > block_
const Schema & schema() const
void push(const Operator &pipeline_start)
Prints the produced Tuples to a std::ostream instance.
Definition: Operator.hpp:223
A Schema represents a sequence of identifiers, optionally with a prefix, and their associated types.
Definition: Schema.hpp:39
std::size_t num_entries() const
Returns the number of entries in this Schema.
Definition: Schema.hpp:124
iterator end()
Definition: Schema.hpp:117
iterator begin()
Definition: Schema.hpp:116
iterator find(const Identifier &id)
Returns an iterator to the entry with the given Identifier id, or end() if no such entry exists.
Definition: Schema.hpp:129
void add(entry_type e)
Adds the entry e to this Schema.
Definition: Schema.hpp:181
A stack machine that evaluates an expression.
void emit_Print(std::size_t ostream_index, const Type *ty)
Emit a Print_X instruction based on Type ty, e.g. Print_i for integral Types.
std::size_t add_and_emit_load(Value val)
Adds the Value val to the context and emits a load instruction to load this value to the top of the s...
void emit_Ld(const Type *ty)
Emit a Ld_X instruction based on Type ty, e.g. Ld_i32 for 4 byte integral types.
void emit_St(const Type *ty)
Emit a St_X instruction based on Type ty, e.g. St_i32 for 4 byte integral types.
std::size_t add(Value val)
Appends the Value val to the context and returns its assigned index.
void emit_St_Tup(std::size_t tuple_id, std::size_t index, const Type *ty)
Emit a St_Tup_X instruction based on Type ty, e.g. St_Tup_i for integral Types.
void emit(const ast::Expr &expr, std::size_t tuple_id=0)
Emit operations evaluating the Expr expr.
Value & get(std::size_t idx)
Returns a reference to the Value at index idx.
Definition: Tuple.hpp:263
bool is_null(std::size_t idx) const
Returns true iff the Value at index idx is NULL.
Definition: Tuple.hpp:219
void set(std::size_t idx, Value val)
Assigns the Value val to this Tuple at index idx and clears the respective NULL bit.
Definition: Tuple.hpp:240
static Pooled< Boolean > Get_Boolean(category_t category)
Returns a Boolean type of the given category.
Definition: Type.cpp:68
static Pooled< Numeric > Get_Integer(category_t category, unsigned num_bytes)
Returns a Numeric type for integrals of given category and num_bytes bytes.
Definition: Type.cpp:94
An expression.
Definition: AST.hpp:39
A CNF represents a conjunction of cnf::Clauses.
Definition: CNF.hpp:134
A cnf::Clause represents a disjunction of Predicates.
Definition: CNF.hpp:87
A Predicate contains a Expr of Boolean type in either positive or negative form.
Definition: CNF.hpp:16
An internal node of the recursive data layout model.
Definition: DataLayout.hpp:99
Models how data is laid out in a linear address space.
Definition: DataLayout.hpp:29