0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Monitoring.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "Context.h"
25 #include "Monitoring.h"
26 #include "RangeServerConnection.h"
27 
28 #include <Common/Error.h>
29 #include <Common/FileUtils.h>
30 #include <Common/Logger.h>
31 #include <Common/Path.h>
32 #include <Common/md5.h>
33 
34 #include <boost/algorithm/string.hpp>
35 
36 #include <algorithm>
37 #include <cctype>
38 #include <cstdlib>
39 
40 extern "C" {
41 #include <unistd.h>
42 }
43 
44 using namespace Hypertable;
45 using namespace std;
46 
48  : m_context(context), m_last_server_count(0), m_disable_rrdtool(false) {
49  PropertiesPtr &props = m_context->props;
50 
52  m_disable_rrdtool = props->get_bool("Hypertable.Monitoring.Disable");
53  m_monitoring_interval = props->get_i32("Hypertable.Monitoring.Interval");
54  Path data_dir = props->get_str("Hypertable.DataDirectory");
55  m_monitoring_dir = (data_dir /= "/run/monitoring").string();
57  m_monitoring_rs_dir = m_monitoring_dir + "/rangeservers";
58 
62  m_allowable_skew = props->get_i32("Hypertable.RangeServer.ClockSkew.Max");
64 
65  memset(m_last_server_set_digest, 0, 16);
66 }
67 
68 void Monitoring::create_dir(const String &dir) {
69  if (!FileUtils::exists(dir)) {
70  if (!FileUtils::mkdirs(dir)) {
71  HT_THROW(Error::LOCAL_IO_ERROR, "Unable to create monitoring dir "+dir);
72  }
73  HT_INFOF("Created monitoring dir %s",dir.c_str());
74  }
75  else
76  HT_INFOF("rangeservers monitoring stats dir %s exists ",dir.c_str());
77 }
78 
79 void Monitoring::add_server(const String &location, const StatsSystem &system_info) {
80  lock_guard<mutex> lock(m_mutex);
81 
82  RangeServerMap::iterator iter = m_server_map.find(location);
83 
84  if (iter != m_server_map.end()) {
85  (*iter).second->system_info = make_shared<StatsSystem>(system_info);
86  return;
87  }
88 
89  m_server_map[location] = new RangeServerStatistics();
90  m_server_map[location]->location = location;
91  m_server_map[location]->system_info = make_shared<StatsSystem>(system_info);
92 }
93 
94 
95 void Monitoring::drop_server(const String &location) {
96  lock_guard<mutex> lock(m_mutex);
97 
98  RangeServerMap::iterator iter = m_server_map.find(location);
99  if (iter != m_server_map.end())
100  m_server_map.erase(iter);
101 }
102 
103 namespace {
105  struct LtRangeServerStatistics {
106  bool operator()(const RangeServerStatistics &s1, const RangeServerStatistics &s2) const {
107  if (boost::algorithm::starts_with(s1.location, "rs") &&
108  boost::algorithm::starts_with(s2.location, "rs")) {
109  int id1 = atoi(s1.location.c_str()+2);
110  int id2 = atoi(s2.location.c_str()+2);
111  return id1 < id2;
112  }
113  return s1.location < s2.location;
114  }
115  };
116 }
117 
118 void Monitoring::add(std::vector<RangeServerStatistics> &stats) {
119  lock_guard<mutex> lock(m_mutex);
120  struct rangeserver_rrd_data rrd_data;
121  RangeServerMap::iterator iter;
122  double numerator, denominator;
123  int32_t server_count = 0;
124  CstrSet server_set;
125 
126  // to keep track max timestamp across rangeserver
127  // this value is used to update table rrds
129  // copy to previous hashmap to calculate read rates
131  m_table_stat_map.clear(); // clear the previous contents
132 
133  for (size_t i = 0; i < stats.size(); i++) {
134  memset(&rrd_data, 0, sizeof(rrd_data));
135 
136  iter = m_server_map.find(stats[i].location);
137  if (iter == m_server_map.end()) {
138  HT_ERRORF("Statistics received for '%s' but not registered for "
139  "Monitoring", stats[i].location.c_str());
140  continue;
141  }
142 
143  if (stats[i].fetch_error != Error::OK) {
144  (*iter).second->fetch_timestamp = stats[i].fetch_timestamp;
145  (*iter).second->fetch_error = stats[i].fetch_error;
146  // if server is getting recovered: overwrite the error
148  if (stats[i].fetch_error == Error::NO_RESPONSE
149  && m_context->rsc_manager->find_server_by_location(stats[i].location, rsc)) {
150  if (rsc->is_recovering())
151  (*iter).second->fetch_error_msg = "Recovering...";
152  else
153  (*iter).second->fetch_error_msg = stats[i].fetch_error_msg;
154  continue;
155  }
156  }
157  else {
158  server_count++;
159  server_set.insert(stats[i].location.c_str());
160  }
161 
162  if ((*iter).second->stats) {
163 
164  if (stats[i].stats->query_cache_accesses > (*iter).second->stats->query_cache_accesses) {
165  numerator = (double)stats[i].stats->query_cache_hits -
166  (double)(*iter).second->stats->query_cache_hits;
167  denominator = (double)stats[i].stats->query_cache_accesses -
168  (double)(*iter).second->stats->query_cache_accesses;
169  rrd_data.qcache_hit_pct = (numerator/denominator)*100.0;
170  }
171 
172  if (stats[i].stats->block_cache_accesses > (*iter).second->stats->block_cache_accesses) {
173  numerator = (double)stats[i].stats->block_cache_hits -
174  (double)(*iter).second->stats->block_cache_hits;
175  denominator = (double)stats[i].stats->block_cache_accesses -
176  (double)(*iter).second->stats->block_cache_accesses;
177  rrd_data.bcache_hit_pct = (numerator/denominator)*100.0;
178  }
179 
180  double elapsed_time = (double)(stats[i].fetch_timestamp - (*iter).second->fetch_timestamp)/1000000000.0;
181 
182  rrd_data.scan_rate = stats[i].stats->scan_count/elapsed_time;
183  rrd_data.update_rate = stats[i].stats->update_count /elapsed_time;
184  rrd_data.sync_rate = stats[i].stats->sync_count/elapsed_time;
185  rrd_data.cell_read_rate = stats[i].stats->scanned_cells /elapsed_time;
186  rrd_data.cell_write_rate = stats[i].stats->updated_cells /elapsed_time;
187  rrd_data.byte_read_rate = stats[i].stats->scanned_bytes /elapsed_time;
188  rrd_data.byte_write_rate = stats[i].stats->updated_bytes /elapsed_time;
189  }
190 
191  rrd_data.timestamp = stats[i].stats_timestamp / 1000000000LL;
192  rrd_data.range_count = stats[i].stats->range_count;
193  rrd_data.scanner_count = stats[i].stats->scanner_count;
194  rrd_data.file_count = stats[i].stats->file_count;
195  rrd_data.qcache_max_mem = stats[i].stats->query_cache_max_memory;
196  rrd_data.qcache_fill = stats[i].stats->query_cache_max_memory -
197  stats[i].stats->query_cache_available_memory;
198  rrd_data.bcache_max_mem = stats[i].stats->block_cache_max_memory;
199  rrd_data.bcache_fill = stats[i].stats->block_cache_max_memory -
200  stats[i].stats->block_cache_available_memory;
201 
202  numerator = denominator = 0.0;
203  for (size_t j=0; j<stats[i].stats->system.fs_stat.size(); j++) {
204  numerator += stats[i].stats->system.fs_stat[j].total
205  - stats[i].stats->system.fs_stat[j].avail;
206  denominator += stats[i].stats->system.fs_stat[j].total;
207  }
208  if (denominator != 0.0)
209  rrd_data.disk_used_pct = (numerator/denominator)*100.0;
210 
211  for (size_t j=0; j<stats[i].stats->system.disk_stat.size(); j++) {
212  rrd_data.disk_read_bytes += (int64_t)stats[i].stats->system.disk_stat[j].read_rate;
213  rrd_data.disk_write_bytes += (int64_t)stats[i].stats->system.disk_stat[j].write_rate;
214  rrd_data.disk_read_iops += (int64_t)stats[i].stats->system.disk_stat[j].reads_rate;
215  rrd_data.disk_write_iops += (int64_t)stats[i].stats->system.disk_stat[j].writes_rate;
216  }
217 
218  rrd_data.vm_size = (int64_t)stats[i].stats->system.proc_stat.vm_size * 1024*1024;
219  rrd_data.vm_resident = (int64_t)stats[i].stats->system.proc_stat.vm_resident * 1024*1024;
220  rrd_data.page_in = (int64_t)stats[i].stats->system.swap_stat.page_in;
221  rrd_data.page_out = (int64_t)stats[i].stats->system.swap_stat.page_out;
222  rrd_data.heap_size = (int64_t)stats[i].stats->system.proc_stat.heap_size;
223  rrd_data.heap_slack = (int64_t)stats[i].stats->system.proc_stat.heap_slack;
224  rrd_data.tracked_memory = (int64_t)stats[i].stats->tracked_memory;
225  rrd_data.net_rx_rate = (int64_t)stats[i].stats->system.net_stat.rx_rate;
226  rrd_data.net_tx_rate = (int64_t)stats[i].stats->system.net_stat.tx_rate;
227  rrd_data.load_average = stats[i].stats->system.loadavg_stat.loadavg[0];
228  rrd_data.cpu_user = stats[i].stats->cpu_user;
229  rrd_data.cpu_sys = stats[i].stats->cpu_sys;
230 
231  compute_clock_skew(stats[i].stats->timestamp, &stats[i]);
232 
233  String rrd_file = m_monitoring_rs_dir + "/" + stats[i].location + "_stats_v0.rrd";
234 
235  if (!FileUtils::exists(rrd_file))
236  create_rangeserver_rrd(rrd_file);
237 
238  if (rrd_data.timestamp > table_stats_timestamp) {
239  table_stats_timestamp = rrd_data.timestamp;
240  }
241  update_rangeserver_rrd(rrd_file, rrd_data);
242  add_table_stats(stats[i].stats->tables,stats[i].fetch_timestamp);
243 
244  (*iter).second->stats = stats[i].stats;
245  (*iter).second->fetch_error = stats[i].fetch_error;
246  (*iter).second->fetch_error_msg = stats[i].fetch_error_msg;
247  (*iter).second->fetch_timestamp = stats[i].fetch_timestamp;
248 
249  }
250 
251  // Calculate "server set" MD5 digest
252  md5_context md5_ctx;
253  md5_starts(&md5_ctx);
254  for (auto server : server_set)
255  md5_update(&md5_ctx, (const unsigned char *)server, strlen(server));
256  unsigned char server_set_digest[16];
257  md5_finish(&md5_ctx, server_set_digest);
258 
259  // calcualte read rates from previous table stats map
260 
261  // finish compression ratio aggregation
262  for (TableStatMap::iterator iter = m_table_stat_map.begin();
263  iter != m_table_stat_map.end(); ++iter) {
264  if (iter->second.disk_used != 0)
265  iter->second.compression_ratio =
266  (double)iter->second.disk_used / iter->second.compression_ratio;
267  else
268  iter->second.compression_ratio = 1.0;
269  if (iter->second.cell_count != 0) {
270  iter->second.average_key_size /= iter->second.cell_count;
271  iter->second.average_value_size /= iter->second.cell_count;
272  }
273  else {
274  iter->second.average_key_size = 0.0;
275  iter->second.average_value_size = 0.0;
276  }
277  }
278 
279  // Dump RangeServer summary data
280  std::vector<RangeServerStatistics> stats_vec;
281  struct LtRangeServerStatistics comp;
282  stats_vec.reserve(m_server_map.size());
283  for (iter = m_server_map.begin(); iter != m_server_map.end(); ++iter)
284  stats_vec.push_back(*(*iter).second);
285  sort(stats_vec.begin(), stats_vec.end(), comp);
287 
288  // create Table rrd data
289  TableStatMap::iterator ts_iter;
290  TableStatMap::iterator prev_iter;
291  for(ts_iter = m_table_stat_map.begin();ts_iter != m_table_stat_map.end(); ++ts_iter) {
292 
293  // calculate read rates and write rates
294  prev_iter = m_prev_table_stat_map.find(ts_iter->first);
295  if (prev_iter != m_prev_table_stat_map.end()) {
296  if (server_count != m_last_server_count ||
297  memcmp(m_last_server_set_digest, server_set_digest, 16)) {
298  HT_INFOF("Statistics server set mismatch, using previous "
299  "statistics. last_server_count=%d, server_count=%d",
300  m_last_server_count, server_count);
301  ts_iter->second.scan_rate = prev_iter->second.scan_rate;
302  ts_iter->second.update_rate = prev_iter->second.update_rate;
303  ts_iter->second.cell_read_rate = prev_iter->second.cell_read_rate;
304  ts_iter->second.cell_write_rate = prev_iter->second.cell_write_rate;
305  ts_iter->second.byte_read_rate = prev_iter->second.byte_read_rate;
306  ts_iter->second.byte_write_rate = prev_iter->second.byte_write_rate;
307  ts_iter->second.disk_read_rate = prev_iter->second.disk_read_rate;
308  memcpy(m_last_server_set_digest, server_set_digest, 16);
309  m_last_server_count = server_count;
310  }
311  else {
312  double elapsed_time = (double)(ts_iter->second.fetch_timestamp - prev_iter->second.fetch_timestamp) / 1000000000.0;
313  ts_iter->second.scan_rate = (ts_iter->second.scans - prev_iter->second.scans) / elapsed_time;
314  ts_iter->second.update_rate = (ts_iter->second.updates - prev_iter->second.updates) / elapsed_time;
315  ts_iter->second.cell_read_rate = (ts_iter->second.cells_read - prev_iter->second.cells_read) / elapsed_time;
316  ts_iter->second.cell_write_rate = (ts_iter->second.cells_written - prev_iter->second.cells_written) / elapsed_time;
317  ts_iter->second.byte_read_rate = (ts_iter->second.bytes_read - prev_iter->second.bytes_read) / elapsed_time;
318  ts_iter->second.byte_write_rate = (ts_iter->second.bytes_written - prev_iter->second.bytes_written) / elapsed_time;
319  ts_iter->second.disk_read_rate = (ts_iter->second.disk_bytes_read - prev_iter->second.disk_bytes_read) / elapsed_time;
320  }
321  }
322 
323  String table_file_name = ts_iter->first;
324  String rrd_file = m_monitoring_table_dir + "/" + table_file_name + "_table_stats_v0.rrd";
325  if (!FileUtils::exists(rrd_file)) {
326  String dir;
327  size_t slash_pos;
328  slash_pos = table_file_name.rfind("/");
329  if (slash_pos != string::npos) {
330  dir = table_file_name.substr(0,slash_pos+1);
331  }
332  String table_dir = m_monitoring_table_dir + "/"+dir;
333  if (!FileUtils::exists(table_dir)) {
334  if (!FileUtils::mkdirs(table_dir)) {
335  HT_THROW(Error::LOCAL_IO_ERROR, "Unable to create table dir");
336  }
337  }
338  create_table_rrd(rrd_file);
339  }
340  update_table_rrd(rrd_file,ts_iter->second);
341  }
343 
345 
346  m_last_server_count = server_count;
347 
348 }
349 
350 void Monitoring::add_table_stats(std::vector<StatsTable> &table_stats,int64_t fetch_timestamp) {
351 
352  TableStatMap::iterator iter;
353 
354  for (size_t i=0; i<table_stats.size(); i++) {
355  iter = m_table_stat_map.find(table_stats[i].table_id);
356  struct table_rrd_data table_data;
357  if (iter != m_table_stat_map.end()) {
358  table_data = iter->second;
359  } else {
360  memset(&table_data, 0, sizeof(table_data));
361  }
362  table_data.fetch_timestamp = fetch_timestamp;
363  table_data.range_count += table_stats[i].range_count;
364  table_data.scanner_count += table_stats[i].scanner_count;
365  table_data.cell_count += table_stats[i].cell_count;
366  table_data.file_count += table_stats[i].file_count;
367  table_data.scans += table_stats[i].scans;
368  table_data.cells_read += table_stats[i].cells_scanned;
369  table_data.bytes_read += table_stats[i].bytes_scanned;
370  table_data.disk_bytes_read += table_stats[i].disk_bytes_read;
371  table_data.updates += table_stats[i].updates;
372  table_data.cells_written += table_stats[i].cells_written;
373  table_data.bytes_written += table_stats[i].bytes_written;
374  table_data.disk_used += table_stats[i].disk_used;
375  table_data.average_key_size += table_stats[i].key_bytes;
376  table_data.average_value_size += table_stats[i].value_bytes;
377  table_data.compression_ratio += (double)table_stats[i].disk_used / table_stats[i].compression_ratio;
378  table_data.memory_used += table_stats[i].memory_used;
379  table_data.memory_allocated += table_stats[i].memory_allocated;
380  table_data.shadow_cache_memory += table_stats[i].shadow_cache_memory;
381  table_data.block_index_memory += table_stats[i].block_index_memory;
382  table_data.bloom_filter_memory += table_stats[i].bloom_filter_memory;
383  table_data.bloom_filter_accesses += table_stats[i].bloom_filter_accesses;
384  table_data.bloom_filter_maybes += table_stats[i].bloom_filter_maybes;
385  m_table_stat_map[table_stats[i].table_id] = table_data;
386  }
387 }
388 
389 void Monitoring::compute_clock_skew(int64_t server_timestamp, RangeServerStatistics *stats) {
390  int64_t skew;
391  int32_t multiplier = 1;
392 
393  if (server_timestamp < stats->fetch_timestamp) {
394  skew = stats->fetch_timestamp - server_timestamp;
395  multiplier = -1;
396  }
397  else
398  skew = server_timestamp - stats->fetch_timestamp;
399 
400  // if the reading difference is less than it took to make the request
401  // then leave the old skew value in place
402  if (skew < stats->fetch_duration)
403  return;
404 
405  // discount fetch duration and convert to microseconds
406  skew -= stats->fetch_duration;
407  skew /= 1000;
408 
409  if (skew < m_allowable_skew)
410  stats->clock_skew = 0;
411  else
412  stats->clock_skew = (skew / 1000000L) * multiplier;
413 }
414 
416  char buf[64];
417  String step;
418 
420 
421  sprintf(buf, "-s %u", (unsigned)(m_monitoring_interval/1000));
422  step = String(buf);
423 
429  HT_DEBUGF("Creating rrd file %s", filename.c_str());
430 
431  std::vector<String> args;
432  args.push_back((String)"create");
433  args.push_back(filename);
434  args.push_back(step);
435  args.push_back((String)"DS:range_count:GAUGE:600:0:U"); // num_ranges is not a rate, 600s heartbeat
436  args.push_back((String)"DS:scanner_count:GAUGE:600:0:U");
437  args.push_back((String)"DS:file_count:GAUGE:600:0:U");
438  args.push_back((String)"DS:scan_rate:GAUGE:600:0:U"); // scans is a rate, 600s heartbeat
439  args.push_back((String)"DS:update_rate:GAUGE:600:0:U");
440  args.push_back((String)"DS:sync_rate:GAUGE:600:0:U");
441  args.push_back((String)"DS:cell_read_rate:GAUGE:600:0:U");
442  args.push_back((String)"DS:cell_write_rate:GAUGE:600:0:U");
443  args.push_back((String)"DS:byte_read_rate:GAUGE:600:0:U");
444  args.push_back((String)"DS:byte_write_rate:GAUGE:600:0:U");
445  args.push_back((String)"DS:qcache_hit_pct:GAUGE:600:0:100");
446  args.push_back((String)"DS:qcache_max_mem:GAUGE:600:0:U");
447  args.push_back((String)"DS:qcache_fill:GAUGE:600:0:U");
448  args.push_back((String)"DS:bcache_hit_pct:GAUGE:600:0:100");
449  args.push_back((String)"DS:bcache_max_mem:GAUGE:600:0:U");
450  args.push_back((String)"DS:bcache_fill:GAUGE:600:0:U");
451  args.push_back((String)"DS:disk_used_pct:GAUGE:600:0:100");
452  args.push_back((String)"DS:disk_read_bytes:GAUGE:600:0:U");
453  args.push_back((String)"DS:disk_write_bytes:GAUGE:600:0:U");
454  args.push_back((String)"DS:disk_read_iops:GAUGE:600:0:U");
455  args.push_back((String)"DS:disk_write_iops:GAUGE:600:0:U");
456  args.push_back((String)"DS:vm_size:GAUGE:600:0:U");
457  args.push_back((String)"DS:vm_resident:GAUGE:600:0:U");
458  args.push_back((String)"DS:page_in:GAUGE:600:0:U");
459  args.push_back((String)"DS:page_out:GAUGE:600:0:U");
460  args.push_back((String)"DS:heap_size:GAUGE:600:0:U");
461  args.push_back((String)"DS:heap_slack:GAUGE:600:0:U");
462  args.push_back((String)"DS:tracked_memory:GAUGE:600:0:U");
463  args.push_back((String)"DS:net_rx_rate:GAUGE:600:0:U");
464  args.push_back((String)"DS:net_tx_rate:GAUGE:600:0:U");
465  args.push_back((String)"DS:loadavg:GAUGE:600:0:U");
466  args.push_back((String)"DS:cpu_user:GAUGE:600:0:U");
467  args.push_back((String)"DS:cpu_sys:GAUGE:600:0:U");
468 
469  args.push_back((String)"RRA:AVERAGE:.5:1:2880"); // higherst res (30s) has 2880 samples(1 day)
470  args.push_back((String)"RRA:AVERAGE:.5:10:2880"); // 5min res for 10 days
471  args.push_back((String)"RRA:AVERAGE:.5:60:1448"); // 30min res for 31 days
472  args.push_back((String)"RRA:AVERAGE:.5:720:2190");// 6hr res for last 1.5 yrs
473  args.push_back((String)"RRA:MAX:.5:10:2880"); // 5min res spikes for last 10 days
474  args.push_back((String)"RRA:MAX:.5:720:2190");// 6hr res spikes for last 1.5 yrs
475 
476  run_rrdtool(args);
477 }
478 
480  char buf[64];
481  String step;
482 
484 
485  sprintf(buf, "-s %u", (unsigned)(m_monitoring_interval/1000));
486  step = String(buf);
487 
493  HT_DEBUGF("Creating rrd file %s", filename.c_str());
494 
495  std::vector<String> args;
496  args.push_back((String)"create");
497  args.push_back(filename);
498  args.push_back(step);
499  args.push_back((String)"DS:range_count:GAUGE:600:0:U"); // num_ranges is not a rate, 600s heartbeat
500  args.push_back((String)"DS:scanner_count:GAUGE:600:0:U");
501  args.push_back((String)"DS:scan_rate:GAUGE:600:0:U"); // scans is a rate, 600s heartbeat
502  args.push_back((String)"DS:update_rate:GAUGE:600:0:U");
503  args.push_back((String)"DS:cell_read_rate:GAUGE:600:0:U");
504  args.push_back((String)"DS:cell_write_rate:GAUGE:600:0:U");
505  args.push_back((String)"DS:byte_read_rate:GAUGE:600:0:U");
506  args.push_back((String)"DS:byte_write_rate:GAUGE:600:0:U");
507  args.push_back((String)"DS:disk_read_rate:GAUGE:600:0:U");
508  args.push_back((String)"DS:disk_used:GAUGE:600:0:U");
509  args.push_back((String)"DS:compression_ratio:GAUGE:600:0:U");
510  args.push_back((String)"DS:memory_used:GAUGE:600:0:U");
511  args.push_back((String)"DS:memory_allocated:GAUGE:600:0:U");
512  args.push_back((String)"DS:shadow_cache_memory:GAUGE:600:0:U");
513  args.push_back((String)"DS:block_index_memory:GAUGE:600:0:U");
514  args.push_back((String)"DS:bloom_filter_memory:GAUGE:600:0:U");
515  args.push_back((String)"DS:bloom_filter_access:GAUGE:600:0:U");
516  args.push_back((String)"DS:bloom_filter_maybes:GAUGE:600:0:U");
517 
518  args.push_back((String)"RRA:AVERAGE:.5:1:2880"); // higherst res (30s) has 2880 samples(1 day)
519  args.push_back((String)"RRA:AVERAGE:.5:10:2880"); // 5min res for 10 days
520  args.push_back((String)"RRA:AVERAGE:.5:60:1448"); // 30min res for 31 days
521  args.push_back((String)"RRA:AVERAGE:.5:720:2190");// 6hr res for last 1.5 yrs
522  args.push_back((String)"RRA:MAX:.5:10:2880"); // 5min res spikes for last 10 days
523  args.push_back((String)"RRA:MAX:.5:720:2190");// 6hr res spikes for last 1.5 yrs
524 
525  run_rrdtool(args);
526 }
527 
529  std::vector<String> args;
530  String update;
531 
532  args.push_back((String)"update");
533  args.push_back(filename);
534 
535  update = format("%llu:%d:%d:%.2f:%.2f:%.2f:%.2f:%.2f:%.2f:%.2f:%lld:%.2f:%lld:%lld:%lld:%lld:%lld:%lld:%lld",
537  rrd_data.range_count,
538  rrd_data.scanner_count,
539  rrd_data.scan_rate,
540  rrd_data.update_rate,
541  rrd_data.cell_read_rate,
542  rrd_data.cell_write_rate,
543  rrd_data.byte_read_rate,
544  rrd_data.byte_write_rate,
545  rrd_data.disk_read_rate,
546  (Lld)rrd_data.disk_used,
547  rrd_data.compression_ratio,
548  (Lld)rrd_data.memory_used,
549  (Lld)rrd_data.memory_allocated,
550  (Lld)rrd_data.shadow_cache_memory,
551  (Lld)rrd_data.block_index_memory,
552  (Lld)rrd_data.bloom_filter_memory,
553  (Lld)rrd_data.bloom_filter_accesses,
554  (Lld)rrd_data.bloom_filter_maybes);
555 
556  HT_DEBUGF("table update=\"%s\"", update.c_str());
557 
558  args.push_back(update);
559 
560  run_rrdtool(args);
561 }
562 
564  std::vector<String> args;
565  String update;
566 
567  args.push_back((String)"update");
568  args.push_back(filename);
569 
570  update = format("%llu:%d:%d:%lld:%.2f:%.2f:%.2f:%.2f:%.2f:%.2f:%.2f:%.2f:%lld:%lld:%.2f:%lld:%lld:%.2f:%lld:%lld:%lld:%lld:%lld:%lld:%lld:%lld:%lld:%lld:%lld:%.2f:%.2f:%.2f:%.2f:%.2f",
571  (Llu)rrd_data.timestamp,
572  rrd_data.range_count,
573  rrd_data.scanner_count,
574  (Llu)rrd_data.file_count,
575  rrd_data.scan_rate,
576  rrd_data.update_rate,
577  rrd_data.sync_rate,
578  rrd_data.cell_read_rate,
579  rrd_data.cell_write_rate,
580  rrd_data.byte_read_rate,
581  rrd_data.byte_write_rate,
582  rrd_data.qcache_hit_pct,
583  (Lld)rrd_data.qcache_max_mem,
584  (Lld)rrd_data.qcache_fill,
585  rrd_data.bcache_hit_pct,
586  (Lld)rrd_data.bcache_max_mem,
587  (Lld)rrd_data.bcache_fill,
588  rrd_data.disk_used_pct,
589  (Lld)rrd_data.disk_read_bytes,
590  (Lld)rrd_data.disk_write_bytes,
591  (Lld)rrd_data.disk_read_iops,
592  (Lld)rrd_data.disk_write_iops,
593  (Lld)rrd_data.vm_size,
594  (Lld)rrd_data.vm_resident,
595  (Lld)rrd_data.page_in,
596  (Lld)rrd_data.page_out,
597  (Lld)rrd_data.heap_size,
598  (Lld)rrd_data.heap_slack,
599  (Lld)rrd_data.tracked_memory,
600  rrd_data.net_rx_rate,
601  rrd_data.net_tx_rate,
602  rrd_data.load_average,
603  rrd_data.cpu_user,
604  rrd_data.cpu_sys);
605 
606  HT_DEBUGF("update=\"%s\"", update.c_str());
607 
608  args.push_back(update);
609 
610  run_rrdtool(args);
611 }
612 
613 namespace {
614  const char *rs_json_header = "{\"RangeServerSummary\": {\n \"servers\": [\n";
615  const char *rs_json_footer= "\n ]\n}}\n";
616  const char *rs_entry_format =
617  "{\"order\": \"%d\", \"location\": \"%s\", \"version\": \"%s\", \"hostname\": \"%s\", \"ip\": \"%s\", \"arch\": \"%s\","
618  " \"cores\": \"%d\", \"skew\": \"%d\", \"os\": \"%s\", \"osVersion\": \"%s\","
619  " \"vendor\": \"%s\", \"vendorVersion\": \"%s\", \"ram\": \"%.2f\","
620  " \"disk\": \"%.2f\", \"diskUsePct\": \"%u\", \"rangeCount\": \"%llu\","
621  " \"lastContact\": \"%s\", \"lastError\": \"%s\"}";
622 
623  const char *master_json_header = "{\"MasterSummary\": {\"version\": \"%s\", \"name\": \"%s\", \"state\": [\n";
624  const char *master_json_footer = "\n]}}\n";
625  const char *state_variable_format = "{\"name\": \"%s\", \"value\": \"%s\"}";
626 
627  const char *table_json_header = "{\"TableSummary\": {\n \"tables\": [\n";
628  const char *table_json_footer= "\n ]\n}}\n";
629  const char *table_entry_format =
630  "{\"id\": \"%s\",\"name\": \"%s\",\"rangecount\": \"%u\", \"cellcount\": \"%llu\", \"filecount\": \"%llu\", \"disk\": \"%llu\","
631  " \"memory\": \"%llu\", \"average_key_size\": \"%.1f\", \"average_value_size\": \"%.1f\", \"compression_ratio\": \"%.3f\"}";
632 }
633 
635  String contents = format(master_json_header, version_string(),
636  m_context->cluster_name.c_str());
637  String entry;
638  String tmp_filename = m_monitoring_dir + "/master_summary.tmp";
639  String json_filename = m_monitoring_dir + "/master_summary.json";
640  std::vector<SystemVariable::Spec> specs;
641 
642  m_context->system_state->get_non_default(specs);
643  for (size_t i = 0; i<specs.size(); i++) {
644  entry = format(state_variable_format,
645  SystemVariable::code_to_string(specs[i].code),
646  specs[i].value ? "true" : "false");
647  if (i == 0)
648  contents += String(" ") + entry;
649  else
650  contents += String(",\n ") + entry;
651  }
652  contents += master_json_footer;
653 
654  if (FileUtils::write(tmp_filename, contents) == -1)
655  return;
656  FileUtils::rename(tmp_filename, json_filename);
657 }
658 
659 void Monitoring::dump_rangeserver_summary_json(std::vector<RangeServerStatistics> &stats) {
660  String str = String(rs_json_header);
661  String entry;
662  double ram;
663  double disk;
664  unsigned disk_use_pct;
665  String error_str;
666  String contact_time;
667  uint64_t range_count;
668  const char *version_string = "";
669 
670  for (size_t i=0; i<stats.size(); i++) {
671  if (stats[i].stats) {
672  double numerator=0.0, denominator=0.0;
673  ram = stats[i].stats->system.mem_stat.ram / 1000.0;
674  disk = 0.0;
675  disk_use_pct = 0;
676  for (size_t j=0; j<stats[i].stats->system.fs_stat.size(); j++) {
677  numerator += stats[i].stats->system.fs_stat[j].total -
678  stats[i].stats->system.fs_stat[j].avail;
679  denominator += stats[i].stats->system.fs_stat[j].total;
680  disk += stats[i].stats->system.fs_stat[j].total;
681  }
682  disk /= 1000000000.0;
683  disk_use_pct = (unsigned)((numerator/denominator)*100.0);
684  time_t contact = (time_t)(stats[i].fetch_timestamp / 1000000000LL);
685  char buf[64];
686  ctime_r(&contact, buf);
687  contact_time = buf;
688  boost::trim(contact_time);
689  range_count = stats[i].stats->range_count;
690  version_string = stats[i].stats->version.c_str();
691  }
692  else {
693  ram = 0.0;
694  disk = 0.0;
695  disk_use_pct = 0;
696  contact_time = String("N/A");
697  range_count = 0;
698  }
699 
700  if (stats[i].fetch_error == 0)
701  error_str = "ok";
702  else {
703  error_str = stats[i].fetch_error_msg;
704  if (error_str.empty())
705  error_str = Error::get_text(stats[i].fetch_error);
706  }
707 
708  entry = format(rs_entry_format,
709  i,
710  stats[i].location.c_str(),
712  stats[i].system_info->net_info.host_name.c_str(),
713  stats[i].system_info->net_info.primary_addr.c_str(),
714  stats[i].system_info->os_info.arch.c_str(),
715  stats[i].system_info->cpu_info.total_cores,
716  stats[i].clock_skew,
717  stats[i].system_info->os_info.name.c_str(),
718  stats[i].system_info->os_info.version.c_str(),
719  stats[i].system_info->os_info.vendor.c_str(),
720  stats[i].system_info->os_info.vendor_version.c_str(),
721  ram,
722  disk,
723  disk_use_pct,
724  (Llu)range_count,
725  contact_time.c_str(),
726  error_str.c_str());
727 
728  if (i != 0)
729  str += String(",\n ") + entry;
730  else
731  str += String(" ") + entry;
732  }
733 
734  str += rs_json_footer;
735 
736  String tmp_filename = m_monitoring_dir + "/rangeserver_summary.tmp";
737  String json_filename = m_monitoring_dir + "/rangeserver_summary.json";
738 
739  if (FileUtils::write(tmp_filename, str) == -1)
740  return;
741 
742  FileUtils::rename(tmp_filename, json_filename);
743 
744 }
745 
747  String str = String(table_json_header);
748  String entry;
749  struct table_rrd_data table_data;
750  String table_id;
751  String table_name;
752  TableStatMap::iterator ts_iter;
753  int i = 0;
754  for (ts_iter = m_table_stat_map.begin();
755  ts_iter != m_table_stat_map.end(); ++ts_iter) {
756  table_id = ts_iter->first;
757  table_data = ts_iter->second;
758  TableNameMap::iterator tn_iter = m_table_name_map.find(table_id);
759  if (tn_iter != m_table_name_map.end()) {
760  table_name = tn_iter->second;
761  }
762  else {
763  m_namemap_ptr->id_to_name(table_id,table_name);
764  m_table_name_map[table_id] = table_name;
765  }
766  entry = format(table_entry_format,
767  table_id.c_str(),
768  table_name.c_str(),
769  (unsigned)table_data.range_count,
770  (Llu)table_data.cell_count,
771  (Llu)table_data.file_count,
772  (Llu)table_data.disk_used,
773  (Llu)table_data.memory_used,
774  table_data.average_key_size,
775  table_data.average_value_size,
776  table_data.compression_ratio);
777  if (i != 0)
778  str += String(",\n ") + entry;
779  else
780  str += String(" ") + entry;
781  i++;
782  }
783 
784  str += table_json_footer;
785 
786  String tmp_filename = m_monitoring_dir + "/table_summary.tmp";
787  String json_filename = m_monitoring_dir + "/table_summary.json";
788 
789  if (FileUtils::write(tmp_filename, str) == -1)
790  return;
791 
792  FileUtils::rename(tmp_filename, json_filename);
793 }
794 
795 void Monitoring::change_id_mapping(const String &table_id, const String &table_name) {
796  String s_table_id(table_id);
797  String s_table_name(table_name);
798  boost::trim_if(s_table_name, boost::is_any_of("/"));
799  m_table_name_map[s_table_id] = s_table_name;
800 }
801 
803  m_table_name_map.erase(table_id);
804 }
805 
806 void Monitoring::run_rrdtool(std::vector<String> &command) {
807  if (m_disable_rrdtool)
808  return;
809 
810  String cmd = "env LD_LIBRARY_PATH= DYLD_LIBRARY_PATH= rrdtool";
811 
812  for (auto &s : command) {
813  cmd += " \"";
814  cmd += s;
815  cmd += "\" ";
816  }
817 
818  HT_DEBUGF("run_rrdtool: %s", cmd.c_str());
819 
820  int ret = ::system(cmd.c_str());
821  if (ret != 0) {
822  HT_WARNF("Monitor: failed to invoke `rrdtool`; make sure it's properly "
823  "installed and in your $PATH (command returned status %d)", ret);
824  }
825 }
826 
String cluster_name
Name of cluster.
Definition: Context.h:176
void invalidate_id_mapping(const String &table_id)
Definition: Monitoring.cc:802
void create_rangeserver_rrd(const String &filename)
Definition: Monitoring.cc:415
#define HT_WARNF(msg,...)
Definition: Logger.h:290
static String filename
Definition: Config.cc:48
void add_server(const String &location, const StatsSystem &system_info)
Definition: Monitoring.cc:79
TableNameMap m_table_name_map
Definition: Monitoring.h:162
void dump_rangeserver_summary_json(std::vector< RangeServerStatistics > &stats)
Definition: Monitoring.cc:659
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
Compatibility class for boost::filesystem::path.
RangeServerMap m_server_map
Definition: Monitoring.h:159
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
void change_id_mapping(const String &table_id, const String &table_name)
Definition: Monitoring.cc:795
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
TableStatMap m_table_stat_map
Definition: Monitoring.h:160
static ssize_t write(const String &fname, const std::string &contents)
Writes a String buffer to a file; the file is overwritten if it already exists.
Definition: FileUtils.cc:124
static bool exists(const String &fname)
Checks if a file or directory exists.
Definition: FileUtils.cc:420
STL namespace.
SystemStatePtr system_state
System state entity.
Definition: Context.h:144
static bool mkdirs(const String &dirname)
Creates a directory (with all parent directories, if required)
Definition: FileUtils.cc:366
TableStatMap m_prev_table_stat_map
Definition: Monitoring.h:161
NameIdMapperPtr m_namemap_ptr
Definition: Monitoring.h:171
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void md5_update(md5_context *ctx, const unsigned char *input, int ilen)
Adds data to the MD5 process buffer.
Definition: md5.cc:208
void md5_starts(md5_context *ctx)
Initialize and setup a MD5 context structure.
Definition: md5.cc:71
Compatibility class for boost::filesystem::path.
Definition: Path.h:45
std::set< const char *, LtCstr > CstrSet
STL Set managing c-style strings.
Definition: StringExt.h:52
PropertiesPtr props
Configuration properties.
Definition: Context.h:147
File system utility functions.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
void add(std::vector< RangeServerStatistics > &stats)
Definition: Monitoring.cc:118
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
void update_table_rrd(const String &filename, struct table_rrd_data &rrd_data)
Definition: Monitoring.cc:528
Logging routines and macros.
Compatibility Macros for C/C++.
Monitoring(Context *context)
Constructor.
Definition: Monitoring.cc:47
void compute_clock_skew(int64_t server_timestamp, RangeServerStatistics *stats)
Definition: Monitoring.cc:389
void update_rangeserver_rrd(const String &filename, struct rangeserver_rrd_data &rrd_data)
Definition: Monitoring.cc:563
MD5 context structure; this structure is used to store the internal state of the md5 algorithm...
Definition: md5.h:44
Collects, serializes and deserializes system-wide statistics.
Definition: StatsSystem.h:43
Hypertable definitions
#define HT_DEBUGF(msg,...)
Definition: Logger.h:260
void create_table_rrd(const String &filename)
Definition: Monitoring.cc:479
uint64_t table_stats_timestamp
Definition: Monitoring.h:170
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void drop_server(const String &location)
Definition: Monitoring.cc:95
#define HT_INFOF(msg,...)
Definition: Logger.h:272
void md5_finish(md5_context *ctx, unsigned char output[16])
Retrieve the final MD5 digest.
Definition: md5.cc:260
unsigned char m_last_server_set_digest[16]
Definition: Monitoring.h:169
Execution context for the Master.
Definition: Context.h:85
NameIdMapperPtr namemap
Definition: Context.h:154
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void run_rrdtool(std::vector< String > &command)
Definition: Monitoring.cc:806
int32_t m_monitoring_interval
Definition: Monitoring.h:166
static bool rename(const String &oldpath, const String &newpath)
Renames a file or directory.
Definition: FileUtils.cc:438
void create_dir(const String &dir)
Definition: Monitoring.cc:68
void add_table_stats(std::vector< StatsTable > &table_stats, int64_t fetch_timestamp)
Definition: Monitoring.cc:350
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
const char * version_string()
Definition: Version.cc:37
md5 digest routines.
Declarations for Context.
RangeServerConnectionManagerPtr rsc_manager
Definition: Context.h:145
const char * code_to_string(int var_code)
Converts variable code to variable string.