52 #include <concurrency/ThreadManager.h> 
   53 #include <protocol/TBinaryProtocol.h> 
   54 #include <server/TThreadedServer.h> 
   55 #include <transport/TBufferTransports.h> 
   56 #include <transport/TServerSocket.h> 
   57 #include <transport/TSocket.h> 
   58 #include <transport/TTransportUtils.h> 
   60 #include <boost/shared_ptr.hpp> 
   68 #include <unordered_map> 
   72 #include <sys/types.h> 
   85   bool g_log_slow_queries {};
 
   87   int32_t g_slow_query_latency_threshold {};
 
   92 #define THROW_TE(_code_, _str_) do { ThriftGen::ClientException te; \ 
   94   te.message.append(Error::get_text(_code_)); \ 
   95   te.message.append(" - "); \ 
   96   te.message.append(_str_); \ 
   97   te.__isset.code = te.__isset.message = true; \ 
  101 #define RETHROW(_expr_) catch (Hypertable::Exception &e) { \ 
  102   std::ostringstream oss;  oss << HT_FUNC << " " << _expr_ << ": "<< e; \ 
  103   HT_ERROR_OUT << oss.str() << HT_END; \ 
  106   g_metrics_handler->error_increment(); \ 
  107   THROW_TE(e.code(), oss.str()); \ 
  110 #define LOG_API_START(_expr_) \ 
  111   std::chrono::fast_clock::time_point start_time, end_time;     \ 
  112   std::ostringstream logging_stream;\ 
  113   ScannerInfoPtr scanner_info;\ 
  114   g_metrics_handler->request_increment(); \ 
  115   if (m_context.log_api || g_log_slow_queries) {\ 
  116     start_time = std::chrono::fast_clock::now(); \ 
  117     if (m_context.log_api)\ 
  118       logging_stream << "API " << __func__ << ": " << _expr_;\ 
  121 #define LOG_API_FINISH \ 
  122   if (m_context.log_api || (g_log_slow_queries && scanner_info)) { \ 
  123     end_time = std::chrono::fast_clock::now(); \ 
  124     if (m_context.log_api) \ 
  125 std::cout << std::chrono::duration_cast<std::chrono::seconds>(start_time.time_since_epoch()).count() <<'.'<< std::setw(9) << std::setfill('0') << (std::chrono::duration_cast<std::chrono::nanoseconds>(start_time.time_since_epoch()).count() % 1000000000LL) <<" API "<< __func__ <<": "<< logging_stream.str() << " latency=" << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << std::endl; \ 
  127       scanner_info->latency += std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();\ 
  130 #define LOG_API_FINISH_E(_expr_) \ 
  131   if (m_context.log_api || (g_log_slow_queries && scanner_info)) { \ 
  132     end_time = std::chrono::fast_clock::now(); \ 
  133     if (m_context.log_api) \ 
  134       std::cout << std::chrono::duration_cast<std::chrono::seconds>(start_time.time_since_epoch()).count() <<'.'<< std::setw(9) << std::setfill('0') << (std::chrono::duration_cast<std::chrono::nanoseconds>(start_time.time_since_epoch()).count() % 1000000000LL) <<" API "<< __func__ <<": "<< logging_stream.str() << _expr_ << " latency=" << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << std::endl; \ 
  136       scanner_info->latency += std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();\ 
  140 #define LOG_API(_expr_) do { \ 
  141   if (m_context.log_api) \ 
  142     std::cout << hires_ts <<" API "<< __func__ <<": "<< _expr_ << std::endl; \ 
  145 #define LOG_HQL_RESULT(_res_) do { \ 
  146   if (m_context.log_api) \ 
  147     cout << hires_ts <<" API "<< __func__ <<": result: "; \ 
  148   if (Logger::logger->isDebugEnabled()) \ 
  150   else if (m_context.log_api) { \ 
  151     if (_res_.__isset.results) \ 
  152       cout <<"results.size=" << _res_.results.size(); \ 
  153     if (_res_.__isset.cells) \ 
  154       cout <<"cells.size=" << _res_.cells.size(); \ 
  155     if (_res_.__isset.scanner) \ 
  156       cout <<"scanner="<< _res_.scanner; \ 
  157     if (_res_.__isset.mutator) \ 
  158       cout <<"mutator="<< _res_.mutator; \ 
  163 #define LOG_SLOW_QUERY(_pd_, _ns_, _hql_) do {   \ 
  164   if (g_log_slow_queries) { \ 
  165     end_time = std::chrono::fast_clock::now(); \ 
  166     int64_t latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(); \ 
  167     if (latency_ms >= g_slow_query_latency_threshold) \ 
  168       log_slow_query(__func__, start_time, end_time, latency_ms, _pd_, _ns_, _hql_); \ 
  172 #define LOG_SLOW_QUERY_SCANNER(_scanner_, _ns_, _table_, _ss_) do {      \ 
  173   if (g_log_slow_queries) { \ 
  174     ProfileDataScanner pd; \ 
  175     _scanner_->get_profile_data(pd); \ 
  176     end_time = std::chrono::fast_clock::now(); \ 
  177     int64_t latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(); \ 
  178     if (latency_ms >= g_slow_query_latency_threshold) \ 
  179       log_slow_query_scanspec(__func__, start_time, end_time, latency_ms, pd, _ns_, _table_, _ss_); \ 
  192 using namespace ThriftGen;
 
  193 using namespace boost;
 
  200       const ThriftGen::MutateSpec &mutate_spec)
 
  201     : m_namespace(ns), m_tablename(tablename), m_mutate_spec(mutate_spec) {}
 
  206     cmp = (int64_t)m_namespace - (int64_t)skey.
m_namespace;
 
  212     cmp = m_mutate_spec.appname.compare(skey.
m_mutate_spec.appname);
 
  215     cmp = m_mutate_spec.flush_interval - skey.
m_mutate_spec.flush_interval;
 
  229   return skey1.
compare(skey2) < 0;
 
  232 typedef Meta::list<ThriftBrokerPolicy, DefaultCommPolicy> 
Policies;
 
  235 typedef std::unordered_map< ::int64_t, ClientObjectPtr> 
ObjectMap;
 
  243     log_api = Config::get_bool(
"ThriftBroker.API.Logging");
 
  244     next_threshold = Config::get_i32(
"ThriftBroker.NextThreshold");
 
  245     future_capacity = Config::get_i32(
"ThriftBroker.Future.Capacity");
 
  257         int64_t min_num = INT64_MIN, int64_t max_num = INT64_MAX) {
 
  260   int64_t value = strtoll(from.data(), &endp, 0);
 
  262   if (endp - from.data() != (int)from.size()
 
  263       || value < min_num || value > max_num)
 
  271   if (tss.__isset.row_limit)
 
  274   if (tss.__isset.cell_limit)
 
  277   if (tss.__isset.cell_limit_per_family)
 
  280   if (tss.__isset.versions)
 
  283   if (tss.__isset.start_time)
 
  286   if (tss.__isset.end_time)
 
  289   if (tss.__isset.return_deletes)
 
  292   if (tss.__isset.keys_only)
 
  295   if (tss.__isset.row_regexp)
 
  298   if (tss.__isset.value_regexp)
 
  301   if (tss.__isset.scan_and_filter_rows)
 
  304   if (tss.__isset.do_not_cache)
 
  307   if (tss.__isset.row_offset)
 
  310   if (tss.__isset.cell_offset)
 
  313   if (tss.__isset.and_column_predicates)
 
  317   const char *start_row;
 
  319   for (
const auto &ri : tss.row_intervals) {
 
  320     start_row = ri.__isset.start_row ?
 
  321                 ri.start_row.c_str() :
 
  322                 ri.__isset.start_row_binary ?
 
  323                 ri.start_row_binary.c_str() :
 
  325     end_row = ri.__isset.end_row ?
 
  327               ri.__isset.end_row_binary ?
 
  328               ri.end_row_binary.c_str() :
 
  331       start_row, ri.__isset.start_inclusive && ri.start_inclusive,
 
  332       end_row, ri.__isset.end_inclusive && ri.end_inclusive));
 
  335   for (
const auto &ci : tss.cell_intervals)
 
  337         ci.__isset.start_row ? ci.start_row.c_str() : 
"",
 
  338         ci.start_column.c_str(),
 
  339         ci.__isset.start_inclusive && ci.start_inclusive,
 
  341         ci.end_column.c_str(),
 
  342         ci.__isset.end_inclusive && ci.end_inclusive));
 
  344   for (
const auto &col : tss.columns)
 
  345     hss.
columns.push_back(col.c_str());
 
  347   for (
const auto &cp : tss.column_predicates) {
 
  348     HT_INFOF(
"%s:%s %s", cp.column_family.c_str(), cp.column_qualifier.c_str(),
 
  349              cp.__isset.value ? cp.value.c_str() : 
"");
 
  352         cp.__isset.column_family ? cp.column_family.c_str() : 0,
 
  353         cp.__isset.column_qualifier ? cp.column_qualifier.c_str() : 0,
 
  355         cp.__isset.value ? cp.value.c_str() : 0,
 
  356         cp.__isset.value ? cp.value.size() : 0));
 
  363   if (tss.__isset.row_limit)
 
  366   if (tss.__isset.cell_limit)
 
  369   if (tss.__isset.cell_limit_per_family)
 
  372   if (tss.__isset.versions)
 
  375   if (tss.__isset.start_time)
 
  378   if (tss.__isset.end_time)
 
  381   if (tss.__isset.return_deletes)
 
  384   if (tss.__isset.keys_only)
 
  387   if (tss.__isset.row_regexp)
 
  390   if (tss.__isset.value_regexp)
 
  393   if (tss.__isset.scan_and_filter_rows)
 
  396   if (tss.__isset.do_not_cache)
 
  399   if (tss.__isset.row_offset)
 
  402   if (tss.__isset.cell_offset)
 
  405   if (tss.__isset.and_column_predicates)
 
  410   for (
auto & col : tss.columns)
 
  414   const char *start_row;
 
  417   for (
auto & ri : tss.row_intervals) {
 
  418     start_row = ri.__isset.start_row ?
 
  419                 ri.start_row.c_str() :
 
  420                 ri.__isset.start_row_binary ?
 
  421                 ri.start_row_binary.c_str() : 
"";
 
  422     end_row = ri.__isset.end_row ?
 
  424               ri.__isset.end_row_binary ?
 
  427       start_row, ri.__isset.start_inclusive && ri.start_inclusive,
 
  428       end_row, ri.__isset.end_inclusive && ri.end_inclusive);
 
  433   for (
auto & ci : tss.cell_intervals)
 
  435         ci.__isset.start_row ? ci.start_row.c_str() : 
"",
 
  436         ci.start_column.c_str(),
 
  437         ci.__isset.start_inclusive && ci.start_inclusive,
 
  439         ci.end_column.c_str(),
 
  440         ci.__isset.end_inclusive && ci.end_inclusive);
 
  444   for (
auto & cp : tss.column_predicates)
 
  446         cp.__isset.column_family ? cp.column_family.c_str() : 0,
 
  447         cp.__isset.column_qualifier ? cp.column_qualifier.c_str() : 0,
 
  449         cp.__isset.value ? cp.value.c_str() : 0,
 
  450         cp.__isset.value ? cp.value.size() : 0);
 
  456   if (tcell.key.__isset.row)
 
  457     hcell.
row_key = tcell.key.row.c_str();
 
  459   if (tcell.key.__isset.column_family)
 
  462   if (tcell.key.__isset.column_qualifier)
 
  465   if (tcell.key.__isset.timestamp)
 
  468   if (tcell.key.__isset.revision)
 
  469     hcell.
