71 static void init_options() {
72 alias(
"port",
"Hypertable.Master.Port");
73 alias(
"workers",
"Hypertable.Master.Workers");
74 alias(
"reactors",
"Hypertable.Master.Reactors");
84 m_handler = make_shared<ConnectionHandler>(m_context);
102 return strcmp(rsc1->location().c_str(), rsc2->location().c_str()) < 0;
123 int main(
int argc,
char **argv) {
134 init_with_policies<Policies>(argc, argv);
135 uint16_t port = get_i16(
"port");
136 listen_addr =
InetAddr(INADDR_ANY, port);
139 context = make_shared<Context>(
properties, hyperspace);
140 context->monitoring = make_shared<Monitoring>(context.get());
141 context->op = make_unique<OperationProcessor>(context, get_i32(
"workers"));
144 context->comm->listen(listen_addr, connection_handler_factory);
146 auto func = [&context](
bool status){context->set_startup_status(
status);};
147 if (!context->master_file->obtain_master_lock(context->toplevel_dir, func)){
148 context->start_shutdown();
157 context->mml_definition =
158 make_shared<MetaLog::DefinitionMaster>(context,
format(
"%s_%u",
"master", port).c_str());
160 if (
has(
"induce-failure")) {
169 std::vector<MetaLog::EntityPtr> entities;
170 std::vector<OperationPtr> operations;
171 std::map<String, OperationPtr> recovery_ops;
175 String log_dir = context->toplevel_dir +
"/servers/master/log/"
176 + context->mml_definition->name();
179 mml_reader = make_shared<MetaLog::Reader>(context->dfs, context->mml_definition,
181 mml_reader->get_entities(entities);
185 std::vector<MetaLog::EntityPtr> entities2;
186 std::set<RangeServerConnectionPtr, ltrsc> rsc_set;
188 entities2.reserve(entities.size());
189 for (
auto &entity : entities) {
190 if (dynamic_cast<RangeServerConnection *>(entity.get())) {
192 if (rsc_set.count(rsc) > 0)
197 if (dynamic_cast<BalancePlanAuthority *>(entity.get())) {
199 context->set_balance_plan_authority(entity);
201 else if (dynamic_cast<SystemState *>(entity.get()))
202 context->system_state = dynamic_pointer_cast<
SystemState>(entity);
203 else if (dynamic_cast<RecoveredServers *>(entity.get()))
204 context->recovered_servers = dynamic_pointer_cast<
RecoveredServers>(entity);
205 entities2.push_back(entity);
209 for (
const auto &rsc : rsc_set)
210 entities2.push_back(rsc);
211 entities.swap(entities2);
214 if (!context->system_state)
215 context->system_state = make_shared<SystemState>();
217 if (!context->recovered_servers)
218 context->recovered_servers = make_shared<RecoveredServers>();
220 context->mml_writer =
221 make_shared<MetaLog::Writer>(context->dfs, context->mml_definition,
225 bpa->set_mml_writer(context->mml_writer);
226 std::stringstream sout;
227 sout <<
"Loading BalancePlanAuthority: " << *bpa;
231 context->response_manager->set_mml_writer(context->mml_writer);
234 operation = make_shared<OperationSystemUpgrade>(context);
235 context->op->add_operation(operation);
236 context->op->wait_for_empty();
239 for (
auto &entity : entities) {
242 operation = dynamic_pointer_cast<
Operation>(entity);
244 context->reference_manager->add(operation);
246 if (dynamic_cast<OperationMoveRange *>(op))
247 context->add_move_operation(operation);
250 if (dynamic_cast<OperationRecover *>(op)) {
251 HT_INFO(
"Recovery was interrupted; continuing");
253 if (!recover_op->
location().empty()) {
254 operations.push_back(operation);
255 recovery_ops[recover_op->
location()] = operation;
259 operations.push_back(operation);
261 else if (dynamic_cast<RangeServerConnection *>(entity.get())) {
263 context->rsc_manager->add_server(rsc);
270 for (
auto &entity : entities) {
271 if (dynamic_cast<RangeServerConnection *>(entity.get())) {
273 if (recovery_ops.find(rsc->location()) == recovery_ops.end())
277 recovery_ops.clear();
279 if (operations.empty()) {
280 OperationPtr init_op = make_shared<OperationInitialize>(context);
281 if (context->namemap->exists_mapping(
"/sys/METADATA", 0))
283 operations.push_back(init_op);
286 if (!context->metadata_table)
289 if (!context->rs_metrics_table)
290 context->rs_metrics_table = context->new_table(
"sys/RS_METRICS");
294 operation = make_shared<OperationWaitForServers>(context);
295 operations.push_back(operation);
296 context->recovery_barrier_op =
298 operation = make_shared<OperationRecoveryBlocker>(context);
299 operations.push_back(operation);
301 context->op->add_operations(operations);
303 context->master_file->write_master_address();
306 static_cast<HandlerFactory*
>(connection_handler_factory.get())->start_timer();
308 if (!context->set_startup_status(
false))
309 context->start_shutdown();
320 context->comm->close_socket(listen_addr);
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Retrieves system information (hardware, installation directory, etc)
Interface and base of config policy.
Cons< DefaultPolicy, CommPolicy > DefaultCommPolicy
Default comm layer config policy.
Meta::list< AppPolicy, FsBrokerPolicy, DefaultCommPolicy > Policies
The FailureInducer simulates errors.
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
PropertiesPtr properties
This singleton map stores all options.
std::string String
A String is simply a typedef to std::string.
static bool unlink(const String &fname)
Unlinks (deletes) a file or directory.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Declarations for OperationProcessor.
Abstract class for creating default application dispatch handlers.
static const char * METADATA_NAME
long long unsigned int Llu
Shortcut for printf formats.
static FailureInducer * instance
This is a singleton class.
Declarations for ConnectionHandler.
Carries out recovery operaton for a range server.
static void destroy()
Destroys singleton instance of the Comm class.
uint16_t get_remove_approval_mask()
Gets the remove approvals bit mask.
int main(int argc, char **argv)
Declarations for ReferenceManager.
Provides access to the cluster ID.
bool has(const String &name)
Check existence of a configuration value.
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Declarations for SystemState.
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
std::shared_ptr< Session > SessionPtr
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Encapsulate an internet address.
Holds persistent global system state.
static uint64_t get()
Gets the cluster ID.
Compatibility Macros for C/C++.
bool removal_approved()
Checks if all remove approvals have been received.
void parse_option(String spec)
Parses a spec string (as explained above) and stores it in an internal structure. ...
Set of recovered servers.
Config policy for generic Comm layer server.
Initialization helper for applications.
const char * RECOVERY_BLOCKER
std::shared_ptr< Reader > ReaderPtr
Smart pointer to Reader.
Central authority for balance plans.
#define HT_INFOF(msg,...)
const String & location() const
Abstract base class for master operations.
This is a generic exception class for Hypertable.
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Handles incoming Master requests.
Passed to constructor to tell it to generate cluster ID if not found.
Declarations for BalancePlanAuthority.
void alias(const String &cmdline_opt, const String &file_opt, bool overwrite)
Setup command line option alias for config file option.
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Declarations for Context.
Declarations for ClusterId.
Declarations for ResponseManager.
static bool proxy_master
Set to true if this process is acting as "Proxy Master".