34 #include <boost/algorithm/string.hpp>
35 #include <boost/algorithm/string/predicate.hpp>
36 #include <boost/iostreams/filter/gzip.hpp>
37 #include <boost/shared_array.hpp>
38 #include <boost/spirit/include/classic_core.hpp>
39 #include <boost/spirit/include/classic_assign_actor.hpp>
51 #include <sys/types.h>
61 LoadDataSource::LoadDataSource(
const string &header_fname,
62 int row_uniquify_chars,
64 : m_type_mask(0), m_cur_line(0), m_line_buffer(0),
65 m_row_key_buffer(0), m_hyperformat(false), m_leading_timestamps(false),
66 m_timestamp_index(-1), m_timestamp(
AUTO_ASSIGN), m_offset(0),
67 m_zipped(false), m_rsgen(0), m_header_fname(header_fname),
68 m_row_uniquify_chars(row_uniquify_chars),
69 m_load_flags(load_flags), m_source_size(0), m_first_line_cached(false),
70 m_field_separator(
'\t')
72 if (row_uniquify_chars)
86 string three_column_header =
format(
"#row%ccolumn%cvalue",
88 string four_column_header =
format(
"#timestamp%crow%ccolumn%cvalue",
99 for (
const char *ptr =
m_first_line.c_str(); *ptr; ptr++) {
106 header = three_column_header;
108 else if (tabs == 3) {
111 header = four_column_header;
115 "Untable to autodetect format, expected 2 or 3 field separators, got %d", (
int)tabs);
124 for (
const char *ptr =
m_first_line.c_str(); *ptr; ptr++) {
129 header =
"#row\tcolumn\tvalue";
131 header =
"#timestamp\trow\tcolumn\tvalue";
134 "Untable to autodetect format, expected 2 or 3 tabs, "
135 "got %d", (
int)tabs);
145 const string ×tamp_column,
146 char field_separator) {
156 const std::vector<String> &key_columns,
157 const string ×tamp_column)
159 string line, column_name;
160 char *base, *ptr, *colon_ptr;
169 base = (
char *)header.c_str();
172 while (isspace(*base))
183 colon_ptr = strchr(base,
':');
195 if (timestamp_column !=
"" && timestamp_column == cinfo.
family) {
215 for (
size_t i=0; i<key_columns.size(); i++) {
221 if (boost::algorithm::starts_with(key_columns[i],
"\\%"))
222 column_name = key_columns[i].substr(1);
223 else if (boost::algorithm::starts_with(key_columns[i],
"%0")) {
225 column_name = key_columns[i].substr(2);
226 ptr = column_name.c_str();
227 key_comps.
width = atoi(ptr);
228 while (isdigit(*ptr))
230 column_name =
String(ptr);
232 else if (boost::algorithm::starts_with(key_columns[i],
"%-")) {
234 column_name = key_columns[i].substr(2);
235 ptr = column_name.c_str();
236 key_comps.
width = atoi(ptr);
237 while (isdigit(*ptr))
239 column_name =
String(ptr);
241 else if (boost::algorithm::starts_with(key_columns[i],
"%")) {
242 column_name = key_columns[i].substr(1);
243 ptr = column_name.c_str();
244 key_comps.
width = atoi(ptr);
245 while (isdigit(*ptr))
247 column_name =
String(ptr);
250 column_name = key_columns[i];
261 cout <<
"ERROR: key column '" << column_name
262 <<
"' not found in input file" << endl;
292 "No columns specified in load file");
302 bool *is_deletep, uint32_t *consumedp)
306 char *base, *ptr, *colon;
320 *consumedp += line.length() + 1;
333 cerr <<
"warning: too few fields on line " <<
m_cur_line << endl;
340 cerr <<
"warn: invalid timestamp format on line " <<
m_cur_line
341 <<
", skipping..." << endl;
355 cerr <<
"warning: too few fields on line " <<
m_cur_line << endl;
372 cerr <<
"warning: zero-lengthed row key on line "
384 cerr <<
"warning: too few fields on line " <<
m_cur_line << endl;
389 if ((colon = strchr(base,
':')) != 0) {
411 *valuep = (uint8_t *)base;
413 *value_lenp = strlen((
char *)*valuep);
416 *value_lenp = ptr-base;
418 if (!strcmp(ptr,
"DELETE")) {
429 else if (!strcmp(ptr,
"DELETE_ROW")) {
431 cerr <<
"warning: column family specified with DELETE_ROW on line "
436 else if (!strcmp(ptr,
"DELETE_COLUMN_FAMILY")) {
440 else if (!strcmp(ptr,
"DELETE_CELL")) {
444 else if (!strcmp(ptr,
"DELETE_CELL_VERSION")) {
449 cerr <<
"warning: too many fields on line " <<
m_cur_line << endl;
512 *consumedp += line.length() + 1;
514 boost::trim_right_if(line, boost::is_any_of(
"\n"));
515 if (line.length() == 0)
527 if (strlen(base) == 0 || !strcmp(base,
"NULL") ||
528 !strcmp(base,
"\\N")) {
530 cout <<
"WARNING: Required key or timestamp field not found "
531 "on line " <<
m_cur_line <<
", skipping ..." << endl << flush;
542 if (strlen(base) == 0 || !strcmp(base,
"NULL") || !strcmp(base,
"\\N"))
548 cerr <<
"warn: field count on line " <<
m_cur_line <<
" does not match header, skipping..." << endl;
576 cerr <<
"warn: timestamp field not found on line " <<
m_cur_line
577 <<
", skipping..." << endl;
582 cerr <<
"warn: invalid timestamp format on line " <<
m_cur_line
583 <<
", skipping..." << endl;
645 size_t value_len = 0;
648 cout <<
"WARNING: Required key field not found on line " <<
m_cur_line
649 <<
", skipping ..." << endl << flush;
653 value_len = strlen(value);
655 if ((
size_t)m_key_comps[index].index >=
m_values.size() || value == 0) {
656 cout <<
"WARNING: Required key field not found on line " <<
m_cur_line
657 <<
", skipping ..." << endl << flush;
661 if ((
size_t)m_key_comps[index].width > value_len) {
662 size_t padding = m_key_comps[index].width - value_len;
664 if (m_key_comps[index].left_justify) {
685 const char *ptr =
str;
692 ns = (int64_t)strtoll(ptr, &end_ptr, 10);
697 else if ((end_ptr - ptr) != 4)
700 tm.tm_year = ns - 1900;
707 if ((ival = strtol(ptr, &end_ptr, 10)) == 0 || (end_ptr - ptr) != 2 ||
711 tm.tm_mon = ival - 1;
717 if ((ival = strtol(ptr, &end_ptr, 10)) == 0 || (end_ptr - ptr) != 2 ||
718 (*end_ptr !=
' ' && *end_ptr !=
'T'))
727 ival = strtol(ptr, &end_ptr, 10);
728 if ((end_ptr - ptr) != 2 || *end_ptr !=
':')
737 ival = strtol(ptr, &end_ptr, 10);
738 if ((end_ptr - ptr) != 2 || *end_ptr !=
':')
753 #if !defined(__sun__)
758 if ((tt = mktime(&tm)) == (time_t)-1)
763 if (*end_ptr ==
':') {
764 ns = strtoul(ptr, &end_ptr, 10);
767 timestamp = ((int64_t)tt * 1000000000LL) + sec + ns;
774 uint_parser<unsigned int, 10, 2, 2> uint2_p;
775 int64_t int_seconds=0;
776 double decimal_seconds=0;
777 *end_ptr = (
char*) str;
778 parse_info<> info = parse(str,
779 (uint2_p[assign_a(int_seconds)] >> !real_p[assign_a(decimal_seconds)]));
780 ns = (int64_t)int_seconds * 1000000000LL + (int64_t)(decimal_seconds *
781 ((double) 1000000000LL));
783 *end_ptr += info.length - 1 ;
DynamicBuffer m_row_key_buffer
Retrieves system information (hardware, installation directory, etc)
bool m_leading_timestamps
bool should_skip(int idx, const uint32_t *masks)
virtual void parse_header(const String &header, const std::vector< String > &key_columns, const std::string ×tamp_column)
static const uint32_t FLAG_DELETE_ROW
std::string String
A String is simply a typedef to std::string.
static String tm_zone
Timezone abbreviation.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
static const uint32_t FLAG_INSERT
virtual void init_src()=0
Po::typed_value< String > * str(String *v=0)
const char * column_qualifier
static bool exists(const String &fname)
Checks if a file or directory exists.
static const uint32_t FLAG_DELETE_CELL
DynamicBuffer m_line_buffer
size_t column_qualifier_len
virtual void init(const std::vector< String > &key_columns, const std::string ×tamp_column, char field_separator)
uint8_t * ptr
Pointer to the end of the used part of the buffer.
std::vector< KeyComponentInfo > m_key_comps
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
static long tm_gmtoff
Seconds east of UTC.
#define HT_EXPECT(_e_, _code_)
File system utility functions.
std::string m_header_fname
A dynamic, resizable memory buffer.
std::vector< ColumnInfo > m_column_info
boost::iostreams::filtering_istream m_fin
uint8_t * add(const void *data, size_t len)
Adds more data WITH boundary checks; if required the buffer is resized and existing data is preserved...
Logging routines and macros.
bool get_next_line(String &line)
std::vector< const char * > m_values
Compatibility Macros for C/C++.
Time related declarations.
bool parse_date_format(const char *str, int64_t ×tamp)
bool add_row_component(int index)
void clear()
Clears the buffer.
virtual uint64_t incr_consumed()=0
FixedRandomStringGenerator * m_rsgen
#define HT_THROWF(_code_, _fmt_,...)
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
bool single_cell_format(int flags)
static const int64_t AUTO_ASSIGN
virtual bool next(KeySpec *keyp, uint8_t **valuep, uint32_t *value_lenp, bool *is_deletep, uint32_t *consumedp)
const char * column_family
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
static const uint32_t FLAG_DELETE_CELL_VERSION
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
bool parse_sec(const char *str, char **end_ptr, int64_t &ns)