mutable
A Database System for Research and Fast Prototyping
Loading...
Searching...
No Matches
Spn.cpp
Go to the documentation of this file.
1#include "Spn.hpp"
2
3#include <iomanip>
5#include <mutable/util/fn.hpp>
6#include "util/Kmeans.hpp"
7#include "util/RDC.hpp"
8
9
10using namespace m;
11using namespace Eigen;
12
13
14namespace {
15
16std::size_t MIN_INSTANCE_SLICE = 0;
17const int MAX_K = 7;
18const float RDC_THRESHOLD = 0.3f;
19
20MatrixXf normalize_minmax(const MatrixXf &data)
21{
22 const RowVectorXf mins = data.colwise().minCoeff();
23 const RowVectorXf maxs = data.colwise().maxCoeff() - mins;
24
25 MatrixXf normalized(data.rows(), data.cols());
26 for (unsigned i = 0; i != data.cols(); ++i) {
27 if (maxs.array()[i] == 0) // min == max => empty range [min, max)
28 normalized.col(i) = VectorXf::Zero(data.rows());
29 else
30 normalized.col(i) = (data.col(i).array() - mins.array()[i]) / maxs.array()[i];
31 }
32 return normalized;
33}
34
41std::pair<std::vector<SmallBitset>, std::vector<SmallBitset>>rdc_split(const MatrixXf &data, SmallBitset variables)
42{
43 const auto num_cols = data.cols();
44 AdjacencyMatrix adjacency_matrix(num_cols);
45 std::vector<MatrixXf> CDF_matrices(num_cols);
46
47 /* precompute CDF matrices */
48 for (int i = 0; i < num_cols; i++) { CDF_matrices[i] = create_CDF_matrix(data.col(i)); }
49
50 /* build a graph with edges between correlated columns (attributes) */
51 for (unsigned i = 0; i < num_cols - 1; i++) {
52 for (unsigned j = i+1; j < num_cols; j++) {
53 const float rdc_value = rdc_precomputed_CDF(CDF_matrices[i], CDF_matrices[j]);
54 /* if the rdc value is greater or equal to the threshold, consider columns dependent */
55 if (rdc_value >= RDC_THRESHOLD) {
56 adjacency_matrix(i,j) = true;
57 adjacency_matrix(j,i) = true;
58 }
59 }
60 }
61
62 SmallBitset remaining = SmallBitset::All(num_cols);
63 std::vector<SmallBitset> connected_subgraphs;
64
65 /* build the connected subgraphs(dependent subsets of attributes) */
66 while (remaining) {
67 auto next = remaining.begin();
68 const SmallBitset CSG = adjacency_matrix.reachable(next.as_set());
69 connected_subgraphs.emplace_back(CSG);
70 remaining -= CSG;
71 }
72
73 std::vector<SmallBitset> variable_candidates(connected_subgraphs.size());
74
75 std::vector<std::size_t> temp_variables;
76 temp_variables.reserve(num_cols);
77 for (auto it = variables.begin(); it != variables.end(); ++it) { temp_variables.emplace_back(*it); }
78
79 /* translate SmallBitset to correct output */
80 for (std::size_t i = 0; i < connected_subgraphs.size(); ++i) {
81 for (auto it = connected_subgraphs[i].begin(); it != connected_subgraphs[i].end(); ++it) {
82 variable_candidates[i][temp_variables[*it]] = true;
83 }
84 }
85
86 return std::make_pair(connected_subgraphs, variable_candidates);
87}
88
89}
90
91
92void Spn::Node::dump() const { dump(std::cerr); }
93void Spn::Node::dump(std::ostream &out) const { print(out, 0); }
94
95
96/*======================================================================================================================
97 * Sum Node
98 *====================================================================================================================*/
99
100std::pair<float, float> Spn::Sum::evaluate(const Filter &filter, unsigned leaf_id, EvalType eval_type) const
101{
102 float expectation_result = 0.f;
103 float likelihood_result = 0.f;
104 for (auto &child : children) {
105 auto [expectation, likelihood] = child->child->evaluate(filter, leaf_id, eval_type);
106 expectation_result += child->weight * expectation;
107 likelihood_result += child->weight * likelihood;
108 }
109
110 return { expectation_result, likelihood_result };
111}
112
113void Spn::Sum::update(VectorXf &row, SmallBitset variables, Spn::UpdateType update_type)
114{
115 /* compute nearest cluster */
116 unsigned nearest_centroid = 0;
117 std::size_t num_clusters = children.size();
118 float delta = (children[0]->centroid - row).squaredNorm();
119 for (std::size_t i = 1; i < num_clusters; i++) {
120 float next_delta = (children[i]->centroid - row).squaredNorm();
121 if (next_delta < delta) {
122 delta = next_delta;
123 nearest_centroid = i;
124 }
125 }
126
127 /* adjust weights of the sum nodes */
128 children[nearest_centroid]->child->num_rows++;
129 num_rows++;
130
131 for (std::size_t i = 0; i < num_clusters; i++) {
132 children[i]->weight = children[i]->child->num_rows / float(num_rows);
133 }
134
135 children[nearest_centroid]->child->update(row, variables, update_type);
136}
137
138std::size_t Spn::Sum::estimate_number_distinct_values(unsigned id) const
139{
140 std::size_t result = 0;
141 for (auto &child : children) {
142 result += child->child->estimate_number_distinct_values(id);
143 }
144
145 return std::min(result, num_rows);
146}
147
148void Spn::Sum::print(std::ostream &out, std::size_t num_tabs) const
149{
150 for (std::size_t i = 0; i < children.size(); i++) {
151 for (std::size_t n = 0; n < num_tabs; n++) { out << "\t"; }
152 if (i != 0) { out << "+ "; }
153 out << children[i]->weight << "\n";
154 children[i]->child->print(out, num_tabs+1);
155 }
156}
157
158
159/*======================================================================================================================
160 * Product Node
161 *====================================================================================================================*/
162
163std::pair<float, float> Spn::Product::evaluate(const Filter &filter, unsigned leaf_id, EvalType eval_type) const
164{
165 float expectation_result = 1.f;
166 float likelihood_result = 1.f;
167 for (auto &child : children) {
168 for (const auto & it : filter) {
169 if (child->variables[it.first]) {
170 auto [expectation, likelihood] = child->child->evaluate(filter, it.first, eval_type);
171 expectation_result *= expectation;
172 likelihood_result *= likelihood;
173 break;
174 }
175 }
176 }
177 return {expectation_result, likelihood_result };
178}
179
180void Spn::Product::update(VectorXf &row, SmallBitset variables, UpdateType update_type)
181{
182 std::unordered_map<unsigned, unsigned> variable_to_index;
183 unsigned index = 0;
184 for (auto it = variables.begin(); it != variables.end(); ++it) { variable_to_index.emplace(*it, index++); }
185
186 /* update each child of the product node with the according subset of attributes of the row */
187 for (auto &child : children) {
188 std::size_t num_cols = child->variables.size();
189 VectorXf proj_row(num_cols);
190 auto it = child->variables.begin();
191 for (std::size_t i = 0; i < num_cols; ++i) {
192 proj_row(i) = row(variable_to_index[*it]);
193 ++it;
194 }
195 child->child->update(proj_row, child->variables, update_type);
196 }
197}
198
200{
201 for (auto &child : children) {
202 if (child->variables[id]) {
203 return child->child->estimate_number_distinct_values(id);
204 }
205 }
206
207 return 0;
208}
209
210void Spn::Product::print(std::ostream &out, std::size_t num_tabs) const
211{
212 for (std::size_t i = 0; i < children.size(); i++) {
213 for (std::size_t n = 0; n < num_tabs; n++) { out << "\t"; }
214 if (i != 0) { out << "* "; }
215 out << "variable scope=(";
216 for (auto it = children[i]->variables.begin(); it != children[i]->variables.end(); ++it) { out << *it; }
217 out << "):" << "\n";
218 children[i]->child->print(out, num_tabs+1);
219 }
220}
221
222
223/*======================================================================================================================
224 * DiscreteLeaf Node
225 *====================================================================================================================*/
226
227std::pair<float, float> Spn::DiscreteLeaf::evaluate(const Filter &filter, unsigned leaf_id, EvalType eval_type) const
228{
229 auto [spn_operator, value] = filter.at(leaf_id);
230
231 if (spn_operator == IS_NULL) { return { null_probability, null_probability }; }
232
233 if (bins.empty()) { return { 0.f, 0.f }; }
234
235 if (spn_operator == EXPECTATION) {
236 float expectation = bins[0].cumulative_probability * bins[0].value;
237 float prev_probability = bins[0].cumulative_probability;
238 for (std::size_t bin_id = 1; bin_id < bins.size(); bin_id++) {
239 float cumulative_probability = bins[bin_id].cumulative_probability;
240 float probability = cumulative_probability - prev_probability;
241 expectation += probability * bins[bin_id].value;
242 prev_probability = cumulative_probability;
243 }
244 return {expectation, 1.f };
245 }
246 /* probability of last bin, because the cumulative probability of the last bin is not always 1 because of NULL */
247 float last_prob = std::prev(bins.end())->cumulative_probability;
248 float probability = 0.f;
249
250 if (spn_operator == SpnOperator::EQUAL) {
251 if (bins.begin()->value > value or (std::prev(bins.end()))->value < value) { return { 0.f, 0.f }; }
252 auto lower_bound = std::lower_bound(bins.begin(), bins.end(), value);
253 if (lower_bound->value == value) {
254 probability = lower_bound->cumulative_probability;
255 if (lower_bound != bins.begin()) { probability -= (--lower_bound)->cumulative_probability; }
256 }
257 return { probability, probability };
258 }
259
260 if (spn_operator == SpnOperator::LESS) {
261 if (bins.begin()->value >= value) { return { 0.f, 0.f }; }
262 if ((std::prev(bins.end()))->value < value) { return { last_prob, last_prob }; }
263 auto lower_bound = std::lower_bound(bins.begin(), bins.end(), value);
264 probability = (--lower_bound)->cumulative_probability;
265 return { probability, probability };
266 }
267
268 if (spn_operator == SpnOperator::LESS_EQUAL) {
269 if (bins.begin()->value > value) { return { 0.f, 0.f }; }
270 if ((std::prev(bins.end()))->value <= value) { return { last_prob, last_prob }; }
271 auto upper_bound = std::upper_bound(bins.begin(), bins.end(), value,
272 [](float bin_value, DiscreteLeaf::Bin bin) { return bin_value < bin.value; }
273 );
274 probability = (--upper_bound)->cumulative_probability;
275 return { probability, probability };
276 }
277
278 if (spn_operator == SpnOperator::GREATER) {
279 if (bins.begin()->value > value) { return { last_prob, last_prob }; }
280 if ((std::prev(bins.end()))->value <= value) { return { 0.f, 0.f }; }
281 auto upper_bound = std::upper_bound(bins.begin(), bins.end(), value,
282 [](float bin_value, DiscreteLeaf::Bin bin) { return bin_value < bin.value; }
283 );
284 probability = last_prob - (--upper_bound)->cumulative_probability;
285 return { probability, probability };
286 }
287
288 if (spn_operator == SpnOperator::GREATER_EQUAL) {
289 if (bins.begin()->value >= value) { return { last_prob, last_prob }; }
290 if ((std::prev(bins.end()))->value < value) { return { 0.f, 0.f }; }
291 auto lower_bound = std::lower_bound(bins.begin(), bins.end(), value);
292 probability = last_prob - (--lower_bound)->cumulative_probability;
293 return { probability, probability };
294 }
295
296 return { 0.f, 0.f };
297}
298
299void Spn::DiscreteLeaf::update(VectorXf &row, SmallBitset variables, Spn::UpdateType update_type)
300{
301 const float value = row(0);
302
303 if (bins.empty()) {
304 if (update_type == INSERT) {
305 bins.emplace_back(value, 1.f/(num_rows + 1));
306 null_probability = (null_probability * num_rows) / (num_rows + 1);
307 num_rows++;
308 }
309 return;
310 }
311
312 /* copy bins with the actual number of values in a bin */
313 std::vector<Bin> updated_bins;
314 updated_bins.reserve(bins.size());
315 updated_bins.emplace_back(bins[0].value, bins[0].cumulative_probability * num_rows);
316 for (std::size_t i = 1; i < bins.size(); i++) {
317 updated_bins.emplace_back(
318 bins[i].value,
319 (bins[i].cumulative_probability - bins[i-1].cumulative_probability) * num_rows
320 );
321 }
322
323 /* insert the update value into the correct bin or create a new one if the bin does not exist */
324 if (update_type == INSERT) {
325 auto lower_bound = std::lower_bound(updated_bins.begin(), updated_bins.end(), value);
326 if (lower_bound == updated_bins.end()) { updated_bins.emplace_back(value, 1.f); }
327 else if (lower_bound->value != value) { updated_bins.emplace(lower_bound, value, 1.f); }
328 else { lower_bound->cumulative_probability += 1; }
329 num_rows++;
330 }
331 /* delete the update value from the correct bin */
332 else {
333 auto lower_bound = std::lower_bound(updated_bins.begin(), updated_bins.end(), value);
334 if (lower_bound->value != value) { return; }
335 lower_bound->cumulative_probability -= 1;
336 num_rows--;
337 }
338
339 /* calculate the cumulative probability */
340 updated_bins[0].cumulative_probability /= float(num_rows);
341 for (std::size_t i = 1; i < updated_bins.size(); i++) {
342 updated_bins[i].cumulative_probability /= float(num_rows);
343 updated_bins[i].cumulative_probability += updated_bins[i-1].cumulative_probability;
344 }
345
346 bins = std::move(updated_bins);
347}
348
350{
351 return bins.size();
352}
353
354void Spn::DiscreteLeaf::print(std::ostream &out, std::size_t num_tabs) const
355{
356 for (std::size_t n = 0; n < num_tabs; n++) { out << "\t"; }
357 out << "[";
358 if (not bins.empty()) {
359 out << bins[0].value << ":" << bins[0].cumulative_probability;
360 for (std::size_t i = 1; i < bins.size(); i++) {
361 out << ", " << bins[i].value << ":" << bins[i].cumulative_probability - bins[i-1].cumulative_probability;
362 }
363 out << " ";
364 }
365 out << "null_prob:" << null_probability;
366 out << "]";
367 out << "\n";
368}
369
370
371/*======================================================================================================================
372 * ContinuousLeaf Node
373 *====================================================================================================================*/
374
375std::pair<float, float> Spn::ContinuousLeaf::evaluate(const Filter &filter, unsigned leaf_id, EvalType eval_type) const
376{
377 auto [spn_operator, value] = filter.at(leaf_id);
378 float probability = 0.f;
379 if (spn_operator == IS_NULL) { return { null_probability, null_probability }; }
380 if (bins.empty()) { return { 0.f, 0.f }; }
381
382 if (spn_operator == SpnOperator::EXPECTATION) {
383 float expectation = lower_bound * lower_bound_probability;
384 expectation += bins[0].cumulative_probability * ((bins[0].upper_bound + lower_bound) / 2.f);
385 float prev_probability = bins[0].cumulative_probability;
386 float prev_upper_bound = bins[0].upper_bound;
387 for (std::size_t bin_id = 1; bin_id < bins.size(); bin_id++) {
388 float upper_bound = bins[bin_id].upper_bound;
389 float cumulative_probability = bins[bin_id].cumulative_probability;
390 float current_probability = cumulative_probability - prev_probability;
391 expectation += current_probability * ((prev_upper_bound + upper_bound) / 2);
392 prev_probability = cumulative_probability;
393 prev_upper_bound = upper_bound;
394 }
395 return std::make_pair(expectation, 1.f);
396 }
397
398 if (spn_operator == SpnOperator::EQUAL) {
399 if (lower_bound > value or (std::prev(bins.end()))->upper_bound < value) { return { 0.f, 0.f }; }
400 if (lower_bound == value) { return { lower_bound_probability, lower_bound_probability }; }
401 if (value <= bins[0].upper_bound) {
402 probability = bins[0].cumulative_probability - lower_bound_probability;
403 return { probability, probability };
404 }
405 auto std_lower_bound = std::lower_bound(bins.begin(), bins.end(), value);
406 probability = std_lower_bound->cumulative_probability - (--std_lower_bound)->cumulative_probability;
407 return { probability, probability };
408 }
409
410 if (spn_operator == LESS or spn_operator == LESS_EQUAL) {
411 if (lower_bound >= value) {
412 if (lower_bound == value and spn_operator == LESS_EQUAL) {
413 return { lower_bound_probability, lower_bound_probability };
414 }
415 return { 0.f, 0.f };
416 }
417 if ((std::prev(bins.end()))->upper_bound <= value) {
418 probability = std::prev(bins.end())->cumulative_probability;
419 return { probability, probability };
420 }
421 if (value <= bins[0].upper_bound) {
422 float prob = bins[0].cumulative_probability - lower_bound_probability;
423 probability = lower_bound_probability +
424 (prob * ((value - lower_bound) / (bins[0].upper_bound - lower_bound)));
425 return { probability, probability };
426 }
427 auto std_lower_bound = std::lower_bound(bins.begin(), bins.end(), value);
428 auto &bin = *std_lower_bound;
429 auto &prev_bin = *(--std_lower_bound);
430 float bin_probability = bin.cumulative_probability - prev_bin.cumulative_probability;
431 float percentile = (value - prev_bin.upper_bound) / (bin.upper_bound - prev_bin.upper_bound);
432 switch (eval_type) {
433 case APPROXIMATE:
434 probability = prev_bin.cumulative_probability + (bin_probability * percentile);
435 case UPPER_BOUND:
436 probability = bin.cumulative_probability;
437 case LOWER_BOUND:
438 probability = prev_bin.cumulative_probability;
439 }
440 return { probability, probability };
441 }
442
443 if (spn_operator == GREATER_EQUAL or spn_operator == GREATER) {
444 float last_prob = std::prev(bins.end())->cumulative_probability;
445 if (lower_bound >= value) {
446 if (lower_bound == value and spn_operator == GREATER) {
447 probability = last_prob - lower_bound_probability;
448 return { probability, probability };
449 }
450 return { last_prob, last_prob };
451 }
452 if ((std::prev(bins.end()))->upper_bound <= value) { return { 0.f, 0.f }; }
453 if (value <= bins[0].upper_bound) {
454 float prob = bins[0].cumulative_probability - lower_bound_probability;
455 float split_bin_prob = (prob * (1.f - ((value - lower_bound) / (bins[0].upper_bound - lower_bound))));
456 probability = last_prob - bins[0].cumulative_probability + split_bin_prob;
457 return { probability, probability };
458 }
459 auto std_lower_bound = std::lower_bound(bins.begin(), bins.end(), value);
460 auto &bin = *std_lower_bound;
461 auto &prev_bin = *(--std_lower_bound);
462 float bin_probability = bin.cumulative_probability - prev_bin.cumulative_probability;
463 float percentile = 1.f - ((value - prev_bin.upper_bound) / (bin.upper_bound - prev_bin.upper_bound));
464 switch (eval_type) {
465 case APPROXIMATE:
466 probability = last_prob - bin.cumulative_probability + (bin_probability * percentile);
467 case UPPER_BOUND:
468 probability = last_prob - prev_bin.cumulative_probability;
469 case LOWER_BOUND:
470 probability = last_prob - bin.cumulative_probability;
471 }
472 return { probability, probability };
473 }
474
475 return { 0.f, 0.f };
476}
477
478void Spn::ContinuousLeaf::update(VectorXf &row, SmallBitset variables, Spn::UpdateType update_type)
479{
480 const float value = row(0);
481
482 if (bins.empty()) {
483 if (update_type == INSERT) {
484 bins.emplace_back(value, 1.f/(num_rows + 1));
485 null_probability = (null_probability * num_rows) / (num_rows + 1);
486 num_rows++;
487 }
488 return;
489 }
490
491 /* copy bins with the actual number of values in a bin */
492 std::vector<Bin> updated_bins;
493 float updated_lower_bound_prob = lower_bound_probability * num_rows;
494 updated_bins.reserve(bins.size());
495 updated_bins.emplace_back(
496 bins[0].upper_bound,
497 (bins[0].cumulative_probability - lower_bound_probability) * num_rows
498 );
499 for (std::size_t i = 1; i < bins.size(); i++) {
500 updated_bins.emplace_back(
501 bins[i].upper_bound,
502 (bins[i].cumulative_probability - bins[i-1].cumulative_probability) * num_rows
503 );
504 }
505
506 /* insert the update value into the correct bin or create a new one if the bin does not exist */
507 if (update_type == INSERT) {
508 if (value == lower_bound) {
509 updated_lower_bound_prob += 1;
510 } else if (value < lower_bound) {
511 updated_bins[0].cumulative_probability += updated_lower_bound_prob;
512 lower_bound = value;
513 updated_lower_bound_prob = 1.f;
514 } else if (value > updated_bins[updated_bins.size() - 1].upper_bound) {
515 updated_bins.emplace_back(value, 1.f);
516 } else {
517 auto std_lower_bound = std::lower_bound(updated_bins.begin(), updated_bins.end(), value);
518 std_lower_bound->cumulative_probability += 1;
519 }
520 num_rows++;
521 }
522 /* delete the update value from the correct bin */
523 else {
524 if (value < lower_bound or value > updated_bins[updated_bins.size() - 1].upper_bound) { return; }
525 if (value == lower_bound) {
526 updated_lower_bound_prob += 1;
527 } else {
528 auto std_lower_bound = std::lower_bound(updated_bins.begin(), updated_bins.end(), value);
529 if (std_lower_bound->cumulative_probability == 0) { return; }
530 std_lower_bound->cumulative_probability -= 1;
531 }
532 num_rows--;
533 }
534
535 /* calculate the cumulative probability */
536 updated_lower_bound_prob /= float(num_rows);
537 updated_bins[0].cumulative_probability /= float(num_rows);
538 updated_bins[0].cumulative_probability += updated_lower_bound_prob;
539 for (std::size_t i = 1; i < updated_bins.size(); i++) {
540 updated_bins[i].cumulative_probability /= float(num_rows);
541 updated_bins[i].cumulative_probability += updated_bins[i-1].cumulative_probability;
542 }
543 lower_bound_probability = updated_lower_bound_prob;
544 bins = std::move(updated_bins);
545}
546
548{
549 return num_rows;
550}
551
552void Spn::ContinuousLeaf::print(std::ostream &out, std::size_t num_tabs) const
553{
554 for (std::size_t n = 0; n < num_tabs; n++) { out << "\t"; }
555 out << "[";
556 if (not bins.empty()) {
557 out << lower_bound_probability << ": ";
558 out << lower_bound;
559 out << " :" << bins[0].cumulative_probability - lower_bound_probability << ": " << bins[0].upper_bound;
560 for (std::size_t i = 1; i < bins.size(); i++) {
561 out << " :" << bins[i].cumulative_probability - bins[i-1].cumulative_probability;
562 out << ": " << bins[i].upper_bound;
563 }
564 out << " ";
565 }
566 out << "null_prob:" << null_probability;
567 out << "]";
568 out << "\n";
569}
570
571/*======================================================================================================================
572 * Spn
573 *====================================================================================================================*/
574
575/*----- Learning helper methods --------------------------------------------------------------------------------------*/
576
577std::unique_ptr<Spn::Product> Spn::create_product_min_slice(LearningData &ld)
578{
579 std::vector<std::unique_ptr<Product::ChildWithVariables>> children;
580 auto variable_it = ld.variables.begin();
581 for (auto i = 0; i < ld.data.cols(); i++) {
582 const MatrixXf &data = ld.data.col(i);
583 const MatrixXf &normalized = ld.normalized.col(i);
584 const MatrixXi &null_matrix = ld.null_matrix.col(i);
585 SmallBitset variables(variable_it.as_set());
586 ++variable_it;
587 std::vector<LeafType> split_leaf_types{ld.leaf_types[i]};
588 LearningData split_data(
589 data,
590 normalized,
591 null_matrix,
592 variables,
593 split_leaf_types
594 );
595 auto child = std::make_unique<Product::ChildWithVariables>(learn_node(split_data), variables);
596 children.push_back(std::move(child));
597 }
598 return std::make_unique<Product>(std::move(children), ld.data.rows());
599}
600
601
602std::unique_ptr<Spn::Product> Spn::create_product_rdc(
603 LearningData &ld,
604 std::vector<SmallBitset> &column_candidates,
605 std::vector<SmallBitset> &variable_candidates
606)
607{
608 std::vector<std::unique_ptr<Product::ChildWithVariables>> children;
609 for (std::size_t current_split = 0; current_split < column_candidates.size(); current_split++) {
610 std::size_t split_size = column_candidates[current_split].size();
611 std::vector<LeafType> split_leaf_types;
612 split_leaf_types.reserve(split_size);
613 std::vector<unsigned> column_index;
614 column_index.reserve(split_size);
615 for (auto it = column_candidates[current_split].begin(); it != column_candidates[current_split].end(); ++it) {
616 split_leaf_types.push_back(ld.leaf_types[*it]);
617 column_index.push_back(*it);
618 }
619
620 const MatrixXf &data = ld.data(all, column_index);
621 const MatrixXf &normalized = ld.normalized(all, column_index);
622 const MatrixXi &null_matrix = ld.null_matrix(all, column_index);
623 LearningData split_data(data, normalized, null_matrix, variable_candidates[current_split], split_leaf_types);
624 auto child = std::make_unique<Product::ChildWithVariables>(
625 learn_node(split_data),
626 variable_candidates[current_split]
627 );
628 children.push_back(std::move(child));
629 }
630 return std::make_unique<Product>(std::move(children), ld.data.rows());
631}
632
633std::unique_ptr<Spn::Sum> Spn::create_sum(Spn::LearningData &ld)
634{
635 const auto num_rows = ld.data.rows();
636
637 int k = 2;
638
639 unsigned prev_num_split_nodes = 0;
640 std::vector<std::vector<SmallBitset>> prev_cluster_column_candidates;
641 std::vector<std::vector<SmallBitset>> prev_cluster_variable_candidates;
642 std::vector<std::vector<unsigned>> prev_cluster_row_ids;
643 MatrixRXf prev_centroids;
644
645 /* increment k of Kmeans until we get a good clustering according to RDC splits in the clusters */
646 while (true) {
647 unsigned num_split_nodes = 0;
648
649 auto [labels, centroids] = kmeans_with_centroids(ld.normalized, k);
650
651 std::vector<std::vector<SmallBitset>> cluster_column_candidates(k);
652 std::vector<std::vector<SmallBitset>> cluster_variable_candidates(k);
653 std::vector<std::vector<unsigned>> cluster_row_ids(k);
654
655 for (unsigned current_row = 0; current_row < num_rows; current_row++) {
656 cluster_row_ids[labels[current_row]].push_back(current_row);
657 }
658
659 /* if only one cluster, split this cluster into k clusters */
660 bool only_one_cluster = not cluster_row_ids[0].empty();
661 for (int i = 1; i < k; i++)
662 only_one_cluster = only_one_cluster and cluster_row_ids[i].empty();
663
664 if (only_one_cluster) {
665 const std::size_t subvector_size = cluster_row_ids[0].size() / k;
666 std::vector<std::vector<unsigned>> new_cluster_row_ids(k);
667 for (std::size_t cluster_id = 0; cluster_id < k - 1; cluster_id++) {
668 std::vector<unsigned> subvector(
669 cluster_row_ids[0].begin() + (cluster_id * subvector_size),
670 cluster_row_ids[0].begin() + ((cluster_id + 1) * subvector_size)
671 );
672 new_cluster_row_ids[cluster_id] = std::move(subvector);
673 }
674 std::vector<unsigned> last_subvector(
675 cluster_row_ids[0].begin() + ((k - 1) * subvector_size),
676 cluster_row_ids[0].end()
677 );
678 new_cluster_row_ids[k - 1] = std::move(last_subvector);
679
680 cluster_row_ids = std::move(new_cluster_row_ids);
681 }
682
683 /* check the splitting of attributes in each cluster */
684 for (unsigned label_id = 0; label_id < k; label_id++) {
685 std::size_t cluster_size = cluster_row_ids[label_id].size();
686 if (cluster_size == 0) { continue; }
687
688 const MatrixXf &data = ld.data(cluster_row_ids[label_id], all);
689
690 if (cluster_size <= MIN_INSTANCE_SLICE) {
691 num_split_nodes++;
692 cluster_column_candidates[label_id] = std::vector<SmallBitset>();
693 cluster_variable_candidates[label_id] = std::vector<SmallBitset>();
694 } else {
695 auto [current_column_candidates, current_variable_candidates] = rdc_split(data, ld.variables);
696 if (current_column_candidates.size() > 1) { num_split_nodes++; }
697 cluster_column_candidates[label_id] = std::move(current_column_candidates);
698 cluster_variable_candidates[label_id] = std::move(current_variable_candidates);
699 }
700 }
701
702 /* if the number of split attributes does not increase or if there is a split in each cluster, build sum node */
703 if (
704 ((num_split_nodes <= prev_num_split_nodes or prev_num_split_nodes == prev_cluster_row_ids.size())
705 and prev_num_split_nodes != 0) or k >= MAX_K
706 ) {
707 std::vector<std::unique_ptr<Sum::ChildWithWeight>> children;
708 for (std::size_t cluster_id = 0; cluster_id < k - 1; cluster_id++) {
709 const MatrixXf &data = ld.data(prev_cluster_row_ids[cluster_id], all);
710 const MatrixXf &normalized = ld.normalized(prev_cluster_row_ids[cluster_id], all);
711 const MatrixXi &null_matrix = ld.null_matrix(prev_cluster_row_ids[cluster_id], all);
712 LearningData cluster_data(data, normalized, null_matrix, ld.variables, ld.leaf_types);
713 const float weight = float(data.rows())/float(num_rows);
714 std::size_t cluster_vertical_partitions = prev_cluster_column_candidates[cluster_id].size();
715
716 /* since we already determined the node type of each cluster (child of the sum node), we can
717 * directly build the children of the sum node */
718 std::unique_ptr<Node> child_node;
719 if (cluster_vertical_partitions == 0) {
720 child_node = create_product_min_slice(cluster_data);
721 } else if (cluster_vertical_partitions == 1) {
722 child_node = create_sum(cluster_data);
723 } else {
724 child_node = create_product_rdc(
725 cluster_data,
726 prev_cluster_column_candidates[cluster_id],
727 prev_cluster_variable_candidates[cluster_id]
728 );
729 }
730 std::unique_ptr<Sum::ChildWithWeight> child = std::make_unique<Sum::ChildWithWeight>(
731 std::move(child_node),
732 weight,
733 prev_centroids.row(cluster_id)
734 );
735
736 children.push_back(std::move(child));
737 }
738
739 return std::make_unique<Sum>(std::move(children), num_rows);
740 }
741 prev_num_split_nodes = num_split_nodes;
742 prev_cluster_column_candidates = std::move(cluster_column_candidates);
743 prev_cluster_variable_candidates = std::move(cluster_variable_candidates);
744 prev_cluster_row_ids = std::move(cluster_row_ids);
745 prev_centroids = centroids;
746 k++;
747 }
748}
749
750
751std::unique_ptr<Spn::Node> Spn::learn_node(LearningData &ld)
752{
753 const auto num_rows = ld.data.rows();
754 const auto num_cols = ld.data.cols();
755
756 /* build leaf */
757 if (num_cols == 1) {
758 if (ld.leaf_types[0] == DISCRETE) {
759 std::vector<DiscreteLeaf::Bin> bins;
760 /* If every value is NULL */
761 if (ld.null_matrix.minCoeff() == 1) {
762 return std::make_unique<DiscreteLeaf>(std::move(bins), 1.f, num_rows);
763 }
764 unsigned null_counter = 0;
765 for (auto i = 0; i < num_rows; i++) {
766 if (ld.null_matrix(i, 0) == 1) {
767 null_counter++;
768 continue;
769 }
770 const auto current_value = ld.data(i, 0);
771 /* insert value into sorted vector of bins */
772 auto lower_bound = std::lower_bound(bins.begin(), bins.end(), current_value);
773 if (lower_bound == bins.end()) { bins.emplace_back(current_value, 1.f); }
774 /* if bin already exists, increment counter */
775 else if (lower_bound->value == current_value) { lower_bound->cumulative_probability++; }
776 else { bins.emplace(lower_bound, current_value, 1.f); }
777 }
778
779 bins[0].cumulative_probability /= float(num_rows);
780 for (std::size_t i = 1; i < bins.size(); i++) {
781 bins[i].cumulative_probability /= float(num_rows);
782 bins[i].cumulative_probability += bins[i-1].cumulative_probability;
783 }
784 return std::make_unique<DiscreteLeaf>(std::move(bins), null_counter / float(num_rows), num_rows);
785 } else {
786 std::vector<ContinuousLeaf::Bin> bins;
787 /* If every value is NULL */
788 if (ld.null_matrix.minCoeff() == 1) {
789 return std::make_unique<ContinuousLeaf>(std::move(bins), 0.f, 0.f, 1.f, num_rows);
790 }
791 const float max = ld.data.maxCoeff();
792 const float min = ld.data.minCoeff();
793 const auto num_bins = 1 + log2_ceil(std::size_t(num_rows));
794 const float bin_width = (max - min) / float(num_bins);
795 const float lower_bound = min;
796 unsigned lower_bound_counter = 0.f;
797 unsigned null_counter = 0.f;
798
799 /* initialise bins */
800 bins.reserve(num_bins);
801 for(std::size_t bin_id = 0; bin_id < num_bins; bin_id++) {
802 bins.emplace_back(min + (float(bin_id + 1) * bin_width), 0.f);
803 }
804
805 /* fill the values into the existing bins */
806 for (unsigned i = 0; i < num_rows; i++) {
807 if (ld.null_matrix(i, 0) == 1) {
808 null_counter++;
809 continue;
810 }
811 const auto current_value = ld.data(i, 0);
812 if (current_value == lower_bound) {
813 lower_bound_counter++;
814 continue;
815 }
816 auto std_lower_bound = std::lower_bound(bins.begin(), bins.end(), current_value);
817 std_lower_bound->cumulative_probability++;
818 }
819
820 float lower_bound_probability = lower_bound_counter / float(num_rows);
821 bins[0].cumulative_probability /= float(num_rows);
822 bins[0].cumulative_probability += lower_bound_probability;
823
824 for (std::size_t i = 1; i < bins.size(); i++) {
825 bins[i].cumulative_probability /= float(num_rows);
826 bins[i].cumulative_probability += bins[i-1].cumulative_probability;
827 }
828 return std::make_unique<ContinuousLeaf>(
829 std::move(bins),
831 lower_bound_probability,
832 null_counter / float(num_rows),
834 );
835 }
836 }
837
838 /* build product node with the minimum instance slice */
839 if (num_rows <= MIN_INSTANCE_SLICE) { return create_product_min_slice(ld); }
840
841 /* build product node with the RDC algorithm */
842 auto [column_candidates, variable_candidates] = rdc_split(ld.data, ld.variables);
843 if (column_candidates.size() != 1) { return create_product_rdc(ld, column_candidates, variable_candidates); }
844
845 /* build sum node */
846 else { return create_sum(ld); }
847}
848
849/*----- Learning -----------------------------------------------------------------------------------------------------*/
850
851Spn Spn::learn_spn(Eigen::MatrixXf &data, Eigen::MatrixXi &null_matrix, std::vector<LeafType> &leaf_types)
852{
853 std::size_t num_rows = data.rows();
854 MIN_INSTANCE_SLICE = std::max<std::size_t>((0.1 * num_rows), 1);
855
856 if (num_rows == 0) {
857 std::vector<DiscreteLeaf::Bin> bins;
858 return Spn(0, std::make_unique<DiscreteLeaf>(std::move(bins), 0, 0));
859 }
860
861 /* replace NULL in the data matrix with the mean of the attribute */
862 for (std::size_t col_id = 0; col_id < data.cols(); col_id++) {
863 if (null_matrix.col(col_id).maxCoeff() == 0) { continue; } // there is no NULL
864 if (null_matrix.col(col_id).minCoeff() == 1) { continue; } // there is only NULL
865 float mean = 0;
866 int num_not_null = 0;
867 for (std::size_t row_id = 0; row_id < data.rows(); row_id++) {
868 if (null_matrix(row_id, col_id) == 1) { continue; }
869 mean += (data(row_id, col_id) - mean) / ++num_not_null; // iterative mean
870 }
871 for (std::size_t row_id = 0; row_id < data.rows(); row_id++) {
872 if (null_matrix(row_id, col_id) == 1) { data(row_id, col_id) = mean; }
873 }
874 }
875
876 SmallBitset variables = SmallBitset::All(data.cols());
877
878 auto normalized = normalize_minmax(data);
879 LearningData ld(
880 data,
881 normalized,
882 null_matrix,
883 variables,
884 std::move(leaf_types)
885 );
886
887 return Spn(num_rows, learn_node(ld));
888}
889
890/*----- Inference ----------------------------------------------------------------------------------------------------*/
891
892void Spn::update(VectorXf &row, UpdateType update_type)
893{
894 SmallBitset variables((1 << row.size()) - 1);
895 root_->update(row, variables, update_type);
896}
897
898float Spn::likelihood(const Filter &filter) const
899{
900 return root_->evaluate(filter, filter.begin()->first, APPROXIMATE).second;
901}
902
903float Spn::upper_bound(const Filter &filter) const
904{
905 return root_->evaluate(filter, filter.begin()->first, UPPER_BOUND).second;
906}
907
908float Spn::lower_bound(const Filter &filter) const
909{
910 return root_->evaluate(filter, filter.begin()->first, LOWER_BOUND).second;
911}
912
913float Spn::expectation(unsigned attribute_id, const Filter &filter) const
914{
915 auto filter_copy = filter;
916 filter_copy.emplace(attribute_id, std::make_pair(EXPECTATION, 0.f));
917
918 auto [cond_expectation, likelihood] = root_->evaluate(filter_copy, filter_copy.begin()->first, APPROXIMATE);
919 float llh = 1.f;
920 if (!filter.empty()) {
921 if (likelihood == 0) { return 0; }
922 llh = likelihood;
923 }
924
925 return cond_expectation / llh;
926}
927
928void Spn::update_row(VectorXf &old_row, VectorXf &updated_row)
929{
930 delete_row(old_row);
931 insert_row(updated_row);
932}
933
934void Spn::insert_row(VectorXf &row)
935{
936 update(row, INSERT);
937 num_rows_++;
938}
939
940void Spn::delete_row(VectorXf &row)
941{
942 update(row, DELETE);
943 num_rows_--;
944}
945
946std::size_t Spn::estimate_number_distinct_values(unsigned attribute_id) const
947{
948 return root_->estimate_number_distinct_values(attribute_id);
949}
950
951void Spn::dump() const { dump(std::cerr); }
952
953void Spn::dump(std::ostream &out) const
954{
955 out << "\n";
956 root_->dump(out);
957}
‍mutable namespace
Definition: Backend.hpp:10
Eigen::MatrixXf create_CDF_matrix(const Eigen::MatrixXf &data)
Creates a new matrix, mapping every cell of data to its position according to the cell's column's CDF...
std::pair< std::vector< unsigned >, MatrixRXf > M_EXPORT kmeans_with_centroids(const Eigen::MatrixXf &data, unsigned k)
Clusters the given data according to the k-means algorithm.
float rdc_precomputed_CDF(Eigen::MatrixXf &CDFs_of_X, Eigen::MatrixXf &CDFs_of_Y, unsigned k=5, float stddev=1.f/6.f, URBG &&g=URBG())
Compute the RDC value without having to compute CDF matrices to avoid sorting the same data multiple ...
and
Definition: enum_ops.hpp:12
Eigen::Matrix< float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor > MatrixRXf
Definition: Kmeans.hpp:10
An adjacency matrix for a given query graph.
Implements a small and efficient set over integers in the range of 0 to 63 (including).
Definition: ADT.hpp:26
static SmallBitset All(std::size_t n)
Factory method for creating a SmallBitset with first n bits set.
Definition: ADT.hpp:117
auto end() const
Definition: ADT.hpp:175
auto begin() const
Definition: ADT.hpp:173
std::size_t estimate_number_distinct_values(unsigned id) const override
Definition: Spn.cpp:547
void print(std::ostream &out, std::size_t num_tabs) const override
Definition: Spn.cpp:552
std::pair< float, float > evaluate(const Filter &filter, unsigned leaf_id, EvalType eval_type) const override
Evaluate the SPN bottom up with a filter condition.
Definition: Spn.cpp:375
void update(Eigen::VectorXf &row, SmallBitset variables, UpdateType update_type) override
Definition: Spn.cpp:478
void update(Eigen::VectorXf &row, SmallBitset variables, UpdateType update_type) override
Definition: Spn.cpp:299
void print(std::ostream &out, std::size_t num_tabs) const override
Definition: Spn.cpp:354
std::pair< float, float > evaluate(const Filter &bin_value, unsigned leaf_id, EvalType eval_type) const override
Evaluate the SPN bottom up with a filter condition.
Definition: Spn.cpp:227
std::size_t estimate_number_distinct_values(unsigned id) const override
Definition: Spn.cpp:349
const Eigen::MatrixXf & data
Definition: Spn.hpp:53
const Eigen::MatrixXf & normalized
Definition: Spn.hpp:54
std::vector< LeafType > leaf_types
Definition: Spn.hpp:57
SmallBitset variables
Definition: Spn.hpp:56
const Eigen::MatrixXi & null_matrix
Definition: Spn.hpp:55
void dump() const
Definition: Spn.cpp:92
std::pair< float, float > evaluate(const Filter &filter, unsigned leaf_id, EvalType eval_type) const override
Evaluate the SPN bottom up with a filter condition.
Definition: Spn.cpp:163
void print(std::ostream &out, std::size_t num_tabs) const override
Definition: Spn.cpp:210
void update(Eigen::VectorXf &row, SmallBitset variables, UpdateType update_type) override
Definition: Spn.cpp:180
std::size_t estimate_number_distinct_values(unsigned id) const override
Definition: Spn.cpp:199
void update(Eigen::VectorXf &row, SmallBitset variables, UpdateType update_type) override
Definition: Spn.cpp:113
std::size_t estimate_number_distinct_values(unsigned id) const override
Definition: Spn.cpp:138
void print(std::ostream &out, std::size_t num_tabs) const override
Definition: Spn.cpp:148
std::pair< float, float > evaluate(const Filter &filter, unsigned leaf_id, EvalType eval_type) const override
Evaluate the SPN bottom up with a filter condition.
Definition: Spn.cpp:100
Tree structure for Sum Product Networks.
Definition: Spn.hpp:20
static Spn learn_spn(Eigen::MatrixXf &data, Eigen::MatrixXi &null_matrix, std::vector< LeafType > &leaf_types)
Learn an SPN over the given data.
Definition: Spn.cpp:851
void dump() const
Definition: Spn.cpp:951
float lower_bound(const Filter &filter) const
Compute the lower bound probability for continuous domains.
Definition: Spn.cpp:908
EvalType
Definition: Spn.hpp:37
@ APPROXIMATE
Definition: Spn.hpp:38
@ LOWER_BOUND
Definition: Spn.hpp:40
@ UPPER_BOUND
Definition: Spn.hpp:39
void update_row(Eigen::VectorXf &old_row, Eigen::VectorXf &updated_row)
Update the SPN with the given row.
Definition: Spn.cpp:928
float likelihood(const Filter &filter) const
Compute the likelihood of the given filter predicates given by a map from attribute to the respective...
Definition: Spn.cpp:898
void delete_row(Eigen::VectorXf &row)
Delete the given row from the SPN.
Definition: Spn.cpp:940
float upper_bound(const Filter &filter) const
Compute the upper bound probability for continuous domains.
Definition: Spn.cpp:903
std::size_t estimate_number_distinct_values(unsigned attribute_id) const
Estimate the number of distinct values of the given attribute.
Definition: Spn.cpp:946
float expectation(unsigned attribute_id, const Filter &filter) const
Compute the expectation of the given attribute.
Definition: Spn.cpp:913
@ GREATER
Definition: Spn.hpp:32
@ EQUAL
Definition: Spn.hpp:29
@ LESS
Definition: Spn.hpp:30
@ GREATER_EQUAL
Definition: Spn.hpp:33
@ LESS_EQUAL
Definition: Spn.hpp:31
@ IS_NULL
Definition: Spn.hpp:34
@ EXPECTATION
Definition: Spn.hpp:35
UpdateType
Definition: Spn.hpp:42
@ DELETE
Definition: Spn.hpp:44
@ INSERT
Definition: Spn.hpp:43
static std::unique_ptr< Spn::Product > create_product_min_slice(LearningData &learning_data)
Create a product node by splitting all columns.
Definition: Spn.cpp:577
std::size_t num_rows() const
returns the number of rows in the SPN.
Definition: Spn.hpp:314
void insert_row(Eigen::VectorXf &row)
Insert the given row into the SPN.
Definition: Spn.cpp:934
static std::unique_ptr< Node > learn_node(LearningData &learning_data)
Recursively learns the nodes of an SPN.
Definition: Spn.cpp:751
std::unique_ptr< Node > root_
Definition: Spn.hpp:307
std::unordered_map< unsigned, std::pair< SpnOperator, float > > Filter
Definition: Spn.hpp:47
static std::unique_ptr< Spn::Sum > create_sum(LearningData &learning_data)
Create a sum node by clustering the rows.
Definition: Spn.cpp:633
std::size_t num_rows_
Definition: Spn.hpp:306
void update(Eigen::VectorXf &row, UpdateType update_type)
Update the SPN from the top down and adjust weights of sum nodes and the distributions on leaves.
Definition: Spn.cpp:892
@ DISCRETE
Definition: Spn.hpp:25
static std::unique_ptr< Product > create_product_rdc(LearningData &learning_data, std::vector< SmallBitset > &column_candidates, std::vector< SmallBitset > &variable_candidates)
Create a product node with the given candidates (vertical clustering)
Definition: Spn.cpp:602