59 if (
props->has(
"Hypertable.Cluster.Name")) {
76 rsc_manager = make_shared<RangeServerConnectionManager>();
78 metrics_handler->start_collecting();
80 int worker_count =
props->get_i32(
"Hypertable.Client.Workers");
81 app_queue = make_shared<ApplicationQueue>(worker_count);
89 if (
props->get_bool(
"Hypertable.Master.Locations.IncludeMasterHash")) {
91 uint16_t port =
props->get_i16(
"port");
103 time_t now = time(0);
118 lock_guard<std::mutex> lock(
mutex);
132 lock_guard<std::mutex> lock(
mutex);
143 op->wait_for_idle(chrono::seconds(15));
161 cmd =
format(
"%s/conf/notification-hook.sh '[Hypertable %s] %s' '%s'",
163 subject.c_str(), message.c_str());
165 cmd =
format(
"%s/conf/notification-hook.sh '[Hypertable] %s' '%s'",
169 int ret = ::system(cmd.c_str());
170 HT_INFOF(
"notification-hook returned: %d", ret);
172 HT_WARNF(
"shell script conf/notification-hook.sh ('%s') returned "
173 "error %d", cmd.c_str(), ret);
178 lock_guard<std::mutex> lock(
mutex);
179 HT_ASSERT(dynamic_cast<BalancePlanAuthority *>(bpa.get()));
184 lock_guard<std::mutex> lock(
mutex);
191 lock_guard<std::mutex> lock(
mutex);
207 const uint8_t *decode_ptr =
event->payload;
208 size_t decode_remain =
event->payload_len;
210 params.
decode(&decode_ptr, &decode_remain);
213 if (event->proxy == 0) {
215 if (!
rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
216 HT_WARNF(
"Unable to determine proxy for replay_status(id=%lld, %s, "
217 "plan_generation=%d) from %s", (
Lld)params.
op_id(),
219 event->addr.format().c_str());
222 proxy = rsc->location();
225 proxy =
event->proxy;
227 HT_INFOF(
"replay_status(id=%lld, %s, plan_generation=%d) from %s",
242 const uint8_t *decode_ptr =
event->payload;
243 size_t decode_remain =
event->payload_len;
245 params.
decode(&decode_ptr, &decode_remain);
248 if (event->proxy == 0) {
250 if (!
rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
251 HT_WARNF(
"Unable to determine proxy for replay_complete(id=%lld, %s, "
252 "plan_generation=%d) from %s", (
Lld)params.
op_id(),
254 event->addr.format().c_str());
257 proxy = rsc->location();
260 proxy =
event->proxy;
262 HT_INFOF(
"from %s replay_complete(id=%lld, %s, plan_generation=%d) = %s",
277 HT_WARN_OUT <<
"No Recovery replay step future found for operation="
284 const uint8_t *decode_ptr =
event->payload;
285 size_t decode_remain =
event->payload_len;
287 params.
decode(&decode_ptr, &decode_remain);
289 HT_INFOF(
"prepare_complete(id=%lld, %s, plan_generation=%d) = %s",
297 if (event->proxy == 0) {
299 if (!
rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
300 HT_WARNF(
"Unable to determine proxy for prepare_complete(id=%lld, %s, "
301 "plan_generation=%d) from %s", (
Lld)params.
op_id(),
303 event->addr.format().c_str());
306 proxy = rsc->location();
309 proxy =
event->proxy;
319 HT_WARN_OUT <<
"No Recovery prepare step future found for operation="
325 const uint8_t *decode_ptr =
event->payload;
326 size_t decode_remain =
event->payload_len;
328 params.
decode(&decode_ptr, &decode_remain);
330 HT_INFOF(
"commit_complete(id=%lld, %s, plan_generation=%d) = %s",
338 if (event->proxy == 0) {
340 if (!
rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
341 HT_WARNF(
"Unable to determine proxy for commit_complete(id=%lld, %s, "
342 "plan_generation=%d) from %s", (
Lld)params.
op_id(),
344 event->addr.format().c_str());
347 proxy = rsc->location();
350 proxy =
event->proxy;
360 HT_WARN_OUT <<
"No Recovery commit step future found for operation="
393 lock_guard<std::mutex> lock(
mutex);
398 lock_guard<std::mutex> lock(
mutex);
403 lock_guard<std::mutex> lock(
mutex);
408 lock_guard<std::mutex> lock(
mutex);
418 HT_WARNF(
"RangeServer %s: no disk usage statistics available",
424 double numerator=0.0, denominator=0.0;
425 for (
const auto &fs : stats.
stats->system.fs_stat) {
426 numerator += fs.total - fs.avail;
427 denominator += fs.total;
430 if (denominator == 0.0 ||
434 HT_WARNF(
"RangeServer %s: disk use %d%% exceeds threshold; will not assign "
436 (int32_t)((numerator/denominator)*100.00));
445 lock_guard<std::mutex> lock(
m_mutex);
451 lock_guard<std::mutex> lock(m_mutex);
453 if (m_replay_map.find(
id) != m_replay_map.end())
454 future = m_replay_map[
id];
460 lock_guard<std::mutex> lock(m_mutex);
461 m_replay_map.erase(
id);
468 lock_guard<std::mutex> lock(m_mutex);
469 m_prepare_map[id] = future;
474 lock_guard<std::mutex> lock(m_mutex);
476 if (m_prepare_map.find(
id) != m_prepare_map.end())
477 future = m_prepare_map[
id];
483 lock_guard<std::mutex> lock(m_mutex);
484 m_prepare_map.erase(
id);
491 lock_guard<std::mutex> lock(m_mutex);
492 m_commit_map[id] = future;
497 lock_guard<std::mutex> lock(m_mutex);
499 if (m_commit_map.find(
id) != m_commit_map.end())
500 future = m_commit_map[
id];
506 lock_guard<std::mutex> lock(m_mutex);
507 m_commit_map.erase(
id);
static Comm * instance()
Creates/returns singleton instance of the Comm class.
std::set< String > StringSet
STL Set managing Strings.
String cluster_name
Name of cluster.
StringSet available_servers
ApplicationQueuePtr app_queue
int32_t plan_generation()
Gets recovery plan generation.
int64_t op_id()
Gets recovery operation ID.
void install_prepare_future(int64_t id, RecoveryStepFuturePtr &future)
#define HT_WARNF(msg,...)
The FailureInducer simulates errors.
bool add_move_operation(std::shared_ptr< Operation > operation)
Adds operation to active move range operation map.
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
std::string String
A String is simply a typedef to std::string.
std::shared_ptr< Entity > EntityPtr
Smart pointer to Entity.
int32_t plan_generation()
Gets recovery plan generation.
const string & location() const
Gets proxy name of RangeServer whose log is being replayed.
RecoveryStepFuturePtr get_commit_future(int64_t id)
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
std::mutex m_outstanding_move_ops_mutex
Mutex for serializing access to m_outstanding_move_ops
Declarations for OperationProcessor.
Declarations for Operation.
ConnectionManagerPtr conn_manager
int32_t max_allowable_skew
void remove_move_operation(std::shared_ptr< Operation > operation)
Removes operation from active move range operation map.
bool set_startup_status(bool status)
Set startup flag to false.
TablePtr new_table(const std::string &name)
int32_t error()
Gets error code.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
std::unique_ptr< ResponseManager > response_manager
RecoveryState m_recovery_state
std::unique_ptr< Thread > response_manager_thread
void start_shutdown()
Start shutdown sequence.
void erase_prepare_future(int64_t id)
bool can_accept_ranges(const RangeServerStatistics &stats)
const string & message() const
Gets error message.
Declarations for ReferenceManager.
int32_t error()
Gets error code.
Request parameters for replay status operation.
Declarations for ReplayStatus request parameters.
int32_t disk_threshold
Disk use threshold percentage.
Request parameters for replay complete operation.
PropertiesPtr props
Configuration properties.
uint32_t monitoring_interval
StatsRangeServerPtr stats
std::shared_ptr< RecoveryStepFuture > RecoveryStepFuturePtr
RangeLocatorPtr range_locator
void erase_replay_future(int64_t id)
std::shared_ptr< Session > SessionPtr
int32_t plan_generation()
Gets recovery plan generation.
std::unique_ptr< OperationProcessor > op
const char * get_text(int error)
Returns a descriptive error message.
const string & location() const
Gets proxy name of RangeServer whose log is being recovered.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Hyperspace::SessionPtr hyperspace
void prepare_complete(EventPtr &event)
const string & message() const
Gets error message.
std::shared_ptr< Properties > PropertiesPtr
Declarations for PhantomPrepareComplete request parameters.
Compatibility Macros for C/C++.
Declarations for PhantomCommitComplete request parameters.
void remove_available_server(const String &location)
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
void replay_status(EventPtr &event)
void md5_string(const char *input, char output[33])
Calculates the hex string of MD5 of null terminated input.
void install_replay_future(int64_t id, RecoveryStepFuturePtr &future)
std::unique_ptr< HyperspaceMasterFile > master_file
Hyperspace master file handle
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
bool shutdown_in_progress()
Gets flag indicating if server is shutting down.
Context(PropertiesPtr &p, Hyperspace::SessionPtr hs)
Context.
RecoveryStepFuturePtr get_prepare_future(int64_t id)
std::shared_ptr< OperationTimedBarrier > recovery_barrier_op
std::unordered_map< int64_t, int64_t > m_outstanding_move_ops
Map of outstanding move range operations.
long long int Lld
Shortcut for printf formats.
Central authority for balance plans.
MetaLog::WriterPtr mml_writer
Request parameters for phantom prepare complete operation.
void set_balance_plan_authority(MetaLog::EntityPtr bpa)
Sets the BalancePlanAuthority.
#define HT_INFOF(msg,...)
void install_commit_future(int64_t id, RecoveryStepFuturePtr &future)
static String install_dir
The installation directory.
Declarations for ReplayComplete request parameters.
MetaLog::EntityPtr m_balance_plan_authority
BalancePlanAuthority entity.
bool m_startup
Flag indicating that server is starting up.
bool startup_in_progress()
Gets flag indicating if server is starting up.
std::unique_ptr< ReferenceManager > reference_manager
void erase_commit_future(int64_t id)
const string & location() const
Gets proxy name of RangeServer whose log is being replayed.
RecoveryStepFuturePtr get_replay_future(int64_t id)
void commit_complete(EventPtr &event)
Declarations for BalancePlanAuthority.
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
int64_t op_id()
Gets recovery operation ID.
time_t next_monitoring_time
System information and statistics based on libsigar.
size_t available_server_count()
void get_available_servers(StringSet &servers)
void replay_complete(EventPtr &event)
std::shared_ptr< Operation > get_move_operation(int64_t hash_code)
Gets operation from active move range operation map.
BalancePlanAuthority * get_balance_plan_authority()
void notification_hook(const String &subject, const String &message)
Invoke notification hook.
Declarations for Context.
static const NetInfo & net_info()
Retrieves updated Network information (see SystemInfo.h)
int64_t op_id()
Gets recovery operation ID.
MetricsHandlerPtr metrics_handler
void add_available_server(const String &location)
std::shared_ptr< Table > TablePtr
RangeServerConnectionManagerPtr rsc_manager
bool m_shutdown
Flag indicating that server is shutting down.