37 typedef std::map<String, uint32_t> RangeDistributionMap;
41 TableSummary() : total_ranges(0), ranges_per_server(0) { }
42 uint32_t total_ranges;
43 uint32_t ranges_per_server;
44 RangeDistributionMap range_dist;
47 typedef std::shared_ptr<TableSummary> TableSummaryPtr;
50 typedef std::map<const String, TableSummaryPtr> TableSummaryMap;
53 vector<String> servers;
54 TableSummaryMap table_summaries;
56 uint32_t num_empty_servers;
59 bool maybe_add_to_plan(
const String &table,
const String &src_server,
65 if (saturated_tables.find(table) != saturated_tables.end()) {
66 HT_DEBUG_OUT <<
"Moves for table " << table <<
" saturated"
67 <<
" Dont move " << src_server <<
":"<< table <<
"[" << start_row
68 <<
".." << end_row <<
"] " <<
HT_END;
73 TableSummaryMap::iterator table_it = state.table_summaries.find(table);
74 if (table_it == state.table_summaries.end()) {
75 HT_DEBUG_OUT <<
"Summary info for table " << table <<
" not found."
76 <<
" Dont move " << src_server <<
":"<< table <<
"[" << start_row
77 <<
".." << end_row <<
"] " <<
HT_END;
82 if (table_it->second->ranges_per_server == 0)
83 table_it->second->ranges_per_server = table_it->second->total_ranges / state.servers.size() + 1;
85 uint32_t ranges_per_server = table_it->second->ranges_per_server;
86 RangeDistributionMap::iterator src_range_dist_it = table_it->second->range_dist.find(src_server);
87 if (src_range_dist_it == table_it->second->range_dist.end()) {
89 <<
" Dont move " << src_server <<
":"<< table <<
"[" << start_row
90 <<
".." << end_row <<
"] " <<
HT_END;
94 if (src_range_dist_it->second <= ranges_per_server) {
95 HT_DEBUG_OUT <<
"src server has " << src_range_dist_it->second
96 <<
" ranges from this table which is below average "
97 << ranges_per_server <<
" Dont move " << src_server <<
":" << table
98 <<
"[" << start_row <<
".." << end_row <<
"] " <<
HT_END;
103 RangeDistributionMap::iterator dst_range_dist_it;
106 bool found_dst_server =
false;
107 for (
const auto &target_server : state.servers) {
108 if (!target_server.compare(src_server))
110 dst_range_dist_it = table_it->second->range_dist.find(target_server);
111 if (dst_range_dist_it == table_it->second->range_dist.end()) {
112 pair<RangeDistributionMap::iterator, bool> ret =
113 table_it->second->range_dist.insert(make_pair(target_server, 0));
114 dst_range_dist_it = ret.first;
116 if (dst_range_dist_it->second < ranges_per_server) {
117 found_dst_server =
true;
118 dst_server = target_server;
124 if (!found_dst_server) {
125 saturated_tables.insert(table_it->first);
126 HT_DEBUG_OUT <<
"Can't find destination for ranges from table " << table
127 <<
" (saturated)." <<
" Dont move " << src_server <<
":" << table
128 <<
"[" << start_row <<
".." << end_row <<
"] " <<
HT_END;
133 --(src_range_dist_it->second);
134 ++(dst_range_dist_it->second);
136 make_shared<RangeMoveSpec>(src_server.c_str(), dst_server.c_str(),
137 table.c_str(), start_row.c_str(), end_row.c_str());
138 plan->moves.push_back(move);
142 random_shuffle(state.servers.begin(), state.servers.end());
151 std::vector<RangeServerStatistics> &statistics)
152 : m_context(context), m_statistics(statistics) {
158 std::vector<RangeServerConnectionPtr> &balanced) {
159 balance_state_t state;
161 int32_t min_ranges = -1;
162 int32_t max_ranges = -1;
164 state.num_ranges = 0;
165 state.num_empty_servers = 0;
169 if (!rs.stats->live) {
170 HT_INFOF(
"Aborting balance because %s is not yet live", rs.location.c_str());
175 if (!rs.stats || !
m_context->can_accept_ranges(rs))
178 locations.insert(rs.location);
180 if (min_ranges == -1)
181 min_ranges = max_ranges = rs.stats->range_count;
183 if (min_ranges > rs.stats->range_count)
184 min_ranges = rs.stats->range_count;
185 if (max_ranges < rs.stats->range_count)
186 max_ranges = rs.stats->range_count;
189 if (rs.stats->range_count > 0) {
191 state.num_ranges += rs.stats->range_count;
192 for (
auto &table : rs.stats->tables) {
193 TableSummaryMap::iterator it = state.table_summaries.find(table.table_id.c_str());
194 if (it == state.table_summaries.end()) {
195 ts = make_shared<TableSummary>();
196 state.table_summaries[table.table_id.c_str()] = ts;
200 ts->range_dist.insert(make_pair(rs.location, table.range_count));
201 ts->total_ranges += table.range_count;
205 state.num_empty_servers++;
208 if (locations.size() <= 1)
210 "Not enough available servers (%u)", (
unsigned)locations.size());
216 std::vector<RangeServerConnectionPtr> connections;
217 m_context->rsc_manager->get_valid_connections(locations, connections);
218 for (
auto &rsc : connections) {
219 if (!rsc->get_balanced())
220 balanced.push_back(rsc);
221 state.servers.push_back(rsc->location());
225 if ((max_ranges - min_ranges) < 3) {
226 HT_INFO(
"Aborting balance because max variance < 3");
236 String last_key, last_location, last_start_row;
237 bool read_start_row =
false;
238 String location(
"Location"), start_row(
"StartRow");
240 scan_spec.
columns.push_back(location.c_str());
241 scan_spec.
columns.push_back(start_row.c_str());
245 while (scanner->next(cell)) {
252 if (last_key == cell.
row_key) {
256 read_start_row =
true;
263 last_start_row.clear();
264 read_start_row =
false;
268 last_location.clear();
269 read_start_row =
true;
274 HT_DEBUG_OUT <<
"last_key=" << last_key <<
", last_location="
275 << last_location <<
", last_start_row=" << last_start_row <<
HT_END;
277 if (last_location.size() > 0 && read_start_row) {
278 size_t pos = last_key.find(
':');
280 String table(last_key, 0, pos);
281 String end_row(last_key, pos+1);
282 if (maybe_add_to_plan(table, last_location, last_start_row, end_row, state, plan)) {
283 HT_DEBUG_OUT <<
"Added move for range " << table <<
":" << end_row
284 <<
", start_row=" << last_start_row <<
", location="
285 << last_location <<
HT_END;
std::set< String > StringSet
STL Set managing Strings.
std::vector< RangeServerStatistics > m_statistics
std::string String
A String is simply a typedef to std::string.
std::shared_ptr< BalancePlan > BalancePlanPtr
BalanceAlgorithmEvenRanges(ContextPtr &context, std::vector< RangeServerStatistics > &statistics)
std::shared_ptr< TableScanner > TableScannerPtr
Smart pointer to TableScanner.
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Scan predicate and control specification.
Compatibility Macros for C/C++.
const char * column_family
#define HT_INFOF(msg,...)
#define HT_THROWF(_code_, _fmt_,...)
static const char * END_ROOT_ROW
std::shared_ptr< RangeMoveSpec > RangeMoveSpecPtr
Encapsulates decomposed key and value.
virtual void compute_plan(BalancePlanPtr &plan, std::vector< RangeServerConnectionPtr > &balanced)