37 #include "Request/Parameters/Compact.h"
41 #include "Request/Parameters/DropTable.h"
51 #include "Request/Parameters/SetState.h"
53 #include "Response/Parameters/Status.h"
80 const String &toplevel_dir, uint32_t timeout_ms,
83 : m_conn_manager(conn_mgr), m_hyperspace(hyperspace), m_app_queue(app_queue),
84 m_dispatcher_handler(dhp), m_connection_initializer(init),
85 m_timeout_ms(timeout_ms), m_toplevel_dir(toplevel_dir) {
109 : m_conn_manager(conn_mgr), m_master_addr(addr), m_timeout_ms(timeout_ms) {
120 : m_comm(comm), m_master_addr(addr), m_timeout_ms(timeout_ms) {
128 m_hyperspace->remove_callback(&m_hyperspace_session_callback);
129 if (m_master_file_handle != 0)
130 m_hyperspace->close_nowait(m_master_file_handle);
135 void Master::Client::hyperspace_disconnected()
137 lock_guard<mutex> lock(m_hyperspace_mutex);
139 m_hyperspace_init =
false;
140 m_hyperspace_connected =
false;
143 void Master::Client::hyperspace_reconnected()
146 lock_guard<mutex> lock(m_hyperspace_mutex);
149 m_hyperspace_connected =
true;
159 void Master::Client::initialize_hyperspace() {
161 if (m_hyperspace_init)
165 Timer timer(m_timeout_ms,
true);
168 m_master_file_handle =
169 m_hyperspace->open(m_toplevel_dir +
"/master",
171 m_master_file_callback, &timer);
177 this_thread::sleep_for(chrono::milliseconds(3000));
180 if (m_master_file_handle == 0)
182 m_hyperspace_init =
true;
194 Timer tmp_timer(m_timeout_ms);
198 String label =
format(
"create_namespace('%s', flags=%s)", name.c_str(),
209 params.
encode(cbuf->get_data_ptr_address());
210 if (!send_message(cbuf, timer, event, label))
214 const uint8_t *ptr =
event->payload + 4;
215 size_t remain =
event->payload_len - 4;
218 fetch_result(
id, timer, event, label);
223 lock_guard<mutex> lock(m_mutex);
225 "Client operation %s to master %s failed", label.c_str(),
226 m_master_addr.format().c_str());
234 Timer tmp_timer(m_timeout_ms);
248 params.
encode(cbuf->get_data_ptr_address());
249 if (!send_message(cbuf, timer, event, label))
253 const uint8_t *ptr =
event->payload + 4;
254 size_t remain =
event->payload_len - 4;
256 fetch_result(
id, timer, event, label);
261 lock_guard<mutex> lock(m_mutex);
263 "Client operation %s to master %s failed", label.c_str(),
264 m_master_addr.format().c_str());
270 void Master::Client::compact(
const String &tablename,
const String &row,
271 int32_t range_types,
Timer *timer) {
272 Timer tmp_timer(m_timeout_ms);
274 String label =
format(
"compact('%s')", tablename.c_str());
284 params.
encode(cbuf->get_data_ptr_address());
285 if (!send_message(cbuf, timer, event, label))
289 const uint8_t *ptr =
event->payload + 4;
290 size_t remain =
event->payload_len - 4;
292 fetch_result(
id, timer, event, label);
297 lock_guard<mutex> lock(m_mutex);
299 "Client operation %s to master %s failed", label.c_str(),
300 m_master_addr.format().c_str());
308 Timer tmp_timer(m_timeout_ms);
311 String label =
format(
"create_table('%s')", name.c_str());
321 params.
encode(cbuf->get_data_ptr_address());
322 if (!send_message(cbuf, timer, event, label))
326 const uint8_t *ptr =
event->payload + 4;
327 size_t remain =
event->payload_len - 4;
329 fetch_result(
id, timer, event, label);
334 lock_guard<mutex> lock(m_mutex);
336 "Client operation %s to master %s failed", label.c_str(),
337 m_master_addr.format().c_str());
344 bool force,
Timer *timer) {
345 Timer tmp_timer(m_timeout_ms);
348 String label =
format(
"alter_table('%s')", name.c_str());
358 params.
encode(cbuf->get_data_ptr_address());
359 if (!send_message(cbuf, timer, event, label))
362 const uint8_t *ptr =
event->payload + 4;
363 size_t remain =
event->payload_len - 4;
365 fetch_result(
id, timer, event, label);
370 lock_guard<mutex> lock(m_mutex);
372 "Client operation %s to master %s failed", label.c_str(),
373 m_master_addr.format().c_str());
378 Timer tmp_timer(m_timeout_ms);
387 if (!send_message(cbuf, timer, event,
"status"))
390 const uint8_t *ptr =
event->payload + 4;
391 size_t remain =
event->payload_len - 4;
393 params.
decode(&ptr, &remain);
398 lock_guard<mutex> lock(m_mutex);
400 "Client operation 'status' to master %s failed",
401 m_master_addr.format().c_str());
406 Master::Client::move_range(
const String &source, int64_t range_id,
409 uint64_t soft_limit,
bool split,
Timer *timer) {
410 Timer tmp_timer(m_timeout_ms);
413 format(
"move_range(%s[%s..%s] (%lld), transfer_log='%s', soft_limit=%llu)",
415 transfer_log.c_str(), (
Llu)soft_limit);
423 CommHeader header(Protocol::COMMAND_MOVE_RANGE);
425 transfer_log, soft_limit, split);
427 params.
encode(cbuf->get_data_ptr_address());
428 if (!send_message(cbuf, timer, event, label))
434 lock_guard<mutex> lock(m_mutex);
436 "Client operation %s to master %s failed", label.c_str(),
437 m_master_addr.format().c_str());
449 Master::Client::relinquish_acknowledge(
const String &source, int64_t range_id,
452 Timer tmp_timer(m_timeout_ms);
455 String label =
format(
"relinquish_acknowledge(%s[%s..%s] id=%lld)",
463 CommHeader header(Protocol::COMMAND_RELINQUISH_ACKNOWLEDGE);
466 params.
encode(cbuf->get_data_ptr_address());
467 if (!send_message(cbuf, timer, event, label))
471 const uint8_t *ptr =
event->payload + 4;
472 size_t remain =
event->payload_len - 4;
474 fetch_result(
id, timer, event, label);
479 lock_guard<mutex> lock(m_mutex);
481 "Client operation %s to master %s failed", label.c_str(),
482 m_master_addr.format().c_str());
491 Timer tmp_timer(m_timeout_ms);
494 String label =
format(
"rename_table(old='%s', new='%s')", from.c_str(), to.c_str());
504 params.
encode(cbuf->get_data_ptr_address());
505 if (!send_message(cbuf, timer, event, label))
509 const uint8_t *ptr =
event->payload + 4;
510 size_t remain =
event->payload_len - 4;
512 fetch_result(
id, timer, event, label);
517 lock_guard<mutex> lock(m_mutex);
519 "Client operation %s to master %s failed", label.c_str(),
520 m_master_addr.format().c_str());
526 Master::Client::drop_table(
const String &name,
bool if_exists,
Timer *timer) {
527 Timer tmp_timer(m_timeout_ms);
530 name.c_str(), if_exists ?
"true" :
"false");
540 params.
encode(cbuf->get_data_ptr_address());
541 if (!send_message(cbuf, timer, event, label))
544 const uint8_t *ptr =
event->payload + 4;
545 size_t remain =
event->payload_len - 4;
547 fetch_result(
id, timer, event, label);
552 lock_guard<mutex> lock(m_mutex);
554 "Client operation %s to master %s failed", label.c_str(),
555 m_master_addr.format().c_str());
560 void Master::Client::recreate_index_tables(
const std::string &name,
562 Timer tmp_timer(m_timeout_ms);
565 String label =
format(
"recreate_index_tables('%s', part=%s)",
566 name.c_str(), parts.
to_string().c_str());
573 CommHeader header(Protocol::COMMAND_RECREATE_INDEX_TABLES);
576 params.
encode(cbuf->get_data_ptr_address());
577 if (!send_message(cbuf, timer, event, label))
581 const uint8_t *ptr =
event->payload + 4;
582 size_t remain =
event->payload_len - 4;
584 fetch_result(
id, timer, event, label);
589 lock_guard<mutex> lock(m_mutex);
591 "Client operation %s to master %s failed", label.c_str(),
592 m_master_addr.format().c_str());
599 Timer tmp_timer(m_timeout_ms);
607 send_message_async(cbuf, &sync_handler, timer,
"shutdown");
612 HT_THROW(error,
"Master 'shutdown' error");
618 Timer tmp_timer(m_timeout_ms);
633 params.
encode(cbuf->get_data_ptr_address());
634 if (!send_message(cbuf, timer, event, label))
637 const uint8_t *ptr =
event->payload + 4;
638 size_t remain =
event->payload_len - 4;
644 lock_guard<mutex> lock(m_mutex);
646 "Client operation %s to master %s failed", label.c_str(),
647 m_master_addr.format().c_str());
656 fetch_result(
id, timer, event, label);
661 void Master::Client::set_state(
const std::vector<SystemVariable::Spec> &specs,
663 Timer tmp_timer(m_timeout_ms);
672 CommHeader header(Protocol::COMMAND_SET_STATE);
675 params.
encode(cbuf->get_data_ptr_address());
676 if (!send_message(cbuf, timer, event, label))
679 const uint8_t *ptr =
event->payload + 4;
680 size_t remain =
event->payload_len - 4;
682 fetch_result(
id, timer, event, label);
687 lock_guard<mutex> lock(m_mutex);
689 "Client operation %s to master %s failed", label.c_str(),
690 m_master_addr.format().c_str());
696 Timer tmp_timer(m_timeout_ms);
708 params.
encode(cbuf->get_data_ptr_address());
709 if (!send_message(cbuf, timer, event, label))
711 const uint8_t *ptr =
event->payload + 4;
712 size_t remain =
event->payload_len - 4;
718 lock_guard<mutex> lock(m_mutex);
720 "Client operation %s to master %s failed", label.c_str(),
721 m_master_addr.format().c_str());
729 fetch_result(
id, timer, event, label);
734 Timer tmp_timer(m_timeout_ms);
741 CommHeader header(Protocol::COMMAND_SYSTEM_STATUS);
743 if (!send_message(cbuf, timer, event,
"system status"))
746 const uint8_t *ptr =
event->payload + 4;
747 size_t remain =
event->payload_len - 4;
749 params.
decode(&ptr, &remain);
754 lock_guard<mutex> lock(m_mutex);
756 "Client operation 'system status' to master %s failed",
757 m_master_addr.format().c_str());
764 unique_lock<mutex> lock(m_mutex);
769 while ((error = m_comm->send_request(m_master_addr, timer->
remaining(), cbp,
782 auto expire_time = chrono::system_clock::now() +
783 chrono::milliseconds(std::min(timer->
remaining(),
786 if (m_cond.wait_until(lock, expire_time) == cv_status::timeout) {
789 "Client operation %s to master %s failed", label.c_str(),
790 m_master_addr.format().c_str());
799 send_message_async(cbp, &sync_handler, timer, label);
810 HT_THROWF(error,
"Client operation %s failed", label.c_str());
819 CommHeader header(Protocol::COMMAND_FETCH_RESULT);
822 params.
encode(cbuf->get_data_ptr_address());
823 if (!send_message(cbuf, timer, event, label))
829 lock_guard<mutex> lock(m_mutex);
831 "Failed to fetch ID %lld for Client operation %s to master %s",
832 (
Lld)
id, label.c_str(), m_master_addr.format().c_str());
837 Master::Client::replay_status(int64_t op_id,
const String &location,
838 int32_t plan_generation) {
839 Timer timer(m_timeout_ms,
true);
841 String label =
format(
"replay_status op_id=%llu location=%s "
842 "plan_generation=%d", (
Llu)op_id,
843 location.c_str(), plan_generation);
848 CommHeader header(Protocol::COMMAND_REPLAY_STATUS);
851 params.
encode(cbuf->get_data_ptr_address());
852 if (!send_message(cbuf, &timer, event, label)) {
861 lock_guard<mutex> lock(m_mutex);
863 "Client operation %s to master %s failed", label.c_str(),
864 m_master_addr.format().c_str());
870 Master::Client::replay_complete(int64_t op_id,
const String &location,
871 int32_t plan_generation, int32_t error,
873 Timer timer(m_timeout_ms,
true);
875 String label =
format(
"replay_complete op_id=%llu location=%s "
876 "plan_generation=%d error=%s", (
Llu)op_id,
877 location.c_str(), plan_generation,
883 CommHeader header(Protocol::COMMAND_REPLAY_COMPLETE);
888 params.
encode(cbuf->get_data_ptr_address());
889 if (!send_message(cbuf, &timer, event, label)) {
898 lock_guard<mutex> lock(m_mutex);
900 "Client operation %s to master %s failed", label.c_str(),
901 m_master_addr.format().c_str());
906 Master::Client::phantom_prepare_complete(int64_t op_id,
const String &location,
907 int plan_generation, int32_t error,
909 Timer timer(m_timeout_ms,
true);
911 String label =
format(
"phantom_prepare_complete op_id=%llu location=%s "
912 "plan_generation=%d error=%s", (
Llu)op_id,
913 location.c_str(), plan_generation,
917 CommHeader header(Protocol::COMMAND_PHANTOM_PREPARE_COMPLETE);
919 plan_generation, error,
922 params.
encode(cbuf->get_data_ptr_address());
923 if (!send_message(cbuf, &timer, event, label)) {
931 lock_guard<mutex> lock(m_mutex);
933 "Client operation %s to master %s failed", label.c_str(),
934 m_master_addr.format().c_str());
939 Master::Client::phantom_commit_complete(int64_t op_id,
const String &location,
940 int plan_generation, int32_t error,
942 Timer timer(m_timeout_ms,
true);
944 String label =
format(
"phantom_commit_complete op_id=%llu location=%s "
948 CommHeader header(Protocol::COMMAND_PHANTOM_COMMIT_COMPLETE);
950 plan_generation, error,
953 params.
encode(cbuf->get_data_ptr_address());
954 if (!send_message(cbuf, &timer, event, label)) {
962 lock_guard<mutex> lock(m_mutex);
964 "Client operation %s to master %s failed", label.c_str(),
965 m_master_addr.format().c_str());
969 void Master::Client::reload_master() {
973 unique_lock<mutex> lock(m_mutex);
979 lock_guard<mutex> lock(m_hyperspace_mutex);
980 if (m_hyperspace_init) {
982 m_hyperspace->attr_get(m_master_file_handle,
"address", value);
985 HT_WARN(
"Unable to determine master address from Hyperspace");
989 else if (m_hyperspace_connected) {
990 initialize_hyperspace();
992 m_hyperspace->attr_get(m_master_file_handle,
"address", value);
995 HT_WARN(
"Unable to determine master address from Hyperspace");
1002 addr_str = (
const char *)value.
base;
1004 if (addr_str != m_master_addr_string) {
1006 if (m_master_addr.sin_port != 0) {
1007 if ((error = m_conn_manager->remove(m_master_addr)) !=
Error::OK) {
1009 HT_WARNF(
"Problem removing connection to Master - %s",
1013 HT_INFOF(
"Connecting to new Master (old=%s, new=%s)",
1014 m_master_addr_string.c_str(), addr_str.c_str());
1017 m_master_addr_string = addr_str;
1025 m_conn_manager->add_with_initializer(m_master_addr, m_retry_interval / 2,
1026 "Master", m_dispatcher_handler, m_connection_initializer);
1028 master_addr = m_master_addr;
1032 Timer timer(m_retry_interval,
true);
1033 if (m_conn_manager->wait_for_connection(master_addr, timer))
1034 m_conn_manager->get_comm()->wait_for_proxy_load(timer);
1035 m_cond.notify_all();
1041 bool Master::Client::wait_for_connection(uint32_t max_wait_ms) {
1042 Timer timer(max_wait_ms,
true);
1043 m_conn_manager->wait_for_connection(m_master_addr, timer);
1044 return m_conn_manager->get_comm()->wait_for_proxy_load(timer);
1047 bool Master::Client::wait_for_connection(
Timer &timer) {
1048 m_conn_manager->wait_for_connection(m_master_addr, timer);
1049 return m_conn_manager->get_comm()->wait_for_proxy_load(timer);
1053 m_client->hyperspace_disconnected();
1057 m_client->hyperspace_reconnected();
Declarations for FetchResult request parameters.
Hyperspace::HandleCallbackPtr m_master_file_callback
Declarations for SystemStatus response parameters.
void initialize(const String &name)
Public initialization function - creates a singleton instance of LogWriter.
void drop_namespace(const std::string &name, Namespace *base=NULL, bool if_exists=false)
Removes a namespace.
#define HT_WARNF(msg,...)
DispatchHandlerPtr m_dispatcher_handler
Request parameters for fetch result request.
Request parameters for set state operation.
Holds Nagios-style program status information.
uint32_t m_retry_interval
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
PropertiesPtr properties
This singleton map stores all options.
const Hypertable::Status & status() const
Gets status object.
Request parameters for alter table operation.
std::string String
A String is simply a typedef to std::string.
Declarations for DropNamespace request parameters.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Request parameters for create table operation.
static std::string to_str(int flags)
Converts flags to human readable string.
Abstract base class for application dispatch handlers registered with AsyncComm.
virtual size_t encoded_length() const
Returns serialized object length.
void init(int argc, char *argv[], const Desc *desc=NULL)
Initialize with default policy.
long long unsigned int Llu
Shortcut for printf formats.
Declarations for AlterTable request parameters.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
bool m_hyperspace_connected
Request parameters for compact operation.
uint32_t remaining()
Returns the remaining time till expiry.
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer.
bool expired()
Returns true if the timer is expired.
ApplicationQueueInterfacePtr m_app_queue
Client(const std::string &install_dir, const std::string &config_file, uint32_t default_timeout_ms=0)
Constructs the object using the specified config file.
Request parameters for stop function.
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
A dynamic, resizable and reference counted memory buffer.
Represents a set of table parts (sub-tables).
Request parameters for create namespace operation.
Request parameters for move range operation.
Response parameters for status operation.
Declarations for Stop request parameters.
void create_table(String &ns, String &tablename, String &rs_metrics_file)
Request parameters for drop table operation.
Request parameters for replay status operation.
Declarations for ReplayStatus request parameters.
Hyperspace::SessionPtr m_hyperspace
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
Request parameters for replay complete operation.
Request parameters for recreate index tables operation.
Declarations for CreateTable request parameters.
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
std::shared_ptr< Session > SessionPtr
const char * get_text(int error)
Returns a descriptive error message.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Encapsulate an internet address.
Request parameters for phantom commit complete operation.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
A timer class to keep timeout states across AsyncComm related calls.
Declarations for PhantomPrepareComplete request parameters.
Compatibility Macros for C/C++.
Declarations for NamespaceFlag.
Declarations for PhantomCommitComplete request parameters.
ConnectionInitializerPtr m_connection_initializer
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
Functions to serialize/deserialize primitives to/from a memory buffer.
Request parameters for rename table operation.
Response parameters for status operation.
Declarations for CreateNamespace request parameters.
Time related declarations.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
void initialize_hyperspace()
Assumes access is serialized via m_hyperspace_mutex.
Declarations for Balance request parameters.
Declarations for RecreateIndexTables request parameters.
Declarations for MoveRange request parameters.
long long int Lld
Shortcut for printf formats.
DispatchHandler class used to synchronize with response messages.
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Request parameters for drop namespace operation.
Entry point to AsyncComm service.
void start()
Starts the timer.
Request parameters for phantom prepare complete operation.
Declarations for RenameTable request parameters.
Declarations for Protocol.
#define HT_INFOF(msg,...)
#define HT_THROWF(_code_, _fmt_,...)
void create_namespace(const std::string &name, Namespace *base=NULL, bool create_intermediate=false, bool if_not_exists=false)
Creates a namespace.
Random number generator for int32, int64, double and ascii arrays.
uint8_t * base
Pointer to the allocated memory buffer.
Internet address wrapper classes and utility functions.
Request parameters for relinquish acknowledge operation.
bool split(int flags)
Tests the SPLIT bit of flags
A timer class to keep timeout states across AsyncComm related calls.
This is a generic exception class for Hypertable.
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Declarations for ReplayComplete request parameters.
Declarations for MasterClient This file contains declarations for MasterClient, a client interface cl...
Message buffer for holding data to be transmitted over a network.
const std::string to_string() const
Returns human readable string describing table parts.
Create file if it does not exist.
Declarations for ApplicationQueueInterface.
ClientHyperspaceSessionCallback m_hyperspace_session_callback
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
Request parameters for balance operation.
std::shared_ptr< ConnectionInitializer > ConnectionInitializerPtr
Smart pointer to ConnectionInitializer.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
const Hypertable::Status & status() const
Gets status object.
Declarations for RelinquishAcknowledge request parameters.
ConnectionManagerPtr m_conn_manager
Declarations for DispatchHandlerSynchronizer.
int code() const
Returns the error code.
#define HT_THROW2(_code_, _ex_, _msg_)