revision = tcell.key.revision;
 
  471   if (tcell.__isset.value) {
 
  472     hcell.
value = (::uint8_t *)tcell.value.c_str();
 
  476   if (tcell.key.__isset.flag)
 
  477     hcell.
flag = tcell.key.flag;
 
  483   if (tkey.__isset.row) {
 
  484     hkey.
row = tkey.row.c_str();
 
  485     hkey.
row_len = tkey.row.size();
 
  488   if (tkey.__isset.column_family)
 
  491   if (tkey.__isset.column_qualifier)
 
  494   if (tkey.__isset.timestamp)
 
  497   if (tkey.__isset.revision)
 
  503   int32_t amount = 
sizeof(ThriftGen::Cell);
 
  506   amount += tcell.key.row.length();
 
  508   amount += tcell.key.column_family.length();
 
  512     tcell.key.__isset.column_qualifier = 
true;
 
  513     amount += tcell.key.column_qualifier.length();
 
  516     tcell.key.column_qualifier = 
"";
 
  517     tcell.key.__isset.column_qualifier = 
false;
 
  521   tcell.key.revision = hcell.
revision;
 
  525     tcell.__isset.value = 
true;
 
  530     tcell.__isset.value = 
false;
 
  533   tcell.key.flag = (KeyFlag::type)hcell.
flag;
 
  534   tcell.key.__isset.row = tcell.key.__isset.column_family
 
  535       = tcell.key.__isset.timestamp = tcell.key.__isset.revision
 
  536       = tcell.key.__isset.flag = 
true;
 
  541   int len = tcell.size();
 
  547   case 4: hcell.
value = (::uint8_t *)tcell[3].c_str();
 
  551   case 1: hcell.
row_key = tcell[0].c_str();
 
  559   int32_t amount = 5*
sizeof(std::string);
 
  562   amount += tcell[0].length();
 
  564   amount += tcell[1].length();
 
  566   amount += tcell[2].length();
 
  568   amount += tcell[3].length();
 
  570   amount += tcell[4].length();
 
  577   tcells.resize(hcells.size());
 
  578   for(
size_t ii=0; ii<hcells.size(); ++ii)
 
  586   for (
const auto &tcell : tcells) {
 
  589     hcells.push_back(hcell);
 
  595   int32_t amount = 
sizeof(CellAsArray);
 
  596   tcells.resize(hcells.size());
 
  597   for(
size_t ii=0; ii<hcells.size(); ++ii) {
 
  606   for(
size_t ii=0; ii<hcells.size(); ++ii) {
 
  607     amount += strlen(hcells[ii].row_key) + strlen(hcells[ii].column_family)
 
  608         + strlen(hcells[ii].column_qualifier) + 8 + 8 + 4 + 1
 
  609         + hcells[ii].value_len + 4;
 
  612   for (
size_t ii = 0; ii < hcells.size(); ++ii) {
 
  613     writer.
add(hcells[ii]);
 
  617   amount = tcells.size();
 
  624   for (
const auto &tcell : tcells) {
 
  627     hcells.push_back(hcell);
 
  632         ThriftGen::TableSplit &tsplit) {
 
  636     tsplit.__isset.start_row = 
true;
 
  639     tsplit.start_row = 
"";
 
  640     tsplit.__isset.start_row = 
false;
 
  645       !(hsplit.
end_row[0] == (
char)0xff && hsplit.
end_row[1] == (
char)0xff)) {
 
  646     tsplit.end_row = hsplit.
end_row;
 
  647     tsplit.__isset.end_row = 
true;
 
  651     tsplit.__isset.end_row = 
false;
 
  657     tsplit.__isset.location = 
true;
 
  660     tsplit.location = 
"";
 
  661     tsplit.__isset.location = 
false;
 
  667     tsplit.__isset.ip_address = 
true;
 
  670     tsplit.ip_address = 
"";
 
  671     tsplit.__isset.ip_address = 
false;
 
  677     tsplit.__isset.hostname = 
true;
 
  680     tsplit.hostname = 
"";
 
  681     tsplit.__isset.hostname = 
false;
 
  687                                    ThriftGen::ColumnFamilyOptions &toptions) {
 
  694     toptions.__set_ttl(hoptions.
get_ttl());
 
  710   if (toptions.__isset.max_versions)
 
  712   if (toptions.__isset.ttl)
 
  713     hoptions.
set_ttl(toptions.ttl);
 
  714   if (toptions.__isset.time_order_desc)
 
  716   if (toptions.__isset.counter)
 
  721                                   ThriftGen::AccessGroupOptions &toptions) {
 
  748   if (toptions.__isset.in_memory)
 
  750   if (toptions.__isset.replication)
 
  752   if (toptions.__isset.blocksize)
 
  754   if (toptions.__isset.compressor)
 
  756   if (toptions.__isset.bloom_filter)
 
  762                     ThriftGen::Schema &tschema) {
 
  764   if (hschema->get_generation())
 
  765     tschema.__set_generation(hschema->get_generation());
 
  767   tschema.__set_version(hschema->get_version());
 
  769   if (hschema->get_group_commit_interval())
 
  770     tschema.__set_group_commit_interval(hschema->get_group_commit_interval());
 
  772   for (
auto ag_spec : hschema->get_access_groups()) {
 
  773     ThriftGen::AccessGroupSpec tag;
 
  774     tag.name = ag_spec->get_name();
 
  775     tag.__set_generation(ag_spec->get_generation());
 
  777       tag.__isset.options = 
true;      
 
  779       tag.__isset.defaults = 
true;
 
  780     tschema.access_groups[ag_spec->get_name()] = tag;
 
  781     tschema.__isset.access_groups = 
true;
 
  784   for (
auto cf_spec : hschema->get_column_families()) {
 
  785     ThriftGen::ColumnFamilySpec tcf;
 
  786     tcf.name = cf_spec->get_name();
 
  787     tcf.access_group = cf_spec->get_access_group();
 
  788     tcf.deleted = cf_spec->get_deleted();
 
  789     if (cf_spec->get_generation())
 
  790       tcf.__set_generation(cf_spec->get_generation());
 
  791     if (cf_spec->get_id())
 
  792       tcf.__set_id(cf_spec->get_id());
 
  793     tcf.value_index = cf_spec->get_value_index();
 
  794     tcf.qualifier_index = cf_spec->get_qualifier_index();
 
  796       tcf.__isset.options = 
true;
 
  797     tschema.column_families[cf_spec->get_name()] = tcf;
 
  798     tschema.__isset.column_families = 
true;
 
  802                                    tschema.access_group_defaults))
 
  803       tschema.__isset.access_group_defaults = 
true;
 
  806                                     tschema.column_family_defaults))
 
  807       tschema.__isset.column_family_defaults = 
true;
 
  814   if (tschema.__isset.generation)
 
  815     hschema->set_generation(tschema.generation);
 
  817   hschema->set_version(tschema.version);
 
  819   hschema->set_group_commit_interval(tschema.group_commit_interval);
 
  822                                hschema->access_group_defaults());
 
  825                                 hschema->column_family_defaults());
 
  827   bool need_default = 
true;
 
  828   unordered_map<string, Hypertable::AccessGroupSpec *> ag_map;
 
  829   for (
auto & entry : tschema.access_groups) {
 
  830     if (entry.second.name == 
"default")
 
  831       need_default = 
false;
 
  833     if (entry.second.__isset.generation)
 
  835     ag_map[entry.second.name] = ag;
 
  846   for (
auto & entry : tschema.column_families) {
 
  849     if (entry.second.access_group.empty())
 
  854     if (entry.second.__isset.generation)
 
  856     if (entry.second.__isset.id)
 
  857       cf->
set_id(entry.second.id);
 
  865     if (iter == ag_map.end())
 
  867                 "Undefined access group '%s' referenced by column '%s'",
 
  869     iter->second->add_column(cf);
 
  873   for (
auto & entry : ag_map)
 
  874     hschema->add_access_group(entry.second);
 
  882     ns(ns), table(t), scan_spec_builder(128) { }
 
  893 template <
class ResultT, 
class CellT>
 
  906               const String &hql, 
bool flush, 
bool buffered)
 
  907     : result(r), handler(*handler), ns(ns), hql(hql), flush(flush),
 
  908       buffered(buffered) { }
 
  910   void on_return(
const std::string &) 
override;
 
  920       : scanners(0), async_scanners(0), mutators(0), async_mutators(0),
 
  921         shared_mutators(0), namespaces(0), futures(0) {
 
  956     : m_remote_peer(remote_peer), m_context(c) {
 
  960     std::lock_guard<std::mutex> lock(m_mutex);
 
  961     if (!m_object_map.empty())
 
  962       HT_WARNF(
"Destroying ServerHandler for remote peer %s with %d objects in map",
 
  963                m_remote_peer.c_str(),
 
  964                (int)m_object_map.size());
 
  967     for (
auto entry : m_reference_map) {
 
  969         entry.second = 
nullptr;
 
  976       m_reference_map.clear();
 
  982     for (
auto entry : m_object_map) {
 
  984         entry.second = 
nullptr;
 
  991       m_object_map.clear();
 
  997     for (
auto entry : m_cached_object_map) {
 
  999         entry.second = 
nullptr;
 
 1006       m_cached_object_map.clear();
 
 1013     return m_remote_peer;
 
 1025     servers.reserve(profile_data.
servers.size()*6);
 
 1026     for (
auto & server : profile_data.
servers) {
 
 1030         servers.append(
",");
 
 1031       servers.append(server);
 
 1037     else if (ns_str[0] != 
'/')
 
 1038       ns_str = string(
"/") + ns_str;
 
 1041     const char *hql_ptr = hql.c_str();
 
 1043     if (hql.find_first_of(
'\n') != string::npos) {
 
 1044       hql_cleaned.reserve(hql.length());
 
 1045       const char *base = hql.c_str();
 
 1046       const char *ptr = base;
 
 1048         while (*ptr && *ptr != 
'\n')
 
 1050         hql_cleaned.append(base, ptr-base);
 
 1052           hql_cleaned.append(
" ", 1);
 
 1056       hql_ptr = hql_cleaned.c_str();
 
 1059     string line = 
format(
"%lld %s %s %lld %d %d %lld %lld %lld %s %s %s",
 
 1061                          func_name, m_remote_peer.c_str(),
 
 1067                          ns_str.c_str(), hql_ptr);
 
 1081     servers.reserve(profile_data.
servers.size()*6);
 
 1082     for (
auto & server : profile_data.
servers) {
 
 1086         servers.append(
",");
 
 1087       servers.append(server);
 
 1093     else if (ns_str[0] != 
'/')
 
 1094       ns_str = string(
"/") + ns_str;
 
 1096     string line = 
format(
"%lld %s %s %lld %d %d %lld %lld %lld %s %s %s",
 
 1098                          func_name, m_remote_peer.c_str(),
 
 1104                          ns_str.c_str(), ss.
render_hql(table).c_str());
 
 1110   hql_exec(HqlResult& result, 
const ThriftGen::Namespace ns,
 
 1111           const String &hql, 
bool noflush, 
bool unbuffered)
 override {
 
 1112     LOG_API_START(
"namespace=" << ns << 
" hql=" << hql << 
" noflush=" << noflush
 
 1113             << 
" unbuffered="<< unbuffered);
 
 1116       cb(result, 
this, ns, hql, !noflush, !unbuffered);
 
 1119       run_hql_interp(ns, hql, cb);
 
 1121     } 
RETHROW(
"namespace=" << ns << 
" hql="<< hql <<
" noflush="<< noflush
 
 1122               << 
" unbuffered="<< unbuffered)
 
 1124     if (!unbuffered && cb.
is_scan)
 
 1136           const String &hql)
 override {
 
 1137     hql_exec(result, ns, hql, 
false, 
false);
 
 1141   hql_exec2(HqlResult2 &result, 
const ThriftGen::Namespace ns,
 
 1142           const String &hql, 
bool noflush, 
bool unbuffered)
 override {
 
 1143     LOG_API_START(
"namespace=" << ns << 
" hql="<< hql <<
" noflush="<< noflush <<
 
 1144                    " unbuffered="<< unbuffered);
 
 1147       cb(result, 
this, ns, hql, !noflush, !unbuffered);
 
 1150       run_hql_interp(ns, hql, cb);
 
 1152     } 
RETHROW(
"namespace=" << ns << 
" hql="<< hql <<
" noflush="<< noflush <<
 
 1153               " unbuffered="<< unbuffered)
 
 1155     if (!unbuffered && cb.
is_scan)
 
 1163           const String &hql, 
bool noflush, 
bool unbuffered)
 override {
 
 1164     LOG_API_START(
"namespace=" << ns << 
" hql="<< hql <<
" noflush="<< noflush <<
 
 1165                    " unbuffered="<< unbuffered);
 
 1168       cb(result, 
this, ns, hql, !noflush, !unbuffered);
 
 1171       run_hql_interp(ns, hql, cb);
 
 1173     } 
RETHROW(
"namespace=" << ns << 
" hql="<< hql <<
" noflush="<< noflush <<
 
 1174               " unbuffered="<< unbuffered)
 
 1176     if (!unbuffered && cb.
is_scan)
 
 1184           const String &hql)
 override {
 
 1185     hql_exec2(result, ns, hql, 
false, 
false);
 
 1190           const String &hql)
 override {
 
 1191     hql_exec_as_arrays(result, ns, hql, 
false, 
false);
 
 1197       m_context.client->create_namespace(ns, NULL);
 
 1203     namespace_create(ns);
 
 1207                             const ThriftGen::Schema &schema)
 override {
 
 1215     } 
RETHROW(
"namespace=" << ns << 
" table="<< table)
 
 1221                            const ThriftGen::Schema &schema)
 override {
 
 1228       namespace_ptr->
alter_table(table, hschema, 
false);
 
 1229     } 
RETHROW(
"namespace=" << ns << 
" table="<< table)
 
 1235           const String &table, 
const ThriftGen::ScanSpec &ss)
 override {
 
 1237     LOG_API_START(
"namespace=" << ns << 
" table="<< table <<
" scan_spec="<< ss);
 
 1239       ScannerInfoPtr si = make_shared<ScannerInfo>(ns, table);
 
 1241       id = get_scanner_id(_open_scanner(ns, table, si->scan_spec_builder.get()), si);
 
 1242     } 
RETHROW(
"namespace=" << ns << 
" table="<< table <<
" scan_spec="<< ss)
 
 1248           const String &table, 
const ThriftGen::ScanSpec &ss)
 override {
 
 1249     return scanner_open(ns, table, ss);
 
 1253           const String &table, 
const ThriftGen::Future ff,
 
 1254           const ThriftGen::ScanSpec &ss)
 override {
 
 1256     LOG_API_START(
"namespace=" << ns << 
" table=" << table << 
" future=" 
 1257             << ff << 
" scan_spec=" << ss);
 
 1259       id = get_object_id(_open_scanner_async(ns, table, ff, ss));
 
 1260       add_reference(
id, ff);
 
 1261     } 
RETHROW(
"namespace=" << ns << 
" table=" << table << 
" future=" 
 1262             << ff << 
" scan_spec="<< ss)
 
 1269           const String &table, 
const ThriftGen::Future ff,
 
 1270           const ThriftGen::ScanSpec &ss)
 override {
 
 1271     return async_scanner_open(ns, table, ff, ss);
 
 1277       remove_namespace(ns);
 
 1283     namespace_close(ns);
 
 1287           const String &table_name)
 override {
 
 1288     LOG_API_START(
"namespace=" << ns << 
" table=" << table_name);
 
 1292     } 
RETHROW(
"namespace=" << ns << 
" table=" << table_name);
 
 1300       remove_scanner(
id, cobj, scanner_info);
 
 1302       if (g_log_slow_queries) {
 
 1306         if (scanner_info->latency >= g_slow_query_latency_threshold) {
 
 1307           if (scanner_info->hql.empty())
 
 1308             log_slow_query_scanspec(__func__, start_time, end_time,
 
 1309                                     scanner_info->latency, pd,
 
 1310                                     get_namespace(scanner_info->ns),
 
 1311                                     scanner_info->table,
 
 1312                                     scanner_info->scan_spec_builder.get());
 
 1314             log_slow_query(__func__, start_time, end_time,
 
 1315                            scanner_info->latency, pd,
 
 1316                            get_namespace(scanner_info->ns),
 
 1326     scanner_close(scanner);
 
 1333       get_scanner_async(scanner)->cancel();
 
 1334     } 
RETHROW(
"scanner=" << scanner)
 
 1340     async_scanner_cancel(scanner);
 
 1346       remove_scanner(scanner_async);
 
 1347       remove_references(scanner_async);
 
 1348     } 
RETHROW(
"scanner_async="<< scanner_async)
 
 1353     async_scanner_close(scanner_async);
 
 1357           const Scanner scanner_id)
 override {
 
 1360       TableScanner *scanner = get_scanner(scanner_id, scanner_info);
 
 1361       _next(result, scanner, m_context.next_threshold);
 
 1362     } 
RETHROW(
"scanner="<< scanner_id)
 
 1366   void next_cells(ThriftCells &result, 
const Scanner scanner_id)
 override {
 
 1367     scanner_get_cells(result, scanner_id);
 
 1371           const Scanner scanner_id)
 override {
 
 1374       TableScanner *scanner = get_scanner(scanner_id, scanner_info);
 
 1375       _next(result, scanner, m_context.next_threshold);
 
 1376     } 
RETHROW(
"scanner="<< scanner_id <<
" result.size="<< result.size())
 
 1381           const Scanner scanner_id)
 override {
 
 1382     scanner_get_cells_as_arrays(result, scanner_id);
 
 1386           const Scanner scanner_id)
 override {
 
 1393       TableScanner *scanner = get_scanner(scanner_id, scanner_info);
 
 1396         if (scanner->
next(cell)) {
 
 1397           if (!writer.
add(cell)) {
 
 1399             scanner->
unget(cell);
 
 1410     } 
RETHROW(
"scanner="<< scanner_id);
 
 1415           const Scanner scanner_id)
 override {
 
 1416     scanner_get_cells_serialized(result, scanner_id);
 
 1420     LOG_API_START(
"scanner="<< scanner_id <<
" result.size="<< result.size());
 
 1422       TableScanner *scanner = get_scanner(scanner_id, scanner_info);
 
 1423       _next_row(result, scanner);
 
 1424     } 
RETHROW(
"scanner=" << scanner_id)
 
 1429   void next_row(ThriftCells &result, 
const Scanner scanner_id)
 override {
 
 1430     scanner_get_row(result, scanner_id);
 
 1434           const Scanner scanner_id)
 override {
 
 1437       TableScanner *scanner = get_scanner(scanner_id, scanner_info);
 
 1438       _next_row(result, scanner);
 
 1439     } 
RETHROW(
"result.size=" << result.size())
 
 1444           const Scanner scanner_id)
 override {
 
 1445     scanner_get_row_as_arrays(result, scanner_id);
 
 1449           const Scanner scanner_id)
 override {
 
 1455       std::string prev_row;
 
 1457       TableScanner *scanner = get_scanner(scanner_id, scanner_info);
 
 1460         if (scanner->
next(cell)) {
 
 1462           if (prev_row.empty() || prev_row == cell.
row_key) {
 
 1465             if (prev_row.empty())
 
 1471             scanner->
unget(cell);
 
 1483     } 
RETHROW(
"scanner="<< scanner_id)
 
 1488           const Scanner scanner_id)
 override {
 
 1489     scanner_get_row_serialized(result, scanner_id);
 
 1492   void get_row(ThriftCells &result, 
const ThriftGen::Namespace ns,
 
 1494     LOG_API_START(
"namespace=" << ns << 
" table="<< table <<
" row="<< row);
 
 1500                                                          row.c_str(), 
true));
 
 1503       _next(result, scanner.get(), INT32_MAX);
 
 1505     } 
RETHROW(
"namespace=" << ns << 
" table="<< table <<
" row="<< row)
 
 1510           const ThriftGen::Namespace ns, 
const String &table,
 
 1511           const String &row)
 override {
 
 1512     LOG_API_START(
"namespace=" << ns << 
" table="<< table <<
" row="<< row);
 
 1519                                                          row.c_str(), 
true));
 
 1522       _next(result, scanner.get(), INT32_MAX);
 
 1524     } 
RETHROW(
"namespace=" << ns << 
" table="<< table <<
" row="<< row)
 
 1529           const ThriftGen::Namespace ns, 
const std::string& table,
 
 1530           const std::string& row)
 override {
 
 1531     LOG_API_START(
"namespace=" << ns << 
" table="<< table <<
" row"<< row);
 
 1539                                                          row.c_str(), 
true));
 
 1544       while (scanner->next(cell))
 
 1550     } 
