52 #include <concurrency/ThreadManager.h>
53 #include <protocol/TBinaryProtocol.h>
54 #include <server/TThreadedServer.h>
55 #include <transport/TBufferTransports.h>
56 #include <transport/TServerSocket.h>
57 #include <transport/TSocket.h>
58 #include <transport/TTransportUtils.h>
60 #include <boost/shared_ptr.hpp>
68 #include <unordered_map>
72 #include <sys/types.h>
85 bool g_log_slow_queries {};
87 int32_t g_slow_query_latency_threshold {};
92 #define THROW_TE(_code_, _str_) do { ThriftGen::ClientException te; \
94 te.message.append(Error::get_text(_code_)); \
95 te.message.append(" - "); \
96 te.message.append(_str_); \
97 te.__isset.code = te.__isset.message = true; \
101 #define RETHROW(_expr_) catch (Hypertable::Exception &e) { \
102 std::ostringstream oss; oss << HT_FUNC << " " << _expr_ << ": "<< e; \
103 HT_ERROR_OUT << oss.str() << HT_END; \
106 g_metrics_handler->error_increment(); \
107 THROW_TE(e.code(), oss.str()); \
110 #define LOG_API_START(_expr_) \
111 std::chrono::fast_clock::time_point start_time, end_time; \
112 std::ostringstream logging_stream;\
113 ScannerInfoPtr scanner_info;\
114 g_metrics_handler->request_increment(); \
115 if (m_context.log_api || g_log_slow_queries) {\
116 start_time = std::chrono::fast_clock::now(); \
117 if (m_context.log_api)\
118 logging_stream << "API " << __func__ << ": " << _expr_;\
121 #define LOG_API_FINISH \
122 if (m_context.log_api || (g_log_slow_queries && scanner_info)) { \
123 end_time = std::chrono::fast_clock::now(); \
124 if (m_context.log_api) \
125 std::cout << std::chrono::duration_cast<std::chrono::seconds>(start_time.time_since_epoch()).count() <<'.'<< std::setw(9) << std::setfill('0') << (std::chrono::duration_cast<std::chrono::nanoseconds>(start_time.time_since_epoch()).count() % 1000000000LL) <<" API "<< __func__ <<": "<< logging_stream.str() << " latency=" << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << std::endl; \
127 scanner_info->latency += std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();\
130 #define LOG_API_FINISH_E(_expr_) \
131 if (m_context.log_api || (g_log_slow_queries && scanner_info)) { \
132 end_time = std::chrono::fast_clock::now(); \
133 if (m_context.log_api) \
134 std::cout << std::chrono::duration_cast<std::chrono::seconds>(start_time.time_since_epoch()).count() <<'.'<< std::setw(9) << std::setfill('0') << (std::chrono::duration_cast<std::chrono::nanoseconds>(start_time.time_since_epoch()).count() % 1000000000LL) <<" API "<< __func__ <<": "<< logging_stream.str() << _expr_ << " latency=" << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << std::endl; \
136 scanner_info->latency += std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();\
140 #define LOG_API(_expr_) do { \
141 if (m_context.log_api) \
142 std::cout << hires_ts <<" API "<< __func__ <<": "<< _expr_ << std::endl; \
145 #define LOG_HQL_RESULT(_res_) do { \
146 if (m_context.log_api) \
147 cout << hires_ts <<" API "<< __func__ <<": result: "; \
148 if (Logger::logger->isDebugEnabled()) \
150 else if (m_context.log_api) { \
151 if (_res_.__isset.results) \
152 cout <<"results.size=" << _res_.results.size(); \
153 if (_res_.__isset.cells) \
154 cout <<"cells.size=" << _res_.cells.size(); \
155 if (_res_.__isset.scanner) \
156 cout <<"scanner="<< _res_.scanner; \
157 if (_res_.__isset.mutator) \
158 cout <<"mutator="<< _res_.mutator; \
163 #define LOG_SLOW_QUERY(_pd_, _ns_, _hql_) do { \
164 if (g_log_slow_queries) { \
165 end_time = std::chrono::fast_clock::now(); \
166 int64_t latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(); \
167 if (latency_ms >= g_slow_query_latency_threshold) \
168 log_slow_query(__func__, start_time, end_time, latency_ms, _pd_, _ns_, _hql_); \
172 #define LOG_SLOW_QUERY_SCANNER(_scanner_, _ns_, _table_, _ss_) do { \
173 if (g_log_slow_queries) { \
174 ProfileDataScanner pd; \
175 _scanner_->get_profile_data(pd); \
176 end_time = std::chrono::fast_clock::now(); \
177 int64_t latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(); \
178 if (latency_ms >= g_slow_query_latency_threshold) \
179 log_slow_query_scanspec(__func__, start_time, end_time, latency_ms, pd, _ns_, _table_, _ss_); \
192 using namespace ThriftGen;
193 using namespace boost;
200 const ThriftGen::MutateSpec &mutate_spec)
201 : m_namespace(ns), m_tablename(tablename), m_mutate_spec(mutate_spec) {}
206 cmp = (int64_t)m_namespace - (int64_t)skey.
m_namespace;
212 cmp = m_mutate_spec.appname.compare(skey.
m_mutate_spec.appname);
215 cmp = m_mutate_spec.flush_interval - skey.
m_mutate_spec.flush_interval;
229 return skey1.
compare(skey2) < 0;
232 typedef Meta::list<ThriftBrokerPolicy, DefaultCommPolicy>
Policies;
235 typedef std::unordered_map< ::int64_t, ClientObjectPtr>
ObjectMap;
243 log_api = Config::get_bool(
"ThriftBroker.API.Logging");
244 next_threshold = Config::get_i32(
"ThriftBroker.NextThreshold");
245 future_capacity = Config::get_i32(
"ThriftBroker.Future.Capacity");
257 int64_t min_num = INT64_MIN, int64_t max_num = INT64_MAX) {
260 int64_t value = strtoll(from.data(), &endp, 0);
262 if (endp - from.data() != (int)from.size()
263 || value < min_num || value > max_num)
271 if (tss.__isset.row_limit)
274 if (tss.__isset.cell_limit)
277 if (tss.__isset.cell_limit_per_family)
280 if (tss.__isset.versions)
283 if (tss.__isset.start_time)
286 if (tss.__isset.end_time)
289 if (tss.__isset.return_deletes)
292 if (tss.__isset.keys_only)
295 if (tss.__isset.row_regexp)
298 if (tss.__isset.value_regexp)
301 if (tss.__isset.scan_and_filter_rows)
304 if (tss.__isset.do_not_cache)
307 if (tss.__isset.row_offset)
310 if (tss.__isset.cell_offset)
313 if (tss.__isset.and_column_predicates)
317 const char *start_row;
319 for (
const auto &ri : tss.row_intervals) {
320 start_row = ri.__isset.start_row ?
321 ri.start_row.c_str() :
322 ri.__isset.start_row_binary ?
323 ri.start_row_binary.c_str() :
325 end_row = ri.__isset.end_row ?
327 ri.__isset.end_row_binary ?
328 ri.end_row_binary.c_str() :
331 start_row, ri.__isset.start_inclusive && ri.start_inclusive,
332 end_row, ri.__isset.end_inclusive && ri.end_inclusive));
335 for (
const auto &ci : tss.cell_intervals)
337 ci.__isset.start_row ? ci.start_row.c_str() :
"",
338 ci.start_column.c_str(),
339 ci.__isset.start_inclusive && ci.start_inclusive,
341 ci.end_column.c_str(),
342 ci.__isset.end_inclusive && ci.end_inclusive));
344 for (
const auto &col : tss.columns)
345 hss.
columns.push_back(col.c_str());
347 for (
const auto &cp : tss.column_predicates) {
348 HT_INFOF(
"%s:%s %s", cp.column_family.c_str(), cp.column_qualifier.c_str(),
349 cp.__isset.value ? cp.value.c_str() :
"");
352 cp.__isset.column_family ? cp.column_family.c_str() : 0,
353 cp.__isset.column_qualifier ? cp.column_qualifier.c_str() : 0,
355 cp.__isset.value ? cp.value.c_str() : 0,
356 cp.__isset.value ? cp.value.size() : 0));
363 if (tss.__isset.row_limit)
366 if (tss.__isset.cell_limit)
369 if (tss.__isset.cell_limit_per_family)
372 if (tss.__isset.versions)
375 if (tss.__isset.start_time)
378 if (tss.__isset.end_time)
381 if (tss.__isset.return_deletes)
384 if (tss.__isset.keys_only)
387 if (tss.__isset.row_regexp)
390 if (tss.__isset.value_regexp)
393 if (tss.__isset.scan_and_filter_rows)
396 if (tss.__isset.do_not_cache)
399 if (tss.__isset.row_offset)
402 if (tss.__isset.cell_offset)
405 if (tss.__isset.and_column_predicates)
410 for (
auto & col : tss.columns)
414 const char *start_row;
417 for (
auto & ri : tss.row_intervals) {
418 start_row = ri.__isset.start_row ?
419 ri.start_row.c_str() :
420 ri.__isset.start_row_binary ?
421 ri.start_row_binary.c_str() :
"";
422 end_row = ri.__isset.end_row ?
424 ri.__isset.end_row_binary ?
427 start_row, ri.__isset.start_inclusive && ri.start_inclusive,
428 end_row, ri.__isset.end_inclusive && ri.end_inclusive);
433 for (
auto & ci : tss.cell_intervals)
435 ci.__isset.start_row ? ci.start_row.c_str() :
"",
436 ci.start_column.c_str(),
437 ci.__isset.start_inclusive && ci.start_inclusive,
439 ci.end_column.c_str(),
440 ci.__isset.end_inclusive && ci.end_inclusive);
444 for (
auto & cp : tss.column_predicates)
446 cp.__isset.column_family ? cp.column_family.c_str() : 0,
447 cp.__isset.column_qualifier ? cp.column_qualifier.c_str() : 0,
449 cp.__isset.value ? cp.value.c_str() : 0,
450 cp.__isset.value ? cp.value.size() : 0);
456 if (tcell.key.__isset.row)
457 hcell.
row_key = tcell.key.row.c_str();
459 if (tcell.key.__isset.column_family)
462 if (tcell.key.__isset.column_qualifier)
465 if (tcell.key.__isset.timestamp)
468 if (tcell.key.__isset.revision)
469 hcell.
revision = tcell.key.revision;
471 if (tcell.__isset.value) {
472 hcell.
value = (::uint8_t *)tcell.value.c_str();
476 if (tcell.key.__isset.flag)
477 hcell.
flag = tcell.key.flag;
483 if (tkey.__isset.row) {
484 hkey.
row = tkey.row.c_str();
485 hkey.
row_len = tkey.row.size();
488 if (tkey.__isset.column_family)
491 if (tkey.__isset.column_qualifier)
494 if (tkey.__isset.timestamp)
497 if (tkey.__isset.revision)
503 int32_t amount =
sizeof(ThriftGen::Cell);
506 amount += tcell.key.row.length();
508 amount += tcell.key.column_family.length();
512 tcell.key.__isset.column_qualifier =
true;
513 amount += tcell.key.column_qualifier.length();
516 tcell.key.column_qualifier =
"";
517 tcell.key.__isset.column_qualifier =
false;
521 tcell.key.revision = hcell.
revision;
525 tcell.__isset.value =
true;
530 tcell.__isset.value =
false;
533 tcell.key.flag = (KeyFlag::type)hcell.
flag;
534 tcell.key.__isset.row = tcell.key.__isset.column_family
535 = tcell.key.__isset.timestamp = tcell.key.__isset.revision
536 = tcell.key.__isset.flag =
true;
541 int len = tcell.size();
547 case 4: hcell.
value = (::uint8_t *)tcell[3].c_str();
551 case 1: hcell.
row_key = tcell[0].c_str();
559 int32_t amount = 5*
sizeof(std::string);
562 amount += tcell[0].length();
564 amount += tcell[1].length();
566 amount += tcell[2].length();
568 amount += tcell[3].length();
570 amount += tcell[4].length();
577 tcells.resize(hcells.size());
578 for(
size_t ii=0; ii<hcells.size(); ++ii)
586 for (
const auto &tcell : tcells) {
589 hcells.push_back(hcell);
595 int32_t amount =
sizeof(CellAsArray);
596 tcells.resize(hcells.size());
597 for(
size_t ii=0; ii<hcells.size(); ++ii) {
606 for(
size_t ii=0; ii<hcells.size(); ++ii) {
607 amount += strlen(hcells[ii].row_key) + strlen(hcells[ii].column_family)
608 + strlen(hcells[ii].column_qualifier) + 8 + 8 + 4 + 1
609 + hcells[ii].value_len + 4;
612 for (
size_t ii = 0; ii < hcells.size(); ++ii) {
613 writer.
add(hcells[ii]);
617 amount = tcells.size();
624 for (
const auto &tcell : tcells) {
627 hcells.push_back(hcell);
632 ThriftGen::TableSplit &tsplit) {
636 tsplit.__isset.start_row =
true;
639 tsplit.start_row =
"";
640 tsplit.__isset.start_row =
false;
645 !(hsplit.
end_row[0] == (
char)0xff && hsplit.
end_row[1] == (
char)0xff)) {
646 tsplit.end_row = hsplit.
end_row;
647 tsplit.__isset.end_row =
true;
651 tsplit.__isset.end_row =
false;
657 tsplit.__isset.location =
true;
660 tsplit.location =
"";
661 tsplit.__isset.location =
false;
667 tsplit.__isset.ip_address =
true;
670 tsplit.ip_address =
"";
671 tsplit.__isset.ip_address =
false;
677 tsplit.__isset.hostname =
true;
680 tsplit.hostname =
"";
681 tsplit.__isset.hostname =
false;
687 ThriftGen::ColumnFamilyOptions &toptions) {
694 toptions.__set_ttl(hoptions.
get_ttl());
710 if (toptions.__isset.max_versions)
712 if (toptions.__isset.ttl)
713 hoptions.
set_ttl(toptions.ttl);
714 if (toptions.__isset.time_order_desc)
716 if (toptions.__isset.counter)
721 ThriftGen::AccessGroupOptions &toptions) {
748 if (toptions.__isset.in_memory)
750 if (toptions.__isset.replication)
752 if (toptions.__isset.blocksize)
754 if (toptions.__isset.compressor)
756 if (toptions.__isset.bloom_filter)
762 ThriftGen::Schema &tschema) {
764 if (hschema->get_generation())
765 tschema.__set_generation(hschema->get_generation());
767 tschema.__set_version(hschema->get_version());
769 if (hschema->get_group_commit_interval())
770 tschema.__set_group_commit_interval(hschema->get_group_commit_interval());
772 for (
auto ag_spec : hschema->get_access_groups()) {
773 ThriftGen::AccessGroupSpec tag;
774 tag.name = ag_spec->get_name();
775 tag.__set_generation(ag_spec->get_generation());
777 tag.__isset.options =
true;
779 tag.__isset.defaults =
true;
780 tschema.access_groups[ag_spec->get_name()] = tag;
781 tschema.__isset.access_groups =
true;
784 for (
auto cf_spec : hschema->get_column_families()) {
785 ThriftGen::ColumnFamilySpec tcf;
786 tcf.name = cf_spec->get_name();
787 tcf.access_group = cf_spec->get_access_group();
788 tcf.deleted = cf_spec->get_deleted();
789 if (cf_spec->get_generation())
790 tcf.__set_generation(cf_spec->get_generation());
791 if (cf_spec->get_id())
792 tcf.__set_id(cf_spec->get_id());
793 tcf.value_index = cf_spec->get_value_index();
794 tcf.qualifier_index = cf_spec->get_qualifier_index();
796 tcf.__isset.options =
true;
797 tschema.column_families[cf_spec->get_name()] = tcf;
798 tschema.__isset.column_families =
true;
802 tschema.access_group_defaults))
803 tschema.__isset.access_group_defaults =
true;
806 tschema.column_family_defaults))
807 tschema.__isset.column_family_defaults =
true;
814 if (tschema.__isset.generation)
815 hschema->set_generation(tschema.generation);
817 hschema->set_version(tschema.version);
819 hschema->set_group_commit_interval(tschema.group_commit_interval);
822 hschema->access_group_defaults());
825 hschema->column_family_defaults());
827 bool need_default =
true;
828 unordered_map<string, Hypertable::AccessGroupSpec *> ag_map;
829 for (
auto & entry : tschema.access_groups) {
830 if (entry.second.name ==
"default")
831 need_default =
false;
833 if (entry.second.__isset.generation)
835 ag_map[entry.second.name] = ag;
846 for (
auto & entry : tschema.column_families) {
849 if (entry.second.access_group.empty())
854 if (entry.second.__isset.generation)
856 if (entry.second.__isset.id)
857 cf->
set_id(entry.second.id);
865 if (iter == ag_map.end())
867 "Undefined access group '%s' referenced by column '%s'",
869 iter->second->add_column(cf);
873 for (
auto & entry : ag_map)
874 hschema->add_access_group(entry.second);
882 ns(ns), table(t), scan_spec_builder(128) { }
893 template <
class ResultT,
class CellT>
906 const String &hql,
bool flush,
bool buffered)
907 : result(r), handler(*handler), ns(ns), hql(hql), flush(flush),
908 buffered(buffered) { }
910 void on_return(
const std::string &)
override;
920 : scanners(0), async_scanners(0), mutators(0), async_mutators(0),
921 shared_mutators(0), namespaces(0), futures(0) {
956 : m_remote_peer(remote_peer), m_context(c) {
960 std::lock_guard<std::mutex> lock(m_mutex);
961 if (!m_object_map.empty())
962 HT_WARNF(
"Destroying ServerHandler for remote peer %s with %d objects in map",
963 m_remote_peer.c_str(),
964 (int)m_object_map.size());
967 for (
auto entry : m_reference_map) {
969 entry.second =
nullptr;
976 m_reference_map.clear();
982 for (
auto entry : m_object_map) {
984 entry.second =
nullptr;
991 m_object_map.clear();
997 for (
auto entry : m_cached_object_map) {
999 entry.second =
nullptr;
1006 m_cached_object_map.clear();
1013 return m_remote_peer;
1025 servers.reserve(profile_data.
servers.size()*6);
1026 for (
auto & server : profile_data.
servers) {
1030 servers.append(
",");
1031 servers.append(server);
1037 else if (ns_str[0] !=
'/')
1038 ns_str = string(
"/") + ns_str;
1041 const char *hql_ptr = hql.c_str();
1043 if (hql.find_first_of(
'\n') != string::npos) {
1044 hql_cleaned.reserve(hql.length());
1045 const char *base = hql.c_str();
1046 const char *ptr = base;
1048 while (*ptr && *ptr !=
'\n')
1050 hql_cleaned.append(base, ptr-base);
1052 hql_cleaned.append(
" ", 1);
1056 hql_ptr = hql_cleaned.c_str();
1059 string line =
format(
"%lld %s %s %lld %d %d %lld %lld %lld %s %s %s",
1061 func_name, m_remote_peer.c_str(),
1067 ns_str.c_str(), hql_ptr);
1081 servers.reserve(profile_data.
servers.size()*6);
1082 for (
auto & server : profile_data.
servers) {
1086 servers.append(
",");
1087 servers.append(server);
1093 else if (ns_str[0] !=
'/')
1094 ns_str = string(
"/") + ns_str;
1096 string line =
format(
"%lld %s %s %lld %d %d %lld %lld %lld %s %s %s",
1098 func_name, m_remote_peer.c_str(),
1104 ns_str.c_str(), ss.
render_hql(table).c_str());
1110 hql_exec(HqlResult& result,
const ThriftGen::Namespace ns,
1111 const String &hql,
bool noflush,
bool unbuffered)
override {
1112 LOG_API_START(
"namespace=" << ns <<
" hql=" << hql <<
" noflush=" << noflush
1113 <<
" unbuffered="<< unbuffered);
1116 cb(result,
this, ns, hql, !noflush, !unbuffered);
1119 run_hql_interp(ns, hql, cb);
1121 }
RETHROW(
"namespace=" << ns <<
" hql="<< hql <<
" noflush="<< noflush
1122 <<
" unbuffered="<< unbuffered)
1124 if (!unbuffered && cb.
is_scan)
1136 const String &hql)
override {
1137 hql_exec(result, ns, hql,
false,
false);
1141 hql_exec2(HqlResult2 &result,
const ThriftGen::Namespace ns,
1142 const String &hql,
bool noflush,
bool unbuffered)
override {
1143 LOG_API_START(
"namespace=" << ns <<
" hql="<< hql <<
" noflush="<< noflush <<
1144 " unbuffered="<< unbuffered);
1147 cb(result,
this, ns, hql, !noflush, !unbuffered);
1150 run_hql_interp(ns, hql, cb);
1152 }
RETHROW(
"namespace=" << ns <<
" hql="<< hql <<
" noflush="<< noflush <<
1153 " unbuffered="<< unbuffered)
1155 if (!unbuffered && cb.
is_scan)
1163 const String &hql,
bool noflush,
bool unbuffered)
override {
1164 LOG_API_START(
"namespace=" << ns <<
" hql="<< hql <<
" noflush="<< noflush <<
1165 " unbuffered="<< unbuffered);
1168 cb(result,
this, ns, hql, !noflush, !unbuffered);
1171 run_hql_interp(ns, hql, cb);
1173 }
RETHROW(
"namespace=" << ns <<
" hql="<< hql <<
" noflush="<< noflush <<
1174 " unbuffered="<< unbuffered)
1176 if (!unbuffered && cb.
is_scan)
1184 const String &hql)
override {
1185 hql_exec2(result, ns, hql,
false,
false);
1190 const String &hql)
override {
1191 hql_exec_as_arrays(result, ns, hql,
false,
false);
1197 m_context.client->create_namespace(ns, NULL);
1203 namespace_create(ns);
1207 const ThriftGen::Schema &schema)
override {
1215 }
RETHROW(
"namespace=" << ns <<
" table="<< table)
1221 const ThriftGen::Schema &schema)
override {
1228 namespace_ptr->
alter_table(table, hschema,
false);
1229 }
RETHROW(
"namespace=" << ns <<
" table="<< table)
1235 const String &table,
const ThriftGen::ScanSpec &ss)
override {
1237 LOG_API_START(
"namespace=" << ns <<
" table="<< table <<
" scan_spec="<< ss);
1239 ScannerInfoPtr si = make_shared<ScannerInfo>(ns, table);
1241 id = get_scanner_id(_open_scanner(ns, table, si->scan_spec_builder.get()), si);
1242 }
RETHROW(
"namespace=" << ns <<
" table="<< table <<
" scan_spec="<< ss)
1248 const String &table,
const ThriftGen::ScanSpec &ss)
override {
1249 return scanner_open(ns, table, ss);
1253 const String &table,
const ThriftGen::Future ff,
1254 const ThriftGen::ScanSpec &ss)
override {
1256 LOG_API_START(
"namespace=" << ns <<
" table=" << table <<
" future="
1257 << ff <<
" scan_spec=" << ss);
1259 id = get_object_id(_open_scanner_async(ns, table, ff, ss));
1260 add_reference(
id, ff);
1261 }
RETHROW(
"namespace=" << ns <<
" table=" << table <<
" future="
1262 << ff <<
" scan_spec="<< ss)
1269 const String &table,
const ThriftGen::Future ff,
1270 const ThriftGen::ScanSpec &ss)
override {
1271 return async_scanner_open(ns, table, ff, ss);
1277 remove_namespace(ns);
1283 namespace_close(ns);
1287 const String &table_name)
override {
1288 LOG_API_START(
"namespace=" << ns <<
" table=" << table_name);
1292 }
RETHROW(
"namespace=" << ns <<
" table=" << table_name);
1300 remove_scanner(
id, cobj, scanner_info);
1302 if (g_log_slow_queries) {
1306 if (scanner_info->latency >= g_slow_query_latency_threshold) {
1307 if (scanner_info->hql.empty())
1308 log_slow_query_scanspec(__func__, start_time, end_time,
1309 scanner_info->latency, pd,
1310 get_namespace(scanner_info->ns),
1311 scanner_info->table,
1312 scanner_info->scan_spec_builder.get());
1314 log_slow_query(__func__, start_time, end_time,
1315 scanner_info->latency, pd,
1316 get_namespace(scanner_info->ns),
1326 scanner_close(scanner);
1333 get_scanner_async(scanner)->cancel();
1334 }
RETHROW(
"scanner=" << scanner)
1340 async_scanner_cancel(scanner);
1346 remove_scanner(scanner_async);
1347 remove_references(scanner_async);
1348 }
RETHROW(
"scanner_async="<< scanner_async)
1353 async_scanner_close(scanner_async);
1357 const Scanner scanner_id)
override {
1360 TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1361 _next(result, scanner, m_context.next_threshold);
1362 }
RETHROW(
"scanner="<< scanner_id)
1366 void next_cells(ThriftCells &result,
const Scanner scanner_id)
override {
1367 scanner_get_cells(result, scanner_id);
1371 const Scanner scanner_id)
override {
1374 TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1375 _next(result, scanner, m_context.next_threshold);
1376 }
RETHROW(
"scanner="<< scanner_id <<
" result.size="<< result.size())
1381 const Scanner scanner_id)
override {
1382 scanner_get_cells_as_arrays(result, scanner_id);
1386 const Scanner scanner_id)
override {
1393 TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1396 if (scanner->
next(cell)) {
1397 if (!writer.
add(cell)) {
1399 scanner->
unget(cell);
1410 }
RETHROW(
"scanner="<< scanner_id);
1415 const Scanner scanner_id)
override {
1416 scanner_get_cells_serialized(result, scanner_id);
1420 LOG_API_START(
"scanner="<< scanner_id <<
" result.size="<< result.size());
1422 TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1423 _next_row(result, scanner);
1424 }
RETHROW(
"scanner=" << scanner_id)
1429 void next_row(ThriftCells &result,
const Scanner scanner_id)
override {
1430 scanner_get_row(result, scanner_id);
1434 const Scanner scanner_id)
override {
1437 TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1438 _next_row(result, scanner);
1439 }
RETHROW(
"result.size=" << result.size())
1444 const Scanner scanner_id)
override {
1445 scanner_get_row_as_arrays(result, scanner_id);
1449 const Scanner scanner_id)
override {
1455 std::string prev_row;
1457 TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1460 if (scanner->
next(cell)) {
1462 if (prev_row.empty() || prev_row == cell.
row_key) {
1465 if (prev_row.empty())
1471 scanner->
unget(cell);
1483 }
RETHROW(
"scanner="<< scanner_id)
1488 const Scanner scanner_id)
override {
1489 scanner_get_row_serialized(result, scanner_id);
1492 void get_row(ThriftCells &result,
const ThriftGen::Namespace ns,
1494 LOG_API_START(
"namespace=" << ns <<
" table="<< table <<
" row="<< row);
1500 row.c_str(),
true));
1503 _next(result, scanner.get(), INT32_MAX);
1505 }
RETHROW(
"namespace=" << ns <<
" table="<< table <<
" row="<< row)
1510 const ThriftGen::Namespace ns,
const String &table,
1511 const String &row)
override {
1512 LOG_API_START(
"namespace=" << ns <<
" table="<< table <<
" row="<< row);
1519 row.c_str(),
true));
1522 _next(result, scanner.get(), INT32_MAX);
1524 }
RETHROW(
"namespace=" << ns <<
" table="<< table <<
" row="<< row)
1529 const ThriftGen::Namespace ns,
const std::string& table,
1530 const std::string& row)
override {
1531 LOG_API_START(
"namespace=" << ns <<
" table="<< table <<
" row"<< row);
1539 row.c_str(),
true));
1544 while (scanner->next(cell))
1550 }
RETHROW(
"namespace=" << ns <<
" table="<< table <<
" row"<< row)
1554 void get_cell(Value &result,
const ThriftGen::Namespace ns,
1556 LOG_API_START(
"namespace=" << ns <<
" table=" << table <<
" row="
1557 << row <<
" column=" << column);
1568 column.c_str(),
true, row.c_str(), column.c_str(),
true));
1574 if (scanner->next(cell))
1579 }
RETHROW(
"namespace=" << ns <<
" table=" << table <<
" row="
1580 << row <<
" column=" << column)
1585 void get_cells(ThriftCells &result,
const ThriftGen::Namespace ns,
1586 const String &table,
const ThriftGen::ScanSpec &ss)
override {
1587 LOG_API_START(
"namespace=" << ns <<
" table=" << table <<
" scan_spec="
1588 << ss <<
" result.size=" << result.size());
1594 _next(result, scanner.get(), INT32_MAX);
1596 }
RETHROW(
"namespace=" << ns <<
" table="<< table <<
" scan_spec="<< ss)
1601 const ThriftGen::Namespace ns,
const String &table,
1602 const ThriftGen::ScanSpec &ss)
override {
1603 LOG_API_START(
"namespace=" << ns <<
" table="<< table <<
" scan_spec="<< ss);
1609 _next(result, scanner.get(), INT32_MAX);
1611 }
RETHROW(
"namespace=" << ns <<
" table="<< table <<
" scan_spec="<< ss)
1616 const ThriftGen::Namespace ns,
const String& table,
1617 const ThriftGen::ScanSpec& ss)
override {
1618 LOG_API_START(
"namespace=" << ns <<
" table="<< table <<
" scan_spec="<< ss);
1627 while (scanner->next(cell))
1633 }
RETHROW(
"namespace=" << ns <<
" table="<< table <<
" scan_spec="<< ss)
1638 const String &table,
const ThriftGen::MutateSpec &mutate_spec,
1639 const ThriftCells &cells)
override {
1641 " mutate_spec.appname=" << mutate_spec.appname);
1644 _offer_cells(ns, table, mutate_spec, cells);
1645 }
RETHROW(
"namespace=" << ns <<
" table=" << table
1646 <<
" mutate_spec.appname="<< mutate_spec.appname)
1651 const ThriftGen::MutateSpec &mutate_spec,
const ThriftCells &cells)
override {
1652 shared_mutator_set_cells(ns, table, mutate_spec, cells);
1656 const String &table,
const ThriftGen::MutateSpec &mutate_spec,
1657 const ThriftGen::Cell &cell)
override {
1659 <<
" mutate_spec.appname="<< mutate_spec.appname);
1662 _offer_cell(ns, table, mutate_spec, cell);
1663 }
RETHROW(
"namespace=" << ns <<
" table=" << table
1664 <<
" mutate_spec.appname="<< mutate_spec.appname)
1669 const ThriftGen::MutateSpec &mutate_spec,
1670 const ThriftGen::Cell &cell)
override {
1671 shared_mutator_set_cell(ns, table, mutate_spec, cell);
1675 const String &table,
const ThriftGen::MutateSpec &mutate_spec,
1676 const ThriftCellsAsArrays &cells)
override {
1678 <<
" mutate_spec.appname=" << mutate_spec.appname);
1680 _offer_cells(ns, table, mutate_spec, cells);
1681 LOG_API(
"mutate_spec.appname=" << mutate_spec.appname <<
" done");
1682 }
RETHROW(
"namespace=" << ns <<
" table=" << table
1683 <<
" mutate_spec.appname=" << mutate_spec.appname)
1688 const String &table,
const ThriftGen::MutateSpec &mutate_spec,
1689 const ThriftCellsAsArrays &cells)
override {
1690 shared_mutator_set_cells_as_arrays(ns, table, mutate_spec, cells);
1694 const String &table,
const ThriftGen::MutateSpec &mutate_spec,
1695 const CellAsArray &cell)
override {
1698 <<
" mutate_spec.appname=" << mutate_spec.appname);
1701 _offer_cell(ns, table, mutate_spec, cell);
1702 LOG_API(
"mutate_spec.appname=" << mutate_spec.appname <<
" done");
1703 }
RETHROW(
"namespace=" << ns <<
" table=" << table
1704 <<
" mutate_spec.appname=" << mutate_spec.appname)
1709 const String &table,
const ThriftGen::MutateSpec &mutate_spec,
1710 const CellAsArray &cell)
override {
1711 shared_mutator_set_cell_as_array(ns, table, mutate_spec, cell);
1715 ThriftGen::Future id;
1718 capacity = (capacity <= 0) ? m_context.future_capacity : capacity;
1720 }
RETHROW(
"capacity=" << capacity)
1726 return future_open(capacity);
1730 ThriftGen::Future ff,
int timeout_millis)
override {
1736 bool timed_out =
false;
1737 bool done = !(future->
get(hresult, (uint32_t)timeout_millis,
1742 tresult.is_empty =
true;
1747 tresult.is_empty =
false;
1748 _convert_result(hresult, tresult);
1750 << tresult.id <<
" is_scan=" << tresult.is_scan
1751 <<
" is_error=" << tresult.is_error);
1757 ThriftGen::Future ff,
int timeout_millis)
override {
1758 future_get_result(tresult, ff, timeout_millis);
1762 ThriftGen::Future ff,
int timeout_millis)
override {
1767 bool timed_out =
false;
1768 bool done = !(future->
get(hresult, (uint32_t)timeout_millis,
1773 tresult.is_empty =
true;
1778 tresult.is_empty =
false;
1779 _convert_result_as_arrays(hresult, tresult);
1781 <<
" is_scan=" << tresult.is_scan <<
"is_error="
1782 << tresult.is_error);
1788 ThriftGen::Future ff,
int timeout_millis)
override {
1789 future_get_result_as_arrays(tresult, ff, timeout_millis);
1793 ThriftGen::Future ff,
int timeout_millis)
override {
1799 bool timed_out =
false;
1800 bool done = !(future->
get(hresult, (uint32_t)timeout_millis,
1805 tresult.is_empty =
true;
1810 tresult.is_empty =
false;
1811 _convert_result_serialized(hresult, tresult);
1813 <<
" is_scan=" << tresult.is_scan <<
"is_error="
1814 << tresult.is_error);
1820 ThriftGen::Future ff,
int timeout_millis)
override {
1821 future_get_result_serialized(tresult, ff, timeout_millis);
1872 bool has_outstanding;
1879 return has_outstanding;
1895 ThriftGen::Namespace id;
1898 id = get_cached_object_id( dynamic_pointer_cast<ClientObject>(m_context.client->open_namespace(ns)) );
1905 return namespace_open(ns);
1909 const String &table,
const ThriftGen::Future ff, ::int32_t flags)
override {
1910 LOG_API_START(
"namespace=" << ns <<
" table=" << table <<
" future="
1911 << ff <<
" flags=" << flags);
1914 id = get_object_id(_open_mutator_async(ns, table, ff, flags));
1915 add_reference(
id, ff);
1916 }
RETHROW(
"namespace=" << ns <<
" table=" << table <<
" future="
1917 << ff <<
" flags=" << flags)
1923 const String &table,
const ThriftGen::Future ff, ::int32_t flags)
override {
1924 return async_mutator_open(ns, table, ff, flags);
1928 const String &table, int32_t flags, int32_t flush_interval)
override {
1929 LOG_API_START(
"namespace=" << ns <<
"table=" << table <<
" flags="
1930 << flags <<
" flush_interval=" << flush_interval);
1935 id = get_object_id(t->create_mutator(0, flags, flush_interval));
1936 }
RETHROW(
"namespace=" << ns <<
"table=" << table <<
" flags="
1937 << flags <<
" flush_interval=" << flush_interval)
1943 const String &table, int32_t flags, int32_t flush_interval)
override {
1944 return mutator_open(ns, table, flags, flush_interval);
1950 get_mutator(mutator)->flush();
1951 }
RETHROW(
"mutator=" << mutator)
1956 mutator_flush(mutator);
1962 get_mutator_async(mutator)->flush();
1963 }
RETHROW(
"mutator=" << mutator)
1968 async_mutator_flush(mutator);
1974 flush_mutator(mutator);
1975 remove_mutator(mutator);
1976 }
RETHROW(
"mutator=" << mutator)
1981 mutator_close(mutator);
1988 get_mutator_async(mutator)->cancel();
1989 }
RETHROW(
"mutator="<< mutator)
1995 async_mutator_cancel(mutator);
2001 flush_mutator_async(mutator);
2002 remove_mutator(mutator);
2003 remove_references(mutator);
2004 }
RETHROW(
"mutator" << mutator)
2009 async_mutator_close(mutator);
2013 const ThriftCells &cells)
override {
2014 LOG_API_START(
"mutator=" << mutator <<
" cell.size=" << cells.size());
2016 _set_cells(mutator, cells);
2017 }
RETHROW(
"mutator=" << mutator <<
" cell.size=" << cells.size())
2022 const ThriftGen::Cell &cell)
override {
2025 _set_cell(mutator, cell);
2026 }
RETHROW(
"mutator=" << mutator <<
" cell=" << cell)
2031 const ThriftCellsAsArrays &cells)
override {
2032 LOG_API_START(
"mutator=" << mutator <<
" cell.size=" << cells.size());
2034 _set_cells(mutator, cells);
2035 }
RETHROW(
"mutator=" << mutator <<
" cell.size=" << cells.size())
2040 const CellAsArray &cell)
override {
2042 LOG_API_START(
"mutator=" << mutator <<
" cell_as_array.size="
2045 _set_cell(mutator, cell);
2046 }
RETHROW(
"mutator="<< mutator <<
" cell_as_array.size="<< cell.size());
2051 const CellsSerialized &cells,
const bool flush)
override {
2052 LOG_API_START(
"mutator=" << mutator <<
" cell.size=" << cells.size());
2057 (uint32_t)cells.length());
2058 while (reader.next()) {
2060 cb.
add(hcell,
false);
2062 get_mutator(mutator)->set_cells(cb.
get());
2063 if (flush || reader.flush())
2064 get_mutator(mutator)->flush();
2065 }
RETHROW(
"mutator="<< mutator <<
" cell.size="<< cells.size())
2071 const ThriftGen::Cell &cell)
override {
2072 LOG_API_START(
"ns=" << ns <<
" table=" << table <<
" cell=" << cell);
2078 cb.
add(hcell,
false);
2079 mutator->set_cells(cb.
get());
2081 }
RETHROW(
"ns=" << ns <<
" table=" << table <<
" cell=" << cell);
2086 const ThriftCells &cells)
override {
2087 LOG_API_START(
"ns=" << ns <<
" table=" << table <<
" cell.size="
2093 mutator->set_cells(hcells);
2095 }
RETHROW(
"ns=" << ns <<
" table=" << table <<
" cell.size="
2101 const String& table,
const ThriftCellsAsArrays &cells)
override {
2102 LOG_API_START(
"ns=" << ns <<
" table=" << table <<
" cell.size="
2109 mutator->set_cells(hcells);
2111 }
RETHROW(
"ns="<< ns <<
" table=" << table<<
" cell.size="<< cells.size());
2116 const String& table,
const CellAsArray &cell)
override {
2119 LOG_API_START(
"ns=" << ns <<
" table=" << table <<
" cell_as_array.size="
2126 cb.
add(hcell,
false);
2127 mutator->set_cells(cb.
get());
2129 }
RETHROW(
"ns=" << ns <<
" table=" << table <<
" cell_as_array.size="
2135 const String& table,
const CellsSerialized &cells)
override {
2137 " cell_serialized.size=" << cells.size() <<
" flush=" << flush);
2143 (uint32_t)cells.length());
2144 while (reader.next()) {
2146 cb.
add(hcell,
false);
2148 mutator->set_cells(cb.
get());
2150 }
RETHROW(
"ns=" << ns <<
" table=" << table <<
" cell_serialized.size="
2151 << cells.size() <<
" flush=" << flush);
2157 const ThriftCells &cells)
override {
2158 LOG_API_START(
"mutator=" << mutator <<
" cells.size=" << cells.size());
2160 _set_cells_async(mutator, cells);
2161 }
RETHROW(
"mutator=" << mutator <<
" cells.size=" << cells.size())
2166 const ThriftCells &cells)
override {
2167 async_mutator_set_cells(mutator, cells);
2171 const ThriftGen::Cell &cell)
override {
2174 _set_cell_async(mutator, cell);
2175 }
RETHROW(
"mutator=" << mutator <<
" cell=" << cell);
2180 const ThriftGen::Cell &cell)
override {
2181 async_mutator_set_cell(mutator, cell);
2185 const ThriftCellsAsArrays &cells)
override {
2186 LOG_API_START(
"mutator=" << mutator <<
" cells.size=" << cells.size());
2188 _set_cells_async(mutator, cells);
2189 }
RETHROW(
"mutator=" << mutator <<
" cells.size=" << cells.size())
2194 const ThriftCellsAsArrays &cells)
override {
2195 async_mutator_set_cells_as_arrays(mutator, cells);
2199 const CellAsArray &cell)
override {
2201 LOG_API_START(
"mutator=" << mutator <<
" cell_as_array.size="
2204 _set_cell_async(mutator, cell);
2205 }
RETHROW(
"mutator=" << mutator <<
" cell_as_array.size=" << cell.size())
2210 const CellAsArray &cell)
override {
2211 async_mutator_set_cell_as_array(mutator, cell);
2215 const CellsSerialized &cells,
2216 const bool flush)
override {
2217 LOG_API_START(
"mutator=" << mutator <<
" cells.size=" << cells.size());
2222 (uint32_t)cells.length());
2223 while (reader.next()) {
2225 cb.
add(hcell,
false);
2229 if (flush || reader.flush() || mutator_ptr->
needs_flush())
2230 mutator_ptr->
flush();
2232 }
RETHROW(
"mutator=" << mutator <<
" cells.size=" << cells.size());
2237 const CellsSerialized &cells,
2238 const bool flush)
override {
2239 async_mutator_set_cells_serialized(mutator, cells, flush);
2246 exists = m_context.client->exists_namespace(ns);
2253 return namespace_exists(ns);
2257 const String &table)
override {
2263 }
RETHROW(
"namespace=" << ns <<
" table="<< table)
2269 const String &table)
override {
2274 const String &table)
override {
2279 }
RETHROW(
"namespace=" << ns <<
" table="<< table)
2284 const String &table)
override {
2285 table_get_id(result, ns, table);
2289 const ThriftGen::Namespace ns,
const String &table)
override {
2294 }
RETHROW(
"namespace=" << ns <<
" table=" << table)
2299 const String &table)
override {
2300 table_get_schema_str(result, ns, table);
2304 const ThriftGen::Namespace ns,
const String &table)
override {
2309 }
RETHROW(
"namespace=" << ns <<
" table=" << table)
2314 const ThriftGen::Namespace ns,
const String &table)
override {
2315 table_get_schema_str_with_ids(result, ns, table);
2319 const ThriftGen::Namespace ns,
const String &table)
override {
2326 }
RETHROW(
"namespace=" << ns <<
" table="<< table)
2331 const ThriftGen::Namespace ns,
const String &table)
override {
2332 table_get_schema(result, ns, table);
2336 const ThriftGen::Namespace ns)
override {
2340 std::vector<Hypertable::NamespaceListing> listing;
2343 for(
size_t ii=0; ii < listing.size(); ++ii)
2344 if (!listing[ii].is_namespace)
2345 tables.push_back(listing[ii].name);
2353 const ThriftGen::Namespace ns)
override {
2357 std::vector<Hypertable::NamespaceListing> listing;
2359 ThriftGen::NamespaceListing entry;
2361 for(
size_t ii=0; ii < listing.size(); ++ii) {
2362 entry.name = listing[ii].name;
2363 entry.is_namespace = listing[ii].is_namespace;
2364 _return.push_back(entry);
2373 const ThriftGen::Namespace ns)
override {
2374 namespace_get_listing(_return, ns);
2378 const ThriftGen::Namespace ns,
const String &table)
override {
2381 <<
" splits.size=" << _return.size());
2383 ThriftGen::TableSplit tsplit;
2386 for (TableSplitsContainer::iterator iter = splits.begin();
2387 iter != splits.end(); ++iter) {
2389 _return.push_back(tsplit);
2392 RETHROW(
"namespace=" << ns <<
" table=" << table)
2398 const ThriftGen::Namespace ns,
const String &table)
override {
2399 table_get_splits(_return, ns, table);
2403 LOG_API_START(
"namespace=" << ns <<
" if_exists=" << if_exists);
2405 m_context.client->drop_namespace(ns, NULL, if_exists);
2407 RETHROW(
"namespace=" << ns <<
" if_exists=" << if_exists)
2412 namespace_drop(ns, if_exists);
2416 const String &new_table_name)
override {
2418 <<
" new_table_name=" << new_table_name <<
" done");
2423 RETHROW(
"namespace=" << ns <<
" table=" << table <<
" new_table_name="
2424 << new_table_name <<
" done")
2429 const String &new_table_name)
override {
2430 table_rename(ns, table, new_table_name);
2434 const bool if_exists)
override {
2435 LOG_API_START(
"namespace=" << ns <<
" table=" << table <<
" if_exists="
2441 RETHROW(
"namespace=" << ns <<
" table=" << table <<
" if_exists="
2447 const bool if_exists)
override {
2448 table_drop(ns, table, if_exists);
2461 const ThriftGen::Namespace ns,
const std::string& table_name,
2462 const ThriftGen::Key& tkey,
const std::string& value)
override {
2464 << tkey <<
" value=" << value);
2472 value.empty() ? guid : (std::string &)value);
2474 RETHROW(
"namespace=" << ns <<
" table=" << table_name
2475 << tkey <<
" value=" << value);
2477 _return = value.empty() ? guid : value;
2480 void status(ThriftGen::Status& _return)
override {
2482 _return.__set_code(0);
2483 _return.__set_text(
"");
2489 kill(getpid(), SIGKILL);
2500 ThriftGen::Result &tresult) {
2503 if (hresult->is_scan()) {
2504 tresult.is_scan =
true;
2505 tresult.id = try_get_object_id(hresult->get_scanner());
2506 if (hresult->is_error()) {
2507 tresult.is_error =
true;
2508 hresult->get_error(tresult.error, tresult.error_msg);
2509 tresult.__isset.error =
true;
2510 tresult.__isset.error_msg =
true;
2513 tresult.is_error =
false;
2514 tresult.__isset.cells =
true;
2515 hresult->get_cells(hcells);
2520 tresult.is_scan =
false;
2521 tresult.id = try_get_object_id(hresult->get_mutator());
2522 if (hresult->is_error()) {
2523 tresult.is_error =
true;
2524 hresult->get_error(tresult.error, tresult.error_msg);
2525 hresult->get_failed_cells(hcells);
2527 tresult.__isset.error =
true;
2528 tresult.__isset.error_msg =
true;
2534 ThriftGen::ResultAsArrays &tresult) {
2537 if (hresult->is_scan()) {
2538 tresult.is_scan =
true;
2539 tresult.id = try_get_object_id(hresult->get_scanner());
2540 if (hresult->is_error()) {
2541 tresult.is_error =
true;
2542 hresult->get_error(tresult.error, tresult.error_msg);
2543 tresult.__isset.error =
true;
2544 tresult.__isset.error_msg =
true;
2547 tresult.is_error =
false;
2548 tresult.__isset.cells =
true;
2549 hresult->get_cells(hcells);
2555 "not yet implemented");
2560 ThriftGen::ResultSerialized &tresult) {
2563 if (hresult->is_scan()) {
2564 tresult.is_scan =
true;
2565 tresult.id = try_get_object_id(hresult->get_scanner());
2566 if (hresult->is_error()) {
2567 tresult.is_error =
true;
2568 hresult->get_error(tresult.error, tresult.error_msg);
2569 tresult.__isset.error =
true;
2570 tresult.__isset.error_msg =
true;
2573 tresult.is_error =
false;
2574 tresult.__isset.cells =
true;
2575 hresult->get_cells(hcells);
2581 "not yet implemented");
2586 const String &table,
const ThriftGen::Future ff, ::int32_t flags) {
2591 return t->create_mutator_async(future, 0, flags);
2595 const String &table,
const ThriftGen::Future ff,
2596 const ThriftGen::ScanSpec &ss) {
2603 return t->create_scanner_async(future, hss, 0);
2610 return t->create_scanner(ss, 0);
2617 return t->create_mutator();
2620 template <
class CellT>
2623 int32_t amount_read = 0;
2625 while (amount_read < limit) {
2626 if (scanner->
next(cell)) {
2629 result.push_back(tcell);
2636 template <
class CellT>
2639 std::string prev_row;
2641 while (scanner->
next(cell)) {
2642 if (prev_row.empty() || prev_row == cell.
row_key) {
2645 result.push_back(tcell);
2646 if (prev_row.empty())
2650 scanner->
unget(cell);
2660 interp->set_namespace(namespace_ptr->
get_name());
2661 interp->execute(hql, cb);
2664 template <
class CellT>
2666 const ThriftGen::MutateSpec &mutate_spec,
2667 const vector<CellT> &cells) {
2670 get_shared_mutator(ns, table, mutate_spec)->set_cells(hcells);
2673 template <
class CellT>
2675 const ThriftGen::MutateSpec &mutate_spec,
const CellT &cell) {
2679 cb.
add(hcell,
false);
2680 get_shared_mutator(ns, table, mutate_spec)->set_cells(cb.
get());
2683 template <
class CellT>
2684 void _set_cells(
const Mutator mutator,
const vector<CellT> &cells) {
2687 get_mutator(mutator)->set_cells(hcells);
2690 template <
class CellT>
2695 cb.
add(hcell,
false);
2696 get_mutator(mutator)->set_cells(cb.
get());
2699 template <
class CellT>
2706 mutator_ptr->
flush();
2709 template <
class CellT>
2714 cb.
add(hcell,
false);
2718 mutator_ptr->
flush();
2722 std::lock_guard<std::mutex> lock(m_mutex);
2723 ObjectMap::iterator it = m_object_map.find(
id);
2724 return (it != m_object_map.end()) ? it->second.get() : 0;
2728 std::lock_guard<std::mutex> lock(m_mutex);
2729 ObjectMap::iterator it = m_cached_object_map.find(
id);
2730 return (it != m_cached_object_map.end()) ? it->second.get() : 0;
2738 format(
"Invalid future id: %lld", (
Lld)
id));
2749 format(
"Invalid namespace id: %lld", (
Lld)
id));
2756 std::lock_guard<std::mutex> lock(m_mutex);
2757 while (!m_cached_object_map.insert(make_pair(
id =
Random::number32(), co)).second ||
id == 0);
2762 std::lock_guard<std::mutex> lock(m_mutex);
2763 int64_t
id =
reinterpret_cast<int64_t
>(co);
2769 std::lock_guard<std::mutex> lock(m_mutex);
2770 int64_t
id =
reinterpret_cast<int64_t
>(mutator.get());
2771 m_object_map.insert(make_pair(
id, static_pointer_cast<ClientObject>(mutator)));
2776 std::lock_guard<std::mutex> lock(m_mutex);
2777 int64_t
id =
reinterpret_cast<int64_t
>(co);
2778 return m_object_map.find(
id) != m_object_map.end() ?
id : 0;
2782 std::lock_guard<std::mutex> lock(m_mutex);
2783 int64_t
id =
reinterpret_cast<int64_t
>(scanner);
2785 m_scanner_info_map.insert(make_pair(
id, info));
2790 std::lock_guard<std::mutex> lock(m_mutex);
2791 int64_t
id =
reinterpret_cast<int64_t
>(scanner.get());
2792 m_object_map.insert(make_pair(
id, static_pointer_cast<ClientObject>(scanner)));
2793 m_scanner_info_map.insert(make_pair(
id, info));
2798 std::lock_guard<std::mutex> lock(m_mutex);
2799 ObjectMap::iterator it = m_object_map.find(to);
2800 if (it != m_object_map.end())
2801 m_reference_map.insert(make_pair(from, it->second));
2805 std::lock_guard<std::mutex> lock(m_mutex);
2806 m_reference_map.erase(
id);
2815 format(
"Invalid scanner id: %lld", (
Lld)
id));
2821 std::lock_guard<std::mutex> lock(m_mutex);
2823 auto it = m_object_map.find(
id);
2824 if (it == m_object_map.end() ||
2825 (scanner =
dynamic_cast<TableScanner *
>(it->second.get())) ==
nullptr) {
2828 format(
"Invalid scanner id: %lld", (
Lld)
id));
2830 auto sit = m_scanner_info_map.find(
id);
2831 HT_ASSERT(sit != m_scanner_info_map.end());
2838 bool removed =
false;
2841 std::lock_guard<std::mutex> lock(m_mutex);
2842 ObjectMap::iterator it = m_object_map.find(
id);
2843 if (it != m_object_map.end()) {
2844 item = (*it).second;
2845 m_object_map.erase(it);
2854 bool removed =
false;
2857 std::lock_guard<std::mutex> lock(m_mutex);
2858 ObjectMap::iterator it = m_cached_object_map.find(
id);
2859 if (it != m_cached_object_map.end()) {
2860 item = (*it).second;
2861 m_cached_object_map.erase(it);
2872 std::lock_guard<std::mutex> lock(m_mutex);
2873 m_scanner_info_map.erase(
id);
2874 ObjectMap::iterator it = m_object_map.find(
id);
2875 if (it != m_object_map.end()) {
2876 item = (*it).second;
2877 m_object_map.erase(it);
2882 format(
"Invalid scanner id: %lld", (
Lld)
id));
2888 std::lock_guard<std::mutex> lock(m_mutex);
2889 ObjectMap::iterator it = m_object_map.find(
id);
2890 if (it != m_object_map.end()) {
2891 scanner = (*it).second;
2892 m_object_map.erase(it);
2897 format(
"Invalid scanner id: %lld", (
Lld)
id));
2899 info = m_scanner_info_map[id];
2900 m_scanner_info_map.erase(
id);
2904 const String &table,
const ThriftGen::MutateSpec &mutate_spec)
override {
2905 std::lock_guard<std::mutex> lock(m_context.shared_mutator_mutex);
2908 SharedMutatorMap::iterator it = m_context.shared_mutator_map.find(skey);
2911 if (it != m_context.shared_mutator_map.end()) {
2912 LOG_API(
"deleting shared mutator on namespace=" << ns <<
" table="
2913 << table <<
" with appname=" << mutate_spec.appname);
2914 m_context.shared_mutator_map.erase(it);
2919 LOG_API(
"creating shared mutator on namespace=" << ns <<
" table=" << table
2920 <<
" with appname=" << mutate_spec.appname);
2923 TableMutator *mutator = t->create_mutator(0, mutate_spec.flags,
2924 mutate_spec.flush_interval);
2925 m_context.shared_mutator_map[skey] = mutator;
2930 const String &table,
const ThriftGen::MutateSpec &mutate_spec)
override {
2931 shared_mutator_refresh(ns, table, mutate_spec);
2935 const String &table,
const ThriftGen::MutateSpec &mutate_spec) {
2936 std::lock_guard<std::mutex> lock(m_context.shared_mutator_mutex);
2939 SharedMutatorMap::iterator it = m_context.shared_mutator_map.find(skey);
2942 if (it != m_context.shared_mutator_map.end())
2946 LOG_API(
"creating shared mutator on namespace=" << ns <<
" table="
2947 << table <<
" with appname=" << mutate_spec.appname);
2950 TableMutator *mutator = t->create_mutator(0, mutate_spec.flags,
2951 mutate_spec.flush_interval);
2952 m_context.shared_mutator_map[skey] = mutator;
2962 format(
"Invalid mutator id: %lld", (
Lld)
id));
2973 format(
"Invalid mutator id: %lld", (
Lld)
id));
2979 if (!remove_object(
id)) {
2982 format(
"Invalid future id: %lld", (
Lld)
id));
2987 if (!remove_cached_object(
id)) {
2990 format(
"Invalid namespace id: %lld", (
Lld)
id));
2995 if (!remove_object(
id)) {
2998 format(
"Invalid mutator id: %lld", (
Lld)
id));
3012 template <
class ResultT,
class CellT>
3014 result.results.push_back(ret);
3015 result.__isset.results =
true;
3018 template <
class ResultT,
class CellT>
3024 while (s->next(hcell)) {
3026 result.cells.push_back(tcell);
3028 result.__isset.cells =
true;
3030 if (g_log_slow_queries)
3031 s->get_profile_data(profile_data);
3037 result.scanner = handler.get_scanner_id(s, si);
3038 result.__isset.scanner =
true;
3043 template <
class ResultT,
class CellT>
3046 Parent::on_finish(m);
3049 result.mutator = handler.get_object_id(m);
3050 result.__isset.mutator =
true;
3062 return instance.get_handler(remotePeer);
3067 instance.release_handler(serverHandler);
3079 std::lock_guard<std::mutex> lock(m_mutex);
3080 ServerHandlerMap::iterator it = m_server_handler_map.find(remotePeer);
3081 if (it != m_server_handler_map.end()) {
3083 return it->second.second;
3087 m_server_handler_map.insert(
3088 std::make_pair(remotePeer,
3089 std::make_pair(1, serverHandler)));
3091 return serverHandler;
3096 std::lock_guard<std::mutex> lock(m_mutex);
3097 ServerHandlerMap::iterator it =
3098 m_server_handler_map.find(serverHandler->
remote_peer());
3099 if (it != m_server_handler_map.end()) {
3100 if (--it->second.first > 0) {
3104 m_server_handler_map.erase(it);
3106 delete serverHandler;
3121 HqlServiceIf*
getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
override {
3122 typedef ::apache::thrift::transport::TSocket TTransport;
3124 dynamic_cast<TTransport*
>(connInfo.transport.get())->getPeerAddress();
3125 g_metrics_handler->connection_increment();
3131 g_metrics_handler->connection_decrement();
3142 using namespace ThriftBroker;
3146 init_with_policies<Policies>(argc, argv);
3148 if (get_bool(
"ThriftBroker.Hyperspace.Session.Reconnect"))
3149 properties->set(
"Hyperspace.Session.Reconnect",
true);
3151 if (get_bool(
"ThriftBroker.SlowQueryLog.Enable")) {
3152 g_log_slow_queries =
true;
3153 g_slow_query_latency_threshold = get_i32(
"ThriftBroker.SlowQueryLog.LatencyThreshold");
3154 g_slow_query_log =
new Cronolog(
"SlowQuery.log",
3159 g_metrics_handler = std::make_shared<MetricsHandler>(
properties, g_slow_query_log);
3160 g_metrics_handler->start_collecting();
3164 g_context = context.get();
3166 ::uint16_t port = get_i16(
"port");
3167 boost::shared_ptr<TProtocolFactory> protocolFactory(
new TBinaryProtocolFactory());
3168 boost::shared_ptr<HqlServiceIfFactory> hql_service_factory(
new ThriftBrokerIfFactory());
3169 boost::shared_ptr<TProcessorFactory> hql_service_processor_factory(
new HqlServiceProcessorFactory(hql_service_factory));
3171 boost::shared_ptr<TServerTransport> serverTransport;
3173 if (
has(
"thrift-timeout")) {
3174 int timeout_ms = get_i32(
"thrift-timeout");
3175 serverTransport.reset(
new TServerSocket(port, timeout_ms, timeout_ms) );
3178 serverTransport.reset(
new TServerSocket(port) );
3180 boost::shared_ptr<TTransportFactory> transportFactory(
new TFramedTransportFactory());
3182 TThreadedServer server(hql_service_processor_factory, serverTransport,
3183 transportFactory, protocolFactory);
3185 HT_INFO(
"Starting the server...");
3189 g_metrics_handler->start_collecting();
3190 g_metrics_handler.reset();
void set_cells(const ThriftGen::Namespace ns, const String &table, const ThriftCells &cells) override
bool get_counter() const
Gets the counter option.
void set_id(int32_t id)
Sets column ID.
bool is_set_max_versions() const
Checks if max versions option is set.
Retrieves system information (hardware, installation directory, etc)
bool get_time_order_desc() const
Gets time order desc option.
Meta::list< ThriftBrokerPolicy, DefaultCommPolicy > Policies
void shared_mutator_set_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftGen::Cell &cell) override
void next_row(ThriftCells &result, const Scanner scanner_id) override
void _convert_result_serialized(Hypertable::ResultPtr &hresult, ThriftGen::ResultSerialized &tresult)
void async_mutator_flush(const MutatorAsync mutator) override
void get_schema_str(String &result, const ThriftGen::Namespace ns, const String &table) override
bool is_set_ttl() const
Checks if ttl option is set.
void table_create(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Schema &schema) override
int32_t get_blocksize() const
Gets blocksize option.
void get_table_splits(const std::string &name, TableSplitsContainer &splits)
Returns a list of existing table names.
void set_row_offset(int32_t n)
Sets the number of rows to be skipped at the beginning of the query.
void async_mutator_cancel(const MutatorAsync mutator) override
bool is_set_bloom_filter() const
Checks if bloom filter option is set.
std::shared_ptr< MetricsHandler > MetricsHandlerPtr
Smart pointer to MetricsHandler.
std::vector< Cell, CellAlloc > Cells
void create_namespace(const String &ns) override
void remove_mutator(int64_t id)
TablePtr open_table(const std::string &name, int32_t flags=0)
Opens a table.
#define HT_WARNF(msg,...)
TableMutator * get_shared_mutator(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec)
void set_generation(int64_t generation)
Sets generation.
TableScannerAsync * _open_scanner_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss)
int64_t get_scanner_id(TableScannerPtr &scanner, ScannerInfoPtr &info)
int32_t scanblocks
Number of scan blocks returned from RangeServers.
void convert_schema(const Hypertable::SchemaPtr &hschema, ThriftGen::Schema &tschema)
void offer_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCells &cells) override
PropertiesPtr properties
This singleton map stores all options.
void reserve_rows(size_t s)
ServerHandler(const String &remote_peer, Context &c)
void set_bloom_filter(const std::string &bloomfilter)
Sets bloom filter option.
std::string String
A String is simply a typedef to std::string.
void get_table_id(String &result, const ThriftGen::Namespace ns, const String &table) override
bool set_max_versions(int32_t max_versions)
Sets max versions option.
ColumnPredicates column_predicates
void hql_exec_as_arrays(HqlResultAsArrays &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
void _set_cell_async(const MutatorAsync mutator, const CellT &cell)
int32_t subscanners
Number of RangeServer::create_scanner() calls.
void set_deleted(bool value)
Sets deleted flag.
void cancel_mutator_async(const MutatorAsync mutator) override
void error_get_text(std::string &_return, int error_code) override
void mutator_set_cells_serialized(const Mutator mutator, const CellsSerialized &cells, const bool flush) override
int64_t bytes_scanned
Number of bytes scanned while executing scan.
bool is_cancelled()
Checks whether the Future object has been cancelled.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
void scanner_get_row_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void mutator_close(const Mutator mutator) override
chrono::time_point< fast_clock > time_point
void scanner_get_row(ThriftCells &result, const Scanner scanner_id) override
void _convert_result_as_arrays(const Hypertable::ResultPtr &hresult, ThriftGen::ResultAsArrays &tresult)
Asynchronous table scanner.
void convert_table_split(const Hypertable::TableSplit &hsplit, ThriftGen::TableSplit &tsplit)
Represents a table split.
void shared_mutator_set_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCells &cells) override
const String & remote_peer() const
void _next_row(vector< CellT > &result, TableScanner *scanner)
ThriftGen::Future future_open(int capacity) override
void convert_scan_spec(const ThriftGen::ScanSpec &tss, Hypertable::ScanSpec &hss)
void async_mutator_set_cells_as_arrays(const MutatorAsync mutator, const ThriftCellsAsArrays &cells) override
void log_slow_query_scanspec(const char *func_name, std::chrono::fast_clock::time_point start_time, std::chrono::fast_clock::time_point end_time, int64_t latency_ms, ProfileDataScanner &profile_data, Hypertable::Namespace *ns, const string &table, Hypertable::ScanSpec &ss)
long long unsigned int Llu
Shortcut for printf formats.
pair< int64_t, int64_t > time_interval
const char * value_regexp
const char * column_qualifier
void hql_query_as_arrays(HqlResultAsArrays &result, const ThriftGen::Namespace ns, const String &hql) override
void set_replication(int16_t replication)
Sets replication option.
#define THROW_TE(_code_, _str_)
int compare(const SharedMutatorMapKey &skey) const
Declarations for fast_clock.
Column family specification.
void convert_key(const ThriftGen::Key &tkey, Hypertable::KeySpec &hkey)
void flush_mutator_async(const MutatorAsync mutator) override
void scanner_get_cells_serialized(CellsSerialized &result, const Scanner scanner_id) override
void set_qualifier_index(bool value)
Sets qualifier index flag.
void set_name(const std::string &name)
Sets column family name.
const char * column_qualifier
ScanSpecBuilder scan_spec_builder
bool set_ttl(time_t ttl)
Sets ttl option.
bool is_set_in_memory() const
Checks if in memory option is set.
void _set_cells_async(const MutatorAsync mutator, const vector< CellT > &cells)
void _convert_result(const Hypertable::ResultPtr &hresult, ThriftGen::Result &tresult)
Specification for column family options.
void mutator_set_cells_as_arrays(const Mutator mutator, const ThriftCellsAsArrays &cells) override
void namespace_create(const String &ns) override
Hypertable::Future * get_future(int64_t id)
std::shared_ptr< Result > ResultPtr
Smart pointer to Result.
void set_start_time(int64_t start)
ScannerAsync open_scanner_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss) override
void get_cells(ThriftCells &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
std::string get_table_id(const std::string &name)
Returns the table identifier for a table.
void get_row_as_arrays(ThriftCellsAsArrays &result, const ThriftGen::Namespace ns, const String &table, const String &row) override
void flush(bool sync=true)
Flushes the current buffer accumulated mutations to their respective range servers.
void async_mutator_close(const MutatorAsync mutator) override
void set_generation(int64_t generation)
Sets generation.
void on_scan(TableScannerPtr &) override
Called when interpreter is ready to scan.
Declarations for MetricsHandler.
void mutator_flush(const Mutator mutator) override
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer.
std::shared_ptr< ClientObject > ClientObjectPtr
Smart pointer to ClientObject.
ClientObject * get_cached_object(int64_t id)
void namespace_get_listing(std::vector< ThriftGen::NamespaceListing > &_return, const ThriftGen::Namespace ns) override
bool future_is_full(ThriftGen::Future ff) override
#define LOG_API_START(_expr_)
void next_cells_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
static ServerHandler * getHandler(const String &remotePeer)
std::shared_ptr< TableScanner > TableScannerPtr
Smart pointer to TableScanner.
Specification for access group options.
bool convert_column_family_options(const Hypertable::ColumnFamilyOptions &hoptions, ThriftGen::ColumnFamilyOptions &toptions)
void set_value_regexp(const char *regexp)
Sets the regexp to filter cell values by.
bool is_set_replication() const
Checks if replication option is set.
bool operator==(const Statistics &other)
void unget(const Cell &cell)
Ungets one cell.
void _offer_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellT &cell)
void get_row_serialized(CellsSerialized &result, const ThriftGen::Namespace ns, const std::string &table, const std::string &row) override
String generate_guid()
Generates a new GUID.
void next_cells_serialized(CellsSerialized &result, const Scanner scanner_id) override
void run_hql_interp(const ThriftGen::Namespace ns, const String &hql, HqlInterpreter::Callback &cb)
void set_cells_serialized_async(const MutatorAsync mutator, const CellsSerialized &cells, const bool flush) override
bool has(const String &name)
Check existence of a configuration value.
Represents a row interval.
const string render_hql(const string &table) const
Renders scan spec as an HQL SELECT statement.
bool is_set_counter() const
Checks if counter option is set.
#define LOG_API_FINISH_E(_expr_)
static void releaseHandler(ServerHandler *serverHandler)
Declarations for Cronolog.
void scanner_get_cells_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void async_mutator_set_cells_serialized(const MutatorAsync mutator, const CellsSerialized &cells, const bool flush) override
void create_cell_unique(std::string &_return, const ThriftGen::Namespace ns, const std::string &table_name, const ThriftGen::Key &tkey, const std::string &value) override
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
void set_cell_as_array_async(const MutatorAsync mutator, const CellAsArray &cell) override
bool future_is_empty(ThriftGen::Future ff) override
void set_value_index(bool value)
Sets value index flag.
void cancel_scanner_async(const ScannerAsync scanner) override
void close_namespace(const ThriftGen::Namespace ns) override
Scan predicate and control specification.
void get_future_result(ThriftGen::Result &tresult, ThriftGen::Future ff, int timeout_millis) override
void table_rename(const ThriftGen::Namespace ns, const String &table, const String &new_table_name) override
void mutator_set_cell(const Mutator mutator, const ThriftGen::Cell &cell) override
MutatorAsync async_mutator_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags) override
std::shared_ptr< TableMutator > TableMutatorPtr
Smart pointer to TableMutator.
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
void set_cell_limit(int32_t n)
Sets the maximum number of cells to return.
multimap<::int64_t, ClientObjectPtr > m_reference_map
void create_cell_unique(const TablePtr &table, const KeySpec &key, String &guid)
Inserts a unique value into a table.
ServerHandlerMap m_server_handler_map
bool future_has_outstanding(ThriftGen::Future ff) override
Mutator open_mutator(const ThriftGen::Namespace ns, const String &table, int32_t flags, int32_t flush_interval) override
void set_end_time(int64_t end)
TableMutatorAsync * _open_mutator_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags)
Declarations for ProfileDataScanner.
std::vector< ThriftGen::Cell > ThriftCells
void close_mutator_async(const MutatorAsync mutator) override
void operator=(const Statistics &other)
int32_t cell_limit_per_family
void offer_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftGen::Cell &cell) override
void set_cells(const Cells &cells)
Insert a bunch of cells into the table (atomically if cells are in the same range/row) ...
void async_scanner_close(const ScannerAsync scanner_async) override
void cancel_future(ThriftGen::Future ff) override
static time_point now() noexcept
void merge_options(const ColumnFamilyOptions &other)
Merges options from another ColumnFamilyOptions object.
void cancel()
Cancels outstanding scanners/mutators.
void _set_cell(const Mutator mutator, const CellT &cell)
void mutator_set_cells(const Mutator mutator, const ThriftCells &cells) override
ThriftGen::MutateSpec m_mutate_spec
void get_listing(bool include_sub_entries, std::vector< NamespaceListing > &listing)
Returns a list of existing tables & namesspaces.
std::set< std::string > servers
Set of server proxy names participating in scan.
const char * get_text(int error)
Returns a descriptive error message.
void remove_scanner(int64_t id)
void set_row_regexp(const char *regexp)
Sets the regexp to filter rows by.
TableMutator * get_mutator(int64_t id)
void convert_cell(const ThriftGen::Cell &tcell, Hypertable::Cell &hcell)
void create_table(const std::string &name, const std::string &schema_str)
Creates a table.
void set_keys_only(bool val)
Return only keys (no values)
int64_t cell_str_to_num(const std::string &from, const char *label, int64_t min_num=INT64_MIN, int64_t max_num=INT64_MAX)
void async_mutator_set_cell(const MutatorAsync mutator, const ThriftGen::Cell &cell) override
SchemaPtr get_schema(const std::string &name)
Returns a smart ptr to a schema object for a table.
int64_t get_cached_object_id(ClientObjectPtr co)
Scanner open_scanner(const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
bool get_in_memory() const
Gets in memory option.
SharedMutatorMapKey(Hypertable::Namespace *ns, const String &tablename, const ThriftGen::MutateSpec &mutate_spec)
int64_t disk_read
Number of bytes read from disk while executing scan.
void table_get_schema(ThriftGen::Schema &result, const ThriftGen::Namespace ns, const String &table) override
bool and_column_predicates
void get_cell(Value &result, const ThriftGen::Namespace ns, const String &table, const String &row, const String &column) override
void reserve_column_predicates(size_t s)
Logging routines and macros.
void table_get_id(String &result, const ThriftGen::Namespace ns, const String &table) override
Hypertable::Namespace * m_namespace
void merge_options(const AccessGroupOptions &options)
Merges options with those from another AccessGroupOptions object.
std::mutex shared_mutator_mutex
void table_get_splits(std::vector< ThriftGen::TableSplit > &_return, const ThriftGen::Namespace ns, const String &table) override
void future_get_result_serialized(ThriftGen::ResultSerialized &tresult, ThriftGen::Future ff, int timeout_millis) override
void release_handler(ServerHandler *serverHandler)
bool next(Cell &cell)
Gets the next cell.
void next_cells(ThriftCells &result, const Scanner scanner_id) override
MutatorAsync open_mutator_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags) override
Compatibility Macros for C/C++.
Mutator mutator_open(const ThriftGen::Namespace ns, const String &table, int32_t flags, int32_t flush_interval) override
void future_get_result(ThriftGen::Result &tresult, ThriftGen::Future ff, int timeout_millis) override
void hql_exec(HqlResult &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
int64_t try_get_object_id(ClientObject *co)
std::unordered_map< ::int64_t, ClientObjectPtr > ObjectMap
bool get(ResultPtr &result)
This call blocks till there is a result available unless async ops have completed.
void get_cells_as_arrays(ThriftCellsAsArrays &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
void get_schema(ThriftGen::Schema &result, const ThriftGen::Namespace ns, const String &table) override
TableMutator * _open_mutator(const ThriftGen::Namespace ns, const String &table)
const std::string & get_access_group() const
Gets access group name.
bool operator<(const SharedMutatorMapKey &skey1, const SharedMutatorMapKey &skey2)
void set_return_deletes(bool val)
Internal use only.
void shared_mutator_set_cell_as_array(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellAsArray &cell) override
bool future_is_cancelled(ThriftGen::Future ff) override
::uint32_t next_threshold
Initialization helper for applications.
void get_table_splits(std::vector< ThriftGen::TableSplit > &_return, const ThriftGen::Namespace ns, const String &table) override
Helper class for building a ScanSpec.
bool set_time_order_desc(bool value)
Sets time order desc option.
bool is_empty()
Checks whether the Future result queue is empty.
void set_cell_limit_per_family(int32_t n)
Sets the maximum number of cells to return per column family.
void set_do_not_cache(bool val)
Don't cache.
TableScanner * _open_scanner(const ThriftGen::Namespace ns, const String &table, const Hypertable::ScanSpec &ss)
#define LOG_SLOW_QUERY(_pd_, _ns_, _hql_)
Represents a column predicate (e.g.
HqlInterpreter::Callback Parent
int32_t get_max_versions() const
Gets max versions option.
int32_t get_buffer_length()
Synchronous table scanner.
void future_cancel(ThriftGen::Future ff) override
bool exists_namespace(const String &ns) override
bool set_counter(bool value)
Sets counter option.
void refresh_shared_mutator(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec) override
ObjectMap m_cached_object_map
Time related declarations.
bool is_set_compressor() const
Checks if compressor option is set.
void status(ThriftGen::Status &_return) override
void async_scanner_cancel(const ScannerAsync scanner) override
void set_cells_async(const MutatorAsync mutator, const ThriftCells &cells) override
bool is_set_time_order_desc() const
Checks if time_order_desc option is set.
Base class for Hypertable client objects.
time_t get_ttl() const
Gets ttl option.
ScannerAsync async_scanner_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss) override
Access group specification.
bool remove_object(int64_t id)
bool table_exists(const ThriftGen::Namespace ns, const String &table) override
int32_t convert_cells(const Hypertable::Cells &hcells, ThriftCells &tcells)
int16_t get_replication() const
Gets replication option.
void remove_scanner(int64_t id, ClientObjectPtr &scanner, ScannerInfoPtr &info)
ScannerInfo(int64_t ns, const string &t)
void mutator_set_cell_as_array(const Mutator mutator, const CellAsArray &cell) override
void hql_query2(HqlResult2 &result, const ThriftGen::Namespace ns, const String &hql) override
void async_mutator_set_cell_as_array(const MutatorAsync mutator, const CellAsArray &cell) override
std::shared_ptr< HqlInterpreter > HqlInterpreterPtr
Smart pointer to HqlInterpreter.
void refresh_table(const ThriftGen::Namespace ns, const String &table_name) override
const std::string & get_name() const
Gets column family name.
long long int Lld
Shortcut for printf formats.
bool table_exists(ContextPtr &context, const String &name, String &id)
Checks if table exists and returns table ID.
void merge_defaults(const ColumnFamilyOptions &options)
Merges column family defaults with those from another AccessGroupOptions object.
Hypertable::Namespace * get_namespace(int64_t id)
Callback interface/base class for execute.
TableScannerAsync * get_scanner_async(int64_t id)
void add_column(const string &str)
Adds a column family to be returned by the scan.
void add_row_interval(const string &start, bool start_inclusive, const string &end, bool end_inclusive)
Adds a row interval to be returned in the scan.
bool convert_access_group_options(const Hypertable::AccessGroupOptions &hoptions, ThriftGen::AccessGroupOptions &toptions)
HqlServiceIf * getHandler(const ::apache::thrift::TConnectionInfo &connInfo) override
TableScanner * get_scanner(int64_t id, ScannerInfoPtr &info)
ProfileDataScanner profile_data
int64_t get_object_id(ClientObject *co)
ThriftGen::Namespace open_namespace(const String &ns) override
void on_return(const std::string &) override
Called when interpreter returns a string result Maybe called multiple times for a list of string resu...
void drop_namespace(const String &ns, const bool if_exists) override
void shared_mutator_set_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCellsAsArrays &cells) override
static ServerHandlerFactory instance
void alter_table(const std::string &table_name, SchemaPtr &schema, bool force)
Alter table schema.
bool exists_table(const std::string &name)
Checks if the table exists.
void log_slow_query(const char *func_name, std::chrono::fast_clock::time_point start_time, std::chrono::fast_clock::time_point end_time, int64_t latency_ms, ProfileDataScanner &profile_data, Hypertable::Namespace *ns, const string &hql)
void drop_table(const ThriftGen::Namespace ns, const String &table, const bool if_exists) override
void set_access_group(const std::string &ag)
Sets access group.
const char * column_family
#define HT_INFOF(msg,...)
void set_blocksize(int32_t blocksize)
Sets blocksize option.
#define HT_THROWF(_code_, _fmt_,...)
void close_scanner(const Scanner scanner) override
void table_alter(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Schema &schema) override
Random number generator for int32, int64, double and ascii arrays.
void close_mutator(const Mutator mutator) override
void get_profile_data(ProfileDataScanner &profile_data)
Gets profile data.
void offer_cell_as_array(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellAsArray &cell) override
void table_get_schema_str_with_ids(String &result, const ThriftGen::Namespace ns, const String &table) override
::uint32_t future_capacity
bool exists_table(const ThriftGen::Namespace ns, const String &table) override
#define LOG_SLOW_QUERY_SCANNER(_scanner_, _ns_, _table_, _ss_)
RowIntervals row_intervals
bool namespace_exists(const String &ns) override
std::string get_schema_str(const std::string &name, bool with_ids=false)
Returns the schema for a table.
bool has_outstanding()
Checks whether there are any outstanding operations.
std::map< SharedMutatorMapKey, TableMutator * > SharedMutatorMap
void table_drop(const ThriftGen::Namespace ns, const String &table, const bool if_exists) override
void set_cell_as_array(const ThriftGen::Namespace ns, const String &table, const CellAsArray &cell) override
static String install_dir
The installation directory.
This is a generic exception class for Hypertable.
void _set_cells(const Mutator mutator, const vector< CellT > &cells)
void remove_references(int64_t id)
void scanner_get_row_serialized(CellsSerialized &result, const Scanner scanner_id) override
int main(int argc, char **argv)
void rename_table(const std::string &old_name, const std::string &new_name)
Renames a table.
void flush_mutator(const Mutator mutator) override
void set_cells_serialized(const ThriftGen::Namespace ns, const String &table, const CellsSerialized &cells) override
void add_reference(int64_t from, int64_t to)
int64_t get_object_id(TableMutatorPtr &mutator)
void set_in_memory(bool value)
Sets in memory option.
ServerHandler * get_handler(const String &remotePeer)
void future_close(const ThriftGen::Future ff) override
void get_tables(std::vector< String > &tables, const ThriftGen::Namespace ns) override
bool is_set_blocksize() const
Checks if blocksize option is set.
void set_cells_as_arrays_async(const MutatorAsync mutator, const ThriftCellsAsArrays &cells) override
void get_listing(std::vector< ThriftGen::NamespaceListing > &_return, const ThriftGen::Namespace ns) override
bool scan_and_filter_rows
TableMutatorAsync * get_mutator_async(int64_t id)
#define HT_ERRORF(msg,...)
void hql_query(HqlResult &result, const ThriftGen::Namespace ns, const String &hql) override
void generate_guid(std::string &_return) override
void get_future_result_serialized(ThriftGen::ResultSerialized &tresult, ThriftGen::Future ff, int timeout_millis) override
const std::string & get_name() const
void namespace_drop(const String &ns, const bool if_exists) override
void set_and_column_predicates(bool val)
AND together the column predicates.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
void remove_future(int64_t id)
void releaseHandler(::Hypertable::ThriftGen::ClientServiceIf *service) override
Represents a cell interval.
void add_cell_interval(const string &start_row, const string &start_column, bool start_inclusive, const string &end_row, const string &end_column, bool end_inclusive)
Adds a cell interval to be returned in the scan.
void scanner_get_cells(ThriftCells &result, const Scanner scanner_id) override
void scanner_close(const Scanner id) override
Scanner scanner_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
void set_cell_async(const MutatorAsync mutator, const ThriftGen::Cell &cell) override
void add_column_predicate(const string &column_family, const char *column_qualifier, uint32_t operation, const char *value, uint32_t value_len=0)
Adds a column predicate to the scan.
ThriftGen::Future open_future(int capacity) override
void reserve_cells(size_t s)
void _offer_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const vector< CellT > &cells)
void set_compressor(const std::string &compressor)
Sets compressor option.
CellIntervals cell_intervals
Hypertable::Client * client
void get_schema_str_with_ids(String &result, const ThriftGen::Namespace ns, const String &table) override
void async_mutator_set_cells(const MutatorAsync mutator, const ThriftCells &cells) override
void add(const Cell &cell, bool own=true)
HqlCallback(ResultT &r, ServerHandler *handler, const ThriftGen::Namespace ns, const String &hql, bool flush, bool buffered)
std::vector< CellAsArray > ThriftCellsAsArrays
void set_row_limit(int32_t n)
Sets the maximum number of rows to return in the scan.
void hql_exec2(HqlResult2 &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
Encapsulates decomposed key and value.
void set_max_versions(uint32_t n)
Sets the maximum number of revisions of each cell to return in the scan.
void reserve_columns(size_t s)
static time_t to_time_t(const time_point &__t) noexcept
void shared_mutator_refresh(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec) override
void finalize(uint8_t flag)
ThriftGen::Namespace namespace_open(const String &ns) override
void drop_table(const std::string &name, bool if_exists)
Removes a table.
std::shared_ptr< ScannerInfo > ScannerInfoPtr
const char * column_family
#define HT_THROW(_code_, _msg_)
static const char * END_ROW_MARKER
void future_get_result_as_arrays(ThriftGen::ResultAsArrays &tresult, ThriftGen::Future ff, int timeout_millis) override
void set_scan_and_filter_rows(bool val)
Scan and filter rows.
bool is_full()
Checks whether the Future result queue is full.
std::unordered_map< ::int64_t, ScannerInfoPtr > m_scanner_info_map
void close_future(const ThriftGen::Future ff) override
void next_row_serialized(CellsSerialized &result, const Scanner scanner_id) override
void close_scanner_async(const ScannerAsync scanner_async) override
void table_get_schema_str(String &result, const ThriftGen::Namespace ns, const String &table) override
void get_row(ThriftCells &result, const ThriftGen::Namespace ns, const String &table, const String &row) override
const char * error_get_text(int error_code)
Retrieves a descriptive error string of an error code.
static void seed(unsigned int s)
Sets the seed of the random number generator.
std::map< String, std::pair< int, ServerHandler * > > ServerHandlerMap
int64_t bytes_returned
Number of bytes returned while executing scan.
void rename_table(const ThriftGen::Namespace ns, const String &table, const String &new_table_name) override
const std::string & get_compressor() const
Gets compressor option.
void set_cell_offset(int32_t n)
Sets the number of cells to be skipped at the beginning of the query.
void get_cells_serialized(CellsSerialized &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
std::shared_ptr< Table > TablePtr
void next_row_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void set_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Cell &cell) override
void offer_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCellsAsArrays &cells) override
int64_t get_scanner_id(TableScanner *scanner, ScannerInfoPtr &info)
void get_future_result_as_arrays(ThriftGen::ResultAsArrays &tresult, ThriftGen::Future ff, int timeout_millis) override
ClientObject * get_object(int64_t id)
bool remove_cached_object(int64_t id)
SharedMutatorMap shared_mutator_map
virtual ~ThriftBrokerIfFactory()
int code() const
Returns the error code.
void remove_namespace(int64_t id)
void _next(vector< CellT > &result, TableScanner *scanner, int limit)
void refresh_table(const std::string &name)
Refreshes the cached table entry.
void namespace_close(const ThriftGen::Namespace ns) override
const std::string & get_bloom_filter() const
Gets bloom filter option.
void set_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftCellsAsArrays &cells) override