48 m_context(context), m_mml_writer(mml_writer), m_generation(0)
55 : MetaLog::Entity(header), m_context(context), m_mml_writer(mml_writer),
65 RecoveryPlanMap::iterator it =
m_map.begin();
66 while (it !=
m_map.end()) {
67 os << it->first <<
": ";
68 for (
int i = 0; i < 4; i++)
69 if (it->second.plans[i])
70 os << *(it->second.plans[i]) <<
" ";
73 os <<
"current_set={";
75 os << *move_spec <<
" ";
80 uint16_t definition_version) {
82 if (definition_version < 4) {
86 Entity::decode(bufp, remainp);
120 RecoveryPlanMap::iterator it =
m_map.find(location);
121 if (it ==
m_map.end())
130 const vector<QualifiedRangeSpec> &specs) {
140 for (
const auto &spec : specs)
141 plan->receiver_plan.remove(spec);
148 vector<QualifiedRangeSpec> specs;
149 bool changed =
false;
151 for (std::map<String, RecoveryPlans>::iterator iter =
m_map.begin();
152 iter !=
m_map.end(); ++iter) {
153 for (
size_t i=0; i<4; i++) {
154 if (iter->second.plans[i]) {
156 auto &receiver_plan = iter->second.plans[i]->receiver_plan;
157 receiver_plan.get_range_specs(specs);
158 for (
auto &spec : specs) {
159 if (!strcmp(table_id.c_str(), spec.table.id)) {
160 receiver_plan.remove(spec);
172 const String &old_destination,
173 const String &new_destination) {
175 vector<QualifiedRangeSpec> specs;
176 vector<RangeState> states;
179 bool changed =
false;
186 for (std::map<String, RecoveryPlans>::iterator iter =
m_map.begin();
187 iter !=
m_map.end(); ++iter) {
188 if (location !=
"*" && iter->first != location)
190 for (
size_t i=start; i<end; i++) {
191 if (iter->second.plans[i]) {
194 auto &receiver_plan = iter->second.plans[i]->receiver_plan;
195 receiver_plan.get_range_specs_and_states(old_destination, specs, states);
196 if (!specs.empty()) {
197 HT_ASSERT(specs.size() == states.size());
198 for (
size_t j=0; j<specs.size(); j++) {
199 receiver_plan.remove(specs[j]);
200 receiver_plan.insert(new_destination, specs[j], states[j]);
219 plan->receiver_plan.get_locations(locations);
228 if (!plan || plan->receiver_plan.empty())
236 return (
m_map.empty());
245 RecoveryPlanMap::const_iterator it =
m_map.begin();
246 while (it !=
m_map.end()) {
248 for (
int i = 0; i < 4; i++) {
250 if (it->second.plans[i])
251 len += it->second.plans[i]->encoded_length();
257 len += move_spec->encoded_length();
264 RecoveryPlanMap::const_iterator it =
m_map.begin();
265 while (it !=
m_map.end()) {
267 for (
int i = 0; i < 4; i++) {
268 if (it->second.plans[i]) {
270 it->second.plans[i]->encode(bufp);
279 move_spec->encode(bufp);
286 for (
int i = 0; i < num_servers; i++) {
289 for (
int j = 0; j < 4; j++) {
295 plans.
plans[j] = make_shared<RangeServerRecovery::Plan>();
296 plans.
plans[j]->decode(bufp, remainp);
303 for (
size_t i=0; i<count; i++) {
304 move_spec = make_shared<RangeMoveSpec>();
305 move_spec->decode(bufp, remainp);
316 for (
int i = 0; i < num_servers; i++) {
319 for (
int j = 0; j < 4; j++) {
325 plans.
plans[j] = make_shared<RangeServerRecovery::Plan>();
333 for (
size_t i=0; i<count; i++) {
334 move_spec = make_shared<RangeMoveSpec>();
343 const vector<QualifiedRangeSpec> &root_specs,
344 const vector<RangeState> &root_states,
345 const vector<QualifiedRangeSpec> &metadata_specs,
346 const vector<RangeState> &metadata_states,
347 const vector<QualifiedRangeSpec> &system_specs,
348 const vector<RangeState> &system_states,
349 const vector<QualifiedRangeSpec> &user_specs,
350 const vector<RangeState> &user_states)
354 HT_INFOF(
"Creating recovery plan for %s", location.c_str());
370 RecoveryPlanMap::iterator it;
371 for (it =
m_map.begin(); it !=
m_map.end(); ++it) {
399 if (location == (*iter)->dest_location) {
402 (*iter)->dest_location = new_location;
404 (*iter)->dest_location = new_location;
406 (*iter)->dest_location = new_location;
408 (*iter)->dest_location = new_location;
410 std::stringstream sout;
411 sout <<
"Found " << (*iter)->table <<
" " << (*iter)->range
412 <<
" in current set assigned to location "
413 << location <<
", but not in recovery plan";
426 m_map[location] = plans;
436 vector<MetaLog::EntityPtr> entities;
438 if (!m_context->rsc_manager->remove_server(location, rsc))
439 HT_FATALF(
"Unable to location RangeServerConnection object for %s", location.c_str());
440 m_context->recovered_servers->add(location);
441 entities.push_back(shared_from_this());
442 entities.push_back(m_context->recovered_servers);
443 entities.push_back(rsc);
444 m_mml_writer->record_state(entities);
447 std::stringstream sout;
448 sout <<
"Global recovery plan was modified: " << *
this;
453 BalancePlanAuthority::create_range_plan(
const String &location,
int type,
454 const vector<QualifiedRangeSpec> &specs,
455 const vector<RangeState> &states)
460 const char *type_strings[] = {
"root",
"metadata",
"system",
"user",
"UNKNOWN" };
462 vector<uint32_t> fragments;
468 +
"/log/" + type_strings[type];
472 HT_ASSERT(specs.size() == states.size());
475 for (
size_t i=0; i<specs.size(); i++) {
478 plan->receiver_plan.insert(*
m_active_iter, specs[i], states[i]);
484 for (
auto fragment : fragments) {
491 HT_INFOF(
"Added recovery plan with %d fragments for %d %s ranges",
492 (
int)fragments.size(), (int)specs.size(), type_strings[type]);
501 const vector<QualifiedRangeSpec> &new_specs)
506 vector<int32_t> fragments;
507 plan->replay_plan.get_fragments(location.c_str(), fragments);
509 for (
auto fragment : fragments) {
516 std::set<QualifiedRangeSpec> purge_ranges;
517 for (
const auto spec : new_specs)
518 purge_ranges.insert(spec);
521 vector<QualifiedRangeSpec> specs;
522 vector<RangeState> states;
523 plan->receiver_plan.get_range_specs_and_states(location, specs, states);
525 for (
size_t i=0; i<specs.size(); i++) {
526 if (purge_ranges.count(specs[i]) > 0)
527 plan->receiver_plan.remove(specs[i]);
531 plan->receiver_plan.insert(*
m_active_iter, specs[i], states[i]);
536 if (plan->receiver_plan.empty())
542 std::vector<MetaLog::EntityPtr> &entities) {
550 for (
auto &move : plan->moves)
554 entities.push_back(shared_from_this());
557 std::stringstream sout;
558 sout <<
"Balance plan registered move " << plan->moves.size()
559 <<
" ranges" <<
", BalancePlan = " << *plan;
568 bool modified =
false;
574 move_spec->table = table;
575 move_spec->range = range;
577 MoveSetT::iterator iter;
580 location = (*iter)->dest_location;
584 move_spec->dest_location = location;
591 m_mml_writer->record_state(shared_from_this());
601 std::stringstream sout;
603 sout <<
"balance_move_complete for " << table <<
" " << range;
606 move_spec->table = table;
607 move_spec->range = range;
609 MoveSetT::iterator iter;
613 (*iter)->complete =
true;
bool get_balance_destination(const TableIdentifier &table, const RangeSpec &range, String &location)
Returns the balance plan destination for a given range.
std::set< String > StringSet
STL Set managing Strings.
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
bool next_available_server(ContextPtr &context, String &location, bool urgent)
Gets name of next available server.
void change_receiver_plan_location(const String &location, int type, const String &old_destination, const String &new_destination)
Modifies all failover plans by changing moves to a given destination to a new one.
BalancePlanAuthority(ContextPtr context, MetaLog::WriterPtr &mml_writer)
Constructor.
void remove_from_receiver_plan(const String &location, int type, const vector< QualifiedRangeSpec > &ranges)
Removes ranges from a failover plan.
RangeServerRecovery::PlanPtr plans[4]
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
std::string String
A String is simply a typedef to std::string.
std::shared_ptr< BalancePlan > BalancePlanPtr
void display(std::ostream &os) override
Writes a human-readable represenation of object to an output stream.
Declarations for CommitLogReader.
uint8_t encoding_version() const override
Returns encoding version.
void copy_recovery_plan(const String &location, int type, RangeServerRecovery::Plan &out, int &plan_generation)
Copies part of a recovery plan for a failed range server.
void update_range_plan(RangeServerRecovery::PlanPtr &plan, const String &location, const vector< QualifiedRangeSpec > &new_specs)
Modifies recovery plan, replacing moves to location with a new destination.
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
void decode(const uint8_t **bufp, size_t *remainp, uint16_t definition_version) override
Reads serialized encoding of object.
size_t encoded_length_internal() const override
Returns internal serialized length.
MetaLog::WriterPtr m_mml_writer
Pointer to MML writer.
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
RangeServer recovery plan.
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
ReplayPlan replay_plan
Replay plan.
void remove_recovery_plan(const String &location)
Removes a recovery plan for a failed range server.
void remove_table_from_receiver_plan(const String &table_id)
Removes range move specifications for a table.
bool register_balance_plan(BalancePlanPtr &plan, int generation, std::vector< MetaLog::EntityPtr > &entities)
Registers a new balance plan for load balancing purposes.
bool decode_bool(const uint8_t **bufp, size_t *remainp)
Decodes a boolean value from the given buffer.
void get_init_fragment_ids(std::vector< uint32_t > &ids)
uint16_t decode_i16(const uint8_t **bufp, size_t *remainp)
Decode a 16-bit integer in little-endian order.
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
Compatibility Macros for C/C++.
RangeServerRecovery::PlanPtr create_range_plan(const String &location, int type, const vector< QualifiedRangeSpec > &specs, const vector< RangeState > &states)
Creates a recovery plan for a failed server.
Functions to serialize/deserialize primitives to/from a memory buffer.
void set_mml_writer(MetaLog::WriterPtr &mml_writer)
Sets the MML writer Sets m_mml_writer to mml_writer
void decode_old(const uint8_t **bufp, size_t *remainp)
MoveSetT m_current_set
Current set of move specifications for move operations.
ContextPtr m_context
Pointer to master context.
Provides sequential access to blocks in a commit log.
void encode_internal(uint8_t **bufp) const override
Writes serialized representation of object to a buffer.
void encode_vstr(uint8_t **bufp, const void *buf, size_t len)
Encode a buffer as variable length string (vint64, data, null)
#define HT_FATALF(msg,...)
Declarations for general-purpose utility functions.
void create_recovery_plan(const String &location, const vector< QualifiedRangeSpec > &root_specs, const vector< RangeState > &root_states, const vector< QualifiedRangeSpec > &metadata_specs, const vector< RangeState > &metadata_states, const vector< QualifiedRangeSpec > &system_specs, const vector< RangeState > &system_states, const vector< QualifiedRangeSpec > &user_specs, const vector< RangeState > &user_states)
Creates a recovery plan for a failed range server.
void legacy_decode(const uint8_t **bufp, size_t *remainp, BalancePlan *plan)
void encode_bool(uint8_t **bufp, bool bval)
Encodes a boolean into the given buffer.
std::shared_ptr< Writer > WriterPtr
Smart pointer to Writer.
std::shared_ptr< Plan > PlanPtr
Smart pointer to Plan.
Holds plans for each range type.
#define HT_INFOF(msg,...)
bool is_empty()
Determines if there are any failover balance plans.
Qualified (with table identifier) range specification.
std::shared_ptr< RangeMoveSpec > RangeMoveSpecPtr
int m_generation
Generation number (incremented with each new failover plan)
void get_receiver_plan_locations(const String &location, int type, StringSet &locations)
Returns the list of receiver location for a recovery plan.
bool recovery_complete(const String &location, int type)
Checks if recovery plan of given type has been removed.
ReceiverPlan receiver_plan
Receiver plan.
Declarations for BalancePlanAuthority.
StringSet::iterator m_active_iter
Iterator pointing into m_active.
Declarations for Context.
void decode_internal(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Reads serialized representation of object from a buffer.
RecoveryPlanMap m_map
Mapping from failed range server to recovery plan.
StringSet m_active
Cache of active (available) servers.