59 struct RangeDataAscending {
69 : m_queue(queue), m_live_map(live_map), m_start_offset(0),
70 m_initialized(false), m_low_memory_mode(false) {
81 m_merging_delay = get_i32(
"Hypertable.RangeServer.Maintenance.MergingCompaction.Delay");
83 std::numeric_limits<int32_t>::max());
86 std::numeric_limits<int32_t>::max());
94 lock_guard<mutex> lock(
m_mutex);
99 function<bool(Range *)> drop_predicate =
100 [table](
Range *r) ->
bool {
return r->get_table_id().compare(table.
id)==0;};
105 lock_guard<mutex> lock(
m_mutex);
113 Ranges ranges_prioritized;
120 int32_t priority = 1;
122 bool do_scheduling =
true;
124 function<bool(RangeData &)> in_blacklist =
127 auto now = chrono::steady_clock::now();
147 do_scheduling =
false;
153 trace_str +=
String(
"low_memory\t") + (low_memory ?
"true" :
"false") +
"\n";
158 trace_str +=
format(
"excess\t%lld\n", (
Lld)excess);
161 trace_str +=
format(
"memory_state.balance\t%lld\n", (
Lld)memory_state.
balance);
162 trace_str +=
format(
"memory_state.limit\t%lld\n", (
Lld)memory_state.
limit);
163 trace_str +=
format(
"memory_state.needed\t%lld\n", (
Lld)memory_state.
needed);
167 uint64_t max_memory = 0;
168 uint64_t available_memory = 0;
169 uint64_t accesses = 0;
174 trace_str +=
format(
"FileBlockCache-max_memory\t%llu\n", (
Llu)max_memory);
175 trace_str +=
format(
"FileBlockCache-available_memory\t%llu\n", (
Llu)available_memory);
176 trace_str +=
format(
"FileBlockCache-accesses\t%llu\n", (
Llu)accesses);
177 trace_str +=
format(
"FileBlockCache-hits\t%llu\n", (
Llu)hits);
188 m_live_map->get_ranges(ranges, &remove_ok_logs);
189 time_t current_time = time(0);
196 ?
"Entering" :
"Exiting");
200 format(
"Within low activity window = %s\n",
204 for (
auto &rd : ranges.
array)
205 rd.data = rd.range->get_maintenance_data(ranges.
arena, current_time, flags);
207 if (ranges.
array.empty()) {
215 RangesPtr ranges_copy = make_shared<Ranges>();
216 ranges_copy->array = ranges.
array;
217 for (
size_t i=0; i<ranges.
array.size(); i++) {
218 ranges_copy->array[i].data =
221 ranges_copy->array[i].data->agdata = 0;
231 std::vector<RangeData> rotated;
232 rotated.reserve(ranges.
array.size());
234 rotated.insert(rotated.end(), iter, ranges.
array.end());
235 rotated.insert(rotated.end(), ranges.
array.begin(), iter);
236 ranges.
array.swap(rotated);
243 lock_guard<mutex> lock(
m_mutex);
250 int64_t block_index_memory = 0;
251 int64_t bloom_filter_memory = 0;
252 int64_t cell_cache_memory = 0;
253 int64_t shadow_cache_memory = 0;
254 int64_t not_acknowledged = 0;
267 trace_str +=
format(
"before revision_root\t%llu\n", (
Llu)revision_root);
268 trace_str +=
format(
"before revision_metadata\t%llu\n", (
Llu)revision_metadata);
269 trace_str +=
format(
"before revision_system\t%llu\n", (
Llu)revision_system);
270 trace_str +=
format(
"before revision_user\t%llu\n", (
Llu)revision_user);
273 for (
auto &rd : ranges.
array) {
276 rd.data->priority = priority++;
280 if (!rd.data->load_acknowledged)
283 for (ag_data = rd.data->agdata; ag_data; ag_data = ag_data->
next) {
287 for (cs_data = ag_data->
csdata; cs_data; cs_data = cs_data->
next) {
294 if (rd.range->is_root()) {
298 else if (rd.data->is_metadata) {
302 else if (rd.data->is_system) {
315 trace_str +=
format(
"after revision_root\t%llu\n", (
Llu)revision_root);
316 trace_str +=
format(
"after revision_metadata\t%llu\n", (
Llu)revision_metadata);
317 trace_str +=
format(
"after revision_system\t%llu\n", (
Llu)revision_system);
318 trace_str +=
format(
"after revision_user\t%llu\n", (
Llu)revision_user);
322 Global::root_log->purge(revision_root, remove_ok_logs, removed_logs, &trace_str);
331 Global::user_log->purge(revision_user, remove_ok_logs, removed_logs, &trace_str);
334 if (!removed_logs.empty()) {
342 int64_t total_memory = block_cache_memory + block_index_memory + bloom_filter_memory + cell_cache_memory + shadow_cache_memory +
m_query_cache_memory;
343 double block_cache_pct = ((double)block_cache_memory / (
double)total_memory) * 100.0;
344 double block_index_pct = ((double)block_index_memory / (
double)total_memory) * 100.0;
345 double bloom_filter_pct = ((double)bloom_filter_memory / (
double)total_memory) * 100.0;
346 double cell_cache_pct = ((double)cell_cache_memory / (
double)total_memory) * 100.0;
347 double shadow_cache_pct = ((double)shadow_cache_memory / (
double)total_memory) * 100.0;
348 double query_cache_pct = ((double)m_query_cache_memory / (
double)total_memory) * 100.0;
350 HT_INFOF(
"Memory Statistics (MB): VM=%.2f, RSS=%.2f, tracked=%.2f, computed=%.2f limit=%.2f",
354 HT_INFOF(
"Memory Allocation: BlockCache=%.2f%% BlockIndex=%.2f%% "
355 "BloomFilter=%.2f%% CellCache=%.2f%% ShadowCache=%.2f%% "
357 block_cache_pct, block_index_pct, bloom_filter_pct,
358 cell_cache_pct, shadow_cache_pct, query_cache_pct);
362 trace_str +=
"\nScheduling Decisions:\n";
365 debug ? &trace_str : 0);
370 auto schedule_time = chrono::steady_clock::now();
372 if (not_acknowledged) {
373 HT_INFOF(
"Found load_acknowledged=false in %d ranges", (
int)not_acknowledged);
376 bool uninitialized_range_seen {};
381 uint32_t level = 0, priority = 0;
382 for (
auto &rd : ranges.
array) {
383 if (!rd.data->initialized)
384 uninitialized_range_seen =
true;
399 lock_guard<mutex> lock(
m_mutex);
406 ranges_prioritized.
array.reserve( ranges.
array.size() );
407 for (
auto &rd : ranges.
array) {
408 if (rd.data->priority > 0)
409 ranges_prioritized.
array.push_back(rd);
411 struct RangeDataAscending ordering;
412 sort(ranges_prioritized.
array.begin(), ranges_prioritized.
array.end(), ordering);
414 int32_t merges_created = 0;
415 int32_t initialization_created = 0;
418 for (
auto &rd : ranges_prioritized.
array) {
419 if (!rd.data->initialized) {
420 uninitialized_range_seen =
true;
424 level, rd.data->priority,
425 schedule_time, rd.range));
426 ++initialization_created;
432 schedule_time, rd.range));
437 schedule_time, rd.range));
443 schedule_time, rd.range);
444 if (!rd.data->needs_major_compaction) {
464 schedule_time, rd.range);
470 task->
add_subtask(cs_data->cs, cs_data->maintenance_flags);
495 if (rd.
range->is_root())
506 if (now -
m_last_check >= chrono::milliseconds(60000)) {
516 const String &header_str) {
520 out.open(output_fname.c_str());
521 out << header_str <<
"\n";
522 for (
auto &rd : ranges.
array) {
523 out << *rd.data <<
"\n";
524 for (ag_data = rd.data->agdata; ag_data; ag_data = ag_data->
next)
525 out << *ag_data <<
"\n";
529 out <<
"RemoveOkLogs:\n";
530 for (
const auto &log : logs)
void exclude(const TableIdentifier &table)
Excludes a table from maintenance scheduling.
std::set< String > StringSet
STL Set managing Strings.
void remove_if(Func pred)
Template function for removing ranges that satisfy a predicate.
int64_t earliest_cached_revision
static void set_ranges(RangesPtr &r)
Recompute CellStore merge run to test if merging compaction needed.
std::set< std::string > m_table_blacklist
Set of table IDs to exclude from maintenance scheduling.
void add_subtask(const void *obj, int flags)
std::string String
A String is simply a typedef to std::string.
static bool unlink(const String &fname)
Unlinks (deletes) a file or directory.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Declarations for TimeWindow.
int32_t m_maintenance_queue_worker_count
long long unsigned int Llu
Shortcut for printf formats.
int32_t m_merges_per_interval
static const ProcStat & proc_stat()
Retrieves updated Process statistics (see SystemInfo.h)
static int64_t memory_limit_ensure_unused_current
MaintenancePrioritizerLogCleanup m_prioritizer_log_cleanup
static int64_t memory_limit
static bool exists(const String &fname)
Checks if a file or directory exists.
bool low_memory_mode()
Checks if low memory maintenance prioritization is enabled.
MaintenancePrioritizer * m_prioritizer
CellStoreMaintenanceData * csdata
int32_t m_maintenance_interval
int16_t maintenance_flags
int64_t m_query_cache_memory
std::chrono::steady_clock::time_point m_last_check
static MetaLogEntityRemoveOkLogsPtr remove_ok_logs
Relinquish - log installed.
Represents a table row range.
CellStoreMaintenanceData * next
int32_t m_low_memory_limit_percentage
int32_t m_move_compactions_per_interval
std::shared_ptr< MaintenanceQueue > MaintenanceQueuePtr
Smart pointer to MaintenanceQueue.
static Hypertable::MemoryTracker * memory_tracker
static std::vector< MetaLog::EntityTaskPtr > work_queue
bool debug_signal_file_exists(std::chrono::steady_clock::time_point now)
Checks to see if scheduler debug signal file exists.
Declarations for MaintenanceScheduler.
ByteArena arena
Memory arena.
std::vector< RangeData > array
Vector of RangeData objects.
static CommitLogPtr root_log
static MetaLog::WriterPtr rsml_writer
Compatibility Macros for C/C++.
CellStore::IndexMemoryStats index_stats
int32_t m_initialization_per_interval
void include(const TableIdentifier &table)
Includes a table for maintenance scheduling.
int64_t block_index_memory
void schedule()
Schedules maintenance.
void write_debug_output(std::chrono::steady_clock::time_point now, Ranges &ranges, const String &header_str)
Writes debugging output and removes signal file.
std::shared_ptr< TableInfoMap > TableInfoMapPtr
Shared smart pointer to TableInfoMap.
static Hypertable::MaintenanceQueuePtr maintenance_queue
virtual void prioritize(std::vector< RangeData > &range_data, MemoryState &memory_state, int32_t priority, String *trace)=0
long long int Lld
Shortcut for printf formats.
static const int64_t TIMESTAMP_MAX
std::chrono::steady_clock::time_point m_last_low_memory
static CommitLogPtr system_log
uint64_t shadow_cache_size
#define HT_INFOF(msg,...)
static String install_dir
The installation directory.
void get_stats(uint64_t *max_memoryp, uint64_t *available_memoryp, uint64_t *accessesp, uint64_t *hitsp)
TableInfoMapPtr m_live_map
static bool range_initialization_complete
static CommitLogPtr user_log
static LoadStatisticsPtr load_statistics
MaintenanceScheduler(MaintenanceQueuePtr &queue, TableInfoMapPtr &live_map)
Constructor.
int get_level(RangeData &rd)
static TimeWindow low_activity_time
Declarations for MaintenanceFlag This file contains declarations that are part of the MaintenanceFlag...
bool major_compaction(int flags)
Tests the COMPACT_MAJOR bit of flags
int64_t bloom_filter_memory
System information and statistics based on libsigar.
Holds pointers to a Range and associated Range::MaintenanceData.
static const MemStat & mem_stat()
Retrieves updated Memory statistics (see SystemInfo.h)
Range::MaintenanceData * data
Pointer to maintenance data for range.
std::mutex m_mutex
Mutex to serialize concurrent access
static Hypertable::FileBlockCache * block_cache
int64_t balance()
Return total range server memory used.
Holds vector of RangeData objects and memory arena.
bool gc_compaction(int flags)
Tests the COMPACT_GC bit of flags
bool merging_compaction(int flags)
Tests the COMPACT_MERGING bit of flags
bool minor_compaction(int flags)
Tests the COMPACT_MINOR bit of flags
static CommitLogPtr metadata_log
RangePtr range
Pointer to Range.
bool m_low_memory_prioritization
std::shared_ptr< Ranges > RangesPtr
Smart pointer to Ranges.