RETHROW(
"namespace=" << ns << 
" table="<< table <<
" row"<< row)
 
 1554   void get_cell(Value &result, 
const ThriftGen::Namespace ns,
 
 1556     LOG_API_START(
"namespace=" << ns << 
" table=" << table << 
" row=" 
 1557             << row << 
" column=" << column);
 
 1568           column.c_str(), 
true, row.c_str(), column.c_str(), 
true));
 
 1574       if (scanner->next(cell))
 
 1579     } 
RETHROW(
"namespace=" << ns << 
" table=" << table << 
" row=" 
 1580             << row << 
" column=" << column)
 
 1585   void get_cells(ThriftCells &result, 
const ThriftGen::Namespace ns,
 
 1586           const String &table, 
const ThriftGen::ScanSpec &ss)
 override {
 
 1587     LOG_API_START(
"namespace=" << ns << 
" table=" << table << 
" scan_spec=" 
 1588             << ss << 
" result.size=" << result.size());
 
 1594       _next(result, scanner.get(), INT32_MAX);
 
 1596     } 
RETHROW(
"namespace=" << ns << 
" table="<< table <<
" scan_spec="<< ss)
 
 1601           const ThriftGen::Namespace ns, 
const String &table,
 
 1602           const ThriftGen::ScanSpec &ss)
 override {
 
 1603     LOG_API_START(
"namespace=" << ns << 
" table="<< table <<
" scan_spec="<< ss);
 
 1609       _next(result, scanner.get(), INT32_MAX);
 
 1611     } 
RETHROW(
"namespace=" << ns << 
" table="<< table <<
" scan_spec="<< ss)
 
 1616           const ThriftGen::Namespace ns, 
const String& table,
 
 1617           const ThriftGen::ScanSpec& ss)
 override {
 
 1618     LOG_API_START(
"namespace=" << ns << 
" table="<< table <<
" scan_spec="<< ss);
 
 1627       while (scanner->next(cell))
 
 1633     } 
RETHROW(
"namespace=" << ns << 
" table="<< table <<
" scan_spec="<< ss)
 
 1638           const String &table, 
const ThriftGen::MutateSpec &mutate_spec,
 
 1639           const ThriftCells &cells)
 override {
 
 1641             " mutate_spec.appname=" << mutate_spec.appname);
 
 1644       _offer_cells(ns, table, mutate_spec, cells);
 
 1645     } 
RETHROW(
"namespace=" << ns << 
" table=" << table
 
 1646             << 
" mutate_spec.appname="<< mutate_spec.appname)
 
 1651           const ThriftGen::MutateSpec &mutate_spec, 
const ThriftCells &cells)
 override {
 
 1652     shared_mutator_set_cells(ns, table, mutate_spec, cells);
 
 1656           const String &table, 
const ThriftGen::MutateSpec &mutate_spec,
 
 1657           const ThriftGen::Cell &cell)
 override {
 
 1659             << 
" mutate_spec.appname="<< mutate_spec.appname);
 
 1662       _offer_cell(ns, table, mutate_spec, cell);
 
 1663     } 
RETHROW(
"namespace=" << ns << 
" table=" << table
 
 1664             << 
" mutate_spec.appname="<< mutate_spec.appname)
 
 1669           const ThriftGen::MutateSpec &mutate_spec,
 
 1670           const ThriftGen::Cell &cell)
 override {
 
 1671     shared_mutator_set_cell(ns, table, mutate_spec, cell);
 
 1675          const String &table, 
const ThriftGen::MutateSpec &mutate_spec,
 
 1676          const ThriftCellsAsArrays &cells)
 override {
 
 1678             << 
" mutate_spec.appname=" << mutate_spec.appname);
 
 1680       _offer_cells(ns, table, mutate_spec, cells);
 
 1681       LOG_API(
"mutate_spec.appname=" << mutate_spec.appname << 
" done");
 
 1682     } 
