49 const String &location,
int type)
51 m_location(location), m_type(type) {
59 :
Operation(context, header_), m_plan_generation(0), m_last_notification(0) {
65 HT_INFOF(
"Entering RecoverServerRanges (%p) %s type=%d plan_generation=%d state=%s",
69 if (!
m_context->recovery_state().get_replay_future(
id())) {
85 HT_INFOF(
"Plan for location %s, type %s is empty, nothing to do",
94 m_context->mml_writer->record_state(shared_from_this());
106 m_context->mml_writer->record_state(shared_from_this());
124 m_context->mml_writer->record_state(shared_from_this());
150 m_context->mml_writer->record_state(shared_from_this());
178 m_context->mml_writer->record_state(shared_from_this());
194 m_context->mml_writer->record_state(shared_from_this());
205 this_thread::sleep_for(chrono::milliseconds(5000));
224 HT_FATALF(
"Unrecognized state %d", state);
228 HT_INFOF(
"Leaving RecoverServerRanges %s plan_generation=%d type=%d state=%s",
241 return "OperationRecoverRanges";
245 return format(
"RecoverServerRanges %s type=%s",
268 if (
m_context->props->get_bool(
"Hypertable.Failover.RecoverInSeries"))
277 vector<QualifiedRangeSpec> specs;
279 for (
auto &spec : specs)
281 spec.table.id, spec.range.start_row,
282 spec.range.end_row));
336 vector<int32_t> fragments;
339 for (
const auto &location : locations) {
341 vector<QualifiedRangeSpec> specs;
342 vector<RangeState> states;
345 HT_INFOF(
"Calling phantom_load(plan_generation=%d, location=%s) for %d %s ranges",
366 m_context->get_balance_plan_authority()->get_generation();
373 ==
m_context->get_balance_plan_authority()->get_generation());
377 size_t available_servers =
m_context->available_server_count();
378 size_t total_servers =
m_context->rsc_manager->server_count();
379 size_t failover_pct =
380 m_context->props->get_i32(
"Hypertable.Failover.Quorum.Percentage");
381 size_t quorum = ((total_servers * failover_pct) + 99) / 100;
383 if (available_servers < quorum || available_servers == 0) {
388 message =
format(
"Recovery of range server %s has been suspended because\\n"
389 "only %d out of %d servers are available. Required\\n"
391 (int)available_servers, (
int)total_servers,
393 m_context->notification_hook(subject, message);
406 m_context->recovery_state().install_replay_future(
id(), future);
410 m_context->recovery_state().install_prepare_future(
id(), future);
414 m_context->recovery_state().install_commit_future(
id(), future);
424 HT_INFOF(
"Retrieved new balance plan for %s (type=%s, generation=%d) range count %d",
429 m_context->mml_writer->record_state(shared_from_this());
458 vector<int32_t> fragments;
461 m_context->recovery_state().get_replay_future(
id());
467 future->register_locations(locations);
469 for (
const auto &location : locations) {
474 HT_INFOF(
"Issue replay_fragments for %d fragments to %s (%s)",
475 (
int)fragments.size(), location.c_str(),
m_type_str.c_str());
487 if (!future->wait_for_completion(
m_timeout)) {
488 bool range_map_not_found =
false;
491 format(
"Failure encountered during REPLAY FRAGMENTS step of recovery\\n"
492 "of range server %s\\n\\n",
m_location.c_str());
494 future->get_error_map(error_map);
495 for (RecoveryStepFuture::ErrorMapT::iterator iter=error_map.begin();
496 iter != error_map.end(); ++iter) {
498 +
" - " + iter->second.second;
499 message += str +
"\\n";
502 range_map_not_found =
true;
504 if (range_map_not_found)
507 time_t now = time(0);
509 String subject =
format(
"ERROR: Replay failure during recovery of %s",
511 m_context->notification_hook(subject, message);
527 m_context->recovery_state().get_prepare_future(
id());
533 future->register_locations(locations);
535 for (
const auto &location : locations) {
537 vector<QualifiedRangeSpec> specs;
540 HT_INFOF(
"Issue phantom_prepare_ranges for %d ranges to %s (%s)",
541 (
int)specs.size(), location.c_str(),
m_type_str.c_str());
553 if (!future->wait_for_completion(
m_timeout)) {
554 bool range_map_not_found =
false;
556 future->get_error_map(error_map);
557 for (RecoveryStepFuture::ErrorMapT::iterator iter=error_map.begin();
558 iter != error_map.end(); ++iter) {
559 HT_ERRORF(
"phantom commit at %s failed - %s (%s)", iter->first.c_str(),
562 range_map_not_found =
true;
564 if (range_map_not_found)
579 m_context->recovery_state().get_commit_future(
id());
584 StringSet existing_locations, new_locations;
587 std::set_intersection(existing_locations.begin(), existing_locations.end(),
588 new_locations.begin(), new_locations.end(),
589 std::inserter(locations, locations.end()));
596 locations.erase(location);
598 future->register_locations(locations);
600 for (
const auto &location : locations) {
602 vector<QualifiedRangeSpec> specs;
606 HT_INFOF(
"Issue phantom_commit_ranges for %d ranges to %s",
607 (
int)specs.size(), location.c_str());
618 if (!future->wait_for_completion(
m_timeout)) {
621 future->get_error_map(error_map);
622 for (RecoveryStepFuture::ErrorMapT::iterator iter=error_map.begin();
623 iter != error_map.end(); ++iter) {
625 m_redo_set.insert(iter->first);
641 vector<QualifiedRangeSpec> acknowledged;
646 StringSet existing_locations, new_locations;
649 std::set_intersection(existing_locations.begin(), existing_locations.end(),
650 new_locations.begin(), new_locations.end(),
651 std::inserter(locations, locations.end()));
658 locations.erase(location);
660 for (
const auto &location : locations) {
662 vector<QualifiedRangeSpec> specs;
663 vector<QualifiedRangeSpec *> range_ptrs;
664 map<QualifiedRangeSpec, int> response_map;
665 map<QualifiedRangeSpec, int>::iterator response_map_it;
668 for (
auto &range : specs)
669 range_ptrs.push_back(&range);
671 HT_INFOF(
"Issue acknowledge_load for %d ranges to %s",
672 (
int)range_ptrs.size(), location.c_str());
677 response_map_it = response_map.begin();
678 while (response_map_it != response_map.end()) {
679 if (response_map_it->second !=
Error::OK)
680 HT_WARNF(
"Problem acknowledging load for %s[%s..%s] - %s",
681 response_map_it->first.table.id,
682 response_map_it->first.range.start_row,
683 response_map_it->first.range.end_row,
688 HT_INFOF(
"acknowledge_load complete for %d ranges to %s",
689 (
int)range_ptrs.size(), location.c_str());
698 if (!acknowledged.empty()) {
700 for (
const auto &spec : acknowledged)
std::set< String > StringSet
STL Set managing Strings.
void encode_state(uint8_t **bufp) const override
Encode operation state.
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
void phantom_prepare_ranges(const CommAddress &addr, int64_t op_id, const String &location, int32_t plan_generation, const vector< QualifiedRangeSpec > &ranges, int32_t timeout)
Issues a "phantom_prepare_ranges" synchronous request.
#define HT_WARNF(msg,...)
void remove_from_receiver_plan(const String &location, int type, const vector< QualifiedRangeSpec > &ranges)
Removes ranges from a failover plan.
The FailureInducer simulates errors.
void execute() override
Executes (carries out) the operation.
ContextPtr m_context
Pointer to Master context.
time_t m_last_notification
std::string String
A String is simply a typedef to std::string.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Declarations for CommitLogReader.
virtual size_t encoded_length() const
Returns serialized object length.
Declarations for OperationProcessor.
Po::typed_value< String > * str(String *v=0)
void get_fragments(vector< int32_t > &fragments) const
Fills a vector with all of the fragment numbers that are part of this replay plan.
bool get_new_recovery_plan()
void acknowledge_load(const CommAddress &addr, const vector< QualifiedRangeSpec * > &ranges, std::map< QualifiedRangeSpec, int > &response_map)
Issues a synchronous "acknowledge load" request for multiple ranges.
bool validate_recovery_plan()
PageArena memory allocator for STL classes.
void get_range_specs(vector< QualifiedRangeSpec > &specs) const
RangeServerRecovery::Plan m_plan
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
std::map< String, std::pair< int32_t, String > > ErrorMapT
int32_t m_plan_generation
const char * get_text(int32_t state)
void display_state(std::ostream &os) override
Write human readable operation state to output stream.
void decode_state(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Decode operation state.
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
const String label() override
Human readable label for operation.
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
void get_range_specs_and_states(vector< QualifiedRangeSpec > &specs, vector< RangeState > &states) const
OperationRecoverRanges(ContextPtr &context, const String &location, int type)
ReplayPlan replay_plan
Replay plan.
const String name() override
Name of operation used for exclusivity.
void replay_fragments(const CommAddress &addr, int64_t op_id, const String &recover_location, int32_t plan_generation, int32_t type, const vector< int32_t > &fragments, const Lib::RangeServerRecovery::ReceiverPlan &plan, int32_t replay_timeout)
Issues a synchronous "replay_fragments" request.
std::shared_ptr< RecoveryStepFuture > RecoveryStepFuturePtr
static time_point now() noexcept
void decode_state_old(uint8_t version, const uint8_t **bufp, size_t *remainp) override
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
void set_state(int32_t state)
const char * get_text(int error)
Returns a descriptive error message.
uint16_t decode_i16(const uint8_t **bufp, size_t *remainp)
Decode a 16-bit integer in little-endian order.
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p.
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
Compatibility Macros for C/C++.
DependencySet m_obstructions_permanent
Set of permanent obstructions.
void phantom_commit_ranges(const CommAddress &addr, int64_t op_id, const String &location, int32_t plan_generation, const vector< QualifiedRangeSpec > &ranges, int32_t timeout)
Issues a "phantom_commit_ranges" synchronous request.
The PageArena allocator is simple and fast, avoiding individual mallocs/frees.
bool recovery_plan_has_changed()
void initialize_obstructions_dependencies()
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
void get_locations(StringSet &locations) const
Fills a set of location strings that represent all of the locations (range servers) that are part of ...
const char * RECOVERY_BLOCKER
void encode_vstr(uint8_t **bufp, const void *buf, size_t len)
Encode a buffer as variable length string (vint64, data, null)
void remove(const QualifiedRangeSpec &qrs)
#define HT_FATALF(msg,...)
DependencySet m_obstructions
Set of obstructions.
Declarations for general-purpose utility functions.
void legacy_decode(const uint8_t **bufp, size_t *remainp, BalancePlan *plan)
Central authority for balance plans.
DependencySet m_dependencies
Set of dependencies.
#define HT_INFOF(msg,...)
void phantom_load(const CommAddress &addr, const String &location, int32_t plan_generation, const vector< int32_t > &fragments, const vector< QualifiedRangeSpec > &range_specs, const vector< RangeState > &range_states)
Issues a "phantom_load" synchronous request.
void decode_request(const uint8_t **bufp, size_t *remainp)
Client interface to RangeServer.
Abstract base class for master operations.
This is a generic exception class for Hypertable.
uint8_t encoding_version_state() const override
Returns version of encoding format of state.
Qualified (with table identifier) range specification.
#define HT_ERRORF(msg,...)
DependencySet m_exclusivities
Set of exclusivities.
void get_receiver_plan_locations(const String &location, int type, StringSet &locations)
Returns the list of receiver location for a recovery plan.
void get_locations(StringSet &locations) const
#define HT_MAYBE_FAIL(_label_)
ReceiverPlan receiver_plan
Receiver plan.
void complete_ok(std::vector< MetaLog::EntityPtr > &additional)
Declarations for BalancePlanAuthority.
size_t encoded_length_state() const override
Encoded length of operation state.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Address abstraction to hold either proxy name or IPv4:port address.
bool phantom_load_ranges()
int code() const
Returns the error code.
ClockT::time_point m_expiration_time
Expiration time (used by ResponseManager)