0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
MaintenancePrioritizer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 #include "Common/Compat.h"
22 #include "Common/Config.h"
23 #include "Common/ScopeGuard.h"
24 #include "Common/StringExt.h"
25 
26 #include <cassert>
27 #include <iostream>
28 
29 #include "Global.h"
30 #include "MaintenanceFlag.h"
31 #include "MaintenancePrioritizer.h"
32 
33 using namespace Hypertable;
34 using namespace Hypertable::Config;
35 using namespace std;
36 
37 namespace {
38 
39  class StatsRec {
40  public:
41  StatsRec(AccessGroup::MaintenanceData *agdata_,
42  Range::MaintenanceData *rangedata_) :
43  agdata(agdata_), rangedata(rangedata_) { }
45  Range::MaintenanceData *rangedata;
46  };
47 
48  struct StatsRecOrderingDescending {
49  bool operator()(const StatsRec &x, const StatsRec &y) const {
50  if (x.agdata->mem_used == y.agdata->mem_used)
51  return x.agdata->mem_used > y.agdata->mem_used;
52  return x.agdata->mem_used > y.agdata->mem_used;
53  }
54  };
55 
56  struct ShadowCacheSortOrdering {
57  bool operator()(const AccessGroup::CellStoreMaintenanceData *x,
59  if (x->shadow_cache_size > 0 && y->shadow_cache_size > 0) {
60  if (x->shadow_cache_hits > 0 || y->shadow_cache_hits > 0)
61  return x->shadow_cache_hits < y->shadow_cache_hits;
62  return x->shadow_cache_ecr < y->shadow_cache_ecr;
63  }
64  return x->shadow_cache_size > y->shadow_cache_size;
65  }
66  };
67 
68  struct CellStoreIndexSortOrdering {
69  bool operator()(const AccessGroup::CellStoreMaintenanceData *x,
73 
74  if (y_mem == 0 || x_mem == 0)
75  return x_mem > y_mem;
76 
77  uint64_t x_atime = std::max(x->index_stats.bloom_filter_access_counter,
79  uint64_t y_atime = std::max(y->index_stats.bloom_filter_access_counter,
81 
82  return x_atime < y_atime;
83  }
84  };
85 
86 }
87 
88 void
90  std::vector<RangeData> &range_data, int32_t &priority) {
91  if (m_initialization_complete)
92  return;
93 
94  for (size_t i=0; i<range_data.size(); i++) {
95  if (!range_data[i].data->initialized)
96  m_uninitialized_ranges_seen = true;
97  if (range_data[i].data->busy || range_data[i].data->priority)
98  continue;
99  if (!range_data[i].data->initialized)
100  range_data[i].data->priority = priority++;
101  }
102 }
103 
104 
105 
106 bool
108  MemoryState &memory_state, int32_t &priority, String *trace) {
111  bool in_progress;
112 
113  for (size_t i=0; i<range_data.size(); i++) {
114 
115  if (range_data[i].data->busy) {
116  if (trace)
117  *trace += format("%d busy %s\n", __LINE__,
118  range_data[i].range->get_name().c_str());
119  continue;
120  }
121 
122  in_progress = false;
123  if (range_data[i].data->state == RangeState::RELINQUISH_LOG_INSTALLED) {
124  if (trace)
125  *trace += format("%d mid-relinquish %s (state=%d, priority=%d"
126  ", mem_needed=%lld)\n", __LINE__,
127  range_data[i].range->get_name().c_str(),
128  range_data[i].data->state, priority,
129  (Lld)memory_state.needed);
130  HT_INFOF("Adding maintenance for range %s because mid-relinquish(%d)",
131  range_data[i].range->get_name().c_str(), range_data[i].data->state);
132  range_data[i].data->maintenance_flags |= MaintenanceFlag::RELINQUISH;
133  in_progress = true;
134  }
135  else if (range_data[i].data->state == RangeState::SPLIT_LOG_INSTALLED ||
136  range_data[i].data->state == RangeState::SPLIT_SHRUNK) {
137  if (trace)
138  *trace += format("%d mid-split %s (state=%d, priority=%d"
139  ", mem_needed=%lld)\n", __LINE__,
140  range_data[i].range->get_name().c_str(),
141  range_data[i].data->state, priority,
142  (Lld)memory_state.needed);
143  HT_INFOF("Adding maintenance for range %s because mid-split(%d)",
144  range_data[i].range->get_name().c_str(),
145  range_data[i].data->state);
146  range_data[i].data->maintenance_flags |= MaintenanceFlag::SPLIT;
147  in_progress = true;
148  }
149 
150  if (in_progress) {
151  range_data[i].data->priority = priority++;
152  if (range_data[i].data->state == RangeState::RELINQUISH_LOG_INSTALLED ||
153  range_data[i].data->state == RangeState::SPLIT_LOG_INSTALLED) {
154  for (ag_data = range_data[i].data->agdata; ag_data; ag_data = ag_data->next) {
155  memory_state.decrement_needed( ag_data->mem_allocated );
156  for (cs_data=ag_data->csdata; cs_data; cs_data=cs_data->next)
157  memory_state.decrement_needed( cs_data->index_stats.bloom_filter_memory +
159  cs_data->shadow_cache_size );
160  }
161  }
162  }
163  }
164  return memory_state.need_more();
165 }
166 
167 bool
169  MemoryState &memory_state, int32_t &priority, String *trace) {
172  int64_t disk_total, mem_total;
173 
174  for (size_t i=0; i<range_data.size(); i++) {
175 
176  if (range_data[i].data->busy || range_data[i].data->priority) {
177  if (range_data[i].data->busy && trace)
178  *trace += format("%d busy %s\n", __LINE__,
179  range_data[i].range->get_name().c_str());
180  continue;
181  }
182 
183  mem_total = 0;
184  disk_total = 0;
185 
186  // compute disk and memory totals
187  for (ag_data = range_data[i].data->agdata; ag_data; ag_data = ag_data->next) {
188  disk_total += ag_data->disk_estimate;
189  mem_total += ag_data->mem_allocated;
190  for (cs_data=ag_data->csdata; cs_data; cs_data=cs_data->next)
191  mem_total +=
194  cs_data->shadow_cache_size;
195  }
196 
197  if (range_data[i].range->get_error() != Error::RANGESERVER_ROW_OVERFLOW) {
198  if (range_data[i].data->relinquish) {
199  if (trace)
200  *trace += format("%d relinquish %s (priority=%d, mem_needed=%lld)\n",
201  __LINE__, range_data[i].range->get_name().c_str(),
202  priority, (Lld)memory_state.needed);
203  HT_INFOF("Adding maintenance for range %s because marked for relinquish(%d)",
204  range_data[i].range->get_name().c_str(), range_data[i].data->state);
205  memory_state.decrement_needed(mem_total);
206  range_data[i].data->priority = priority++;
207  range_data[i].data->maintenance_flags |= MaintenanceFlag::RELINQUISH;
208  }
209  else if (range_data[i].data->needs_split && !range_data[i].range->is_root()) {
210  if (trace)
211  *trace += format("%d disk_total %lld exceeds threshold %s "
212  " (priority=%d, mem_needed=%lld)\n",
213  __LINE__, (Lld)disk_total,
214  range_data[i].range->get_name().c_str(),
215  priority, (Lld)memory_state.needed);
216  HT_INFOF("Adding maintenance for range %s because disk_total %d exceeds split threshold",
217  range_data[i].range->get_name().c_str(), (int)disk_total);
218  memory_state.decrement_needed(mem_total);
219  range_data[i].data->priority = priority++;
220  range_data[i].data->maintenance_flags |= MaintenanceFlag::SPLIT;
221  }
222  }
223  }
224  return memory_state.need_more();
225 }
226 
227 bool
229  CommitLogPtr &log, int64_t prune_threshold, MemoryState &memory_state,
230  int32_t &priority, String *trace) {
231  CommitLog::CumulativeSizeMap cumulative_size_map;
232  CommitLog::CumulativeSizeMap::iterator iter;
234 
235  // First do log cleanup compactions
236 
237  log->load_cumulative_size_map(cumulative_size_map);
238 
239  for (size_t i=0; i<range_data.size(); i++) {
240 
241  if (range_data[i].data->busy) {
242  if (trace)
243  *trace += format("%d busy %s\n", __LINE__,
244  range_data[i].range->get_name().c_str());
245  continue;
246  }
247 
248  for (ag_data = range_data[i].data->agdata; ag_data; ag_data = ag_data->next) {
249 
250  if (ag_data->earliest_cached_revision != TIMESTAMP_MAX && !cumulative_size_map.empty()) {
251 
252  iter = cumulative_size_map.lower_bound(ag_data->earliest_cached_revision);
253 
254  if (iter == cumulative_size_map.end()) {
255  String errstr;
256  for (iter = cumulative_size_map.begin(); iter != cumulative_size_map.end(); iter++) {
257  errstr += format("PERROR frag-%d\trevision\t%lld\n",
258  (int)(*iter).second.fragno, (Lld)(*iter).first);
259  errstr += format("PERROR frag-%d\tdistance\t%lld\n",
260  (*iter).second.fragno, (Lld)(*iter).second.distance);
261  errstr += format("PERROR frag-%d\tsize\t%lld\n",
262  (*iter).second.fragno, (Lld)(*iter).second.cumulative_size);
263  }
264  errstr += format("PERROR revision %lld not found in map\n",
265  (Lld)ag_data->earliest_cached_revision);
266  cout << flush << errstr << flush;
267  if (trace)
268  *trace += format("%d THIS SHOULD NEVER HAPPEN, ecr=%lld\n",
269  __LINE__, (Lld)ag_data->earliest_cached_revision);
270  continue;
271  }
272 
273  if ((*iter).second.cumulative_size > prune_threshold) {
274  if (ag_data->mem_used > 0) {
275  if (trace)
276  *trace+=format("%d prune compact %s (cumulative_size=%lld, prune_"
277  "threshold=%lld, priority=%d, mem_needed=%lld)\n",
278  __LINE__, ag_data->ag->get_full_name(),
279  (Lld)(*iter).second.cumulative_size,
280  (Lld)prune_threshold,
281  range_data[i].data->priority ? range_data[i].data->priority : priority,
282  (Lld)memory_state.needed);
283  if (range_data[i].data->priority == 0)
284  range_data[i].data->priority = priority++;
285  if (memory_state.need_more()) {
286  range_data[i].data->maintenance_flags |= MaintenanceFlag::COMPACT|MaintenanceFlag::MEMORY_PURGE;
288  memory_state.decrement_needed(ag_data->mem_allocated);
289  }
290  else {
291  range_data[i].data->maintenance_flags |= MaintenanceFlag::COMPACT;
293  }
294  }
295  }
296  }
297  }
298  }
299 
300  // Other compactions
301 
302  for (size_t i=0; i<range_data.size(); i++) {
303 
304  if (range_data[i].data->busy)
305  continue;
306 
307  for (ag_data = range_data[i].data->agdata; ag_data; ag_data = ag_data->next) {
308 
309  // Maintenance already scheduled for this AG
310  if (ag_data->maintenance_flags != 0)
311  continue;
312 
313  // Check for manually scheduled compactions
314  if (range_data[i].data->compaction_type_needed) {
315  range_data[i].data->maintenance_flags |= MaintenanceFlag::COMPACT;
316  ag_data->maintenance_flags |= range_data[i].data->compaction_type_needed;
317  if (range_data[i].data->priority == 0)
318  range_data[i].data->priority = priority++;
319  if (memory_state.need_more())
320  memory_state.decrement_needed(ag_data->mem_allocated);
321  }
322  // Schedule compaction for AGs that need garbage collection
323  else if (ag_data->gc_needed) {
324  range_data[i].data->maintenance_flags |= MaintenanceFlag::COMPACT;
326  if (range_data[i].data->priority == 0)
327  range_data[i].data->priority = priority++;
328  if (trace)
329  *trace +=format("%d GC needed %s (priority=%d, mem_needed=%lld)\n",
330  __LINE__, ag_data->ag->get_full_name(),
331  range_data[i].data->priority,
332  (Lld)memory_state.needed);
333 
334  }
335  // Compact LARGE CellCaches
336  else if (!ag_data->in_memory && ag_data->mem_used > Global::access_group_max_mem) {
337  if (memory_state.need_more()) {
338  range_data[i].data->maintenance_flags |= MaintenanceFlag::COMPACT|MaintenanceFlag::MEMORY_PURGE;
340  memory_state.decrement_needed(ag_data->mem_allocated);
341  }
342  else {
343  range_data[i].data->maintenance_flags |= MaintenanceFlag::COMPACT;
344  ag_data->maintenance_flags |=
346  }
347  if (range_data[i].data->priority == 0)
348  range_data[i].data->priority = priority++;
349  if (trace)
350  *trace += format("%d large CellCache %s (mem_used=%lld, "
351  "priority=%d, mem_needed=%lld)\n", __LINE__,
352  ag_data->ag->get_full_name(),
353  (Lld)ag_data->mem_used, range_data[i].data->priority,
354  (Lld)memory_state.needed);
355  }
356  // Merging compactions
357  else if (ag_data->needs_merging) {
358  if (range_data[i].data->priority == 0)
359  range_data[i].data->priority = priority++;
360  range_data[i].data->maintenance_flags |= MaintenanceFlag::COMPACT;
362  // If it's an "end merge" then the cell cache will be included so
363  // decrement the memory occupied by the cell cache
364  if (ag_data->end_merge && memory_state.need_more())
365  memory_state.decrement_needed(ag_data->mem_allocated);
366  if (trace)
367  *trace += format("%d needs merging %s (priority=%d, "
368  "mem_needed=%lld)\n", __LINE__,
369  ag_data->ag->get_full_name(),
370  range_data[i].data->priority,
371  (Lld)memory_state.needed);
372  }
373  }
374  }
375 
376  return memory_state.need_more();
377 }
378 
379 
380 bool
381 MaintenancePrioritizer::purge_shadow_caches(std::vector<RangeData> &range_data,
382  MemoryState &memory_state, int32_t &priority, String *trace) {
383  Range::MaintenanceData *range_maintenance_data;
386  std::vector<AccessGroup::CellStoreMaintenanceData *> csmd;
387 
388  csmd.clear();
389  for (size_t i=0; i<range_data.size(); i++) {
390  if (range_data[i].data->busy || range_data[i].data->priority)
391  continue;
392  for (ag_data = range_data[i].data->agdata; ag_data; ag_data = ag_data->next) {
393  ag_data->user_data = (void *)range_data[i].data;
394  for (cs_data=ag_data->csdata; cs_data; cs_data=cs_data->next) {
395  if (cs_data->shadow_cache_size > 0) {
396  cs_data->user_data = (void *)ag_data;
397  csmd.push_back(cs_data);
398  }
399  }
400  }
401  }
402 
403  {
404  struct ShadowCacheSortOrdering ordering;
405  sort(csmd.begin(), csmd.end(), ordering);
406  }
407 
408  for (size_t i=0; i<csmd.size(); i++) {
409  ag_data = (AccessGroup::MaintenanceData *)(csmd[i]->user_data);
411  range_maintenance_data = (Range::MaintenanceData *)(ag_data->user_data);
412  range_maintenance_data->maintenance_flags |= MaintenanceFlag::MEMORY_PURGE;
413  if (range_maintenance_data->priority == 0)
414  range_maintenance_data->priority = priority++;
415  if (trace)
416  *trace += format("%d shadow cache purge %s (priority=%d, "
417  "mem_needed=%lld)\n", __LINE__,
418  ag_data->ag->get_full_name(),
419  range_data[i].data->priority,
420  (Lld)memory_state.needed);
421  csmd[i]->maintenance_flags |= MaintenanceFlag::MEMORY_PURGE_SHADOW_CACHE;
422  memory_state.decrement_needed(csmd[i]->shadow_cache_size);
423  if (!memory_state.need_more())
424  return false;
425  }
426  return true;
427 }
428 
429 
430 bool
431 MaintenancePrioritizer::purge_cellstore_indexes(std::vector<RangeData> &range_data,
432  MemoryState &memory_state, int32_t &priority, String *trace) {
433  Range::MaintenanceData *range_maintenance_data;
436  std::vector<AccessGroup::CellStoreMaintenanceData *> csmd;
437 
438  for (size_t i=0; i<range_data.size(); i++) {
439  if (range_data[i].data->busy ||
440  range_data[i].data->maintenance_flags & MaintenanceFlag::SPLIT)
441  continue;
442  for (ag_data = range_data[i].data->agdata; ag_data; ag_data = ag_data->next) {
443  ag_data->user_data = (void *)range_data[i].data;
444  for (cs_data=ag_data->csdata; cs_data; cs_data=cs_data->next) {
445  if (cs_data->index_stats.bloom_filter_memory > 0 ||
446  cs_data->index_stats.block_index_memory > 0) {
447  cs_data->user_data = (void *)ag_data;
448  csmd.push_back(cs_data);
449  }
450  }
451  }
452  }
453 
454  {
455  CellStoreIndexSortOrdering ordering;
456  sort(csmd.begin(), csmd.end(), ordering);
457  }
458 
459  int64_t memory_used = 0;
460  for (size_t i=0; i<csmd.size(); i++) {
461  ag_data = (AccessGroup::MaintenanceData *)(csmd[i]->user_data);
463  range_maintenance_data = (Range::MaintenanceData *)(ag_data->user_data);
464  range_maintenance_data->maintenance_flags |= MaintenanceFlag::MEMORY_PURGE;
465  memory_used = csmd[i]->index_stats.block_index_memory + csmd[i]->index_stats.bloom_filter_memory;
466  if (range_maintenance_data->priority == 0)
467  range_maintenance_data->priority = priority++;
468  if (trace)
469  *trace += format("%d cellstore index purge %s (priority=%d, "
470  "mem_needed=%lld)\n", __LINE__,
471  ag_data->ag->get_full_name(),
472  range_data[i].data->priority,
473  (Lld)memory_state.needed);
474  csmd[i]->maintenance_flags |= MaintenanceFlag::MEMORY_PURGE_CELLSTORE;
475  memory_state.decrement_needed(memory_used);
476  if (!memory_state.need_more())
477  return false;
478  }
479  return true;
480 }
481 
482 
483 namespace {
484 
485  struct CellCacheCompactionSortOrdering {
486  bool operator()(const AccessGroup::MaintenanceData *x,
487  const AccessGroup::MaintenanceData *y) const {
488  if (x->mem_used == y->mem_used)
489  return x->mem_used > y->mem_used;
490  return x->mem_used > y->mem_used;
491  }
492  };
493 }
494 
495 bool
496 MaintenancePrioritizer::compact_cellcaches(std::vector<RangeData> &range_data,
497  MemoryState &memory_state, int32_t &priority,
498  String *trace) {
500  std::vector<AccessGroup::MaintenanceData *> md;
501 
502  for (size_t i=0; i<range_data.size(); i++) {
503 
504  if (range_data[i].data->busy ||
505  range_data[i].data->maintenance_flags & MaintenanceFlag::SPLIT)
506  continue;
507 
508  for (ag_data = range_data[i].data->agdata; ag_data; ag_data = ag_data->next) {
510  !ag_data->in_memory && ag_data->mem_used > 0) {
511  ag_data->user_data = range_data[i].data;
512  md.push_back(ag_data);
513  }
514  }
515  }
516 
517  {
518  struct CellCacheCompactionSortOrdering ordering;
519  sort(md.begin(), md.end(), ordering);
520  }
521 
522  for (size_t i=0; i<md.size(); i++) {
523  if (((Range::MaintenanceData *)md[i]->user_data)->priority == 0)
524  ((Range::MaintenanceData *)md[i]->user_data)->priority = priority++;
525  if (trace)
526  *trace += format("%d minor compaction %s (priority=%d, "
527  "mem_needed=%lld)\n", __LINE__,
528  md[i]->ag->get_full_name(),
529  ((Range::MaintenanceData *)md[i]->user_data)->priority,
530  (Lld)memory_state.needed);
532  ((Range::MaintenanceData *)md[i]->user_data)->maintenance_flags |= MaintenanceFlag::COMPACT|MaintenanceFlag::MEMORY_PURGE;
533  memory_state.decrement_needed(md[i]->mem_allocated);
534  if (!memory_state.need_more())
535  return false;
536  }
537  return true;
538 }
bool compact_cellcaches(std::vector< RangeData > &range_data, MemoryState &memory_state, int32_t &priority, String *trace)
std::map< int64_t, CumulativeFragmentData > CumulativeSizeMap
Definition: CommitLog.h:74
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
bool purge_shadow_caches(std::vector< RangeData > &range_data, MemoryState &memory_state, int32_t &priority, String *trace)
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
CellStoreMaintenanceData * csdata
Definition: AccessGroup.h:86
STL namespace.
void schedule_initialization_operations(std::vector< RangeData > &range_data, int32_t &priority)
bool purge_cellstore_indexes(std::vector< RangeData > &range_data, MemoryState &memory_state, int32_t &priority, String *trace)
bool schedule_inprogress_operations(std::vector< RangeData > &range_data, MemoryState &memory_state, int32_t &priority, String *trace)
bool schedule_splits_and_relinquishes(std::vector< RangeData > &range_data, MemoryState &memory_state, int32_t &priority, String *trace)
Compatibility Macros for C/C++.
Hypertable definitions
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
static const int64_t TIMESTAMP_MAX
Definition: KeySpec.h:35
const char * get_full_name()
Definition: AccessGroup.h:207
Split - range shrunk.
Definition: RangeState.h:55
#define HT_INFOF(msg,...)
Definition: Logger.h:272
std::shared_ptr< CommitLog > CommitLogPtr
Smart pointer to CommitLog.
Definition: CommitLog.h:223
static int32_t access_group_max_mem
Definition: Global.h:88
Configuration settings.
Declarations for MaintenanceFlag This file contains declarations that are part of the MaintenanceFlag...
bool major_compaction(int flags)
Tests the COMPACT_MAJOR bit of flags
bool schedule_necessary_compactions(std::vector< RangeData > &range_data, CommitLogPtr &log, int64_t prune_threshold, MemoryState &memory_state, int32_t &priority, String *trace)
String extensions and helpers: sets, maps, append operators etc.
Executes user-defined functions when leaving the current scope.