RETHROW(
"namespace=" << ns << 
" table=" << table
 
 1683             << 
" mutate_spec.appname=" << mutate_spec.appname)
 
 1688           const String &table, 
const ThriftGen::MutateSpec &mutate_spec,
 
 1689           const ThriftCellsAsArrays &cells)
 override {
 
 1690     shared_mutator_set_cells_as_arrays(ns, table, mutate_spec, cells);
 
 1694           const String &table, 
const ThriftGen::MutateSpec &mutate_spec,
 
 1695           const CellAsArray &cell)
 override {
 
 1698             << 
" mutate_spec.appname=" << mutate_spec.appname);
 
 1701       _offer_cell(ns, table, mutate_spec, cell);
 
 1702       LOG_API(
"mutate_spec.appname=" << mutate_spec.appname << 
" done");
 
 1703     } 
RETHROW(
"namespace=" << ns << 
" table=" << table
 
 1704             << 
" mutate_spec.appname=" << mutate_spec.appname)
 
 1709           const String &table, 
const ThriftGen::MutateSpec &mutate_spec,
 
 1710           const CellAsArray &cell)
 override {
 
 1711     shared_mutator_set_cell_as_array(ns, table, mutate_spec, cell);
 
 1715     ThriftGen::Future id;
 
 1718       capacity = (capacity <= 0) ? m_context.future_capacity : capacity;
 
 1720     } 
RETHROW(
"capacity=" << capacity)
 
 1726     return future_open(capacity);
 
 1730           ThriftGen::Future ff, 
int timeout_millis)
 override {
 
 1736       bool timed_out = 
false;
 
 1737       bool done = !(future->
get(hresult, (uint32_t)timeout_millis,
 
 1742         tresult.is_empty = 
true;
 
 1747         tresult.is_empty = 
false;
 
 1748         _convert_result(hresult, tresult);
 
 1750                 << tresult.id << 
" is_scan=" << tresult.is_scan
 
 1751                 << 
" is_error=" << tresult.is_error);
 
 1757           ThriftGen::Future ff, 
int timeout_millis)
 override {
 
 1758     future_get_result(tresult, ff, timeout_millis);
 
 1762           ThriftGen::Future ff, 
int timeout_millis)
 override {
 
 1767       bool timed_out = 
false;
 
 1768       bool done = !(future->
get(hresult, (uint32_t)timeout_millis,
 
 1773         tresult.is_empty = 
true;
 
 1778         tresult.is_empty = 
false;
 
 1779         _convert_result_as_arrays(hresult, tresult);
 
 1781                 << 
" is_scan=" << tresult.is_scan << 
"is_error=" 
 1782                 << tresult.is_error);
 
 1788           ThriftGen::Future ff, 
int timeout_millis)
 override {
 
 1789     future_get_result_as_arrays(tresult, ff, timeout_millis);
 
 1793           ThriftGen::Future ff, 
int timeout_millis)
 override {
 
 1799       bool timed_out = 
false;
 
 1800       bool done = !(future->
get(hresult, (uint32_t)timeout_millis,
 
 1805         tresult.is_empty = 
true;
 
 1810         tresult.is_empty = 
false;
 
 1811         _convert_result_serialized(hresult, tresult);
 
 1813                 << 
" is_scan=" << tresult.is_scan << 
"is_error=" 
 1814                 << tresult.is_error);
 
 1820           ThriftGen::Future ff, 
int timeout_millis)
 override {
 
 1821     future_get_result_serialized(tresult, ff, timeout_millis);
 
 1872     bool has_outstanding;
 
 1879     return has_outstanding;
 
 1895     ThriftGen::Namespace id;
 
 1898       id = get_cached_object_id( dynamic_pointer_cast<ClientObject>(m_context.client->open_namespace(ns)) );
 
 1905     return namespace_open(ns);
 
 1909           const String &table, 
const ThriftGen::Future ff, ::int32_t flags)
 override {
 
 1910     LOG_API_START(
"namespace=" << ns << 
" table=" << table << 
" future=" 
 1911             << ff << 
" flags=" << flags);
 
 1914       id = get_object_id(_open_mutator_async(ns, table, ff, flags));
 
 1915       add_reference(
id, ff);
 
 1916     } 
RETHROW(
"namespace=" << ns << 
" table=" << table << 
" future=" 
 1917             << ff << 
" flags=" << flags)
 
 1923           const String &table, 
const ThriftGen::Future ff, ::int32_t flags)
 override {
 
 1924     return async_mutator_open(ns, table, ff, flags);
 
 1928           const String &table, int32_t flags, int32_t flush_interval)
 override {
 
 1929     LOG_API_START(
"namespace=" << ns << 
"table=" << table << 
" flags=" 
 1930             << flags << 
" flush_interval=" << flush_interval);
 
 1935       id =  get_object_id(t->create_mutator(0, flags, flush_interval));
 
 1936     } 
RETHROW(
"namespace=" << ns << 
"table=" << table << 
" flags=" 
 1937             << flags << 
" flush_interval=" << flush_interval)
 
 1943           const String &table, int32_t flags, int32_t flush_interval)
 override {
 
 1944     return mutator_open(ns, table, flags, flush_interval);
 
 1950       get_mutator(mutator)->flush();
 
 1951     } 
RETHROW(
"mutator=" << mutator)
 
 1956     mutator_flush(mutator);
 
 1962       get_mutator_async(mutator)->flush();
 
 1963     } 
RETHROW(
"mutator=" << mutator)
 
 1968     async_mutator_flush(mutator);
 
 1974       flush_mutator(mutator);
 
 1975       remove_mutator(mutator);
 
 1976     } 
RETHROW(
"mutator=" << mutator)
 
 1981     mutator_close(mutator);
 
 1988       get_mutator_async(mutator)->cancel();
 
 1989     } 
RETHROW(
"mutator="<< mutator)
 
 1995     async_mutator_cancel(mutator);
 
 2001       flush_mutator_async(mutator);
 
 2002       remove_mutator(mutator);
 
 2003       remove_references(mutator);
 
 2004     } 
RETHROW(
"mutator" << mutator)
 
 2009     async_mutator_close(mutator);
 
 2013           const ThriftCells &cells)
 override {
 
 2014     LOG_API_START(
"mutator=" << mutator << 
" cell.size=" << cells.size());
 
 2016       _set_cells(mutator, cells);
 
 2017     } 
RETHROW(
"mutator=" << mutator << 
" cell.size=" << cells.size())
 
 2022           const ThriftGen::Cell &cell)
 override {
 
 2025       _set_cell(mutator, cell);
 
 2026     } 
RETHROW(
"mutator=" << mutator << 
" cell=" << cell)
 
 2031           const ThriftCellsAsArrays &cells)
 override {
 
 2032     LOG_API_START(
"mutator=" << mutator << 
" cell.size=" << cells.size());
 
 2034       _set_cells(mutator, cells);
 
 2035     } 
RETHROW(
"mutator=" << mutator << 
" cell.size=" << cells.size())
 
 2040           const CellAsArray &cell)
 override {
 
 2042     LOG_API_START(
"mutator=" << mutator << 
" cell_as_array.size=" 
 2045       _set_cell(mutator, cell);
 
 2046     } 
RETHROW(
"mutator="<< mutator <<
" cell_as_array.size="<< cell.size());
 
 2051           const CellsSerialized &cells, 
const bool flush)
 override {
 
 2052     LOG_API_START(
"mutator=" << mutator << 
" cell.size=" << cells.size());
 
 2057               (uint32_t)cells.length());
 
 2058       while (reader.next()) {
 
 2060         cb.
add(hcell, 
false);
 
 2062       get_mutator(mutator)->set_cells(cb.
get());
 
 2063       if (flush || reader.flush())
 
 2064         get_mutator(mutator)->flush();
 
 2065     } 
RETHROW(
"mutator="<< mutator <<
" cell.size="<< cells.size())
 
 2071           const ThriftGen::Cell &cell)
 override {
 
 2072     LOG_API_START(
"ns=" << ns << 
" table=" << table << 
" cell=" << cell);
 
 2078       cb.
add(hcell, 
false);
 
 2079       mutator->set_cells(cb.
get());
 
 2081     } 
RETHROW(
"ns=" << ns << 
" table=" << table << 
" cell=" << cell);
 
 2086           const ThriftCells &cells)
 override {
 
 2087     LOG_API_START(
"ns=" << ns << 
" table=" << table << 
" cell.size=" 
 2093       mutator->set_cells(hcells);
 
 2095     } 
RETHROW(
"ns=" << ns << 
" table=" << table <<
" cell.size=" 
 2101           const String& table, 
const ThriftCellsAsArrays &cells)
 override {
 
 2102     LOG_API_START(
"ns=" << ns << 
" table=" << table << 
" cell.size=" 
 2109       mutator->set_cells(hcells);
 
 2111     } 
RETHROW(
"ns="<< ns <<
" table=" << table<<
" cell.size="<< cells.size());
 
 2116           const String& table, 
const CellAsArray &cell)
 override {
 
 2119     LOG_API_START(
"ns=" << ns << 
" table=" << table << 
" cell_as_array.size=" 
 2126       cb.
add(hcell, 
false);
 
 2127       mutator->set_cells(cb.
get());
 
 2129     } 
RETHROW(
"ns=" << ns << 
" table=" << table << 
" cell_as_array.size=" 
 2135           const String& table, 
const CellsSerialized &cells)
 override {
 
 2137             " cell_serialized.size=" << cells.size() << 
" flush=" << flush);
 
 2143               (uint32_t)cells.length());
 
 2144       while (reader.next()) {
 
 2146         cb.
add(hcell, 
false);
 
 2148       mutator->set_cells(cb.
get());
 
 2150     } 
RETHROW(
"ns=" << ns << 
" table=" << table << 
" cell_serialized.size=" 
 2151             << cells.size() << 
" flush=" << flush);
 
 2157           const ThriftCells &cells)
 override {
 
 2158     LOG_API_START(
"mutator=" << mutator << 
" cells.size=" << cells.size());
 
 2160       _set_cells_async(mutator, cells);
 
 2161     } 
RETHROW(
"mutator=" << mutator << 
" cells.size=" << cells.size())
 
 2166           const ThriftCells &cells)
 override {
 
 2167     async_mutator_set_cells(mutator, cells);
 
 2171           const ThriftGen::Cell &cell)
 override {
 
 2174       _set_cell_async(mutator, cell);
 
 2175     } 
RETHROW(
"mutator=" << mutator << 
" cell=" << cell);
 
 2180           const ThriftGen::Cell &cell)
 override {
 
 2181     async_mutator_set_cell(mutator, cell);
 
 2185           const ThriftCellsAsArrays &cells)
 override {
 
 2186     LOG_API_START(
"mutator=" << mutator << 
" cells.size=" << cells.size());
 
 2188       _set_cells_async(mutator, cells);
 
 2189     } 
RETHROW(
"mutator=" << mutator << 
" cells.size=" << cells.size())
 
 2194           const ThriftCellsAsArrays &cells)
 override {
 
 2195     async_mutator_set_cells_as_arrays(mutator, cells);
 
 2199           const CellAsArray &cell)
 override {
 
 2201     LOG_API_START(
"mutator=" << mutator << 
" cell_as_array.size=" 
 2204       _set_cell_async(mutator, cell);
 
 2205     } 
RETHROW(
"mutator=" << mutator << 
" cell_as_array.size=" << cell.size())
 
 2210           const CellAsArray &cell)
 override {
 
 2211     async_mutator_set_cell_as_array(mutator, cell);
 
 2215           const CellsSerialized &cells,
 
 2216       const bool flush)
 override {
 
 2217    LOG_API_START(
"mutator=" << mutator << 
" cells.size=" << cells.size());
 
 2222               (uint32_t)cells.length());
 
 2223       while (reader.next()) {
 
 2225         cb.
add(hcell, 
false);
 
 2229       if (flush || reader.flush() || mutator_ptr->
needs_flush())
 
 2230         mutator_ptr->
flush();
 
 2232     } 
