73 #include <boost/algorithm/string.hpp>
75 #if defined(TCMALLOC_MINIMAL)
76 #include <gperftools/tcmalloc.h>
77 #include <gperftools/malloc_extension.h>
78 #elif defined(TCMALLOC)
79 #include <gperftools/heap-checker.h>
80 #include <gperftools/heap-profiler.h>
81 #include <gperftools/malloc_extension.h>
82 #include <gperftools/tcmalloc.h>
94 #include <unordered_map>
100 #include <sys/resource.h>
112 : m_props(props), m_conn_manager(conn_mgr),
113 m_app_queue(app_queue), m_hyperspace(hyperspace) {
116 std::make_shared<MetricsCollectorGanglia>(
"rangeserver", props);
119 m_context->props = props;
120 m_context->comm = conn_mgr->get_comm();
121 m_context->server_state = std::make_shared<ServerState>();
122 m_context->live_map = make_shared<TableInfoMap>();
134 = cfg.get_bool(
"Range.IgnoreCellsWithClockSkew");
141 cfg.get_i32(
"AccessGroup.GarbageThreshold.Percentage");
148 port = cfg.get_i16(
"Port");
155 vector<String> specs = cfg.get_strs(
"LowActivityPeriod");
157 specs.push_back(
"* 2-4 * * *");
158 else if (find(specs.begin(), specs.end(),
"none") != specs.end())
164 uint32_t maintenance_threads;
167 maintenance_threads = std::max(((disk_count*3)/2), (int32_t)
m_cores);
168 if (maintenance_threads < 2)
169 maintenance_threads = 2;
170 maintenance_threads = cfg.get_i32(
"MaintenanceThreads", maintenance_threads);
171 HT_INFOF(
"drive count = %d, maintenance threads = %d", disk_count, maintenance_threads);
181 int64_t interval = (int64_t)cfg.get_i32(
"Maintenance.Interval");
204 cfg.get_i32(
"AccessGroup.CellCache.ScannerCacheSize");
206 if (m_scanner_ttl < (time_t)10000) {
207 HT_WARNF(
"Value %u for Hypertable.RangeServer.Scanner.ttl is too small, "
208 "setting to 10000", (
unsigned int)m_scanner_ttl);
209 m_scanner_ttl = (time_t)10000;
213 if (cfg.
has(
"MemoryLimit"))
216 double pct = std::max(1.0, std::min((
double)cfg.get_i32(
"MemoryLimit.Percentage"), 99.0)) / 100.0;
220 if (cfg.
has(
"MemoryLimit.EnsureUnused"))
222 else if (cfg.
has(
"MemoryLimit.EnsureUnused.Percentage")) {
223 double pct = std::max(1.0, std::min((
double)cfg.get_i32(
"MemoryLimit.EnsureUnused.Percentage"), 99.0)) / 100.0;
232 HT_NOTICEF(
"Start up in low memory condition (free memory %.2fMB)", mem_stat.
free);
235 int64_t block_cache_min = cfg.get_i64(
"BlockCache.MinMemory");
236 int64_t block_cache_max = cfg.get_i64(
"BlockCache.MaxMemory");
237 if (block_cache_max == -1) {
239 block_cache_max = (int64_t)physical_ram;
241 if (block_cache_min > block_cache_max)
242 block_cache_min = block_cache_max;
244 if (block_cache_max > 0)
246 cfg.get_bool(
"BlockCache.Compressed"));
248 int64_t query_cache_memory = cfg.get_i64(
"QueryCache.MaxMemory");
249 if (query_cache_memory > 0) {
252 query_cache_memory = (int64_t)((
double)Global::memory_limit * 0.2);
253 props->set(
"Hypertable.RangeServer.QueryCache.MaxMemory", query_cache_memory);
254 HT_INFOF(
"Maximum size of query cache has been reduced to %.2fMB", (
double)query_cache_memory /
Property::MiB);
256 m_query_cache = std::make_shared<QueryCache>(query_cache_memory);
264 if (props->has(
"FsBroker.Timeout"))
265 dfs_timeout = props->get_i32(
"FsBroker.Timeout");
267 dfs_timeout = props->get_i32(
"Hypertable.Request.Timeout");
269 if (!dfsclient->wait_for_connection(dfs_timeout))
279 if (cfg.
has(
"CommitLog.DfsBroker.Host")) {
280 String loghost = cfg.get_str(
"CommitLog.DfsBroker.Host");
281 uint16_t logport = cfg.get_i16(
"CommitLog.DfsBroker.Port");
284 dfsclient = std::make_shared<FsBroker::Lib::Client>(conn_mgr, addr, dfs_timeout);
286 if (!dfsclient->wait_for_connection(30000))
303 InetAddr listen_addr(INADDR_ANY, port);
305 m_context->comm->listen(listen_addr, chfp);
308 HT_ERRORF(
"Unable to listen on port %u - %s - %s",
310 quick_exit(EXIT_SUCCESS);
317 <<
" has been marked removed in hyperspace" <<
HT_END;
318 quick_exit(EXIT_FAILURE);
322 int timeout = props->get_i32(
"Hypertable.Request.Timeout");
324 = make_shared<ConnectionHandler>(m_context->comm,
m_app_queue,
this);
339 uint32_t max_memory_percentage =
340 cfg.get_i32(
"CommitLog.PruneThreshold.Max.MemoryPercentage");
342 HT_ASSERT(max_memory_percentage >= 0 && max_memory_percentage <= 100);
344 double max_memory_ratio = (double)max_memory_percentage / 100.0;
346 int64_t threshold_max = (int64_t)(mem_stat.
ram *
347 max_memory_ratio * (
double)
MiB);
352 convert(props->get_str(
"Hypertable.LogFlushMethod.Meta"));
355 convert(props->get_str(
"Hypertable.LogFlushMethod.User"));
359 m_context->live_map);
366 m_timer_handler->start();
384 Timer timer(cb->
event()->header.timeout_ms,
true);
388 status.
get(&code, text);
390 status.
set(code,
format(
"[fsbroker] %s", text.c_str()));
405 m_timer_handler->shutdown();
415 auto deadline = chrono::steady_clock::now() + chrono::seconds(30);
416 m_app_queue->wait_for_idle(deadline, 1);
418 lock_guard<mutex> lock(m_stats_mutex);
420 if (m_group_commit_timer_handler)
421 m_group_commit_timer_handler->shutdown();
424 m_update_pipeline_user->shutdown();
425 if (m_update_pipeline_system)
426 m_update_pipeline_system->shutdown();
427 if (m_update_pipeline_metadata)
428 m_update_pipeline_metadata->shutdown();
468 m_app_queue->shutdown();
472 quick_exit(EXIT_FAILURE);
491 m_existence_file_handle = m_hyperspace->open(top_dir, oflags);
497 &lock_status, &m_existence_file_sequencer);
504 HT_INFOF(
"Waiting for exclusive lock on hyperspace:/%s ...",
506 this_thread::sleep_for(chrono::milliseconds(5000));
520 HT_THROW2F(e.
code(), e,
"Problem creating commit log directory '%s': %s",
521 path.c_str(), e.what());
530 struct ByFragmentNumber {
532 int num_x = atoi(x.
name.c_str());
533 int num_y = atoi(y.
name.c_str());
534 return num_x < num_y;
538 void add_mark_file_to_commit_logs(
const String &logname) {
539 vector<Filesystem::Dirent> listing;
540 vector<Filesystem::Dirent> listing2;
549 HT_FATALF(
"Unable to read log directory '%s'", logdir.c_str());
552 if (listing.size() == 0)
555 sort(listing.begin(), listing.end(), ByFragmentNumber());
558 for (
auto &entry : listing) {
559 String fragment_file = logdir +
"/" + entry.name;
562 HT_INFOF(
"Removing log fragment '%s' because it has zero length",
563 fragment_file.c_str());
567 listing2.push_back(entry);
570 HT_FATALF(
"Unable to check fragment file '%s'", fragment_file.c_str());
574 if (listing2.size() == 0)
578 long num = strtol(listing2.back().name.c_str(), &endptr, 10);
579 String mark_filename = logdir +
"/" + (int64_t)num +
".mark";
589 HT_FATALF(
"Unable to create file '%s'", mark_filename.c_str());
605 std::vector<MetaLog::EntityPtr> entities, stripped_entities;
613 make_shared<MetaLog::Reader>(
Global::log_dfs, rsml_definition, rsml_dir);
616 HT_FATALF(
"Problem reading RSML %s: %s - %s", rsml_dir.c_str(),
621 std::vector<MaintenanceTask*> maintenance_tasks;
622 auto now = chrono::steady_clock::now();
624 rsml_reader->get_entities(entities);
626 if (!entities.empty()) {
628 << rsml_definition->name() <<
", start recovering"<<
HT_END;
633 add_mark_file_to_commit_logs(
"root");
634 add_mark_file_to_commit_logs(
"metadata");
635 add_mark_file_to_commit_logs(
"system");
636 add_mark_file_to_commit_logs(
"user");
641 for (
auto & entity : entities) {
642 if (dynamic_pointer_cast<MetaLog::EntityTask>(entity))
644 else if (dynamic_cast<MetaLogEntityRange *>(entity.get())) {
649 if (strstr(transfer_log.c_str(),
"phantom") != 0) {
654 HT_WARNF(
"Problem removing phantom log %s - %s", transfer_log.c_str(),
661 else if (dynamic_cast<MetaLogEntityRemoveOkLogs *>(entity.get())) {
664 if (remove_ok_logs->decode_version() > 1)
669 stripped_entities.push_back(entity);
673 entities.swap(stripped_entities);
681 for (
auto & entity : entities) {
685 String end_row = range_entity->get_end_row();
686 range_entity->get_table_identifier(table);
688 replay_load_range(replay_map, range_entity);
692 if (!replay_map.
empty()) {
695 replay_log(replay_map, root_log_reader);
697 root_log_reader->get_linked_logs(transfer_logs);
700 ranges.
array.clear();
702 for (
auto &rd : ranges.
array) {
703 rd.range->recovery_finalize();
714 m_context->live_map->merge(&replay_map);
718 +
"/root", m_props, root_log_reader.get());
720 m_log_replay_barrier->set_root_complete();
723 if (!maintenance_tasks.empty()) {
724 for (
size_t i=0; i<maintenance_tasks.size(); i++)
726 maintenance_tasks.clear();
730 for (
auto & entity : entities) {
734 String end_row = range_entity->get_end_row();
735 range_entity->get_table_identifier(table);
737 replay_load_range(replay_map, range_entity);
741 if (!replay_map.
empty()) {
742 metadata_log_reader =
743 make_shared<CommitLogReader>(
Global::log_dfs, Global::log_dir +
"/metadata");
745 replay_log(replay_map, metadata_log_reader);
747 metadata_log_reader->get_linked_logs(transfer_logs);
750 ranges.
array.clear();
752 for (
auto &rd : ranges.
array) {
753 rd.range->recovery_finalize();
764 m_context->live_map->merge(&replay_map);
766 if (root_log_reader || metadata_log_reader) {
768 Global::log_dir +
"/metadata",
769 m_props, metadata_log_reader.get());
770 m_update_pipeline_metadata =
771 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
775 m_log_replay_barrier->set_metadata_complete();
778 if (!maintenance_tasks.empty()) {
779 for (
size_t i=0; i<maintenance_tasks.size(); i++)
781 maintenance_tasks.clear();
785 for (
auto & entity : entities) {
789 range_entity->get_table_identifier(table);
791 replay_load_range(replay_map, range_entity);
795 if (!replay_map.
empty()) {
797 make_shared<CommitLogReader>(
Global::log_dfs, Global::log_dir +
"/system");
799 replay_log(replay_map, system_log_reader);
801 system_log_reader->get_linked_logs(transfer_logs);
804 ranges.
array.clear();
806 for (
auto &rd : ranges.
array) {
807 rd.range->recovery_finalize();
818 m_context->live_map->merge(&replay_map);
822 if (system_log_reader) {
824 Global::log_dir +
"/system", m_props,
825 system_log_reader.get());
826 m_update_pipeline_system =
827 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
831 m_log_replay_barrier->set_system_complete();
834 if (!maintenance_tasks.empty()) {
835 for (
size_t i=0; i<maintenance_tasks.size(); i++)
837 maintenance_tasks.clear();
840 if (m_props->get_bool(
"Hypertable.RangeServer.LoadSystemTablesOnly"))
844 for (
auto & entity : entities) {
848 range_entity->get_table_identifier(table);
850 replay_load_range(replay_map, range_entity);
854 if (!replay_map.
empty()) {
856 Global::log_dir +
"/user");
858 replay_log(replay_map, user_log_reader);
860 user_log_reader->get_linked_logs(transfer_logs);
863 ranges.
array.clear();
865 for (
auto &rd : ranges.
array) {
866 rd.range->recovery_finalize();
878 m_context->live_map->merge(&replay_map);
881 +
"/user", m_props, user_log_reader.get(),
false);
883 m_update_pipeline_user =
884 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
887 m_log_replay_barrier->set_user_complete();
890 if (!maintenance_tasks.empty()) {
891 for (
size_t i=0; i<maintenance_tasks.size(); i++)
893 maintenance_tasks.clear();
900 lock_guard<mutex> lock(m_mutex);
908 +
"/root", m_props, root_log_reader.get());
910 if (root_log_reader || metadata_log_reader) {
912 +
"/metadata", m_props, metadata_log_reader.get());
913 m_update_pipeline_metadata =
914 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
918 if (system_log_reader) {
920 +
"/system", m_props, system_log_reader.get());
921 m_update_pipeline_system =
922 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
927 +
"/user", m_props, user_log_reader.get(),
false);
929 m_update_pipeline_user =
930 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
935 Global::log_dir +
"/" + rsml_definition->name(),
938 m_log_replay_barrier->set_root_complete();
939 m_log_replay_barrier->set_metadata_complete();
940 m_log_replay_barrier->set_system_complete();
941 m_log_replay_barrier->set_user_complete();
958 const uint8_t *buf_saved = *bufp;
959 size_t remain_saved = *remainp;
961 tid->
decode(bufp, remainp);
966 *remainp = remain_saved;
986 range_entity->get_table_identifier(table);
987 range_entity->get_range_spec(range_spec);
991 replay_map.
get(table.
id, table_info);
995 if (table_info->maintenance_disabled())
996 m_maintenance_scheduler->exclude(table);
998 m_context->live_map->get(table.
id, live_table_info);
1001 HT_ASSERT(!live_table_info->get_range(range_spec, range));
1006 int64_t generation = live_table_info->get_schema()->get_generation();
1008 range_entity->set_table_generation(generation);
1018 uint32_t timeout_ms = m_props->get_i32(
"Hypertable.Request.Timeout");
1028 schema = table_info->get_schema();
1030 range = make_shared<Range>(m_master_client, schema, range_entity,
1031 live_table_info.get());
1033 range->recovery_initialize();
1035 table_info->add_range(range);
1037 HT_INFOF(
"Successfully replay loaded range %s", range->get_name().c_str());
1042 HT_WARNF(
"Skipping recovery of %s[%s..%s] - %s",
1061 String start_row, end_row;
1062 unsigned long block_count = 0;
1066 while (log_reader->next((
const uint8_t **)&base, &len, &header)) {
1068 const uint8_t *ptr = base;
1069 const uint8_t *end = base + len;
1071 decode_table_id(&ptr, &len, &table_id);
1074 if (!replay_map.
lookup(table_id.
id, table_info))
1077 bool pair_loaded =
false;
1096 while (pair_loaded) {
1097 if (!table_info->find_containing_range(key.
row, range, start_row, end_row)) {
1098 pair_loaded =
false;
1101 lock_guard<Range> lock(*range);
1103 range->add(key, value);
1105 pair_loaded =
false;
1119 }
while (start_row.compare(key.
row) < 0 && end_row.compare(key.
row) >= 0);
1126 HT_INFOF(
"Replayed %lu blocks of updates from '%s'", block_count,
1127 log_reader->get_log_dir().c_str());
1132 const char *row, int32_t flags) {
1135 String start_row, end_row;
1137 size_t range_count = 0;
1140 HT_INFOF(
"compacting table ID=%s ROW=%s FLAGS=%s", table.
id, row,
1143 HT_INFOF(
"compacting ranges FLAGS=%s",
1146 if (!m_log_replay_barrier->wait_for_user(cb->
event()->deadline()))
1160 HT_INFOF(
"compaction type = 0x%x", compaction_type);
1166 if (!m_context->live_map->lookup(table.
id, table_info)) {
1172 if (!table_info->find_containing_range(row, range, start_row, end_row)) {
1174 format(
"Unable to find range for row '%s'", row));
1177 range->set_compaction_type_needed(compaction_type);
1181 ranges.
array.clear();
1182 table_info->get_ranges(ranges);
1183 for (
auto &rd : ranges.
array)
1184 rd.range->set_compaction_type_needed(compaction_type);
1185 range_count = ranges.
array.size();
1189 std::vector<TableInfoPtr> tables;
1191 m_context->live_map->get_all(tables);
1193 for (
size_t i=0; i<tables.size(); i++) {
1195 if (tables[i]->identifier().is_metadata()) {
1199 ranges.
array.clear();
1200 tables[i]->get_ranges(ranges);
1201 for (
auto &rd : ranges.
array)
1202 rd.range->set_compaction_type_needed(compaction_type);
1203 range_count += ranges.
array.size();
1206 Lib::RangeServer::Protocol::COMPACT_FLAG_ROOT) {
1207 ranges.
array.clear();
1208 tables[i]->get_ranges(ranges);
1209 for (
auto &rd : ranges.
array) {
1210 if (rd.range->is_root()) {
1211 rd.range->set_compaction_type_needed(compaction_type);
1218 else if (tables[i]->identifier().is_system()) {
1220 ranges.
array.clear();
1221 tables[i]->get_ranges(ranges);
1222 for (
auto &rd : ranges.
array)
1223 rd.range->set_compaction_type_needed(compaction_type);
1224 range_count += ranges.
array.size();
1229 ranges.
array.clear();
1230 tables[i]->get_ranges(ranges);
1231 for (
auto &rd : ranges.
array)
1232 rd.range->set_compaction_type_needed(compaction_type);
1233 range_count += ranges.
array.size();
1239 HT_INFOF(
"Compaction scheduled for %d ranges", (
int)range_count);
1256 const char *table_id,
bool do_start_row,
bool do_location) {
1258 String start_row, end_row;
1261 rd.
range->get_boundary_rows(start_row, end_row);
1263 metadata_key_str =
String(table_id) +
":" + end_row;
1264 key.
row = metadata_key_str.c_str();
1265 key.
row_len = metadata_key_str.length();
1271 mutator->set(key, start_row);
1281 const char *table_id,
bool do_start_row,
bool do_location) {
1282 for (
auto &rd : ranges.
array)
1283 do_metadata_sync(rd, mutator, table_id, do_start_row, do_location);
1290 uint32_t flags, std::vector<const char *> columns) {
1293 size_t range_count = 0;
1295 bool do_start_row =
true;
1296 bool do_location =
true;
1299 if (!columns.empty()) {
1300 columns_str =
String(
"COLUMNS=") + columns[0];
1301 for (
size_t i=1; i<columns.size(); i++)
1302 columns_str +=
String(
",") + columns[i];
1306 HT_INFOF(
"metadata sync table ID=%s %s", table_id, columns_str.c_str());
1308 HT_INFOF(
"metadata sync ranges FLAGS=%s %s",
1310 columns_str.c_str());
1312 if (!m_log_replay_barrier->wait_for_user(cb->
event()->deadline()))
1320 uint32_t timeout_ms = m_props->get_i32(
"Hypertable.Request.Timeout");
1331 if (!columns.empty()) {
1332 do_start_row = do_location =
false;
1333 for (
size_t i=0; i<columns.size(); i++) {
1334 if (!strcmp(columns[i],
"StartRow"))
1335 do_start_row =
true;
1336 else if (!strcmp(columns[i],
"Location"))
1339 HT_WARNF(
"Unsupported METADATA column: %s", columns[i]);
1347 if (!m_context->live_map->lookup(table_id, table_info)) {
1354 ranges.
array.clear();
1355 table_info->get_ranges(ranges);
1357 do_metadata_sync(ranges, mutator, table_id, do_start_row, do_location);
1358 range_count = ranges.
array.size();
1362 std::vector<TableInfoPtr> tables;
1364 m_context->live_map->get_all(tables);
1368 for (
size_t i=0; i<tables.size(); i++) {
1370 if (tables[i]->identifier().is_metadata()) {
1372 ranges.
array.clear();
1373 tables[i]->get_ranges(ranges);
1375 if (!ranges.
array.empty()) {
1376 if (ranges.
array[0].range->is_root() &&
1378 do_metadata_sync(ranges.
array[0], mutator, table_id, do_start_row, do_location);
1382 Lib::RangeServer::Protocol::COMPACT_FLAG_METADATA) {
1383 do_metadata_sync(ranges, mutator, table_id, do_start_row, do_location);
1384 range_count += ranges.
array.size();
1388 else if (tables[i]->identifier().is_system()) {
1390 Lib::RangeServer::Protocol::COMPACT_FLAG_SYSTEM) {
1391 ranges.
array.clear();
1392 tables[i]->get_ranges(ranges);
1393 do_metadata_sync(ranges, mutator, table_id, do_start_row, do_location);
1394 range_count += ranges.
array.size();
1397 else if (tables[i]->identifier().is_user()) {
1399 Lib::RangeServer::Protocol::COMPACT_FLAG_USER) {
1400 ranges.
array.clear();
1401 tables[i]->get_ranges(ranges);
1402 do_metadata_sync(ranges, mutator, table_id, do_start_row, do_location);
1403 range_count += ranges.
array.size();
1412 HT_INFOF(
"METADATA sync'd for %d ranges", (
int)range_count);
1440 bool decrement_needed=
false;
1445 if (!m_log_replay_barrier->wait(cb->
event()->deadline(), table, range_spec))
1456 "can only scan one row interval");
1459 "both row and cell intervals defined");
1464 "can only scan one cell interval");
1466 if (!m_context->live_map->lookup(table.
id, table_info))
1469 if (!table_info->get_range(range_spec, range))
1473 schema = table_info->get_schema();
1476 if (schema->get_generation() != table.
generation) {
1478 "RangeServer Schema generation for table '%s'"
1479 " is %lld but supplied is %lld",
1480 table.
id, (
Lld)schema->get_generation(),
1484 range->deferred_initialization(cb->
event()->header.timeout_ms);
1486 if (!range->increment_scan_counter())
1488 "Range %s[%s..%s] dropped or relinquished",
1491 decrement_needed =
true;
1493 String start_row, end_row;
1494 range->get_boundary_rows(start_row, end_row);
1497 if (strcmp(start_row.c_str(), range_spec.
start_row) ||
1498 strcmp(end_row.c_str(), range_spec.
end_row))
1503 if (cache_key && m_query_cache && !table.
is_metadata()) {
1504 boost::shared_array<uint8_t> ext_buffer;
1506 uint32_t cell_count;
1507 if (m_query_cache->lookup(cache_key, ext_buffer, &ext_len, &cell_count)) {
1508 if ((error = cb->
response(
id, 0, 0,
false, profile_data, ext_buffer, ext_len))
1511 range->decrement_scan_counter();
1517 std::set<uint8_t> columns;
1518 scan_ctx = make_shared<ScanContext>(range->get_scan_revision(cb->
event()->header.timeout_ms),
1519 &scan_spec, &range_spec, schema, &columns);
1520 scan_ctx->timeout_ms = cb->
event()->header.timeout_ms;
1522 range->create_scanner(scan_ctx, scanner);
1524 range->decrement_scan_counter();
1525 decrement_needed =
false;
1527 uint32_t cell_count {};
1529 more =
FillScanBlock(scanner, rbuf, &cell_count, m_scanner_buffer_size);
1535 profile_data.
disk_read = scanner->get_disk_read();
1537 int64_t output_cells = scanner->get_output_cells();
1553 int skipped_rows = scanner->get_skipped_rows();
1554 int skipped_cells = scanner->get_skipped_cells();
1557 scan_ctx->deep_copy_specs();
1558 id = m_scanner_map.put(scanner, range, table, profile_data);
1568 HT_INFOF(
"Successfully created scanner (id=%u) on table '%s', returning "
1569 "%lld k/v pairs, more=%lld",
id, table.
id,
1570 (
Lld)output_cells, (
Lld) more);
1575 if (cache_key && m_query_cache && !table.
is_metadata() && !more) {
1576 const char *cache_row_key = scan_spec.
cache_key();
1577 char *row_key_ptr, *tablename_ptr;
1578 uint8_t *buffer =
new uint8_t [ rbuf.
fill() + strlen(cache_row_key) + strlen(table.
id) + 2 ];
1579 memcpy(buffer, rbuf.
base, rbuf.
fill());
1580 row_key_ptr = (
char *)buffer + rbuf.
fill();
1581 strcpy(row_key_ptr, cache_row_key);
1582 tablename_ptr = row_key_ptr + strlen(row_key_ptr) + 1;
1583 strcpy(tablename_ptr, table.
id);
1584 boost::shared_array<uint8_t> ext_buffer(buffer);
1585 m_query_cache->insert(cache_key, tablename_ptr, row_key_ptr,
1586 columns, cell_count, ext_buffer, rbuf.
fill());
1587 if ((error = cb->
response(
id, skipped_rows, skipped_cells,
false,
1594 if ((error = cb->
response(
id, skipped_rows, skipped_cells, more,
1603 if (decrement_needed)
1604 range->decrement_scan_counter();
1617 HT_DEBUGF(
"destroying scanner id=%u", scanner_id);
1618 m_scanner_map.remove(scanner_id);
1624 int32_t scanner_id) {
1641 if (!m_scanner_map.get(scanner_id, scanner, range, scanner_table, &profile_data_before))
1643 format(
"scanner ID %d", scanner_id));
1647 if (!m_context->live_map->lookup(scanner_table.
id, table_info))
1650 schema = table_info->get_schema();
1653 if (schema->get_generation() != scanner_table.
generation) {
1654 m_scanner_map.remove(scanner_id);
1656 "RangeServer Schema generation for table '%s' is %lld but "
1657 "scanner has generation %lld", scanner_table.
id,
1661 uint32_t cell_count {};
1663 more =
FillScanBlock(scanner, rbuf, &cell_count, m_scanner_buffer_size);
1669 profile_data.
disk_read = scanner->get_disk_read();
1671 int64_t output_cells = scanner->get_output_cells();
1674 m_scanner_map.remove(scanner_id);
1678 m_scanner_map.update_profile_data(scanner_id, profile_data);
1680 profile_data -= profile_data_before;
1703 error = cb->
response(scanner_id, 0, 0, more, profile_data, ext);
1707 HT_DEBUGF(
"Successfully fetched %u bytes (%lld k/v pairs) of scan data",
1708 ext.
size-4, (
Lld)output_cells);
1723 bool needs_compaction) {
1734 char md5DigestStr[33];
1737 bool is_staged =
false;
1741 if (!m_log_replay_barrier->wait(cb->
event()->deadline(), table, range_spec))
1747 std::stringstream sout;
1748 sout <<
"Loading range: "<< table <<
" "<< range_spec <<
" " << range_state
1749 <<
" needs_compaction=" << boolalpha << needs_compaction;
1750 HT_INFOF(
"%s", sout.str().c_str());
1754 m_context->live_map->get(table.
id, table_info);
1758 if (table_info->maintenance_disabled())
1759 m_maintenance_scheduler->exclude(table);
1761 uint32_t generation = table_info->get_schema()->get_generation();
1763 HT_WARNF(
"Table generation mismatch in load range request (%d < %d),"
1764 " automatically upgrading", (
int)table.
generation, (
int)generation);
1768 table_info->stage_range(range_spec, cb->
event()->deadline());
1778 uint32_t timeout_ms = m_props->get_i32(
"Hypertable.Request.Timeout");
1793 lock_guard<mutex> lock(m_pending_metrics_mutex);
1796 if (m_pending_metrics_updates == 0)
1806 m_pending_metrics_updates->add(cell);
1809 schema = table_info->get_schema();
1816 assert(*range_spec.
end_row != 0);
1818 md5DigestStr[16] = 0;
1821 for (
auto ag_spec : schema->get_access_groups()) {
1823 range_dfsdir = table_dfsdir +
"/" + ag_spec->get_name() +
"/" + md5DigestStr;
1830 range = make_shared<Range>(m_master_client, table, schema, range_spec,
1831 table_info.get(), range_state, needs_compaction);
1836 if (!table.is_user()) {
1838 if (table.is_metadata()) {
1842 +
"/root", m_props);
1848 m_update_pipeline_metadata =
1849 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
1857 m_update_pipeline_system =
1858 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
1870 metadata_key_str =
format(
"%s:%s", table.id, range_spec.end_row);
1877 key.
row = metadata_key_str.c_str();
1878 key.
row_len = strlen(metadata_key_str.c_str());
1883 mutator->set(key, location.c_str(), location.length());
1889 HT_INFO(
"Loading root METADATA range");
1894 "Location", location.c_str(), location.length());
1897 HT_ERROR_OUT <<
"Problem setting attribute 'location' on Hyperspace "
1909 int64_t revision = range->get_scan_revision(cb->
event()->header.timeout_ms);
1910 if (revision > now) {
1911 int64_t diff = (revision - now) / 1000000;
1912 HT_WARNF(
"Clock skew detected when loading range; waiting for %lld "
1913 "millisec", (
long long int)diff);
1914 this_thread::sleep_for(chrono::milliseconds(diff));
1917 m_context->live_map->promote_staged_range(table, range, range_state.
transfer_log);
1925 HT_INFOF(
"Successfully loaded range %s[%s..%s]", table.id,
1926 range_spec.start_row, range_spec.end_row);
1933 table_info->unstage_range(range_spec);
1942 const vector<QualifiedRangeSpec> &specs) {
1945 map<QualifiedRangeSpec, int> error_map;
1947 for (
const auto &rr : specs) {
1949 if (!m_log_replay_barrier->wait(cb->
event()->deadline(),
1950 rr.table, rr.range))
1953 HT_INFOF(
"Acknowledging range: %s[%s..%s]", rr.table.id,
1954 rr.range.start_row, rr.range.end_row);
1956 if (!m_context->live_map->lookup(rr.table.id, table_info)) {
1962 if (!table_info->get_range(rr.range, range)) {
1968 if (range->load_acknowledged()) {
1975 range->acknowledge_load(cb->
event()->header.timeout_ms);
1978 error_map[rr] = e.
code();
1986 std::stringstream sout;
1987 sout <<
"Range: " << rr <<
" acknowledged";
1988 HT_INFOF(
"%s", sout.str().c_str());
1998 HT_INFOF(
"Updating schema for: %s schema = %s", table.
id, schema_str);
2003 if (m_context->live_map->lookup(table.
id, table_info))
2004 table_info->update_schema(schema);
2012 HT_INFOF(
"Successfully updated schema for: %s", table.
id);
2019 uint64_t cluster_id,
2025 std::vector<UpdateRecTable *> table_update_vector;
2029 if (!m_log_replay_barrier->wait_for_user(cb->
event()->deadline()))
2034 if (!m_context->live_map->lookup(table.
id, table_update->
table_info)) {
2044 if (schema->get_group_commit_interval() > 0) {
2045 group_commit_add(cb->
event(), cluster_id, schema, table, 0, buffer, 0);
2052 table_update->
id = table;
2056 table_update->
flags = 0;
2057 request->
buffer = buffer;
2060 table_update->
requests.push_back(request);
2062 table_update_vector.push_back(table_update);
2067 m_update_pipeline_user->add(uc);
2069 m_update_pipeline_metadata->add(uc);
2072 m_update_pipeline_system->add(uc);
2099 if (!m_log_replay_barrier->wait(cb->
event()->deadline(), table))
2102 if (!m_context->live_map->lookup(table.
id, table_update->
table_info))
2105 schema = table_update->
table_info->get_schema();
2108 if (schema->get_group_commit_interval() > 0) {
2109 group_commit_add(cb->
event(), cluster_id, schema, table, count, buffer, flags);
2110 delete table_update;
2116 std::vector<UpdateRecTable *> table_update_vector;
2118 table_update->
id = table;
2121 table_update->
flags = flags;
2125 request->
buffer = buffer;
2126 request->
count = count;
2129 table_update->
requests.push_back(request);
2131 table_update_vector.push_back(table_update);
2136 m_update_pipeline_user->add(uc);
2138 m_update_pipeline_metadata->add(uc);
2141 m_update_pipeline_system->add(uc);
2150 m_update_pipeline_user->add(uc);
2169 if (!m_log_replay_barrier->wait_for_user(cb->
event()->deadline()))
2172 if (!m_context->live_map->remove(table.
id, table_info)) {
2173 HT_WARNF(
"drop_table '%s' - table not found", table.
id);
2179 ranges.
array.clear();
2180 table_info->get_ranges(ranges);
2181 for (
auto &rd : ranges.
array)
2185 for (
auto &rd : ranges.
array) {
2186 rd.
range->disable_maintenance();
2197 SchemaPtr schema = table_info->get_schema();
2208 metadata_prefix =
String(
"") + table.
id +
":";
2209 for (
auto &rd : ranges.
array) {
2211 metadata_key = metadata_prefix + rd.
range->end_row();
2212 key.
row = metadata_key.c_str();
2213 key.
row_len = metadata_key.length();
2215 mutator->set(key,
"!", 1);
2216 for (
size_t j=0; j<ag_specs.size(); j++) {
2220 mutator->set(key, (uint8_t *)
"!", 1);
2227 cb->
error(e.
code(),
"Problem clearing 'Location' columns of METADATA");
2231 HT_INFOF(
"Successfully dropped table '%s'", table.
id);
2245 std::ofstream out(outfile);
2247 m_context->live_map->get_ranges(ranges);
2248 time_t now = time(0);
2249 for (
auto &rd : ranges.
array) {
2251 out <<
"RANGE " << rd.
range->get_name() <<
"\n";
2252 out << *rd.
data <<
"\n";
2253 for (ag_data = rd.
data->
agdata; ag_data; ag_data = ag_data->
next)
2254 out << *ag_data <<
"\n";
2259 for (
auto &rd : ranges.
array)
2260 for (ag_data = rd.
data->
agdata; ag_data; ag_data = ag_data->
next)
2266 m_query_cache->dump_keys(out);
2269 out <<
"\nGarbage tracker statistics:\n";
2271 for (ag_data = rd.
data->
agdata; ag_data; ag_data = ag_data->
next)
2275 out <<
"\nCommit Log Info\n";
2295 cb->
error(e.
code(),
"Problem executing dump() command");
2298 catch (std::exception &e) {
2307 const char *pseudo_table,
const char *outfile) {
2309 HT_INFOF(
"dump_psudo_table ID=%s pseudo-table=%s outfile=%s", table.
id, pseudo_table, outfile);
2311 if (!m_log_replay_barrier->wait_for_user(cb->
event()->deadline()))
2325 std::ofstream out(outfile);
2327 if (!m_context->live_map->lookup(table.
id, table_info)) {
2332 scan_ctx->timeout_ms = cb->
event()->header.timeout_ms;
2334 table_info->get_ranges(ranges);
2335 for (
auto &rd : ranges.
array) {
2336 scanner = rd.
range->create_scanner_pseudo_table(scan_ctx, pseudo_table);
2337 while (scanner->
get(key, value)) {
2340 HT_ERRORF(
"Column family code %d not found in %s pseudo table schema",
2341 key.column_family_code, pseudo_table);
2344 out << key.row <<
"\t" << cf_spec->
get_name();
2345 if (key.column_qualifier)
2346 out <<
":" << key.column_qualifier;
2350 out.write((
const char *)ptr, len);
2367 catch (std::exception &e) {
2377 #if defined(TCMALLOC) || defined(TCMALLOC_MINIMAL)
2378 if (outfile && *outfile) {
2379 std::ofstream out(outfile);
2381 MallocExtension::instance()->GetStats(buf, 4096);
2382 out << buf << std::endl;
2386 #if defined(TCMALLOC)
2387 HeapLeakChecker::NoGlobalLeaks();
2388 if (IsHeapProfilerRunning())
2389 HeapProfilerDump(
"heapcheck");
2391 HT_WARN(
"heapcheck not defined for current allocator");
2398 const std::vector<SystemVariable::Spec> &specs,
2399 int64_t generation) {
2403 m_context->server_state->set(generation, specs);
2410 HT_INFOF(
"table_maintenance_enable(\"%s\"", table.
id);
2412 if (!m_log_replay_barrier->wait(cb->
event()->deadline(), table))
2416 if (m_context->live_map->lookup(table.
id, table_info)) {
2417 table_info->set_maintenance_disabled(
false);
2419 table_info->get_ranges(ranges);
2421 rd.
range->enable_maintenance();
2424 m_maintenance_scheduler->include(table);
2433 HT_INFOF(
"table_maintenance_disable(\"%s\"", table.
id);
2435 if (!m_log_replay_barrier->wait(cb->
event()->deadline(), table))
2440 if (!m_context->live_map->lookup(table.
id, table_info)) {
2445 table_info->set_maintenance_disabled(
true);
2447 m_maintenance_scheduler->exclude(table);
2450 table_info->get_ranges(ranges);
2452 rd.
range->disable_maintenance();
2464 const std::vector<SystemVariable::Spec> &specs,
2465 uint64_t generation) {
2467 if (test_and_set_get_statistics_outstanding(
true))
2472 lock_guard<mutex> lock(m_stats_mutex);
2475 time_t now = (time_t)(timestamp/1000000000LL);
2478 HT_INFO(
"Entering get_statistics()");
2486 m_context->server_state->set(generation, specs);
2489 m_stats->system.refresh();
2491 float period_seconds = (float)load_stats.
period_millis / 1000.0;
2493 uint64_t disk_total = 0;
2494 uint64_t disk_avail = 0;
2495 for (
auto &fss : m_stats->system.fs_stat) {
2496 disk_total += fss.total;
2497 disk_avail += fss.avail;
2500 m_loadavg_accum += m_stats->system.loadavg_stat.loadavg[0];
2501 m_page_in_accum += m_stats->system.swap_stat.page_in;
2502 m_page_out_accum += m_stats->system.swap_stat.page_out;
2504 m_load_factors.bytes_written += load_stats.
update_bytes;
2509 m_stats->timestamp = timestamp;
2518 m_stats->cpu_user = m_stats->system.cpu_stat.user;
2519 m_stats->cpu_sys = m_stats->system.cpu_stat.sys;
2520 m_stats->live = m_log_replay_barrier->user_complete();
2522 uint64_t previous_query_cache_accesses = m_stats->query_cache_accesses;
2523 uint64_t previous_query_cache_hits = m_stats->query_cache_hits;
2524 uint64_t previous_block_cache_accesses = m_stats->block_cache_accesses;
2525 uint64_t previous_block_cache_hits = m_stats->block_cache_hits;
2526 int32_t query_cache_waiters {};
2529 m_query_cache->get_stats(&m_stats->query_cache_max_memory,
2530 &m_stats->query_cache_available_memory,
2531 &m_stats->query_cache_accesses,
2532 &m_stats->query_cache_hits,
2533 &query_cache_waiters);
2537 &m_stats->block_cache_available_memory,
2538 &m_stats->block_cache_accesses,
2539 &m_stats->block_cache_hits);
2542 if (now > m_next_metrics_update) {
2546 uint32_t timeout_ms = m_props->get_i32(
"Hypertable.Request.Timeout");
2553 m_namemap,
"sys/RS_METRICS", 0, timeout_ms);
2556 HT_ERRORF(
"Unable to open 'sys/RS_METRICS' - %s (%s)",
2565 lock_guard<mutex> lock(m_pending_metrics_mutex);
2566 pending_metrics_updates = m_pending_metrics_updates;
2567 m_pending_metrics_updates = 0;
2570 if (pending_metrics_updates) {
2572 Cells &cells = pending_metrics_updates->
get();
2573 for (
size_t i=0; i<cells.size(); i++) {
2574 key.
row = cells[i].row_key;
2575 key.
row_len = strlen(cells[i].row_key);
2579 mutator->set(key, cells[i].value, cells[i].value_len);
2581 delete pending_metrics_updates;
2591 StatsTableMap::iterator iter;
2593 m_stats->tables.
clear();
2595 if (mutator || !ranges) {
2596 ranges = make_shared<Ranges>();
2597 m_context->live_map->get_ranges(*ranges);
2599 for (
auto &rd : ranges->array) {
2602 rd.
data = rd.
range->get_maintenance_data(ranges->arena, now, 0, mutator.get());
2616 m_stats->tables.push_back(table_stat);
2617 table_scanner_count_map[table_stat.
table_id.c_str()] = 0;
2653 m_stats->range_count = ranges->array.size();
2659 m_stats->tables.push_back(table_stat);
2660 table_scanner_count_map[table_stat.
table_id.c_str()] = 0;
2664 m_stats->file_count = 0;
2665 m_scanner_map.get_counts(&m_stats->scanner_count, table_scanner_count_map);
2666 for (
size_t i=0; i<m_stats->tables.size(); i++) {
2667 m_stats->tables[i].scanner_count = table_scanner_count_map[m_stats->tables[i].table_id.c_str()];
2668 m_stats->file_count += m_stats->tables[i].file_count;
2676 if (m_last_metrics_update != 0) {
2677 double time_interval = (double)now - (
double)m_last_metrics_update;
2678 String value =
format(
"3:%ld,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f:%lld:%lld",
2680 m_loadavg_accum / (
double)(m_metric_samples * m_cores),
2681 (
double)m_load_factors.disk_bytes_read / time_interval,
2682 (
double)m_load_factors.bytes_written / time_interval,
2683 (
double)m_load_factors.bytes_scanned / time_interval,
2684 (
double)m_load_factors.updates / time_interval,
2685 (
double)m_load_factors.scans / time_interval,
2686 (
double)m_load_factors.cells_written / time_interval,
2687 (
double)m_load_factors.cells_scanned / time_interval,
2688 (
double)m_page_in_accum / (
double)m_metric_samples,
2689 (
double)m_page_out_accum / (
double)m_metric_samples,
2690 (
Lld)disk_total, (
Lld)disk_avail);
2693 key.
row = location.c_str();
2694 key.
row_len = location.length();
2699 mutator->set(key, (uint8_t *)value.c_str(), value.length());
2707 m_last_metrics_update = now;
2708 m_loadavg_accum = 0.0;
2709 m_page_in_accum = 0;
2710 m_page_out_accum = 0;
2711 m_load_factors.reset();
2712 m_metric_samples = 0;
2719 m_metrics_process.collect(timestamp, m_ganglia_collector.get());
2721 m_ganglia_collector->update(
"scans",
2722 (
float)load_stats.
scan_count / period_seconds);
2723 m_ganglia_collector->update(
"updates",
2725 m_ganglia_collector->update(
"cellsScanned",
2728 m_ganglia_collector->update(
"cellsScanYield",
2731 m_ganglia_collector->update(
"cellsReturned",
2735 m_ganglia_collector->update(
"cellsWritten",
2737 m_ganglia_collector->update(
"bytesScanned",
2740 m_ganglia_collector->update(
"bytesScanYield",
2743 m_ganglia_collector->update(
"bytesReturned",
2747 m_ganglia_collector->update(
"bytesWritten",
2753 m_ganglia_collector->update(
"compactions.gc", load_stats.
compactions_gc);
2755 m_ganglia_collector->update(
"scanners",
2756 m_stats->scanner_count);
2757 m_ganglia_collector->update(
"cellstores",
2758 (int32_t)m_stats->file_count);
2759 m_ganglia_collector->update(
"ranges",
2760 m_stats->range_count);
2761 m_ganglia_collector->update(
"memory.tracked",
2762 (
float)m_stats->tracked_memory / 1000000000.0);
2764 HT_ASSERT(previous_block_cache_accesses <= m_stats->block_cache_accesses &&
2765 previous_block_cache_hits <= m_stats->block_cache_hits);
2766 uint64_t block_cache_accesses = m_stats->block_cache_accesses - previous_block_cache_accesses;
2767 uint64_t block_cache_hits = m_stats->block_cache_hits - previous_block_cache_hits;
2769 if (block_cache_accesses)
2770 m_ganglia_collector->update(
"blockCache.hitRate",
2771 (int32_t)((block_cache_hits*100)
2772 / block_cache_accesses));
2774 m_ganglia_collector->update(
"blockCache.hitRate", (int32_t)0);
2775 m_ganglia_collector->update(
"blockCache.memory",
2776 (
float)m_stats->block_cache_max_memory / 1000000000.0);
2777 uint64_t block_cache_fill = m_stats->block_cache_max_memory -
2778 m_stats->block_cache_available_memory;
2779 m_ganglia_collector->update(
"blockCache.fill",
2780 (
float)block_cache_fill / 1000000000.0);
2782 HT_ASSERT(previous_query_cache_accesses <= m_stats->query_cache_accesses &&
2783 previous_query_cache_hits <= m_stats->query_cache_hits);
2784 uint64_t query_cache_accesses = m_stats->query_cache_accesses - previous_query_cache_accesses;
2785 uint64_t query_cache_hits = m_stats->query_cache_hits - previous_query_cache_hits;
2787 if (query_cache_accesses)
2788 m_ganglia_collector->update(
"queryCache.hitRate",
2789 (int32_t)((query_cache_hits*100) /
2790 query_cache_accesses));
2792 m_ganglia_collector->update(
"queryCache.hitRate", (int32_t)0);
2793 m_ganglia_collector->update(
"queryCache.memory",
2794 (
float)m_stats->query_cache_max_memory / 1000000000.0);
2795 uint64_t query_cache_fill = m_stats->query_cache_max_memory -
2796 m_stats->query_cache_available_memory;
2797 m_ganglia_collector->update(
"queryCache.fill",
2798 (
float)query_cache_fill / 1000000000.0);
2799 m_ganglia_collector->update(
"queryCache.waiters", query_cache_waiters);
2801 m_ganglia_collector->update(
"requestBacklog",(int32_t)m_app_queue->backlog());
2804 m_ganglia_collector->publish();
2807 HT_INFOF(
"Problem publishing Ganglia metrics - %s", e.what());
2810 m_stats_last_timestamp = timestamp;
2812 HT_INFO(
"Exiting get_statistics()");
2823 std::stringstream sout;
2825 sout <<
"drop_range\n"<< table << range_spec;
2826 HT_INFOF(
"%s", sout.str().c_str());
2828 if (!m_log_replay_barrier->wait(cb->
event()->deadline(), table, range_spec))
2833 if (!m_context->live_map->lookup(table.
id, table_info))
2837 if (!table_info->remove_range(range_spec, range))
2839 format(
"%s[%s..%s]", table.
id, range_spec.start_row, range_spec.end_row));
2856 std::stringstream sout;
2858 sout <<
"relinquish_range\n" << table << range_spec;
2859 HT_INFOF(
"%s", sout.str().c_str());
2861 if (!m_log_replay_barrier->wait(cb->
event()->deadline(), table, range_spec))
2865 if (!m_context->live_map->lookup(table.
id, table_info)) {
2870 if (!table_info->get_range(range_spec, range))
2872 format(
"%s[%s..%s]", table.
id, range_spec.start_row,
2873 range_spec.end_row));
2875 range->schedule_relinquish();
2878 m_timer_handler->schedule_immediate_maintenance();
2891 const String &location, int32_t plan_generation,
2892 int32_t type,
const vector<int32_t> &fragments,
2894 int32_t replay_timeout) {
2897 HT_INFOF(
"replay_fragments location=%s, plan_generation=%d, num_fragments=%d",
2898 location.c_str(), plan_generation, (int)fragments.size());
2904 if (!m_log_replay_barrier->wait_for_user(cb->
event()->deadline()))
2907 HT_INFOF(
"replay_fragments(id=%lld, %s, plan_generation=%d, num_fragments=%d)",
2908 (
Lld)op_id, location.c_str(), plan_generation, (int)fragments.size());
2913 log_reader = make_shared<CommitLogReader>(
Global::log_dfs, log_dir, fragments);
2917 uint32_t timeout_ms = m_props->get_i32(
"Hypertable.Request.Timeout");
2918 Timer timer(replay_timeout,
true);
2919 for (
const auto &receiver : receivers) {
2921 m_conn_manager->add(addr, timeout_ms,
"RangeServer");
2922 if (!m_conn_manager->wait_for_connection(addr, timer.
remaining())) {
2932 const uint8_t *ptr, *end;
2935 uint32_t block_count = 0;
2936 uint32_t fragment_id;
2937 uint32_t last_fragment_id = 0;
2938 bool started =
false;
2939 ReplayBuffer replay_buffer(m_props, m_context->comm, receiver_plan, location, plan_generation);
2940 size_t num_kv_pairs=0;
2944 while (log_reader->next((
const uint8_t **)&base, &len, &header)) {
2945 fragment_id = log_reader->last_fragment_id();
2948 last_fragment_id = fragment_id;
2951 else if (fragment_id != last_fragment_id) {
2952 replay_buffer.
flush();
2953 last_fragment_id = fragment_id;
2960 decode_table_id(&ptr, &len, &table_id);
2975 replay_buffer.
add(table_id, key, value);
2977 HT_INFOF(
"Replayed %d key/value pairs from fragment %s",
2978 (
int)num_kv_pairs, log_reader->last_fragment_fname().c_str());
2984 m_master_client->replay_status(op_id, location, plan_generation);
2998 HT_THROWF(e.
code(),
"%s: %s", log_reader->last_fragment_fname().c_str(), e.what());
3001 replay_buffer.
flush();
3005 HT_INFOF(
"Finished playing %d fragments from %s",
3006 (
int)fragments.size(), log_dir.c_str());
3012 m_master_client->replay_complete(op_id, location, plan_generation,
3013 e.
code(), e.what());
3022 m_master_client->replay_complete(op_id, location, plan_generation,
3026 HT_ERROR_OUT <<
"Unable to call player_complete on master for op_id="
3027 << op_id <<
", type=" << type <<
", location=" << location
3028 <<
", plan_generation=" << plan_generation <<
", num_fragments="
3029 << fragments.size() <<
" - " << e <<
HT_END;
3034 int32_t plan_generation,
3035 const vector<int32_t> &fragments,
3036 const vector<QualifiedRangeSpec> &specs,
3037 const vector<RangeState> &states) {
3039 FailoverPhantomRangeMap::iterator failover_map_it;
3044 HT_INFOF(
"phantom_load location=%s, plan_generation=%d, num_fragments=%d,"
3045 " num_ranges=%d", location.c_str(), plan_generation,
3046 (int)fragments.size(), (int)specs.size());
3048 if (!m_log_replay_barrier->wait_for_user(cb->
event()->deadline()))
3056 lock_guard<mutex> lock(m_failover_mutex);
3057 failover_map_it = m_failover_map.find(location);
3058 if (failover_map_it == m_failover_map.end()) {
3059 phantom_range_map = make_shared<PhantomRangeMap>(plan_generation);
3060 m_failover_map[location] = phantom_range_map;
3063 phantom_range_map = failover_map_it->second;
3067 lock_guard<PhantomRangeMap> lock(*phantom_range_map);
3070 if (plan_generation < phantom_range_map->get_plan_generation())
3073 if (plan_generation > phantom_range_map->get_plan_generation())
3074 phantom_range_map->reset(plan_generation);
3075 else if (phantom_range_map->loaded()) {
3080 phantom_tableinfo_map = phantom_range_map->get_tableinfo_map();
3083 for (
size_t i=0; i<specs.size(); i++) {
3089 phantom_tableinfo_map->get(spec.
table.
id, table_info);
3091 uint32_t generation = table_info->get_schema()->get_generation();
3093 HT_WARNF(
"Table generation mismatch in phantom load request (%d < %d),"
3094 " automatically upgrading", (
int)spec.
table.
generation, (
int)generation);
3099 phantom_range_map->insert(spec, state, table_info->get_schema(), fragments);
3104 if ((error = cb->
error(e.
code(), e.what())))
3109 phantom_range_map->set_loaded();
3117 int32_t plan_generation,
3119 int32_t fragment,
EventPtr &event) {
3120 std::stringstream sout;
3122 sout <<
"phantom_update location=" << location <<
", fragment="
3123 << fragment <<
", range=" << range;
3124 HT_INFOF(
"%s", sout.str().c_str());
3126 FailoverPhantomRangeMap::iterator failover_map_it;
3131 HT_MAYBE_FAIL_X(
"phantom-update-metadata", range.table.is_metadata());
3134 lock_guard<mutex> lock(m_failover_mutex);
3135 failover_map_it = m_failover_map.find(location);
3136 if (failover_map_it == m_failover_map.end()) {
3138 "no phantom range map found for recovery of " + location);
3140 phantom_range_map = failover_map_it->second;
3144 lock_guard<PhantomRangeMap> lock(*phantom_range_map);
3147 if (plan_generation != phantom_range_map->get_plan_generation())
3149 "supplied = %d, installed == %d", plan_generation,
3150 phantom_range_map->get_plan_generation());
3152 if (phantom_range_map->replayed()) {
3159 phantom_range_map->get(range, phantom_range);
3160 if (phantom_range && !phantom_range->replayed() && !phantom_range->add(fragment, event)) {
3161 String msg =
format(
"fragment %d completely received for range "
3162 "%s[%s..%s]", fragment, range.table.id, range.range.start_row,
3163 range.range.end_row);
3175 const String &location, int32_t plan_generation,
3176 const vector<QualifiedRangeSpec> &specs) {
3177 FailoverPhantomRangeMap::iterator failover_map_it;
3182 vector<MetaLog::EntityPtr> metalog_entities;
3184 HT_INFOF(
"phantom_prepare_ranges op_id=%lld, location=%s, plan_generation=%d,"
3185 " num_ranges=%d", (
Lld)op_id, location.c_str(), plan_generation,
3188 if (!m_log_replay_barrier->wait_for_user(cb->
event()->deadline()))
3194 lock_guard<mutex> lock(m_failover_mutex);
3195 failover_map_it = m_failover_map.find(location);
3196 if (failover_map_it == m_failover_map.end()) {
3198 String msg =
format(
"No phantom map found for %s", location.c_str());
3200 m_master_client->phantom_prepare_complete(op_id, location, plan_generation,
3208 phantom_range_map = failover_map_it->second;
3212 lock_guard<PhantomRangeMap> lock(*phantom_range_map);
3214 if (phantom_range_map->prepared()) {
3216 m_master_client->phantom_prepare_complete(op_id, location, plan_generation,
Error::OK,
"");
3226 phantom_map = phantom_range_map->get_tableinfo_map();
3228 for (
const auto &rr : specs) {
3229 phantom_table_info = 0;
3230 HT_ASSERT(phantom_map->lookup(rr.table.id, phantom_table_info));
3232 m_context->live_map->get(rr.table.id, table_info);
3236 if (table_info->maintenance_disabled())
3237 m_maintenance_scheduler->exclude(rr.table);
3239 if (rr.table.generation != table_info->get_schema()->get_generation())
3240 HT_WARNF(
"Table (id=%s) generation mismatch %lld != %lld", rr.table.id,
3241 (
Lld)rr.table.generation,
3242 (
Lld)table_info->get_schema()->get_generation());
3246 phantom_range_map->get(rr, phantom_range);
3249 if (!phantom_range || phantom_range->prepared())
3257 uint32_t timeout_ms = m_props->get_i32(
"Hypertable.Request.Timeout");
3270 if (rr.table.is_metadata()) {
3276 phantom_range->create_range(m_master_client, table_info,
3283 bool is_empty =
true;
3285 phantom_range_map->get(rr, phantom_range);
3288 if (!phantom_range || phantom_range->prepared())
3291 phantom_range->populate_range_and_log(
Global::log_dfs, op_id, &is_empty);
3295 RangePtr range = phantom_range->get_range();
3296 if (!rr.table.is_user()) {
3298 if (rr.table.is_metadata()) {
3311 m_update_pipeline_metadata =
3312 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
3317 else if (rr.table.is_system()) {
3322 m_update_pipeline_system =
3323 make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
3338 String msg =
format(
"Problem linking phantom log '%s' for range %s[%s..%s]",
3339 phantom_range->get_phantom_logname().c_str(),
3340 rr.table.id, rr.range.start_row, rr.range.end_row);
3342 m_master_client->phantom_prepare_complete(op_id, location, plan_generation, error, msg);
3346 metalog_entities.push_back( range->metalog_entity() );
3348 HT_ASSERT(phantom_map->lookup(rr.table.id, phantom_table_info));
3350 HT_INFO(
"phantom adding range");
3352 phantom_table_info->add_range(range,
true);
3354 phantom_range->set_prepared();
3357 HT_MAYBE_FAIL_X(
"phantom-prepare-ranges-user-1", specs.back().table.is_user());
3367 HT_MAYBE_FAIL_X(
"phantom-prepare-ranges-root-2", specs.back().is_root());
3368 HT_MAYBE_FAIL_X(
"phantom-prepare-ranges-user-2", specs.back().table.is_user());
3370 phantom_range_map->set_prepared();
3376 m_master_client->phantom_prepare_complete(op_id, location, plan_generation, e.
code(), e.what());
3385 m_master_client->phantom_prepare_complete(op_id, location, plan_generation,
Error::OK,
"");
3396 const String &location, int32_t plan_generation,
3397 const vector<QualifiedRangeSpec> &specs) {
3398 FailoverPhantomRangeMap::iterator failover_map_it;
3404 vector<MetaLog::EntityPtr> entities;
3406 map<QualifiedRangeSpec, TableInfoPtr> phantom_table_info_map;
3407 map<QualifiedRangeSpec, int> error_map;
3408 vector<RangePtr> range_vec;
3410 HT_INFOF(
"phantom_commit_ranges op_id=%lld, location=%s, plan_generation=%d,"
3411 " num_ranges=%d", (
Lld)op_id, location.c_str(), plan_generation,
3414 if (!m_log_replay_barrier->wait_for_system(cb->
event()->deadline()))
3422 lock_guard<mutex> lock(m_failover_mutex);
3423 m_failover_map.erase(location);
3427 m_master_client->phantom_commit_complete(op_id, location, plan_generation,
Error::OK,
"");
3430 String msg =
format(
"Error during phantom_commit op_id=%lld, "
3431 "plan_generation=%d, location=%s, num ranges=%u", (
Lld)op_id,
3432 plan_generation, location.c_str(), (unsigned)specs.size());
3439 lock_guard<mutex> lock(m_failover_mutex);
3440 failover_map_it = m_failover_map.find(location);
3441 if (failover_map_it == m_failover_map.end()) {
3443 String msg =
format(
"No phantom map found for %s plan_generation=%d",
3444 location.c_str(), plan_generation);
3446 m_master_client->phantom_commit_complete(op_id, location, plan_generation,
3454 phantom_range_map = failover_map_it->second;
3458 lock_guard<PhantomRangeMap> lock(*phantom_range_map);
3464 HT_ASSERT(phantom_range_map->prepared());
3465 HT_ASSERT(!phantom_range_map->committed());
3467 phantom_map = phantom_range_map->get_tableinfo_map();
3469 for (
const auto &rr : specs) {
3475 rr.range.start_row, rr.range.end_row);
3478 phantom_range_map->get(rr, phantom_range);
3480 if (!phantom_range || phantom_range->committed())
3483 range = phantom_range->get_range();
3487 bool is_root = range->is_root();
3489 entity->set_needs_compaction(
true);
3490 entity->set_load_acknowledged(
false);
3492 entities.push_back(entity);
3493 phantom_logs.insert( phantom_range->get_phantom_logname() );
3503 String range_str =
format(
"%s[%s..%s]", rr.table.id, rr.range.start_row, rr.range.end_row);
3504 HT_INFOF(
"Taking ownership of range %s", range_str.c_str());
3507 String metadata_key_str =
format(
"%s:%s", rr.table.id,rr.range.end_row);
3513 key.
row = metadata_key_str.c_str();
3514 key.
row_len = strlen(metadata_key_str.c_str());
3520 HT_DEBUG_OUT <<
"Update metadata location for " << key <<
" to "
3521 << our_location <<
HT_END;
3522 mutator->set(key, our_location.c_str(), our_location.length());
3527 HT_INFO(
"Failing over root METADATA range");
3532 handle = m_hyperspace->open(root_filename, oflags);
3534 our_location.length());
3535 HT_DEBUG_OUT <<
"Updated attr Location of " << root_filename <<
" to "
3536 << our_location <<
HT_END;
3539 phantom_range->set_committed();
3546 HT_MAYBE_FAIL_X(
"phantom-commit-user-2", specs.back().table.is_user());
3554 m_context->live_map->merge(phantom_map.get(), entities, phantom_logs);
3556 HT_MAYBE_FAIL_X(
"phantom-commit-user-3", specs.back().table.is_user());
3558 HT_INFOF(
"Merging phantom map into live map for recovery of %s (ID=%lld)",
3559 location.c_str(), (
Lld)op_id);
3562 lock_guard<mutex> lock(m_failover_mutex);
3563 m_failover_map.erase(location);
3566 phantom_range_map->set_committed();
3572 m_master_client->phantom_commit_complete(op_id, location, plan_generation, e.
code(), e.what());
3582 m_master_client->phantom_commit_complete(op_id, location, plan_generation,
Error::OK,
"");
3584 HT_MAYBE_FAIL_X(
"phantom-commit-user-4", specs.back().table.is_user());
3586 HT_DEBUG_OUT <<
"phantom_commit_complete sent to master for num_ranges="
3587 << specs.size() <<
HT_END;
3591 m_timer_handler->schedule_immediate_maintenance();
3595 String msg =
format(
"Error during phantom_commit op_id=%lld, "
3596 "plan_generation=%d, location=%s, num ranges=%u", (
Lld)op_id,
3597 plan_generation, location.c_str(), (unsigned)specs.size());
3606 size_t live_count = 0;
3607 for (
const auto &qrs : ranges) {
3608 if (m_context->live_map->lookup(qrs.table.id, table_info)) {
3609 if (table_info->has_range(qrs.range))
3614 return live_count == ranges.size();
3619 if (m_context->live_map->lookup(spec.
table.
id, table_info)) {
3620 if (table_info->has_range(spec.
range))
3628 HT_INFO(
"wait_for_maintenance");
3638 SchemaPtr schema = table_info->get_schema();
3640 if (!schema || schema->get_generation() < generation) {
3642 TableSchemaMap::const_iterator it;
3643 if (table_schemas &&
3644 (it = table_schemas->find(table_info->identifier().id))
3645 != table_schemas->end())
3646 schema = it->second;
3650 + table_info->identifier().id;
3651 m_hyperspace->attr_get(tablefile,
"schema", valbuf);
3655 table_info->update_schema(schema);
3658 if (schema->get_generation() < generation)
3660 "Fetched Schema generation for table '%s' is %lld"
3661 " but supplied is %lld", table_info->identifier().id,
3662 (
Lld)schema->get_generation(), (
Lld)generation);
3667 m_group_commit->trigger();
3677 m_scanner_map.purge_expired(m_scanner_ttl);
3680 bool low_memory_mode = m_timer_handler->low_memory_mode();
3681 m_maintenance_scheduler->set_low_memory_mode(low_memory_mode);
3685 m_maintenance_scheduler->schedule();
3688 auto now = chrono::steady_clock::now();
3689 if (now - m_last_control_file_check >= chrono::milliseconds(m_control_file_check_interval)) {
3691 if (!m_profile_query) {
3692 lock_guard<mutex> lock(m_profile_mutex);
3694 m_profile_query_out.open(output_fname.c_str(), ios_base::out|ios_base::app);
3695 m_profile_query =
true;
3699 if (m_profile_query) {
3700 lock_guard<mutex> lock(m_profile_mutex);
3701 m_profile_query_out.close();
3702 m_profile_query =
false;
3705 m_last_control_file_check = now;
3714 m_timer_handler->maintenance_scheduled_notify();
3724 lock_guard<mutex> lock(m_mutex);
3725 if (!m_group_commit) {
3726 m_group_commit = std::make_shared<GroupCommit>(
this);
3727 HT_ASSERT(!m_group_commit_timer_handler);
3728 m_group_commit_timer_handler = make_shared<GroupCommitTimerHandler>(m_context->comm,
this, m_app_queue);
3729 m_group_commit_timer_handler->start();
3731 m_group_commit->add(event, cluster_id, schema, table, count, buffer, flags);
static bool enable_shadow_cache
static LocationInitializerPtr location_initializer
std::set< String > StringSet
STL Set managing Strings.
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
A memory buffer of static size.
POD-style structure to hold statistics.
int32_t compactions_merging
bool m_verbose
Flag indicating if verbose logging is enabled.
uint32_t scan_count
Scan count.
Lock successfully granted.
int response(int32_t id, int32_t skipped_rows, int32_t skipped_cells, bool more, ProfileDataScanner &profile_data, StaticBuffer &ext)
std::vector< Cell, CellAlloc > Cells
#define HT_WARNF(msg,...)
uint64_t bytes_scanned
Bytes scanned.
int64_t m_scanner_buffer_size
static int32_t merge_cellstore_run_length_threshold
The FailureInducer simulates errors.
static const CpuInfo & cpu_info()
Retrieves updated CPU information (see SystemInfo.h)
std::shared_ptr< PhantomRangeMap > PhantomRangeMapPtr
uint32_t bloom_filter_accesses
Holds Nagios-style program status information.
uint64_t block_index_memory
uint64_t block_index_memory
static int32_t access_group_garbage_compaction_threshold
static TablePtr metadata_table
Type declarations for PseudoTables class.
static string compact_flags_to_string(uint32_t flags)
#define HT_NOTICEF(msg,...)
std::string String
A String is simply a typedef to std::string.
std::chrono::steady_clock::time_point m_last_control_file_check
bool test_and_set_get_statistics_outstanding(bool value)
Performs a "test and set" operation on m_get_statistics_outstanding.
uint32_t count
Count of serialized key/value pairs in buffer.
std::vector< UpdateRequest * > requests
Vector of corresponding client requests.
uint64_t memory_allocated
std::map< String, SchemaPtr > TableSchemaMap
int64_t bytes_scanned
Number of bytes scanned while executing scan.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
static constexpr const char * SERVER_IS_SHUTTING_DOWN
std::vector< AccessGroupSpec * > AccessGroupSpecs
Vector of AccessGroupSpec pointers.
chrono::time_point< fast_clock > time_point
Filesystem::Flags convert(std::string str)
Converts string mnemonic to corresponding Filesystem::Flags value.
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
void group_commit_add(EventPtr &event, uint64_t cluster_id, SchemaPtr &schema, const TableIdentifier &table, uint32_t count, StaticBuffer &buffer, uint32_t flags)
static const char * METADATA_NAME
Holds updates destined for a specific table.
void metadata_sync(ResponseCallback *, const char *, uint32_t flags, std::vector< const char * > columns)
const char * cache_key() const
long long unsigned int Llu
Shortcut for printf formats.
Po::typed_value< String > * str(String *v=0)
const char * column_qualifier
Column family specification.
static int64_t memory_limit_ensure_unused_current
const char * column_qualifier
static void clear_cache()
Clears both value and qualifier caches.
static int64_t memory_limit
static bool exists(const String &fname)
Checks if a file or directory exists.
uint64_t bloom_filter_memory
Filesystem::Flags m_log_flush_method_user
Flush method for USER commit log updates.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
uint64_t m_log_roll_limit
std::string specs_to_string(const std::vector< Spec > &specs)
Returns a textual representation of variable specifications.
void set_current_fragment(uint32_t fragment_id)
bool live(const vector< QualifiedRangeSpec > &ranges)
Hyperspace::SessionPtr m_hyperspace
Filesystem::Flags m_log_flush_method_meta
Flush method for METADATA commit log updates.
Declarations for RangeServer.
void initialize(PropertiesPtr &)
uint32_t remaining()
Returns the remaining time till expiry.
int32_t m_control_file_check_interval
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer.
#define HT_ON_SCOPE_EXIT(...)
ClockT::time_point expire_time
Request expiration time.
void table_maintenance_enable(ResponseCallback *cb, const TableIdentifier &table)
Enables maintenance for a table.
size_t column_qualifier_len
void phantom_load(ResponseCallback *, const String &location, int32_t plan_generation, const vector< int32_t > &fragments, const vector< QualifiedRangeSpec > &specs, const vector< RangeState > &states)
bool expired()
Returns true if the timer is expired.
void wait_for_maintenance(ResponseCallback *cb)
Blocks while the maintenance queue is non-empty.
void add(const Key &key, uint8_t flag, const void *value, uint32_t value_len, TableMutatorAsync *value_index_mutator, TableMutatorAsync *qualifier_index_mutator)
void set(Code code, const std::string &text)
Sets status code and text.
A dynamic, resizable and reference counted memory buffer.
uint32_t update_count
Update count.
static void add_to_work_queue(MetaLog::EntityTaskPtr entity)
void table_maintenance_disable(ResponseCallback *cb, const TableIdentifier &table)
Disables maintenance for a table.
Code
Enumeration for status codes.
static MetaLogEntityRemoveOkLogsPtr remove_ok_logs
void reset(bool start_timer=false)
Resets the timer.
Tracks range server memory used.
void destroy_scanner(ResponseCallback *cb, int32_t scanner_id)
Relinquish - log installed.
void replay_log(TableInfoMap &replay_map, CommitLogReaderPtr &log_reader)
std::shared_ptr< CommitLogReader > CommitLogReaderPtr
Smart pointer to CommitLogReader.
void md5_trunc_modified_base64(const char *input, char output[17])
Get the modified base64 encoded string of the first 12 Bytes of the 16 Byte MD5 code of a null termin...
uint32_t update_cells
Cells updated.
A class managing one or more serializable ByteStrings.
Wrapper for TableIdentifier providing member storage.
void set_state(ResponseCallback *cb, const std::vector< SystemVariable::Spec > &specs, int64_t generation)
void get_ranges(Ranges &ranges, StringSet *remove_ok_logs=0)
Gets set of live RangeData objects and corresponding transfer logs that can be safely removed...
int response(Hypertable::Status &status)
Sends response parameters back to client.
Scan predicate and control specification.
Declarations for RangeServerProtocol.
Response callback for status function.
void commit_log_sync(ResponseCallback *cb, uint64_t cluster_id, const TableIdentifier &table)
std::shared_ptr< TableMutator > TableMutatorPtr
Smart pointer to TableMutator.
static int64_t memory_limit_ensure_unused
StaticBuffer buffer
Update buffer containing serialized key/value pairs.
static int64_t log_prune_threshold_max
File system utility functions.
bool m_startup
Flag indicating if server is starting up.
static Hypertable::MemoryTracker * memory_tracker
PropertiesPtr m_props
Configuration properties.
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
static Hyperspace::SessionPtr hyperspace
StatsRangeServerPtr m_stats
std::shared_ptr< Session > SessionPtr
EventPtr event
Event object of originating update requst.
static std::string toplevel_dir
uint64_t cached_bytes_returned
Cached bytes returned.
const char * get_text(int error)
Returns a descriptive error message.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
uint64_t update_bytes
Bytes updated.
Encapsulate an internet address.
std::shared_ptr< MergeScannerRange > MergeScannerRangePtr
Smart pointer to MergeScannerRange.
Declarations for ScanContext.
Declarations for MaintenanceScheduler.
ByteArena arena
Memory arena.
TimerHandlerPtr m_timer_handler
Smart pointer to timer handler.
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
std::vector< RangeData > array
Vector of RangeData objects.
static Hypertable::Lib::Master::ClientPtr master_client
int64_t disk_read
Number of bytes read from disk while executing scan.
std::shared_ptr< Properties > PropertiesPtr
static CommitLogPtr root_log
void batch_update(std::vector< UpdateRecTable * > &updates, ClockT::time_point expire_time)
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p.
void drop_range(ResponseCallback *, const TableIdentifier &, const RangeSpec &)
static TablePtr rs_metrics_table
static uint64_t get()
Gets the cluster ID.
void acknowledge_load(Response::Callback::AcknowledgeLoad *cb, const vector< QualifiedRangeSpec > &specs)
uint32_t bloom_filter_maybes
static MetaLog::WriterPtr rsml_writer
void close_handle_ptr(SessionPtr hyperspace, uint64_t *handlep)
void dump_pseudo_table(ResponseCallback *cb, const TableIdentifier &table, const char *pseudo_table, const char *outfile)
Compatibility Macros for C/C++.
int response(const StatsRangeServer &stats)
const char * transfer_log
Full pathname of transfer log.
uint32_t cells_returned
Cells returned.
static PseudoTables * instance()
Creates and/or returns singleton instance of the PseudoTables class.
int64_t period_millis
Time period over which stats are computed.
ConnectionManagerPtr m_conn_manager
static int64_t log_prune_threshold_min
std::shared_ptr< ConnectionHandler > m_master_connection_handler
Declarations for HyperspaceTableCache.
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
std::shared_ptr< TableInfoMap > TableInfoMapPtr
Shared smart pointer to TableInfoMap.
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
void dump_garbage_tracker_statistics(std::ofstream &out)
Prints human-readable representation of garbage tracker state to an output stream.
static Hypertable::FilesystemPtr dfs
void load_range(ResponseCallback *, const TableIdentifier &, const RangeSpec &, const RangeState &, bool needs_compaction)
size_t length() const
Retrieves the length of the serialized string.
void add(const TableIdentifier &table, SerializedKey &key, ByteString &value)
void verify_schema(TableInfoPtr &, uint32_t generation, const TableSchemaMap *table_schemas=0)
static int32_t get_drive_count()
Returns the number of drives.
TableInfoPtr table_info
TableInfo object for destination table.
void phantom_prepare_ranges(ResponseCallback *, int64_t op_id, const String &location, int32_t plan_generation, const vector< QualifiedRangeSpec > &ranges)
Declarations for IndexUpdater.
static Hypertable::RangeLocatorPtr range_locator
void decode_table_id(const uint8_t **bufp, size_t *remainp, TableIdentifier *tid)
static int64_t cellstore_target_size_min
uint64_t shadow_cache_memory
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
NameIdMapperPtr m_namemap
Table name-to-ID mapper
QueryCachePtr m_query_cache
static int64_t range_split_size
Lib::Master::ClientPtr m_master_client
int32_t compactions_major
Context record for update request passed into UpdatePipeline.
bool has(const String &name) const
Check whether a sub-property exists.
A structure to retrieve memory statistics (RAM size, used size, free size etc)
This class is used to generate and deliver standard responses back to a client.
const uint8_t * ptr
The pointer to the serialized data.
uint32_t sync_count
Sync count.
static Schema * new_instance(const std::string &buf)
Creates schema object from XML schema string.
TableIdentifier id
Table identifier for destination table.
#define HT_FATALF(msg,...)
#define HT_DEBUGF(msg,...)
const std::string & get_name() const
Gets column family name.
static Hypertable::MaintenanceQueuePtr maintenance_queue
std::shared_ptr< Reader > ReaderPtr
Smart pointer to Reader.
void replay_load_range(TableInfoMap &replay_map, MetaLogEntityRangePtr &range_entity)
long long int Lld
Shortcut for printf formats.
static bool ignore_clock_skew_errors
static std::string log_dir
static void get(Status &status)
Gets persistent status.
void legacy_decode(const uint8_t **bufp, size_t *remainp, BalancePlan *plan)
void compact(ResponseCallback *, const TableIdentifier &, const char *row, int32_t flags)
ColumnFamilySpec * get_column_family(const std::string &name)
Gets a column family specification given its name.
Helper class to access parts of the properties.
std::shared_ptr< MetaLogEntityRange > MetaLogEntityRangePtr
Smart pointer to MetaLogEntityRange.
bool empty()
Determines if map is empty.
static CommitLogPtr system_log
void drop_table(ResponseCallback *cb, const TableIdentifier &table)
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
static int32_t metrics_interval
void get(const String &table_id, TableInfoPtr &info)
Gets the TableInfo object for a table, creating one if not found.
std::shared_ptr< TableInfo > TableInfoPtr
Smart pointer to TableInfo.
ApplicationQueuePtr m_app_queue
uint32_t cached_cells_returned
Cached cells returned.
Declarations for MaintenanceQueue This file contains the type declarations for the MaintenanceQueue...
MetricsCollectorGangliaPtr m_ganglia_collector
Ganglia metrics collector.
const char * column_family
uint64_t memory_allocated
#define HT_INFOF(msg,...)
static int32_t failover_timeout
String name
File or directory name.
Schema * cellstore_index
Schema of cellstore.index pseudo table.
uint32_t cells_scanned
Cells scanned.
#define HT_THROWF(_code_, _fmt_,...)
Provides access to internal components of opaque key.
time_t m_next_metrics_update
std::shared_ptr< Range > RangePtr
Smart pointer to Range.
AccessGroup::MaintenanceData * agdata
Random number generator for int32, int64, double and ascii arrays.
static int64_t range_metadata_split_size
uint8_t * base
Pointer to the allocated memory buffer.
void get(Code *code, std::string &text) const
Gets status code and text.
size_t fill() const
Returns the size of the used portion.
#define HT_FAILURE_SIGNALLED(_label_)
std::shared_ptr< CommitLog > CommitLogPtr
Smart pointer to CommitLog.
bool FillScanBlock(MergeScannerRangePtr &scanner, DynamicBuffer &dbuf, uint32_t *cell_count, int64_t buffer_size)
Fills a block of scan results to be sent back to client.
uint64_t shadow_cache_memory
static int64_t cellstore_target_size_max
bool lookup(const String &table_id, TableInfoPtr &info)
Returns the TableInfo object for a given table.
static int32_t access_group_max_mem
std::shared_ptr< PhantomRange > PhantomRangePtr
Shared smart pointer to PhantomRange.
void replay_fragments(ResponseCallback *, int64_t op_id, const String &location, int32_t plan_generation, int32_t type, const vector< int32_t > &fragments, const RangeServerRecovery::ReceiverPlan &receiver_plan, int32_t replay_timeout)
RowIntervals row_intervals
A timer class to keep timeout states across AsyncComm related calls.
static String install_dir
The installation directory.
This is a generic exception class for Hypertable.
Holds client update request and error state.
uint64_t bloom_filter_memory
std::shared_ptr< MetaLogEntityRemoveOkLogs > MetaLogEntityRemoveOkLogsPtr
Smart pointer to MetaLogEntityRemoveOkLogs.
void get_statistics(Response::Callback::GetStatistics *cb, const std::vector< SystemVariable::Spec > &specs, uint64_t generation)
void get_stats(uint64_t *max_memoryp, uint64_t *available_memoryp, uint64_t *accessesp, uint64_t *hitsp)
static int32_t cell_cache_scanner_cache_size
void enable_window(bool enable)
Enables or disables this time window.
void phantom_update(Response::Callback::PhantomUpdate *, const String &location, int32_t plan_generation, const QualifiedRangeSpec &range, int32_t fragment, EventPtr &event)
static bool range_initialization_complete
static const char * END_ROOT_ROW
static CommitLogPtr user_log
MaintenanceSchedulerPtr m_maintenance_scheduler
Smart pointer to maintenance scheduler.
void clear()
Clears the map.
virtual bool get(Key &key, ByteString &value)=0
#define HT_MAYBE_FAIL_X(_label_, _exp_)
static LoadStatisticsPtr load_statistics
Declarations for StatusPersister.
static Hypertable::FilesystemPtr log_dfs
Qualified (with table identifier) range specification.
std::shared_ptr< RangeServer > RangeServerPtr
Shared smart pointer to RangeServer.
bool scan_and_filter_rows
#define HT_ERRORF(msg,...)
void status(Response::Callback::Status *cb)
Declarations for CommitLog.
void dump(ResponseCallback *, const char *, bool)
static Hypertable::PseudoTables * pseudo_tables
Create file if it does not exist.
void phantom_commit_ranges(ResponseCallback *, int64_t op_id, const String &location, int32_t plan_generation, const vector< QualifiedRangeSpec > &ranges)
static std::string type_str(int type)
#define HT_ON_OBJ_SCOPE_EXIT(...)
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
static bool ignore_cells_with_clock_skew
void update(Response::Callback::Update *cb, uint64_t cluster_id, const TableIdentifier &table, uint32_t count, StaticBuffer &buffer, uint32_t flags)
Inserts data into a table.
uint64_t bytes_returned
Bytes returned.
uint64_t bloom_filter_maybes
static TimeWindow low_activity_time
void get_locations(StringSet &locations) const
int64_t cells_scanned
Number of cell scanned while executing scan.
uint64_t bloom_filter_accesses
CellIntervals cell_intervals
int64_t cells_returned
Number of cell returned while executing scan.
Declarations for LocationInitializer.
#define HT_MAYBE_FAIL(_label_)
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
void dump_keys(std::ofstream &out)
void relinquish_range(ResponseCallback *, const TableIdentifier &, const RangeSpec &)
Encapsulates decomposed key and value.
std::shared_ptr< ApplicationQueue > ApplicationQueuePtr
Shared smart pointer to ApplicationQueue object.
static Hypertable::ApplicationQueuePtr app_queue
System information and statistics based on libsigar.
String extensions and helpers: sets, maps, append operators etc.
Holds pointers to a Range and associated Range::MaintenanceData.
static const MemStat & mem_stat()
Retrieves updated Memory statistics (see SystemInfo.h)
const char * column_family
#define HT_THROW(_code_, _msg_)
const char * version_string()
static RangesPtr get_ranges()
EventPtr & event()
Get smart pointer to event object that triggered the request.
Manages live range map and set of log names that can be safely removed.
Wrapper for RangeSpec providing member storage.
void create_scanner(Response::Callback::CreateScanner *, const TableIdentifier &, const RangeSpec &, const ScanSpec &, QueryCache::Key *)
uint64_t total_buffer_size
Range::MaintenanceData * data
Pointer to maintenance data for range.
static constexpr const char * SERVER_IS_COMING_UP
uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 32-bit.
Declarations for ClusterId.
static Hypertable::FileBlockCache * block_cache
static bool row_size_unlimited
int64_t balance()
Return total range server memory used.
Declaration for FillScanBlock.
int64_t bytes_returned
Number of bytes returned while executing scan.
Holds vector of RangeData objects and memory arena.
Address abstraction to hold either proxy name or IPv4:port address.
int response(const std::map< QualifiedRangeSpec, int32_t > &error_map)
void fetch_scanblock(Response::Callback::CreateScanner *, int32_t scanner_id)
Declarations for MergeScannerRange.
std::shared_ptr< Definition > DefinitionPtr
Smart pointer to Definition.
static int64_t range_maximum_size
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
int32_t compactions_minor
static CommitLogPtr metadata_log
int code() const
Returns the error code.
std::shared_ptr< ScanContext > ScanContextPtr
RangePtr range
Pointer to Range.
LogReplayBarrierPtr m_log_replay_barrier
void heapcheck(ResponseCallback *, const char *)
Executes user-defined functions when leaving the current scope.
std::map< const char *, int32_t, LtCstr > CstrToInt32Map
STL map from c-style string to int32_t.
std::shared_ptr< Ranges > RangesPtr
Smart pointer to Ranges.
RangeServer recovery receiver plan.
void update_schema(ResponseCallback *, const TableIdentifier &, const char *)