49 #include <boost/algorithm/string.hpp>
50 #include <boost/iostreams/device/file_descriptor.hpp>
51 #include <boost/iostreams/filtering_stream.hpp>
52 #include <boost/iostreams/filter/gzip.hpp>
53 #include <boost/iostreams/device/null.hpp>
67 void close_file(
int fd) {
88 if (!ns || state.
ns.find(
'/') == 0)
100 if (ns && immutable_namespace)
102 ns->get_name() +
" to " + state.
ns);
105 if (!ns || state.
ns.find(
'/') == 0)
121 if (!ns || state.
ns.find(
'/') == 0 )
141 string exists = (
String)
"true";
158 string schema_str = ns->get_schema_str(state.
table_name,
true);
159 SchemaPtr schema( Schema::new_instance(schema_str) );
160 string out_str = schema->render_hql(state.
table_name);
175 if (!FileUtils::read(state.
input_file, schema_str))
196 if (!FileUtils::read(state.
input_file, schema_str))
198 state.
alter_schema.reset(Schema::new_instance(schema_str));
236 boost::iostreams::filtering_ostream fout;
239 string localfs =
"file://";
249 if (boost::algorithm::ends_with(state.
scan.
outfile,
".gz"))
250 fout.push(boost::iostreams::gzip_compressor());
252 if (boost::algorithm::starts_with(state.
scan.
outfile,
"dfs://") ||
253 boost::algorithm::starts_with(state.
scan.
outfile,
"fs://")) {
256 fs_client = std::make_shared<FsBroker::Lib::Client>(conn_manager,
Config::properties);
257 if (boost::algorithm::starts_with(state.
scan.
outfile,
"dfs://"))
262 else if (boost::algorithm::starts_with(state.
scan.
outfile, localfs))
263 fout.push(boost::iostreams::file_descriptor_sink(state.
scan.
outfile.substr(localfs.size())));
265 fout.push(boost::iostreams::file_descriptor_sink(state.
scan.
outfile));
269 fout <<
"revision" << fs;
271 fout <<
"timestamp" << fs;
275 fout <<
"row" << fs <<
"column" << fs <<
"value\n";
282 out_fd = dup(fileno(outf));
283 fout.push(boost::iostreams::file_descriptor_sink(out_fd));
293 const char *unescaped_buf, *row_unescaped_buf;
294 size_t unescaped_len, row_unescaped_len;
301 while (scanner->next(cell)) {
316 nsec = cell.
revision % 1000000000LL;
317 unix_time = cell.
revision / 1000000000LL;
318 localtime_r(&unix_time, &tms);
320 tms.tm_year + 1900, tms.tm_mon + 1, tms.tm_mday,
321 tms.tm_hour, tms.tm_min, tms.tm_sec, nsec, fs);
329 unix_time = cell.
timestamp / 1000000000LL;
330 localtime_r(&unix_time, &tms);
332 tms.tm_year + 1900, tms.tm_mon + 1, tms.tm_mday,
333 tms.tm_hour, tms.tm_min, tms.tm_sec, nsec, fs);
338 &row_unescaped_buf, &row_unescaped_len);
340 row_unescaped_buf = cell.
row_key;
347 &unescaped_buf, &unescaped_len);
350 fout <<
":" << unescaped_buf;
355 fout << row_unescaped_buf;
359 &unescaped_buf, &unescaped_len);
361 unescaped_buf = (
const char *)cell.
value;
368 fout.write(unescaped_buf, unescaped_len);
372 fout << fs <<
"DELETE ROW\n";
375 fout.write(unescaped_buf, unescaped_len);
376 fout << fs <<
"DELETE COLUMN FAMILY\n";
379 fout.write(unescaped_buf, unescaped_len);
380 fout << fs <<
"DELETE CELL\n";
383 fout.write(unescaped_buf, unescaped_len);
384 fout << fs <<
"DELETE CELL VERSION\n";
387 fout << fs <<
"BAD KEY FLAG\n";
391 fout << row_unescaped_buf <<
"\n";
408 boost::iostreams::filtering_ostream fout;
411 string localfs =
"file://";
422 if (boost::algorithm::ends_with(state.
scan.
outfile,
".gz"))
423 fout.push(boost::iostreams::gzip_compressor());
425 if (boost::algorithm::starts_with(state.
scan.
outfile,
"dfs://") ||
426 boost::algorithm::starts_with(state.
scan.
outfile,
"fs://")) {
429 fs_client = std::make_shared<FsBroker::Lib::Client>(conn_manager,
Config::properties);
430 if (boost::algorithm::starts_with(state.
scan.
outfile,
"dfs://"))
435 else if (boost::algorithm::starts_with(state.
scan.
outfile, localfs))
436 fout.push(boost::iostreams::file_descriptor_sink(state.
scan.
outfile.substr(localfs.size())));
438 fout.push(boost::iostreams::file_descriptor_sink(state.
scan.
outfile));
441 fout <<
"#timestamp" << fs <<
"row" << fs <<
"column" << fs <<
"value\n";
443 fout <<
"#row" << fs <<
"column" << fs <<
"value\n";
450 out_fd = dup(fileno(outf));
451 fout.push(boost::iostreams::file_descriptor_sink(out_fd));
458 const char *unescaped_buf, *row_unescaped_buf;
459 size_t unescaped_len, row_unescaped_len;
466 while (dumper->next(cell)) {
483 &row_unescaped_buf, &row_unescaped_len);
485 row_unescaped_buf = cell.
row_key;
492 &unescaped_buf, &unescaped_len);
495 fout <<
":" << unescaped_buf;
499 fout << row_unescaped_buf;
503 &unescaped_buf, &unescaped_len);
505 unescaped_buf = (
const char *)cell.
value;
512 fout.write(unescaped_buf, unescaped_len);
523 cmd_load_data(
NamespacePtr &ns, ::uint32_t mutator_flags,
531 bool into_table =
true;
532 bool display_timestamps =
false;
533 boost::iostreams::filtering_ostream fout;
536 bool largefile_mode =
false;
537 ::uint64_t running_total = 0;
538 ::uint64_t consume_threshold = 0;
539 bool ignore_unknown_columns =
false;
543 ignore_unknown_columns =
true;
547 mutator_flags |= Table::MUTATOR_FLAG_NO_LOG;
549 mutator_flags |= Table::MUTATOR_FLAG_NO_LOG_SYNC;
554 "LOAD DATA INFILE ... INTO FILE - bad filename");
555 fout.push(boost::iostreams::file_descriptor_sink(state.
output_file));
560 out_fd = dup(fileno(outf));
561 fout.push(boost::iostreams::file_descriptor_sink(out_fd));
564 fout.push(boost::iostreams::null_sink());
566 mutator.reset(table->create_mutator(0, mutator_flags));
576 fs_client = std::make_shared<FsBroker::Lib::Client>(conn_manager,
Config::properties);
584 if (cb.
file_size > std::numeric_limits<unsigned long>::max()) {
585 largefile_mode =
true;
586 unsigned long adjusted_size = (
unsigned long)(cb.
file_size / 1048576LL);
587 consume_threshold = 1048576LL;
594 display_timestamps = lds->has_timestamps();
595 if (display_timestamps)
596 fout <<
"timestamp" << fs <<
"row" << fs <<
"column" << fs <<
"value\n";
598 fout <<
"row" << fs <<
"column" << fs <<
"value\n";
603 ::uint32_t value_len;
604 ::uint32_t consumed = 0;
608 const char *escaped_buf;
619 while (lds->next(&key, &value, &value_len, &is_delete, &consumed)) {
627 &escaped_buf, &escaped_len);
628 key.
row = escaped_buf;
634 value_escaper.
unescape((
const char *)value,
635 (
size_t)value_len, &escaped_buf, &escaped_len);
638 escaped_buf = (
const char *)value;
639 escaped_len = (size_t)value_len;
645 if (ignore_unknown_columns) {
652 mutator->set_delete(key);
654 mutator->set(key, escaped_buf, escaped_len);
659 mutator->show_failed(e);
660 }
while (!mutator->retry());
664 if (display_timestamps)
669 << escaped_buf <<
"\n";
673 if (largefile_mode ==
true) {
674 running_total += consumed;
675 if (running_total >= consume_threshold) {
676 consumed = 1 + (
unsigned long)((running_total - consume_threshold)
678 consume_threshold += (::uint64_t)consumed * 1048576LL;
708 mutator->set_cells(cells);
712 mutator->show_failed(e);
713 }
while (!mutator->retry());
715 if (mutator->get_last_error())
722 for (
const auto &cell : cells) {
733 if (mutator->get_last_error())
745 char *column_qualifier;
767 mutator->set_delete(key);
770 mutator->show_failed(e);
779 if ((column_qualifier = (
char*)strchr(col.c_str(),
':')) != 0) {
780 *column_qualifier++ = 0;
793 mutator->set_delete(key);
796 mutator->show_failed(e);
811 std::vector<NamespaceListing> listing;
812 ns->get_listing(
false, listing);
813 for (
const auto &entry : listing) {
815 cb.
on_return(entry.name +
"\t(namespace)");
816 else if (!entry.is_namespace)
860 Timer timer(10000,
true);
881 status.
get(&code, text);
884 return static_cast<int>(code);
888 master->system_status(status, &timer);
893 status.
get(&code, text);
896 return static_cast<int>(code);
914 size_t lastslash = state.
table_name.find_last_of(
'/');
915 if (lastslash != string::npos) {
916 table_basename = state.
table_name.substr(lastslash+1);
929 int8_t parts = state.
flags ?
static_cast<int8_t
>(state.
flags) : TableParts::ALL;
932 working_ns->rebuild_indices(table_basename, table_parts);
955 bool immutable_namespace) : m_client(client), m_mutator_flags(0),
956 m_conn_manager(conn_manager), m_fs_client(0), m_immutable_namespace(immutable_namespace) {
971 string stripped_line = line;
973 boost::trim(stripped_line);
976 parse_info<> info = parse(stripped_line.c_str(), parser, space_p);
983 return cmd_show_create_table(
m_namespace, state, cb);
985 return cmd_help(state, cb);
1018 return cmd_shutdown_master(
m_client, cb);
1027 return cmd_balance(
m_client, state, cb);
1029 return cmd_stop(
m_client, state, cb);
1031 return cmd_set(
m_client, state, cb);
1033 return cmd_status(
m_client, state, cb);
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
void drop_namespace(const std::string &name, Namespace *base=NULL, bool if_exists=false)
Removes a namespace.
virtual void on_finish(TableMutatorPtr &mutator)
Called when interpreter is finished Note: mutator pointer maybe NULL in case of things like LOAD DATA...
uint64_t total_values_size
std::vector< Cell, CellAlloc > Cells
ScanSpec & get()
Returns the built ScanSpec object.
NamespacePtr open_namespace(const std::string &name, Namespace *base=NULL)
Opens a Namespace.
Holds Nagios-style program status information.
PropertiesPtr properties
This singleton map stores all options.
static const uint32_t FLAG_DELETE_ROW
std::string String
A String is simply a typedef to std::string.
ConnectionManagerPtr m_conn_manager
The Stopwatch measures elapsed time.
void set_field_separator(char fs)
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
static const uint32_t FLAG_INSERT
std::string timestamp_column
bool ignore_unknown_cfs(int flags)
const char * column_qualifier
const char * column_qualifier
static const uint32_t FLAG_DELETE_CELL
bool escape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
bool m_immutable_namespace
#define HT_ON_SCOPE_EXIT(...)
size_t column_qualifier_len
std::shared_ptr< TableScanner > TableScannerPtr
Smart pointer to TableScanner.
Represents a set of table parts (sub-tables).
Code
Enumeration for status codes.
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
::int32_t row_uniquify_chars
virtual void on_update(size_t total)
Called when interpreter is ready to update.
virtual void on_parsed(Hql::ParserState &)
Called when the hql string is parsed successfully.
std::shared_ptr< Namespace > NamespacePtr
Shared smart pointer to Namespace.
std::shared_ptr< TableMutator > TableMutatorPtr
Smart pointer to TableMutator.
virtual void on_progress(size_t amount)
Called when interpreter updates progress for long running queries.
File system utility functions.
const char * get_text(int error)
Returns a descriptive error message.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
std::vector< String > columns
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Compatibility Macros for C/C++.
T get(const String &name)
Retrieves a configuration value.
virtual void on_scan(TableScannerPtr &)
Called when interpreter is ready to scan.
virtual void on_dump(TableDumper &)
Called when interpreter is ready to dump.
::int64_t delete_version_time
std::string new_table_name
void set_namespace(const std::string &ns)
long long int Lld
Shortcut for printf formats.
Callback interface/base class for execute.
Hyperspace::SessionPtr & get_hyperspace_session()
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
virtual void on_return(const std::string &)
Called when interpreter returns a string result Maybe called multiple times for a list of string resu...
const char * column_family
void create_namespace(const std::string &name, Namespace *base=NULL, bool create_intermediate=false, bool if_not_exists=false)
Creates a namespace.
void get(Code *code, std::string &text) const
Gets status code and text.
A timer class to keep timeout states across AsyncComm related calls.
This is a generic exception class for Hypertable.
A String class based on std::string.
std::vector< String > delete_columns
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
static const int64_t AUTO_ASSIGN
std::shared_ptr< TableDumper > TableDumperPtr
Smart pointer to TableDumper.
Declarations for HqlHelpText.
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
FsBroker::Lib::ClientPtr m_fs_client
Encapsulates decomposed key and value.
bool unescape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
const char * column_family
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
static const uint32_t FLAG_DELETE_CELL_VERSION
int execute(const std::string &str, Callback &)
The main interface for the interpreter.
std::shared_ptr< LoadDataSource > LoadDataSourcePtr
Smart pointer to LoadDataSource.
std::shared_ptr< Table > TablePtr
Lib::Master::ClientPtr get_master_client()
const char * code_to_string(int var_code)
Converts variable code to variable string.
int code() const
Returns the error code.
std::vector< SystemVariable::Spec > variable_specs
Executes user-defined functions when leaving the current scope.