RETHROW(
"mutator=" << mutator << 
" cells.size=" << cells.size());
 
 2237           const CellsSerialized &cells,
 
 2238       const bool flush)
 override {
 
 2239     async_mutator_set_cells_serialized(mutator, cells, flush);
 
 2246       exists = m_context.client->exists_namespace(ns);
 
 2253     return namespace_exists(ns);
 
 2257           const String &table)
 override {
 
 2263     } 
RETHROW(
"namespace=" << ns << 
" table="<< table)
 
 2269           const String &table)
 override {
 
 2274           const String &table)
 override {
 
 2279     } 
RETHROW(
"namespace=" << ns << 
" table="<< table)
 
 2284           const String &table)
 override {
 
 2285     table_get_id(result, ns, table);
 
 2289           const ThriftGen::Namespace ns, 
const String &table)
 override {
 
 2294     } 
RETHROW(
"namespace=" << ns << 
" table=" << table)
 
 2299           const String &table)
 override {
 
 2300     table_get_schema_str(result, ns, table);
 
 2304           const ThriftGen::Namespace ns, 
const String &table)
 override {
 
 2309     } 
RETHROW(
"namespace=" << ns << 
" table=" << table)
 
 2314           const ThriftGen::Namespace ns, 
const String &table)
 override {
 
 2315     table_get_schema_str_with_ids(result, ns, table);
 
 2319           const ThriftGen::Namespace ns, 
const String &table)
 override {
 
 2326     } 
RETHROW(
"namespace=" << ns << 
" table="<< table)
 
 2331           const ThriftGen::Namespace ns, 
