mutable
A Database System for Research and Fast Prototyping
Loading...
Searching...
No Matches
DSVReader.cpp
Go to the documentation of this file.
2
5#include <cctype>
6#include <cerrno>
7#include <exception>
8#include <iterator>
9#include <limits>
10#include <map>
11#include <memory>
16#include <string>
17
18
19using namespace m;
20using namespace m::storage;
21
22
23DSVReader::DSVReader(const Table &table, Config cfg, Diagnostic &diag, Scheduler::Transaction *transaction)
24 : Reader(table, diag, transaction)
25 , cfg_(cfg)
26 , pos(nullptr)
27{
28 if (config().delimiter == config().quote)
29 throw invalid_argument("delimiter and quote must not be the same character");
30}
31
32void DSVReader::operator()(std::istream &in, const char *name)
33{
34 auto &C = Catalog::Get();
35 auto &store = table.store();
36
37 /* Compute table schema. */
38 Schema S;
39 for (auto it = table.begin_all(); it != table.end_all(); ++it) S.add({table.name(), it->name}, it->type);
40
41 /* Declare reference to the `StackMachine` for the current `Linearization`. */
42 std::unique_ptr<StackMachine> W;
43 const DataLayout *layout = nullptr;
44
45 /* Allocate intermediate tuple. */
46 tup = Tuple(S);
47
48 std::vector<const Attribute*> columns;
49 this->in = &in;
50 c = '\n';
51 pos = Position(name);
52 step(); // initialize the variable `c` by reading the first character from the input stream
53
54 auto read_cell = [&]() -> ThreadSafePooledString {
55 buf.clear();
56 while (c != EOF and c != '\n' and c != config().delimiter) {
57 buf.push_back(c);
58 step();
59 }
60 buf.push_back(0);
61 return C.pool(&buf[0]);
62 };
63
64 /*----- Handle header information. -------------------------------------------------------------------------------*/
66 while (c != EOF and c != '\n') {
67 auto name = read_cell();
68 const Attribute *attr = nullptr;
69 try {
70 attr = &table.at(name);
71 } catch (std::out_of_range) { /* nothing to do */ }
72 columns.push_back(attr);
73 if (c == config().delimiter)
74 step(); // discard delimiter
75 }
76 M_insist(c == EOF or c == '\n');
77 step();
78 } else {
79 for (auto &attr : table)
80 columns.push_back(&attr);
81 if (config().skip_header) {
82 in.ignore(std::numeric_limits<std::streamsize>::max(), '\n'); // skip entire line
83 c = '\n';
84 step(); // skip newline
85 }
86 }
87
88 /* Find timestamp attributes */
89 auto ts_begin = std::find_if(table.cbegin_hidden(), table.end_hidden(),
90 [&](const Attribute & attr) {
91 return attr.name == C.pool("$ts_begin");
92 });
93 auto ts_end = std::find_if(table.cbegin_hidden(), table.end_hidden(),
94 [&](const Attribute & attr) {
95 return attr.name == C.pool("$ts_end");
96 });
97
98 /*----- Read data. -----------------------------------------------------------------------------------------------*/
99 std::size_t idx = 0;
100 while (in.good() and idx < config().num_rows) {
101 ++idx;
102 store.append();
103 for (std::size_t i = 0; i != columns.size(); ++i) {
104 auto col = columns[i];
105 if (i != 0 and not accept(config().delimiter)) {
106 diag.e(pos) << "Expected a delimiter (" << config().delimiter << ").\n";
107 discard_row();
108 --idx;
109 store.drop(); // drop the unfinished row
110 goto end_of_row;
111 }
112
113 if (col) { // current cell should be read
114 if ((i == columns.size() - 1 and c == '\n') or (i < columns.size() - 1 and c == config().delimiter)) { // NULL
115 tup.null(col->id);
116 continue; // keep delimiter (expected at beginning of each loop)
117 }
118 col_idx = col->id;
119 (*this)(*col->type); // dynamic dispatch based on column type
120 discard_cell(); // discard remainder of the cell
121 } else {
122 discard_cell();
123 }
124 }
125 if (c != EOF and c != '\n') {
126 diag.e(pos) << "Expected end of row.\n";
127 discard_row();
128 } else {
129 if (layout != &table.layout()) {
130 /* The data layout was updated, recompile stack machine. */
131 layout = &table.layout();
132 W = std::make_unique<StackMachine>(Interpreter::compile_store(S, store.memory().addr(), *layout,
133 S, store.num_rows() - 1));
134 }
135 /*----- set timestamps if available. -----*/
136 if (this->transaction and ts_begin != table.end_hidden()) {
137 tup.set(ts_begin->id, Value(transaction->start_time()));
138 /* Set $ts_end to -1. It is a special value representing infinity. */
139 M_insist(ts_end != table.end_hidden());
140 tup.set(ts_end->id, Value(-1));
141 }
142
143 Tuple *args[] = { &tup };
144 (*W)(args); // write tuple to store
145 }
146end_of_row:
147 M_insist(c == EOF or c == '\n');
148 step();
149 }
150
151 this->in = nullptr;
152}
153
154
155void DSVReader::operator()(Const<Boolean>&)
156{
157 buf.clear();
158 while (c != EOF and c != '\n' and c != config().delimiter) { push(); }
159 buf.push_back(0);
160 if (streq("TRUE", &buf[0]))
161 tup.set(col_idx, true);
162 else if (streq("FALSE", &buf[0]))
163 tup.set(col_idx, false);
164 else
165 diag.e(pos) << "Expected TRUE or FALSE.\n";
166}
167
168void DSVReader::operator()(Const<CharacterSequence>&)
169{
170 /* This implementation is compliant with RFC 4180. In quoted strings, quotes have to be escaped with an additional
171 * quote. In unquoted strings, delimiter and quotes are prohibited. In both cases, escape sequences are not
172 * supported. Note that EOF implicitly closes quoted strings.
173 * Source: https://tools.ietf.org/html/rfc4180#section-2 */
174 buf.clear();
175 if (accept(config().quote)) {
176 if (config().escape == config().quote) { // RFC 4180
177 while (c != EOF) {
178 if (c != config().quote) {
179 push();
180 } else {
181 step();
182 if (c == config().quote)
183 push();
184 else
185 break;
186 }
187 }
188 } else {
189 while (c != EOF) {
190 if (c == config().quote) {
191 step();
192 break;
193 } else if (c == config().escape) {
194 step();
195 push();
196 } else {
197 push();
198 }
199 }
200 }
201 } else {
202 while (c != EOF and c != '\n' and c != config().delimiter) {
203 if (c == config().quote) {
204 diag.e(pos) << "WARNING: Illegal character " << config().quote << " found in unquoted string.\n";
205 /* Entire cell is discarded. */
207 while (c != EOF and c != '\n' and c != config().delimiter) step();
208 return;
209 } else
210 push();
211 }
212 }
213 buf.push_back(0);
214
215 Catalog &C = Catalog::Get();
216 tup.set(col_idx, C.pool(&buf[0]));
217}
218
219void DSVReader::operator()(Const<Date>&)
220{
221 Catalog &C = Catalog::Get();
222 buf.clear();
223 buf.push_back('d');
224 buf.push_back('\'');
225
226 const bool has_quote = accept(config().quote);
227#define DIGITS(num) for (auto i = 0; i < num; ++i) if (is_dec(c)) push(); else goto invalid;
228 if ('-' == c) push();
229 DIGITS(4);
230 if ('-' == c) push(); else goto invalid;
231 DIGITS(2);
232 if ('-' == c) push(); else goto invalid;
233 DIGITS(2);
234#undef DIGITS
235 if (has_quote and not accept(config().quote))
236 goto invalid;
237
238 buf.push_back('\'');
239 buf.push_back(0);
240
241 tup.set(col_idx, Interpreter::eval(ast::Constant(ast::Token(pos, C.pool(buf.data()), TK_DATE))));
242 return;
243
244invalid:
245 diag.e(pos) << "WARNING: Invalid date.\n";
247}
248
249void DSVReader::operator()(Const<DateTime>&)
250{
251 Catalog &C = Catalog::Get();
252 buf.clear();
253 buf.push_back('d');
254 buf.push_back('\'');
255
256 const bool has_quote = accept(config().quote);
257#define DIGITS(num) for (auto i = 0; i < num; ++i) if (is_dec(c)) push(); else goto invalid;
258 if ('-' == c) push();
259 DIGITS(4);
260 if ('-' == c) push(); else goto invalid;
261 DIGITS(2);
262 if ('-' == c) push(); else goto invalid;
263 DIGITS(2);
264 if (' ' == c) push(); else goto invalid;
265 DIGITS(2);
266 if (':' == c) push(); else goto invalid;
267 DIGITS(2);
268 if (':' == c) push(); else goto invalid;
269 DIGITS(2);
270#undef DIGITS
271 if (has_quote and not accept(config().quote))
272 goto invalid;
273
274 buf.push_back('\'');
275 buf.push_back(0);
276 tup.set(col_idx, Interpreter::eval(ast::Constant(ast::Token(pos, C.pool(buf.data()), TK_DATE_TIME))));
277 return;
278
279invalid:
280 diag.e(pos) << "WARNING: Invalid datetime.\n";
282}
283
284void DSVReader::operator()(Const<Numeric> &ty)
285{
286 switch (ty.kind) {
287 case Numeric::N_Int: {
288 bool is_neg = false;
289 if (accept('-'))
290 is_neg = true;
291 else
292 accept('+');
293 int64_t i = read_unsigned_int();
294 if (c != EOF and c != '\n' and c != config().delimiter) {
295 diag.e(pos) << "WARNING: Unexpected characters encountered in an integer.\n";
297 while (c != EOF and c != '\n' and c != config().delimiter) step();
298 return;
299 }
300 if (is_neg) i = -i;
301 tup.set(col_idx, i);
302 break;
303 }
304
305 case Numeric::N_Decimal: {
306 auto scale = ty.scale;
307 /* Read pre dot digits. */
308 bool is_neg = false;
309 if (accept('-'))
310 is_neg = true;
311 else
312 accept('+');
313 int64_t d = read_unsigned_int();
314 // std::cerr << "Read: " << d;
315 d = d * powi(10, scale);
316 if (accept('.')) {
317 /* Read post dot digits. */
318 int64_t post_dot = 0;
319 auto n = scale;
320 while (n > 0 and is_dec(c)) {
321 post_dot = 10 * post_dot + c - '0';
322 step();
323 n--;
324 }
325 post_dot *= powi(10, n);
326 /* Discard further digits */
327 while (is_dec(c)) { step(); }
328 d += d >= 0 ? post_dot : -post_dot;
329 }
330 if (c != EOF and c != '\n' and c != config().delimiter) {
331 diag.e(pos) << "WARNING: Unexpected characters encountered in a decimal.\n";
333 while (c != EOF and c != '\n' and c != config().delimiter) step();
334 return;
335 }
336 if (is_neg) d = -d;
337 tup.set(col_idx, d);
338 break;
339 }
340
341 case Numeric::N_Float: {
342 std::string float_str;
343 while(c != EOF and c != '\n' and c != config().delimiter) {
344 float_str += c;
345 step();
346 }
347 char* end;
348 errno = 0;
349 double d = std::strtod(float_str.c_str(), &end);
350 if (*end != '\0') {
351 diag.e(pos) << "WARNING: Unexpected characters encountered in a floating-point number.\n";
353 return;
354 }
355 if ( errno == ERANGE and ( d == HUGE_VAL or d == HUGE_VALF or d == HUGE_VALL ) ) {
356 diag.w(pos) << "WARNING: A floating-point number is larger than the maximum value.\n";
357 d = std::numeric_limits<double>::max();
358 } else if ( errno == ERANGE and (d == -HUGE_VAL or d == -HUGE_VALF or d == -HUGE_VALL ) ) {
359 diag.w(pos) << "WARNING: A floating-point number is smaller than the minimum value.\n";
360 d = std::numeric_limits<double>::min();
361 }
362 if (ty.is_float())
363 tup.set(col_idx, float(d));
364 else
365 tup.set(col_idx, d);
366 break;
367 }
368 }
369}
370
371void DSVReader::operator()(Const<ErrorType>&) { M_unreachable("invalid type"); }
372void DSVReader::operator()(Const<NoneType>&) { M_unreachable("invalid type"); }
373void DSVReader::operator()(Const<Bitmap>&) { M_unreachable("invalid type"); }
374void DSVReader::operator()(Const<FnType>&) { M_unreachable("invalid type"); }
375
377{
378 int64_t i = 0;
379 while (is_dec(c)) {
380 i = 10 * i + c - '0';
381 step();
382 }
383 return i;
384}
#define DIGITS(num)
struct @5 args
#define M_unreachable(MSG)
Definition: macro.hpp:146
#define M_insist(...)
Definition: macro.hpp:129
‍mutable namespace
Definition: Backend.hpp:10
T M_EXPORT powi(const T base, const U exp)
Power function for integral types.
Definition: fn.hpp:428
bool streq(const char *first, const char *second)
Definition: fn.hpp:29
bool is_dec(int c)
Definition: fn.hpp:589
and
Definition: enum_ops.hpp:12
An attribute of a table.
Definition: Schema.hpp:289
The catalog contains all Databases and keeps track of all meta information of the database system.
Definition: Catalog.hpp:215
ThreadSafePooledString pool(const char *str) const
Creates an internalized copy of the string str by adding it to the internal StringPool.
Definition: Catalog.hpp:274
static Catalog & Get()
Return a reference to the single Catalog instance.
Configuration parameters for importing a DSV file.
Definition: Reader.hpp:45
char delimiter
‍the delimiter separating cells
Definition: Reader.hpp:47
char quote
‍the quotation mark for strings
Definition: Reader.hpp:49
std::size_t col_idx
Definition: Reader.hpp:78
size_t skip_header() const
Definition: Reader.hpp:91
void discard_cell()
Definition: Reader.hpp:123
size_t delimiter() const
Definition: Reader.hpp:87
bool accept(char chr)
Definition: Reader.hpp:121
std::istream * in
Definition: Reader.hpp:75
size_t num_rows() const
Definition: Reader.hpp:86
Tuple tup
intermediate tuple to store values of a row
Definition: Reader.hpp:77
size_t escape() const
Definition: Reader.hpp:88
size_t quote() const
Definition: Reader.hpp:89
const Config & config() const
Definition: Reader.hpp:85
int step()
Definition: Reader.hpp:105
size_t has_header() const
Definition: Reader.hpp:90
void discard_row()
Definition: Reader.hpp:131
DSVReader(const Table &table, Config cfg, Diagnostic &diag, Scheduler::Transaction *transaction=nullptr)
Definition: DSVReader.cpp:23
void operator()(std::istream &in, const char *name) override
Definition: DSVReader.cpp:32
int64_t read_unsigned_int()
Definition: DSVReader.cpp:376
void push()
Definition: Reader.hpp:119
Position pos
Definition: Reader.hpp:73
std::vector< char > buf
Definition: Reader.hpp:76
std::ostream & e(const Position pos)
Definition: Diagnostic.hpp:41
std::ostream & w(const Position pos)
Definition: Diagnostic.hpp:36
static StackMachine compile_store(const Schema &tuple_schema, void *address, const storage::DataLayout &layout, const Schema &layout_schema, std::size_t row_id=0, std::size_t tuple_id=0)
Compile a StackMachine to store a tuple of Schema tuple_schema using a given memory address and a giv...
A data type representing a pooled (or internalized) object.
Definition: Pool.hpp:168
An interface for all readers.
Definition: Reader.hpp:16
const Table & table
the table to insert the data into
Definition: Reader.hpp:17
Scheduler::Transaction * transaction
Definition: Reader.hpp:19
Diagnostic & diag
Definition: Reader.hpp:18
void start_time(int64_t time)
‍sets the start time of the Transaction. Should only be set once and only to a positive number.
Definition: Scheduler.hpp:30
A Schema represents a sequence of identifiers, optionally with a prefix, and their associated types.
Definition: Schema.hpp:39
void add(entry_type e)
Adds the entry e to this Schema.
Definition: Schema.hpp:181
A table is a sorted set of attributes.
Definition: Schema.hpp:388
virtual const storage::DataLayout & layout() const =0
Returns a reference to the physical data layout.
virtual all_iterator begin_all() const =0
virtual const ThreadSafePooledString & name() const =0
Returns the name of the Table.
virtual hidden_iterator cbegin_hidden() const =0
virtual hidden_iterator end_hidden() const =0
virtual all_iterator end_all() const =0
virtual Store & store() const =0
Returns a reference to the backing store.
virtual Attribute & at(std::size_t id)=0
Returns the attribute with the given id.
void null(std::size_t idx)
Sets the Value at index idx to NULL.
Definition: Tuple.hpp:225
void set(std::size_t idx, Value val)
Assigns the Value val to this Tuple at index idx and clears the respective NULL bit.
Definition: Tuple.hpp:240
This class holds a SQL attribute value.
Definition: Tuple.hpp:19
A constant: a string literal or a numeric constant.
Definition: AST.hpp:213
Signals that an argument to a function of method was invalid.
Definition: exception.hpp:37
Models how data is laid out in a linear address space.
Definition: DataLayout.hpp:29