34 std::vector<RangeServerStatistics> &statistics)
35 : m_context(context) {
39 for (
auto &rs : statistics)
45 std::vector<RangeServerConnectionPtr> &balanced) {
46 vector<ServerMetrics> server_metrics;
53 int num_loaded_servers=0;
54 double mean_loadavg=0;
55 double mean_loadavg_per_loadestimate=0;
57 for (
const auto &sm : server_metrics) {
61 (!
m_context->rsc_manager->find_server_by_location(sm.get_id(), rsc)
62 || !rsc->connected() || rsc->get_removed() || rsc->is_recovering())) {
63 HT_INFOF(
"RangeServer %s not connected, skipping", sm.get_id().c_str());
68 servers_desc_load.insert(ss);
75 num_servers = servers_desc_load.size();
77 if (num_servers < 2 || num_loaded_servers < 1) {
78 HT_INFOF(
"No balancing required, num_servers=%d, num_loaded_servers=%d",
79 num_servers, num_loaded_servers);
83 mean_loadavg /= num_servers;
84 mean_loadavg_per_loadestimate /= num_loaded_servers;
86 HT_INFOF(
"meand_loadavg=%f, num_servers=%u, mean_loadavg_per_loadestimate=%f"
87 ", num_loaded_servers=%u, loadavg_deviation_threshold=%f",
88 mean_loadavg, (
unsigned)num_servers, mean_loadavg_per_loadestimate,
92 if (servers_desc_load.size() < 2)
102 HT_DEBUG_OUT <<
"heaviest_server=" << heaviest <<
", lightest_server="
108 HT_DEBUG_OUT <<
"Heaviest loaded server now has estimated loadavg of "
109 << heaviest.
loadavg <<
" which is within the acceptable threshold ("
121 RangeSetDescLoad::iterator ranges_desc_load_it = ranges_desc_load.begin();
124 ranges_desc_load_it != ranges_desc_load.end()) {
125 if (
check_move(heaviest, lightest, ranges_desc_load_it->loadestimate,
129 lightest.
server_id, ranges_desc_load_it->table_id,
130 ranges_desc_load_it->start_row, ranges_desc_load_it->end_row);
131 HT_DEBUG_OUT <<
"Added move to plan: " << *(move.get()) << HT_END;
132 plan->moves.push_back(move);
142 ServerSetDescLoad::iterator it = servers_desc_load.end();
144 servers_desc_load.erase(it);
145 servers_desc_load.insert(lightest);
146 lightest = *(servers_desc_load.rbegin());
154 <<
" is not viable." <<
HT_END;
156 ranges_desc_load_it++;
161 servers_desc_load.erase(servers_desc_load.begin());
169 double loadestimate = 0;
173 if (measurements.size() > 0) {
174 for (
const auto &measurement : measurements) {
175 summary.
loadavg += measurement.loadavg;
176 loadestimate += measurement.bytes_written_rate
177 + measurement.bytes_scanned_rate;
179 summary.
loadavg /= measurements.size();
181 / (loadestimate / measurements.size());
199 if (measurements.size() > 0) {
200 for (
const auto &measurement : measurements)
201 summary.
loadestimate += measurement.byte_read_rate + measurement.byte_write_rate;
209 ranges_desc_load.clear();
210 for (
const auto &vv : range_metrics) {
212 if (!vv.second.is_moveable())
216 ranges_desc_load.insert(summary);
222 double mean_loadavg) {
224 double loadavg_destination = destination.
loadavg;
225 double delta_destination;
228 loadavg_destination += delta_destination;
235 out <<
"{ServerMetricSummary: server_id=" << summary.
server_id <<
", loadavg="
244 out <<
"{RangeMetricSummary: table_id=" << summary.
table_id <<
", start_row="
bool check_move(const ServerMetricSummary &source, const ServerMetricSummary &destination, double range_loadestimate, double mean_loadavg)
void calculate_range_summary(const Lib::RS_METRICS::RangeMetrics &metrics, RangeMetricSummary &summary)
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
const std::vector< ServerMeasurement > & get_measurements() const
const std::vector< RangeMeasurement > & get_measurements() const
std::shared_ptr< BalancePlan > BalancePlanPtr
std::map< String, RangeMetrics > RangeMetricsMap
Facilities for reading and writing sys/RS_METRICS table.
virtual void compute_plan(BalancePlanPtr &plan, std::vector< RangeServerConnectionPtr > &balanced)
const String & get_start_row(bool *isset) const
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
std::multiset< RangeMetricSummary, GtRangeMetricSummary > RangeSetDescLoad
double loadavg_per_loadestimate
void calculate_server_summary(const Lib::RS_METRICS::ServerMetrics &metrics, ServerMetricSummary &summary)
std::multiset< ServerMetricSummary, GtServerMetricSummary > ServerSetDescLoad
Declarations for ReaderTable.
Compatibility Macros for C/C++.
std::ostream & operator<<(std::ostream &os, const crontab_entry &entry)
Helper function to write crontab_entry to an ostream.
Aggregates metrics for an individual range.
double m_loadavg_deviation_threshold
BalanceAlgorithmLoad(ContextPtr &context, std::vector< RangeServerStatistics > &statistics)
#define HT_INFOF(msg,...)
Reads metrics from the sys/RS_METRICS table.
const String & get_id() const
std::shared_ptr< RangeMoveSpec > RangeMoveSpecPtr
Aggregates metrics for an individual RangeServer.
virtual void get_range_metrics(const char *server_id, RangeMetricsMap &range_metrics)
const String & get_table_id() const
const String & get_end_row() const
virtual void get_server_metrics(vector< ServerMetrics > &server_metrics)
void populate_range_load_set(const Lib::RS_METRICS::RangeMetricsMap &range_metrics, RangeSetDescLoad &ranges_desc_load)