const String &table)
 override {
 
 2332     table_get_schema(result, ns, table);
 
 2336           const ThriftGen::Namespace ns)
 override {
 
 2340       std::vector<Hypertable::NamespaceListing> listing;
 
 2343       for(
size_t ii=0; ii < listing.size(); ++ii)
 
 2344         if (!listing[ii].is_namespace)
 
 2345           tables.push_back(listing[ii].name);
 
 2353           const ThriftGen::Namespace ns)
 override {
 
 2357       std::vector<Hypertable::NamespaceListing> listing;
 
 2359       ThriftGen::NamespaceListing entry;
 
 2361       for(
size_t ii=0; ii < listing.size(); ++ii) {
 
 2362         entry.name = listing[ii].name;
 
 2363         entry.is_namespace = listing[ii].is_namespace;
 
 2364         _return.push_back(entry);
 
 2373           const ThriftGen::Namespace ns)
 override {
 
 2374     namespace_get_listing(_return, ns);
 
 2378           const ThriftGen::Namespace ns,  
const String &table)
 override {
 
 2381             << 
" splits.size=" << _return.size());
 
 2383       ThriftGen::TableSplit tsplit;
 
 2386       for (TableSplitsContainer::iterator iter = splits.begin();
 
 2387               iter != splits.end(); ++iter) {
 
 2389         _return.push_back(tsplit);
 
 2392     RETHROW(
"namespace=" << ns << 
" table=" << table)
 
 2398           const ThriftGen::Namespace ns,  
const String &table)
 override {
 
 2399     table_get_splits(_return, ns, table);
 
 2403     LOG_API_START(
"namespace=" << ns << 
" if_exists=" << if_exists);
 
 2405       m_context.client->drop_namespace(ns, NULL, if_exists);
 
 2407     RETHROW(
"namespace=" << ns << 
" if_exists=" << if_exists)
 
 2412     namespace_drop(ns, if_exists);
 
 2416           const String &new_table_name)
 override {
 
 2418             << 
" new_table_name=" << new_table_name << 
" done");
 
 2423     RETHROW(
"namespace=" << ns << 
" table=" << table << 
" new_table_name=" 
 2424             << new_table_name << 
" done")
 
 2429           const String &new_table_name)
 override {
 
 2430     table_rename(ns, table, new_table_name);
 
 2434           const bool if_exists)
 override {
 
 2435     LOG_API_START(
"namespace=" << ns << 
" table=" << table << 
" if_exists=" 
 2441     RETHROW(
"namespace=" << ns << 
" table=" << table << 
" if_exists=" 
 2447           const bool if_exists)
 override {
 
 2448     table_drop(ns, table, if_exists);
 
 2461           const ThriftGen::Namespace ns, 
const std::string& table_name, 
 
 2462           const ThriftGen::Key& tkey, 
const std::string& value)
 override {
 
 2464             << tkey << 
" value=" << value);
 
 2472               value.empty() ? guid : (std::string &)value);
 
 2474     RETHROW(
"namespace=" << ns << 
" table=" << table_name 
 
 2475             << tkey << 
" value=" << value);
 
 2477     _return = value.empty() ? guid : value;
 
 2480   void status(ThriftGen::Status& _return)
 override {
 
 2482       _return.__set_code(0);
 
 2483       _return.__set_text(
"");
 
 2489     kill(getpid(), SIGKILL);
 
 2500           ThriftGen::Result &tresult) {
 
 2503     if (hresult->is_scan()) {
 
 2504       tresult.is_scan = 
true;
 
 2505       tresult.id = try_get_object_id(hresult->get_scanner());
 
 2506       if (hresult->is_error()) {
 
 2507         tresult.is_error = 
true;
 
 2508         hresult->get_error(tresult.error, tresult.error_msg);
 
 2509         tresult.__isset.error = 
true;
 
 2510         tresult.__isset.error_msg = 
true;
 
 2513         tresult.is_error = 
false;
 
 2514         tresult.__isset.cells = 
true;
 
 2515         hresult->get_cells(hcells);
 
 2520       tresult.is_scan = 
false;
 
 2521       tresult.id = try_get_object_id(hresult->get_mutator());
 
 2522       if (hresult->is_error()) {
 
 2523         tresult.is_error = 
true;
 
 2524         hresult->get_error(tresult.error, tresult.error_msg);
 
 2525         hresult->get_failed_cells(hcells);
 
 2527         tresult.__isset.error = 
true;
 
 2528         tresult.__isset.error_msg = 
true;
 
 2534       ThriftGen::ResultAsArrays &tresult) {
 
 2537     if (hresult->is_scan()) {
 
 2538       tresult.is_scan = 
true;
 
 2539       tresult.id = try_get_object_id(hresult->get_scanner());
 
 2540       if (hresult->is_error()) {
 
 2541         tresult.is_error = 
true;
 
 2542         hresult->get_error(tresult.error, tresult.error_msg);
 
 2543         tresult.__isset.error = 
true;
 
 2544         tresult.__isset.error_msg = 
true;
 
 2547         tresult.is_error = 
false;
 
 2548         tresult.__isset.cells = 
true;
 
 2549         hresult->get_cells(hcells);
 
 2555               "not yet implemented");
 
 2560           ThriftGen::ResultSerialized &tresult) {
 
 2563     if (hresult->is_scan()) {
 
 2564       tresult.is_scan = 
true;
 
 2565       tresult.id = try_get_object_id(hresult->get_scanner());
 
 2566       if (hresult->is_error()) {
 
 2567         tresult.is_error = 
true;
 
 2568         hresult->get_error(tresult.error, tresult.error_msg);
 
 2569         tresult.__isset.error = 
true;
 
 2570         tresult.__isset.error_msg = 
true;
 
 2573         tresult.is_error = 
false;
 
 2574         tresult.__isset.cells = 
true;
 
 2575         hresult->get_cells(hcells);
 
 2581               "not yet implemented");
 
 2586           const String &table, 
const ThriftGen::Future ff, ::int32_t flags) {
 
 2591     return t->create_mutator_async(future, 0, flags);
 
 2595           const String &table, 
const ThriftGen::Future ff,
 
 2596           const ThriftGen::ScanSpec &ss) {
 
 2603     return t->create_scanner_async(future, hss, 0);
 
 2610     return t->create_scanner(ss, 0);
 
 2617     return t->create_mutator();
 
 2620   template <
class CellT>
 
 2623     int32_t amount_read = 0;
 
 2625     while (amount_read < limit) {
 
 2626       if (scanner->
next(cell)) {
 
 2629         result.push_back(tcell);
 
 2636   template <
class CellT>
 
 2639     std::string prev_row;
 
 2641     while (scanner->
next(cell)) {
 
 2642       if (prev_row.empty() || prev_row == cell.
row_key) {
 
 2645         result.push_back(tcell);
 
 2646         if (prev_row.empty())
 
 2650         scanner->
unget(cell);
 
 2660     interp->set_namespace(namespace_ptr->
get_name());
 
 2661     interp->execute(hql, cb);
 
 2664   template <
class CellT>
 
 2666           const ThriftGen::MutateSpec &mutate_spec,
 
 2667           const vector<CellT> &cells) {
 
 2670     get_shared_mutator(ns, table, mutate_spec)->set_cells(hcells);
 
 2673   template <
class CellT>
 
 2675           const ThriftGen::MutateSpec &mutate_spec, 
const CellT &cell) {
 
 2679     cb.
add(hcell, 
false);
 
 2680     get_shared_mutator(ns, table, mutate_spec)->set_cells(cb.
get());
 
 2683   template <
class CellT>
 
 2684   void _set_cells(
const Mutator mutator, 
const vector<CellT> &cells) {
 
 2687     get_mutator(mutator)->set_cells(hcells);
 
 2690   template <
class CellT>
 
 2695     cb.
add(hcell, 
false);
 
 2696     get_mutator(mutator)->set_cells(cb.
get());
 
 2699   template <
class CellT>
 
 2706       mutator_ptr->
flush();
 
 2709   template <
class CellT>
 
 2714     cb.
add(hcell, 
false);
 
 2718       mutator_ptr->
flush();
 
 2722     std::lock_guard<std::mutex> lock(m_mutex);
 
 2723     ObjectMap::iterator it = m_object_map.find(
id);
 
 2724     return (it != m_object_map.end()) ? it->second.get() : 0;
 
 2728     std::lock_guard<std::mutex> lock(m_mutex);
 
 2729     ObjectMap::iterator it = m_cached_object_map.find(
id);
 
 2730     return (it != m_cached_object_map.end()) ? it->second.get() : 0;
 
 2738                format(
"Invalid future id: %lld", (
Lld)
id));
 
 2749                format(
"Invalid namespace id: %lld", (
Lld)
id));
 
 2756     std::lock_guard<std::mutex> lock(m_mutex);
 
 2757     while (!m_cached_object_map.insert(make_pair(
id = 
Random::number32(), co)).second || 
id == 0); 
 
 2762     std::lock_guard<std::mutex> lock(m_mutex);
 
 2763     int64_t 
id = 
reinterpret_cast<int64_t
>(co);
 
 2769     std::lock_guard<std::mutex> lock(m_mutex);
 
 2770     int64_t 
id = 
reinterpret_cast<int64_t
>(mutator.get());
 
 2771     m_object_map.insert(make_pair(
id, static_pointer_cast<ClientObject>(mutator))); 
 
 2776     std::lock_guard<std::mutex> lock(m_mutex);
 
 2777     int64_t 
id = 
reinterpret_cast<int64_t
>(co);
 
 2778     return m_object_map.find(
id) != m_object_map.end() ? 
id : 0;
 
 2782     std::lock_guard<std::mutex> lock(m_mutex);
 
 2783     int64_t 
id = 
reinterpret_cast<int64_t
>(scanner);
 
 2785     m_scanner_info_map.insert(make_pair(
id, info));
 
 2790     std::lock_guard<std::mutex> lock(m_mutex);
 
 2791     int64_t 
id = 
reinterpret_cast<int64_t
>(scanner.get());
 
 2792     m_object_map.insert(make_pair(
id, static_pointer_cast<ClientObject>(scanner)));
 
 2793     m_scanner_info_map.insert(make_pair(
id, info));
 
 2798     std::lock_guard<std::mutex> lock(m_mutex);
 
 2799     ObjectMap::iterator it = m_object_map.find(to);
 
 2800     if (it != m_object_map.end())
 
 2801       m_reference_map.insert(make_pair(from, it->second));
 
 2805     std::lock_guard<std::mutex> lock(m_mutex);
 
 2806     m_reference_map.erase(
id);
 
 2815                format(
"Invalid scanner id: %lld", (
Lld)
id));
 
 2821     std::lock_guard<std::mutex> lock(m_mutex);
 
 2823     auto it = m_object_map.find(
id);
 
 2824     if (it == m_object_map.end() ||
 
 2825         (scanner = 
dynamic_cast<TableScanner *
>(it->second.get())) == 
nullptr) {
 
 2828                format(
"Invalid scanner id: %lld", (
Lld)
id));
 
 2830     auto sit = m_scanner_info_map.find(
id);
 
 2831     HT_ASSERT(sit != m_scanner_info_map.end());
 
 2838     bool removed = 
false;
 
 2841       std::lock_guard<std::mutex> lock(m_mutex);
 
 2842       ObjectMap::iterator it = m_object_map.find(
id);
 
 2843       if (it != m_object_map.end()) {
 
 2844         item = (*it).second;
 
 2845         m_object_map.erase(it);
 
 2854     bool removed = 
false;
 
 2857       std::lock_guard<std::mutex> lock(m_mutex);
 
 2858       ObjectMap::iterator it = m_cached_object_map.find(
id);
 
 2859       if (it != m_cached_object_map.end()) {
 
 2860         item = (*it).second;
 
 2861         m_cached_object_map.erase(it);
 
 2872       std::lock_guard<std::mutex> lock(m_mutex);
 
 2873       m_scanner_info_map.erase(
id);
 
 2874       ObjectMap::iterator it = m_object_map.find(
id);
 
 2875       if (it != m_object_map.end()) {
 
 2876         item = (*it).second;
 
 2877         m_object_map.erase(it);
 
 2882                  format(
"Invalid scanner id: %lld", (
Lld)
id));
 
 2888     std::lock_guard<std::mutex> lock(m_mutex);
 
 2889     ObjectMap::iterator it = m_object_map.find(
id);
 
 2890     if (it != m_object_map.end()) {
 
 2891       scanner = (*it).second;
 
 2892       m_object_map.erase(it);
 
 2897                format(
"Invalid scanner id: %lld", (
Lld)
id));
 
 2899     info = m_scanner_info_map[id];
 
 2900     m_scanner_info_map.erase(
id);
 
 2904           const String &table, 
const ThriftGen::MutateSpec &mutate_spec)
 override {
 
 2905     std::lock_guard<std::mutex> lock(m_context.shared_mutator_mutex);
 
 2908     SharedMutatorMap::iterator it = m_context.shared_mutator_map.find(skey);
 
 2911     if (it != m_context.shared_mutator_map.end()) {
 
 2912       LOG_API(
"deleting shared mutator on namespace=" << ns << 
" table=" 
 2913               << table << 
" with appname=" << mutate_spec.appname);
 
 2914       m_context.shared_mutator_map.erase(it);
 
 2919     LOG_API(
"creating shared mutator on namespace=" << ns << 
" table=" << table
 
 2920             <<
" with appname=" << mutate_spec.appname);
 
 2923     TableMutator *mutator = t->create_mutator(0, mutate_spec.flags,
 
 2924             mutate_spec.flush_interval);
 
 2925     m_context.shared_mutator_map[skey] = mutator;
 
 2930           const String &table, 
const ThriftGen::MutateSpec &mutate_spec)
 override {
 
 2931     shared_mutator_refresh(ns, table, mutate_spec);
 
 2935           const String &table, 
const ThriftGen::MutateSpec &mutate_spec) {
 
 2936     std::lock_guard<std::mutex> lock(m_context.shared_mutator_mutex);
 
 2939     SharedMutatorMap::iterator it = m_context.shared_mutator_map.find(skey);
 
 2942     if (it != m_context.shared_mutator_map.end())
 
 2946       LOG_API(
"creating shared mutator on namespace=" << ns << 
" table=" 
 2947               << table << 
" with appname=" << mutate_spec.appname);
 
 2950       TableMutator *mutator = t->create_mutator(0, mutate_spec.flags,
 
 2951               mutate_spec.flush_interval);
 
 2952       m_context.shared_mutator_map[skey] = mutator;
 
 2962                format(
"Invalid mutator id: %lld", (
Lld)
id));
 
 2973                format(
"Invalid mutator id: %lld", (
Lld)
id));
 
 2979     if (!remove_object(
id)) {
 
 2982                format(
"Invalid future id: %lld", (
Lld)
id));
 
 2987     if (!remove_cached_object(
id)) {
 
 2990                format(
"Invalid namespace id: %lld", (
Lld)
id));
 
 2995     if (!remove_object(
id)) {
 
 2998                format(
"Invalid mutator id: %lld", (
Lld)
id));
 
 3012 template <
class ResultT, 
class CellT>
 
 3014   result.results.push_back(ret);
 
 3015   result.__isset.results = 
true;
 
 3018 template <
class ResultT, 
class CellT>
 
 3024     while (s->next(hcell)) {
 
 3026       result.cells.push_back(tcell);
 
 3028     result.__isset.cells = 
true;
 
 3030     if (g_log_slow_queries)
 
 3031       s->get_profile_data(profile_data);
 
 3037     result.scanner = handler.get_scanner_id(s, si);
 
 3038     result.__isset.scanner = 
true;
 
 3043 template <
class ResultT, 
class CellT>
 
 3046     Parent::on_finish(m);
 
 3049     result.mutator = handler.get_object_id(m);
 
 3050     result.__isset.mutator = 
true;
 
 3062     return instance.get_handler(remotePeer);
 
 3067       instance.release_handler(serverHandler);
 
 3079     std::lock_guard<std::mutex> lock(m_mutex);
 
 3080     ServerHandlerMap::iterator it = m_server_handler_map.find(remotePeer);
 
 3081     if (it != m_server_handler_map.end()) {
 
 3083       return it->second.second;
 
 3087     m_server_handler_map.insert(
 
 3088       std::make_pair(remotePeer,
 
 3089       std::make_pair(1, serverHandler)));
 
 3091     return serverHandler;
 
 3096       std::lock_guard<std::mutex> lock(m_mutex);
 
 3097       ServerHandlerMap::iterator it =
 
 3098         m_server_handler_map.find(serverHandler->
remote_peer());
 
 3099       if (it != m_server_handler_map.end()) {
 
 3100         if (--it->second.first > 0) {
 
 3104       m_server_handler_map.erase(it);
 
 3106     delete serverHandler;
 
 3121   HqlServiceIf* 
getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
 override {
 
 3122     typedef ::apache::thrift::transport::TSocket TTransport;
 
 3124       dynamic_cast<TTransport*
>(connInfo.transport.get())->getPeerAddress();
 
 3125     g_metrics_handler->connection_increment();
 
 3131     g_metrics_handler->connection_decrement();
 
 3142   using namespace ThriftBroker;
 
 3146     init_with_policies<Policies>(argc, argv);
 
 3148     if (get_bool(
"ThriftBroker.Hyperspace.Session.Reconnect"))
 
 3149       properties->set(
"Hyperspace.Session.Reconnect", 
true);
 
 3151     if (get_bool(
"ThriftBroker.SlowQueryLog.Enable")) {
 
 3152       g_log_slow_queries = 
true;
 
 3153       g_slow_query_latency_threshold = get_i32(
"ThriftBroker.SlowQueryLog.LatencyThreshold");
 
 3154       g_slow_query_log = 
new Cronolog(
"SlowQuery.log",
 
 3159     g_metrics_handler = std::make_shared<MetricsHandler>(
properties, g_slow_query_log);
 
 3160     g_metrics_handler->start_collecting();
 
 3164     g_context = context.get();
 
 3166     ::uint16_t port = get_i16(
"port");
 
 3167     boost::shared_ptr<TProtocolFactory> protocolFactory(
new TBinaryProtocolFactory());
 
 3168     boost::shared_ptr<HqlServiceIfFactory> hql_service_factory(
new ThriftBrokerIfFactory());
 
 3169     boost::shared_ptr<TProcessorFactory> hql_service_processor_factory(
new HqlServiceProcessorFactory(hql_service_factory));
 
 3171     boost::shared_ptr<TServerTransport> serverTransport;
 
 3173     if (
has(
"thrift-timeout")) {
 
 3174       int timeout_ms = get_i32(
"thrift-timeout");
 
 3175       serverTransport.reset( 
new TServerSocket(port, timeout_ms, timeout_ms) );
 
 3178       serverTransport.reset( 
new TServerSocket(port) );
 
 3180     boost::shared_ptr<TTransportFactory> transportFactory(
new TFramedTransportFactory());
 
 3182     TThreadedServer server(hql_service_processor_factory, serverTransport,
 
 3183                            transportFactory, protocolFactory);
 
 3185     HT_INFO(
"Starting the server...");
 
 3189     g_metrics_handler->start_collecting();
 
 3190     g_metrics_handler.reset();
 
void set_cells(const ThriftGen::Namespace ns, const String &table, const ThriftCells &cells) override
bool get_counter() const 
Gets the counter option. 
void set_id(int32_t id)
Sets column ID. 
bool is_set_max_versions() const 
Checks if max versions option is set. 
Retrieves system information (hardware, installation directory, etc) 
bool get_time_order_desc() const 
Gets time order desc option. 
Meta::list< ThriftBrokerPolicy, DefaultCommPolicy > Policies
void shared_mutator_set_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftGen::Cell &cell) override
void next_row(ThriftCells &result, const Scanner scanner_id) override
void _convert_result_serialized(Hypertable::ResultPtr &hresult, ThriftGen::ResultSerialized &tresult)
void async_mutator_flush(const MutatorAsync mutator) override
void get_schema_str(String &result, const ThriftGen::Namespace ns, const String &table) override
bool is_set_ttl() const 
Checks if ttl option is set. 
void table_create(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Schema &schema) override
int32_t get_blocksize() const 
Gets blocksize option. 
void get_table_splits(const std::string &name, TableSplitsContainer &splits)
Returns a list of existing table names. 
void set_row_offset(int32_t n)
Sets the number of rows to be skipped at the beginning of the query. 
void async_mutator_cancel(const MutatorAsync mutator) override
bool is_set_bloom_filter() const 
Checks if bloom filter option is set. 
std::shared_ptr< MetricsHandler > MetricsHandlerPtr
Smart pointer to MetricsHandler. 
std::vector< Cell, CellAlloc > Cells
void create_namespace(const String &ns) override
void remove_mutator(int64_t id)
TablePtr open_table(const std::string &name, int32_t flags=0)
Opens a table. 
#define HT_WARNF(msg,...)
TableMutator * get_shared_mutator(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec)
void set_generation(int64_t generation)
Sets generation. 
TableScannerAsync * _open_scanner_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss)
int64_t get_scanner_id(TableScannerPtr &scanner, ScannerInfoPtr &info)
int32_t scanblocks
Number of scan blocks returned from RangeServers. 
void convert_schema(const Hypertable::SchemaPtr &hschema, ThriftGen::Schema &tschema)
void offer_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCells &cells) override
PropertiesPtr properties
This singleton map stores all options. 
void reserve_rows(size_t s)
ServerHandler(const String &remote_peer, Context &c)
void set_bloom_filter(const std::string &bloomfilter)
Sets bloom filter option. 
std::string String
A String is simply a typedef to std::string. 
void get_table_id(String &result, const ThriftGen::Namespace ns, const String &table) override
bool set_max_versions(int32_t max_versions)
Sets max versions option. 
ColumnPredicates column_predicates
void hql_exec_as_arrays(HqlResultAsArrays &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
void _set_cell_async(const MutatorAsync mutator, const CellT &cell)
int32_t subscanners
Number of RangeServer::create_scanner() calls. 
void set_deleted(bool value)
Sets deleted flag. 
void cancel_mutator_async(const MutatorAsync mutator) override
void error_get_text(std::string &_return, int error_code) override
void mutator_set_cells_serialized(const Mutator mutator, const CellsSerialized &cells, const bool flush) override
int64_t bytes_scanned
Number of bytes scanned while executing scan. 
bool is_cancelled()
Checks whether the Future object has been cancelled. 
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
void scanner_get_row_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void mutator_close(const Mutator mutator) override
chrono::time_point< fast_clock > time_point
void scanner_get_row(ThriftCells &result, const Scanner scanner_id) override
void _convert_result_as_arrays(const Hypertable::ResultPtr &hresult, ThriftGen::ResultAsArrays &tresult)
Asynchronous table scanner. 
void convert_table_split(const Hypertable::TableSplit &hsplit, ThriftGen::TableSplit &tsplit)
Represents a table split. 
void shared_mutator_set_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCells &cells) override
const String & remote_peer() const 
void _next_row(vector< CellT > &result, TableScanner *scanner)
ThriftGen::Future future_open(int capacity) override
void convert_scan_spec(const ThriftGen::ScanSpec &tss, Hypertable::ScanSpec &hss)
void async_mutator_set_cells_as_arrays(const MutatorAsync mutator, const ThriftCellsAsArrays &cells) override
void log_slow_query_scanspec(const char *func_name, std::chrono::fast_clock::time_point start_time, std::chrono::fast_clock::time_point end_time, int64_t latency_ms, ProfileDataScanner &profile_data, Hypertable::Namespace *ns, const string &table, Hypertable::ScanSpec &ss)
long long unsigned int Llu
Shortcut for printf formats. 
pair< int64_t, int64_t > time_interval
const char * value_regexp
const char * column_qualifier
void hql_query_as_arrays(HqlResultAsArrays &result, const ThriftGen::Namespace ns, const String &hql) override
void set_replication(int16_t replication)
Sets replication option. 
#define THROW_TE(_code_, _str_)
int compare(const SharedMutatorMapKey &skey) const 
Declarations for fast_clock. 
Column family specification. 
void convert_key(const ThriftGen::Key &tkey, Hypertable::KeySpec &hkey)
void flush_mutator_async(const MutatorAsync mutator) override
void scanner_get_cells_serialized(CellsSerialized &result, const Scanner scanner_id) override
void set_qualifier_index(bool value)
Sets qualifier index flag. 
void set_name(const std::string &name)
Sets column family name. 
const char * column_qualifier
ScanSpecBuilder scan_spec_builder
bool set_ttl(time_t ttl)
Sets ttl option. 
bool is_set_in_memory() const 
Checks if in memory option is set. 
void _set_cells_async(const MutatorAsync mutator, const vector< CellT > &cells)
void _convert_result(const Hypertable::ResultPtr &hresult, ThriftGen::Result &tresult)
Specification for column family options. 
void mutator_set_cells_as_arrays(const Mutator mutator, const ThriftCellsAsArrays &cells) override
void namespace_create(const String &ns) override
Hypertable::Future * get_future(int64_t id)
std::shared_ptr< Result > ResultPtr
Smart pointer to Result. 
void set_start_time(int64_t start)
ScannerAsync open_scanner_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss) override
void get_cells(ThriftCells &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
std::string get_table_id(const std::string &name)
Returns the table identifier for a table. 
void get_row_as_arrays(ThriftCellsAsArrays &result, const ThriftGen::Namespace ns, const String &table, const String &row) override
void flush(bool sync=true)
Flushes the current buffer accumulated mutations to their respective range servers. 
void async_mutator_close(const MutatorAsync mutator) override
void set_generation(int64_t generation)
Sets generation. 
void on_scan(TableScannerPtr &) override
Called when interpreter is ready to scan. 
Declarations for MetricsHandler. 
void mutator_flush(const Mutator mutator) override
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer. 
std::shared_ptr< ClientObject > ClientObjectPtr
Smart pointer to ClientObject. 
ClientObject * get_cached_object(int64_t id)
void namespace_get_listing(std::vector< ThriftGen::NamespaceListing > &_return, const ThriftGen::Namespace ns) override
bool future_is_full(ThriftGen::Future ff) override
#define LOG_API_START(_expr_)
void next_cells_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
static ServerHandler * getHandler(const String &remotePeer)
std::shared_ptr< TableScanner > TableScannerPtr
Smart pointer to TableScanner. 
Specification for access group options. 
bool convert_column_family_options(const Hypertable::ColumnFamilyOptions &hoptions, ThriftGen::ColumnFamilyOptions &toptions)
void set_value_regexp(const char *regexp)
Sets the regexp to filter cell values by. 
bool is_set_replication() const 
Checks if replication option is set. 
bool operator==(const Statistics &other)
void unget(const Cell &cell)
Ungets one cell. 
void _offer_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellT &cell)
void get_row_serialized(CellsSerialized &result, const ThriftGen::Namespace ns, const std::string &table, const std::string &row) override
String generate_guid()
Generates a new GUID. 
void next_cells_serialized(CellsSerialized &result, const Scanner scanner_id) override
void run_hql_interp(const ThriftGen::Namespace ns, const String &hql, HqlInterpreter::Callback &cb)
void set_cells_serialized_async(const MutatorAsync mutator, const CellsSerialized &cells, const bool flush) override
bool has(const String &name)
Check existence of a configuration value. 
Represents a row interval. 
const string render_hql(const string &table) const 
Renders scan spec as an HQL SELECT statement. 
bool is_set_counter() const 
Checks if counter option is set. 
#define LOG_API_FINISH_E(_expr_)
static void releaseHandler(ServerHandler *serverHandler)
Declarations for Cronolog. 
void scanner_get_cells_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void async_mutator_set_cells_serialized(const MutatorAsync mutator, const CellsSerialized &cells, const bool flush) override
void create_cell_unique(std::string &_return, const ThriftGen::Namespace ns, const std::string &table_name, const ThriftGen::Key &tkey, const std::string &value) override
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
void set_cell_as_array_async(const MutatorAsync mutator, const CellAsArray &cell) override
bool future_is_empty(ThriftGen::Future ff) override
void set_value_index(bool value)
Sets value index flag. 
void cancel_scanner_async(const ScannerAsync scanner) override
void close_namespace(const ThriftGen::Namespace ns) override
Scan predicate and control specification. 
void get_future_result(ThriftGen::Result &tresult, ThriftGen::Future ff, int timeout_millis) override
void table_rename(const ThriftGen::Namespace ns, const String &table, const String &new_table_name) override
void mutator_set_cell(const Mutator mutator, const ThriftGen::Cell &cell) override
MutatorAsync async_mutator_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags) override
std::shared_ptr< TableMutator > TableMutatorPtr
Smart pointer to TableMutator. 
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
void set_cell_limit(int32_t n)
Sets the maximum number of cells to return. 
multimap<::int64_t, ClientObjectPtr > m_reference_map
void create_cell_unique(const TablePtr &table, const KeySpec &key, String &guid)
Inserts a unique value into a table. 
ServerHandlerMap m_server_handler_map
bool future_has_outstanding(ThriftGen::Future ff) override
Mutator open_mutator(const ThriftGen::Namespace ns, const String &table, int32_t flags, int32_t flush_interval) override
void set_end_time(int64_t end)
TableMutatorAsync * _open_mutator_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags)
Declarations for ProfileDataScanner. 
std::vector< ThriftGen::Cell > ThriftCells
void close_mutator_async(const MutatorAsync mutator) override
void operator=(const Statistics &other)
int32_t cell_limit_per_family
void offer_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftGen::Cell &cell) override
void set_cells(const Cells &cells)
Insert a bunch of cells into the table (atomically if cells are in the same range/row) ...
void async_scanner_close(const ScannerAsync scanner_async) override
void cancel_future(ThriftGen::Future ff) override
static time_point now() noexcept
void merge_options(const ColumnFamilyOptions &other)
Merges options from another ColumnFamilyOptions object. 
void cancel()
Cancels outstanding scanners/mutators. 
void _set_cell(const Mutator mutator, const CellT &cell)
void mutator_set_cells(const Mutator mutator, const ThriftCells &cells) override
ThriftGen::MutateSpec m_mutate_spec
void get_listing(bool include_sub_entries, std::vector< NamespaceListing > &listing)
Returns a list of existing tables & namesspaces. 
std::set< std::string > servers
Set of server proxy names participating in scan. 
const char * get_text(int error)
Returns a descriptive error message. 
void remove_scanner(int64_t id)
void set_row_regexp(const char *regexp)
Sets the regexp to filter rows by. 
TableMutator * get_mutator(int64_t id)
void convert_cell(const ThriftGen::Cell &tcell, Hypertable::Cell &hcell)
void create_table(const std::string &name, const std::string &schema_str)
Creates a table. 
void set_keys_only(bool val)
Return only keys (no values) 
int64_t cell_str_to_num(const std::string &from, const char *label, int64_t min_num=INT64_MIN, int64_t max_num=INT64_MAX)
void async_mutator_set_cell(const MutatorAsync mutator, const ThriftGen::Cell &cell) override
SchemaPtr get_schema(const std::string &name)
Returns a smart ptr to a schema object for a table. 
int64_t get_cached_object_id(ClientObjectPtr co)
Scanner open_scanner(const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
bool get_in_memory() const 
Gets in memory option. 
SharedMutatorMapKey(Hypertable::Namespace *ns, const String &tablename, const ThriftGen::MutateSpec &mutate_spec)
int64_t disk_read
Number of bytes read from disk while executing scan. 
void table_get_schema(ThriftGen::Schema &result, const ThriftGen::Namespace ns, const String &table) override
bool and_column_predicates
void get_cell(Value &result, const ThriftGen::Namespace ns, const String &table, const String &row, const String &column) override
void reserve_column_predicates(size_t s)
Logging routines and macros. 
void table_get_id(String &result, const ThriftGen::Namespace ns, const String &table) override
Hypertable::Namespace * m_namespace
void merge_options(const AccessGroupOptions &options)
Merges options with those from another AccessGroupOptions object. 
std::mutex shared_mutator_mutex
void table_get_splits(std::vector< ThriftGen::TableSplit > &_return, const ThriftGen::Namespace ns, const String &table) override
void future_get_result_serialized(ThriftGen::ResultSerialized &tresult, ThriftGen::Future ff, int timeout_millis) override
void release_handler(ServerHandler *serverHandler)
bool next(Cell &cell)
Gets the next cell. 
void next_cells(ThriftCells &result, const Scanner scanner_id) override
MutatorAsync open_mutator_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags) override
Compatibility Macros for C/C++. 
Mutator mutator_open(const ThriftGen::Namespace ns, const String &table, int32_t flags, int32_t flush_interval) override
void future_get_result(ThriftGen::Result &tresult, ThriftGen::Future ff, int timeout_millis) override
void hql_exec(HqlResult &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
int64_t try_get_object_id(ClientObject *co)
std::unordered_map< ::int64_t, ClientObjectPtr > ObjectMap
bool get(ResultPtr &result)
This call blocks till there is a result available unless async ops have completed. 
void get_cells_as_arrays(ThriftCellsAsArrays &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
void get_schema(ThriftGen::Schema &result, const ThriftGen::Namespace ns, const String &table) override
TableMutator * _open_mutator(const ThriftGen::Namespace ns, const String &table)
const std::string & get_access_group() const 
Gets access group name. 
bool operator<(const SharedMutatorMapKey &skey1, const SharedMutatorMapKey &skey2)
void set_return_deletes(bool val)
Internal use only. 
void shared_mutator_set_cell_as_array(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellAsArray &cell) override
bool future_is_cancelled(ThriftGen::Future ff) override
::uint32_t next_threshold
Initialization helper for applications. 
void get_table_splits(std::vector< ThriftGen::TableSplit > &_return, const ThriftGen::Namespace ns, const String &table) override
Helper class for building a ScanSpec. 
bool set_time_order_desc(bool value)
Sets time order desc option. 
bool is_empty()
Checks whether the Future result queue is empty. 
void set_cell_limit_per_family(int32_t n)
Sets the maximum number of cells to return per column family. 
void set_do_not_cache(bool val)
Don't cache. 
TableScanner * _open_scanner(const ThriftGen::Namespace ns, const String &table, const Hypertable::ScanSpec &ss)
#define LOG_SLOW_QUERY(_pd_, _ns_, _hql_)
Represents a column predicate (e.g. 
HqlInterpreter::Callback Parent
int32_t get_max_versions() const 
Gets max versions option. 
int32_t get_buffer_length()
Synchronous table scanner. 
void future_cancel(ThriftGen::Future ff) override
bool exists_namespace(const String &ns) override
bool set_counter(bool value)
Sets counter option. 
void refresh_shared_mutator(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec) override
ObjectMap m_cached_object_map
Time related declarations. 
bool is_set_compressor() const 
Checks if compressor option is set. 
void status(ThriftGen::Status &_return) override
void async_scanner_cancel(const ScannerAsync scanner) override
void set_cells_async(const MutatorAsync mutator, const ThriftCells &cells) override
bool is_set_time_order_desc() const 
Checks if time_order_desc option is set. 
Base class for Hypertable client objects. 
time_t get_ttl() const 
Gets ttl option. 
ScannerAsync async_scanner_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss) override
Access group specification. 
bool remove_object(int64_t id)
bool table_exists(const ThriftGen::Namespace ns, const String &table) override
int32_t convert_cells(const Hypertable::Cells &hcells, ThriftCells &tcells)
int16_t get_replication() const 
Gets replication option. 
void remove_scanner(int64_t id, ClientObjectPtr &scanner, ScannerInfoPtr &info)
ScannerInfo(int64_t ns, const string &t)
void mutator_set_cell_as_array(const Mutator mutator, const CellAsArray &cell) override
void hql_query2(HqlResult2 &result, const ThriftGen::Namespace ns, const String &hql) override
void async_mutator_set_cell_as_array(const MutatorAsync mutator, const CellAsArray &cell) override
std::shared_ptr< HqlInterpreter > HqlInterpreterPtr
Smart pointer to HqlInterpreter. 
void refresh_table(const ThriftGen::Namespace ns, const String &table_name) override
const std::string & get_name() const 
Gets column family name. 
long long int Lld
Shortcut for printf formats. 
bool table_exists(ContextPtr &context, const String &name, String &id)
Checks if table exists and returns table ID. 
void merge_defaults(const ColumnFamilyOptions &options)
Merges column family defaults with those from another AccessGroupOptions object. 
Hypertable::Namespace * get_namespace(int64_t id)
Callback interface/base class for execute. 
TableScannerAsync * get_scanner_async(int64_t id)
void add_column(const string &str)
Adds a column family to be returned by the scan. 
void add_row_interval(const string &start, bool start_inclusive, const string &end, bool end_inclusive)
Adds a row interval to be returned in the scan. 
bool convert_access_group_options(const Hypertable::AccessGroupOptions &hoptions, ThriftGen::AccessGroupOptions &toptions)
HqlServiceIf * getHandler(const ::apache::thrift::TConnectionInfo &connInfo) override
TableScanner * get_scanner(int64_t id, ScannerInfoPtr &info)
ProfileDataScanner profile_data
int64_t get_object_id(ClientObject *co)
ThriftGen::Namespace open_namespace(const String &ns) override
void on_return(const std::string &) override
Called when interpreter returns a string result Maybe called multiple times for a list of string resu...
void drop_namespace(const String &ns, const bool if_exists) override
void shared_mutator_set_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCellsAsArrays &cells) override
static ServerHandlerFactory instance
void alter_table(const std::string &table_name, SchemaPtr &schema, bool force)
Alter table schema. 
bool exists_table(const std::string &name)
Checks if the table exists. 
void log_slow_query(const char *func_name, std::chrono::fast_clock::time_point start_time, std::chrono::fast_clock::time_point end_time, int64_t latency_ms, ProfileDataScanner &profile_data, Hypertable::Namespace *ns, const string &hql)
void drop_table(const ThriftGen::Namespace ns, const String &table, const bool if_exists) override
void set_access_group(const std::string &ag)
Sets access group. 
const char * column_family
#define HT_INFOF(msg,...)
void set_blocksize(int32_t blocksize)
Sets blocksize option. 
#define HT_THROWF(_code_, _fmt_,...)
void close_scanner(const Scanner scanner) override
void table_alter(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Schema &schema) override
Random number generator for int32, int64, double and ascii arrays. 
void close_mutator(const Mutator mutator) override
void get_profile_data(ProfileDataScanner &profile_data)
Gets profile data. 
void offer_cell_as_array(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellAsArray &cell) override
void table_get_schema_str_with_ids(String &result, const ThriftGen::Namespace ns, const String &table) override
::uint32_t future_capacity
bool exists_table(const ThriftGen::Namespace ns, const String &table) override
#define LOG_SLOW_QUERY_SCANNER(_scanner_, _ns_, _table_, _ss_)
RowIntervals row_intervals
bool namespace_exists(const String &ns) override
std::string get_schema_str(const std::string &name, bool with_ids=false)
Returns the schema for a table. 
bool has_outstanding()
Checks whether there are any outstanding operations. 
std::map< SharedMutatorMapKey, TableMutator * > SharedMutatorMap
void table_drop(const ThriftGen::Namespace ns, const String &table, const bool if_exists) override
void set_cell_as_array(const ThriftGen::Namespace ns, const String &table, const CellAsArray &cell) override
static String install_dir
The installation directory. 
This is a generic exception class for Hypertable. 
void _set_cells(const Mutator mutator, const vector< CellT > &cells)
void remove_references(int64_t id)
void scanner_get_row_serialized(CellsSerialized &result, const Scanner scanner_id) override
int main(int argc, char **argv)
void rename_table(const std::string &old_name, const std::string &new_name)
Renames a table. 
void flush_mutator(const Mutator mutator) override
void set_cells_serialized(const ThriftGen::Namespace ns, const String &table, const CellsSerialized &cells) override
void add_reference(int64_t from, int64_t to)
int64_t get_object_id(TableMutatorPtr &mutator)
void set_in_memory(bool value)
Sets in memory option. 
ServerHandler * get_handler(const String &remotePeer)
void future_close(const ThriftGen::Future ff) override
void get_tables(std::vector< String > &tables, const ThriftGen::Namespace ns) override
bool is_set_blocksize() const 
Checks if blocksize option is set. 
void set_cells_as_arrays_async(const MutatorAsync mutator, const ThriftCellsAsArrays &cells) override
void get_listing(std::vector< ThriftGen::NamespaceListing > &_return, const ThriftGen::Namespace ns) override
bool scan_and_filter_rows
TableMutatorAsync * get_mutator_async(int64_t id)
#define HT_ERRORF(msg,...)
void hql_query(HqlResult &result, const ThriftGen::Namespace ns, const String &hql) override
void generate_guid(std::string &_return) override
void get_future_result_serialized(ThriftGen::ResultSerialized &tresult, ThriftGen::Future ff, int timeout_millis) override
const std::string & get_name() const 
void namespace_drop(const String &ns, const bool if_exists) override
void set_and_column_predicates(bool val)
AND together the column predicates. 
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema. 
void remove_future(int64_t id)
void releaseHandler(::Hypertable::ThriftGen::ClientServiceIf *service) override
Represents a cell interval. 
void add_cell_interval(const string &start_row, const string &start_column, bool start_inclusive, const string &end_row, const string &end_column, bool end_inclusive)
Adds a cell interval to be returned in the scan. 
void scanner_get_cells(ThriftCells &result, const Scanner scanner_id) override
void scanner_close(const Scanner id) override
Scanner scanner_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
void set_cell_async(const MutatorAsync mutator, const ThriftGen::Cell &cell) override
void add_column_predicate(const string &column_family, const char *column_qualifier, uint32_t operation, const char *value, uint32_t value_len=0)
Adds a column predicate to the scan. 
ThriftGen::Future open_future(int capacity) override
void reserve_cells(size_t s)
void _offer_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const vector< CellT > &cells)
void set_compressor(const std::string &compressor)
Sets compressor option. 
CellIntervals cell_intervals
Hypertable::Client * client
void get_schema_str_with_ids(String &result, const ThriftGen::Namespace ns, const String &table) override
void async_mutator_set_cells(const MutatorAsync mutator, const ThriftCells &cells) override
void add(const Cell &cell, bool own=true)
HqlCallback(ResultT &r, ServerHandler *handler, const ThriftGen::Namespace ns, const String &hql, bool flush, bool buffered)
std::vector< CellAsArray > ThriftCellsAsArrays
void set_row_limit(int32_t n)
Sets the maximum number of rows to return in the scan. 
void hql_exec2(HqlResult2 &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
Encapsulates decomposed key and value. 
void set_max_versions(uint32_t n)
Sets the maximum number of revisions of each cell to return in the scan. 
void reserve_columns(size_t s)
static time_t to_time_t(const time_point &__t) noexcept
void shared_mutator_refresh(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec) override
void finalize(uint8_t flag)
ThriftGen::Namespace namespace_open(const String &ns) override
void drop_table(const std::string &name, bool if_exists)
Removes a table. 
std::shared_ptr< ScannerInfo > ScannerInfoPtr
const char * column_family
#define HT_THROW(_code_, _msg_)
static const char * END_ROW_MARKER
void future_get_result_as_arrays(ThriftGen::ResultAsArrays &tresult, ThriftGen::Future ff, int timeout_millis) override
void set_scan_and_filter_rows(bool val)
Scan and filter rows. 
bool is_full()
Checks whether the Future result queue is full. 
std::unordered_map< ::int64_t, ScannerInfoPtr > m_scanner_info_map
void close_future(const ThriftGen::Future ff) override
void next_row_serialized(CellsSerialized &result, const Scanner scanner_id) override
void close_scanner_async(const ScannerAsync scanner_async) override
void table_get_schema_str(String &result, const ThriftGen::Namespace ns, const String &table) override
void get_row(ThriftCells &result, const ThriftGen::Namespace ns, const String &table, const String &row) override
const char * error_get_text(int error_code)
Retrieves a descriptive error string of an error code. 
static void seed(unsigned int s)
Sets the seed of the random number generator. 
std::map< String, std::pair< int, ServerHandler * > > ServerHandlerMap
int64_t bytes_returned
Number of bytes returned while executing scan. 
void rename_table(const ThriftGen::Namespace ns, const String &table, const String &new_table_name) override
const std::string & get_compressor() const 
Gets compressor option. 
void set_cell_offset(int32_t n)
Sets the number of cells to be skipped at the beginning of the query. 
void get_cells_serialized(CellsSerialized &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
std::shared_ptr< Table > TablePtr
void next_row_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void set_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Cell &cell) override
void offer_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCellsAsArrays &cells) override
int64_t get_scanner_id(TableScanner *scanner, ScannerInfoPtr &info)
void get_future_result_as_arrays(ThriftGen::ResultAsArrays &tresult, ThriftGen::Future ff, int timeout_millis) override
ClientObject * get_object(int64_t id)
bool remove_cached_object(int64_t id)
SharedMutatorMap shared_mutator_map
virtual ~ThriftBrokerIfFactory()
int code() const 
Returns the error code. 
void remove_namespace(int64_t id)
void _next(vector< CellT > &result, TableScanner *scanner, int limit)
void refresh_table(const std::string &name)
Refreshes the cached table entry. 
void namespace_close(const ThriftGen::Namespace ns) override
const std::string & get_bloom_filter() const 
Gets bloom filter option. 
void set_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftCellsAsArrays &cells) override