49         const String &location, 
int type)
 
   51     m_location(location), m_type(type) {
 
   59   : 
Operation(context, header_), m_plan_generation(0), m_last_notification(0) {
 
   65   HT_INFOF(
"Entering RecoverServerRanges (%p) %s type=%d plan_generation=%d state=%s",
 
   69   if (!
m_context->recovery_state().get_replay_future(
id())) {
 
   85       HT_INFOF(
"Plan for location %s, type %s is empty, nothing to do",
 
   94     m_context->mml_writer->record_state(shared_from_this());
 
  106       m_context->mml_writer->record_state(shared_from_this());
 
  124     m_context->mml_writer->record_state(shared_from_this());
 
  150     m_context->mml_writer->record_state(shared_from_this());
 
  178     m_context->mml_writer->record_state(shared_from_this());
 
  194     m_context->mml_writer->record_state(shared_from_this());
 
  205       this_thread::sleep_for(chrono::milliseconds(5000));
 
  224     HT_FATALF(
"Unrecognized state %d", state);
 
  228   HT_INFOF(
"Leaving RecoverServerRanges %s plan_generation=%d type=%d state=%s",
 
  241   return "OperationRecoverRanges";
 
  245   return format(
"RecoverServerRanges %s type=%s",
 
  268     if (
m_context->props->get_bool(
"Hypertable.Failover.RecoverInSeries"))
 
  277   vector<QualifiedRangeSpec> specs;
 
  279   for (
auto &spec : specs)
 
  281                                  spec.table.id, spec.range.start_row,
 
  282                                  spec.range.end_row));
 
  336   vector<int32_t> fragments;
 
  339   for (
const auto &location : locations) {
 
  341     vector<QualifiedRangeSpec> specs;
 
  342     vector<RangeState> states;
 
  345       HT_INFOF(
"Calling phantom_load(plan_generation=%d, location=%s) for %d %s ranges",
 
  366     m_context->get_balance_plan_authority()->get_generation();
 
  373           == 
m_context->get_balance_plan_authority()->get_generation());
 
  377   size_t available_servers = 
m_context->available_server_count();
 
  378   size_t total_servers = 
m_context->rsc_manager->server_count();
 
  379   size_t failover_pct =
 
  380     m_context->props->get_i32(
"Hypertable.Failover.Quorum.Percentage");
 
  381   size_t quorum = ((total_servers * failover_pct) + 99) / 100;
 
  383   if (available_servers < quorum || available_servers == 0) {
 
  388     message = 
format(
"Recovery of range server %s has been suspended because\\n" 
  389                      "only %d out of %d servers are available.  Required\\n" 
  391                      (int)available_servers, (
int)total_servers,
 
  393     m_context->notification_hook(subject, message);
 
  406   m_context->recovery_state().install_replay_future(
id(), future);
 
  410   m_context->recovery_state().install_prepare_future(
id(), future);
 
  414   m_context->recovery_state().install_commit_future(
id(), future);
 
  424     HT_INFOF(
"Retrieved new balance plan for %s (type=%s, generation=%d) range count %d",
 
  429     m_context->mml_writer->record_state(shared_from_this());
 
  458   vector<int32_t> fragments;
 
  461     m_context->recovery_state().get_replay_future(
id());
 
  467   future->register_locations(locations);
 
  469   for (
const auto &location : locations) {
 
  474       HT_INFOF(
"Issue replay_fragments for %d fragments to %s (%s)",
 
  475                (
int)fragments.size(), location.c_str(), 
m_type_str.c_str());
 
  487   if (!future->wait_for_completion(
m_timeout)) {
 
  488     bool range_map_not_found = 
false;
 
  491       format(
"Failure encountered during REPLAY FRAGMENTS step of recovery\\n" 
  492              "of range server %s\\n\\n", 
m_location.c_str());
 
  494     future->get_error_map(error_map);
 
  495     for (RecoveryStepFuture::ErrorMapT::iterator iter=error_map.begin();
 
  496          iter != error_map.end(); ++iter) {
 
  498         + 
" - " + iter->second.second;
 
  499       message += str + 
"\\n";
 
  502         range_map_not_found = 
true;
 
  504     if (range_map_not_found)
 
  507       time_t now = time(0);
 
  509         String subject = 
format(
"ERROR: Replay failure during recovery of %s",
 
  511         m_context->notification_hook(subject, message);
 
  527     m_context->recovery_state().get_prepare_future(
id());
 
  533   future->register_locations(locations);
 
  535   for (
const auto &location : locations) {
 
  537     vector<QualifiedRangeSpec> specs;
 
  540     HT_INFOF(
"Issue phantom_prepare_ranges for %d ranges to %s (%s)",
 
  541              (
int)specs.size(), location.c_str(), 
m_type_str.c_str());
 
  553   if (!future->wait_for_completion(
m_timeout)) {
 
  554     bool range_map_not_found = 
false;
 
  556     future->get_error_map(error_map);
 
  557     for (RecoveryStepFuture::ErrorMapT::iterator iter=error_map.begin();
 
  558          iter != error_map.end(); ++iter) {
 
  559       HT_ERRORF(
"phantom commit at %s failed - %s (%s)", iter->first.c_str(),
 
  562         range_map_not_found = 
true;
 
  564     if (range_map_not_found)
 
  579     m_context->recovery_state().get_commit_future(
id());
 
  584     StringSet existing_locations, new_locations;
 
  587     std::set_intersection(existing_locations.begin(), existing_locations.end(),
 
  588                           new_locations.begin(), new_locations.end(),
 
  589                           std::inserter(locations, locations.end()));
 
  596     locations.erase(location);
 
  598   future->register_locations(locations);
 
  600   for (
const auto &location : locations) {
 
  602     vector<QualifiedRangeSpec> specs;
 
  606      HT_INFOF(
"Issue phantom_commit_ranges for %d ranges to %s",
 
  607               (
int)specs.size(), location.c_str());
 
  618   if (!future->wait_for_completion(
m_timeout)) {
 
  621     future->get_error_map(error_map);
 
  622     for (RecoveryStepFuture::ErrorMapT::iterator iter=error_map.begin();
 
  623          iter != error_map.end(); ++iter) {
 
  625         m_redo_set.insert(iter->first);
 
  641   vector<QualifiedRangeSpec> acknowledged;
 
  646     StringSet existing_locations, new_locations;
 
  649     std::set_intersection(existing_locations.begin(), existing_locations.end(),
 
  650                           new_locations.begin(), new_locations.end(),
 
  651                           std::inserter(locations, locations.end()));
 
  658     locations.erase(location);
 
  660   for (
const auto &location : locations) {
 
  662     vector<QualifiedRangeSpec> specs;
 
  663     vector<QualifiedRangeSpec *> range_ptrs;
 
  664     map<QualifiedRangeSpec, int> response_map;
 
  665     map<QualifiedRangeSpec, int>::iterator response_map_it;
 
  668     for (
auto &range : specs)
 
  669       range_ptrs.push_back(&range);
 
  671       HT_INFOF(
"Issue acknowledge_load for %d ranges to %s",
 
  672                (
int)range_ptrs.size(), location.c_str());
 
  677       response_map_it = response_map.begin();
 
  678       while (response_map_it != response_map.end()) {
 
  679         if (response_map_it->second != 
Error::OK)
 
  680           HT_WARNF(
"Problem acknowledging load for %s[%s..%s] - %s",
 
  681                   response_map_it->first.table.id,
 
  682                   response_map_it->first.range.start_row,
 
  683                   response_map_it->first.range.end_row,
 
  688       HT_INFOF(
"acknowledge_load complete for %d ranges to %s",
 
  689                (
int)range_ptrs.size(), location.c_str());
 
  698   if (!acknowledged.empty()) {
 
  700     for (
const auto &spec : acknowledged)
 
std::set< String > StringSet
STL Set managing Strings. 
void encode_state(uint8_t **bufp) const override
Encode operation state. 
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null). 
void phantom_prepare_ranges(const CommAddress &addr, int64_t op_id, const String &location, int32_t plan_generation, const vector< QualifiedRangeSpec > &ranges, int32_t timeout)
Issues a "phantom_prepare_ranges" synchronous request. 
#define HT_WARNF(msg,...)
void remove_from_receiver_plan(const String &location, int type, const vector< QualifiedRangeSpec > &ranges)
Removes ranges from a failover plan. 
The FailureInducer simulates errors. 
void execute() override
Executes (carries out) the operation. 
ContextPtr m_context
Pointer to Master context. 
time_t m_last_notification
std::string String
A String is simply a typedef to std::string. 
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Declarations for CommitLogReader. 
virtual size_t encoded_length() const 
Returns serialized object length. 
Declarations for OperationProcessor. 
Po::typed_value< String > * str(String *v=0)
void get_fragments(vector< int32_t > &fragments) const 
Fills a vector with all of the fragment numbers that are part of this replay plan. 
bool get_new_recovery_plan()
void acknowledge_load(const CommAddress &addr, const vector< QualifiedRangeSpec * > &ranges, std::map< QualifiedRangeSpec, int > &response_map)
Issues a synchronous "acknowledge load" request for multiple ranges. 
bool validate_recovery_plan()
PageArena memory allocator for STL classes. 
void get_range_specs(vector< QualifiedRangeSpec > &specs) const 
RangeServerRecovery::Plan m_plan
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null) 
std::map< String, std::pair< int32_t, String > > ErrorMapT
int32_t m_plan_generation
const char * get_text(int32_t state)
void display_state(std::ostream &os) override
Write human readable operation state to output stream. 
void decode_state(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Decode operation state. 
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order. 
const String label() override
Human readable label for operation. 
std::shared_ptr< Context > ContextPtr
Smart pointer to Context. 
void get_range_specs_and_states(vector< QualifiedRangeSpec > &specs, vector< RangeState > &states) const 
OperationRecoverRanges(ContextPtr &context, const String &location, int type)
ReplayPlan replay_plan
Replay plan. 
const String name() override
Name of operation used for exclusivity. 
void replay_fragments(const CommAddress &addr, int64_t op_id, const String &recover_location, int32_t plan_generation, int32_t type, const vector< int32_t > &fragments, const Lib::RangeServerRecovery::ReceiverPlan &plan, int32_t replay_timeout)
Issues a synchronous "replay_fragments" request. 
std::shared_ptr< RecoveryStepFuture > RecoveryStepFuturePtr
static time_point now() noexcept
void decode_state_old(uint8_t version, const uint8_t **bufp, size_t *remainp) override
virtual void encode(uint8_t **bufp) const 
Writes serialized representation of object to a buffer. 
void set_state(int32_t state)
const char * get_text(int error)
Returns a descriptive error message. 
uint16_t decode_i16(const uint8_t **bufp, size_t *remainp)
Decode a 16-bit integer in little-endian order. 
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p. 
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order. 
Compatibility Macros for C/C++. 
DependencySet m_obstructions_permanent
Set of permanent obstructions. 
void phantom_commit_ranges(const CommAddress &addr, int64_t op_id, const String &location, int32_t plan_generation, const vector< QualifiedRangeSpec > &ranges, int32_t timeout)
Issues a "phantom_commit_ranges" synchronous request. 
The PageArena allocator is simple and fast, avoiding individual mallocs/frees. 
bool recovery_plan_has_changed()
void initialize_obstructions_dependencies()
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer. 
void get_locations(StringSet &locations) const 
Fills a set of location strings that represent all of the locations (range servers) that are part of ...
const char * RECOVERY_BLOCKER
void encode_vstr(uint8_t **bufp, const void *buf, size_t len)
Encode a buffer as variable length string (vint64, data, null) 
void remove(const QualifiedRangeSpec &qrs)
#define HT_FATALF(msg,...)
DependencySet m_obstructions
Set of obstructions. 
Declarations for general-purpose utility functions. 
void legacy_decode(const uint8_t **bufp, size_t *remainp, BalancePlan *plan)
Central authority for balance plans. 
DependencySet m_dependencies
Set of dependencies. 
#define HT_INFOF(msg,...)
void phantom_load(const CommAddress &addr, const String &location, int32_t plan_generation, const vector< int32_t > &fragments, const vector< QualifiedRangeSpec > &range_specs, const vector< RangeState > &range_states)
Issues a "phantom_load" synchronous request. 
void decode_request(const uint8_t **bufp, size_t *remainp)
Client interface to RangeServer. 
Abstract base class for master operations. 
This is a generic exception class for Hypertable. 
uint8_t encoding_version_state() const override
Returns version of encoding format of state. 
Qualified (with table identifier) range specification. 
#define HT_ERRORF(msg,...)
DependencySet m_exclusivities
Set of exclusivities. 
void get_receiver_plan_locations(const String &location, int type, StringSet &locations)
Returns the list of receiver location for a recovery plan. 
void get_locations(StringSet &locations) const 
#define HT_MAYBE_FAIL(_label_)
ReceiverPlan receiver_plan
Receiver plan. 
void complete_ok(std::vector< MetaLog::EntityPtr > &additional)
Declarations for BalancePlanAuthority. 
size_t encoded_length_state() const override
Encoded length of operation state. 
Error codes, Exception handling, error logging. 
#define HT_THROW(_code_, _msg_)
Address abstraction to hold either proxy name or IPv4:port address. 
bool phantom_load_ranges()
int code() const 
Returns the error code. 
ClockT::time_point m_expiration_time
Expiration time (used by ResponseManager)