44 #include <unordered_map>
48 #include <sys/types.h>
49 #include <sys/socket.h>
50 #include <netinet/in.h>
51 #include <arpa/inet.h>
62 add_internal(addr, null_addr, timeout_ms, service_name, handler, null_initializer);
66 uint32_t timeout_ms,
const char *service_name,
69 add_internal(addr, null_addr, timeout_ms, service_name, handler, initializer);
75 const char *service_name) {
77 add(addr, timeout_ms, service_name, null_disp_handler);
83 uint32_t timeout_ms,
const char *service_name,
86 add_internal(addr, local_addr, timeout_ms, service_name, handler, null_initializer);
92 uint32_t timeout_ms,
const char *service_name) {
94 add(addr, local_addr, timeout_ms, service_name, null_disp_handler);
102 lock_guard<mutex> lock(m_impl->mutex);
106 if (!m_impl->thread.joinable())
107 m_impl->thread = std::thread([=](){ this->connect_retry_loop(); });
112 auto iter = m_impl->conn_map_proxy.find(addr.
proxy);
113 if (iter != m_impl->conn_map_proxy.end() && iter->second->state != State::DECOMMISSIONED)
118 m_impl->conn_map.find(addr.
inet);
119 if (iter != m_impl->conn_map.end() && iter->second->state != State::DECOMMISSIONED)
123 conn_state = make_shared<ConnectionState>();
124 conn_state->addr = addr;
125 conn_state->local_addr = local_addr;
126 conn_state->timeout_ms = timeout_ms;
127 conn_state->handler = handler;
128 conn_state->initializer = initializer;
129 conn_state->service_name = (service_name) ? service_name :
"";
130 conn_state->next_retry = std::chrono::steady_clock::now();
133 m_impl->conn_map_proxy[addr.
proxy] = conn_state;
135 m_impl->conn_map[addr.
inet] = conn_state;
138 lock_guard<mutex> conn_lock(conn_state->mutex);
139 send_connect_request(conn_state);
146 uint32_t max_wait_ms) {
147 Timer timer(max_wait_ms,
true);
158 lock_guard<mutex> lock(
m_impl->mutex);
162 if (iter ==
m_impl->conn_map.end())
164 conn_state_ptr = (*iter).second;
167 auto iter =
m_impl->conn_map_proxy.find(addr.
proxy);
168 if (iter ==
m_impl->conn_map_proxy.end())
170 conn_state_ptr = (*iter).second;
184 unique_lock<mutex> conn_lock(conn_state->mutex);
186 auto duration = std::chrono::milliseconds(timer.
remaining());
188 if (!conn_state->cond.wait_for(conn_lock, duration, [&conn_state](){ return conn_state->state == State::READY; }))
213 HT_FATALF(
"Attempt to connect decommissioned connection to service='%s'",
214 conn_state->service_name.c_str());
216 if (!conn_state->local_addr.is_set())
217 error =
m_impl->comm->connect(conn_state->addr, shared_from_this());
219 error =
m_impl->comm->connect(conn_state->addr, conn_state->local_addr,
227 conn_state->cond.notify_all();
230 m_impl->conn_map.erase(conn_state->inet_addr);
231 m_impl->conn_map_proxy.erase(conn_state->addr.proxy);
233 conn_state->cond.notify_all();
236 if (!
m_impl->quiet_mode) {
237 if (conn_state->service_name !=
"")
238 HT_INFOF(
"Connection attempt to %s at %s failed - %s. Will retry "
239 "again in %d milliseconds...", conn_state->service_name.c_str(),
241 (int)conn_state->timeout_ms);
243 HT_INFOF(
"Connection attempt to service at %s failed - %s. Will retry "
244 "again in %d milliseconds...", conn_state->addr.to_str().c_str(),
249 conn_state->next_retry = std::chrono::steady_clock::now() +
250 std::chrono::milliseconds(conn_state->timeout_ms);
258 m_impl->retry_queue.push(conn_state);
259 m_impl->retry_cond.notify_one();
266 bool check_inet_addr =
false;
268 bool do_close =
false;
274 lock_guard<mutex> lock(
m_impl->mutex);
277 auto iter =
m_impl->conn_map_proxy.find(addr.
proxy);
278 if (iter !=
m_impl->conn_map_proxy.end()) {
280 lock_guard<mutex> conn_lock((*iter).second->mutex);
281 check_inet_addr =
true;
282 inet_addr = (*iter).second->inet_addr;
287 (*iter).second->cond.notify_all();
289 m_impl->conn_map_proxy.erase(iter);
293 check_inet_addr =
true;
294 inet_addr = addr.
inet;
297 if (check_inet_addr) {
299 m_impl->conn_map.find(inet_addr);
300 if (iter !=
m_impl->conn_map.end()) {
302 lock_guard<mutex> conn_lock((*iter).second->mutex);
307 (*iter).second->cond.notify_all();
309 m_impl->conn_map.erase(iter);
316 m_impl->comm->close_socket(addr);
335 lock_guard<mutex> lock(
m_impl->mutex);
339 auto iter =
m_impl->conn_map.find(event->addr);
340 if (iter !=
m_impl->conn_map.end())
341 conn_state = (*iter).second;
344 if (!conn_state && event->proxy) {
345 auto iter =
m_impl->conn_map_proxy.find(event->proxy);
346 if (iter !=
m_impl->conn_map_proxy.end()) {
347 conn_state = (*iter).second;
349 m_impl->conn_map[
event->addr] = conn_state;
354 lock_guard<mutex> conn_lock(conn_state->mutex);
357 conn_state->inet_addr =
event->addr;
358 if (conn_state->initializer) {
365 conn_state->cond.notify_all();
370 if (event->proxy && !
m_impl->comm->translate_proxy(event->proxy, 0)) {
371 m_impl->conn_map.erase(conn_state->inet_addr);
372 m_impl->conn_map_proxy.erase(conn_state->addr.proxy);
374 conn_state->cond.notify_all();
378 HT_INFOF(
"Received event %s", event->to_str().c_str());
391 else if (event->header.command != conn_state->initializer->initialization_command()) {
392 String err_msg =
"Connection initialization not yet complete";
397 cbuf->append_str16(err_msg);
398 m_impl->comm->send_response(event->addr, cbuf);
401 if (!conn_state->initializer->process_initialization_response(event.get()))
402 HT_FATALF(
"Unable to initialize connection to %s, exiting ...",
403 conn_state->service_name.c_str());
405 conn_state->cond.notify_all();
411 if (conn_state->handler)
412 conn_state->handler->handle(event);
415 HT_WARNF(
"Unable to find connection for %s in map.",
421 CommBufPtr cbuf(conn_state->initializer->create_initialization_request());
422 int error =
m_impl->comm->send_request(conn_state->inet_addr, 60000, cbuf,
this);
427 HT_INFOF(
"Received error %d", error);
432 HT_FATALF(
"Problem initializing connection to %s - %s",
438 const string &message) {
440 HT_INFOF(
"%s: Problem connecting to %s, will retry in %d "
441 "milliseconds...", message.c_str(),
442 conn_state->service_name.c_str(), (int)conn_state->timeout_ms);
448 conn_state->next_retry = std::chrono::steady_clock::now() +
449 std::chrono::milliseconds(conn_state->timeout_ms);
452 m_impl->retry_queue.push(conn_state);
453 m_impl->retry_cond.notify_one();
461 unique_lock<mutex> lock(
m_impl->mutex);
464 while (!
m_impl->shutdown) {
466 while (
m_impl->retry_queue.empty()) {
467 m_impl->retry_cond.wait(lock);
475 conn_state =
m_impl->retry_queue.top();
478 lock_guard<mutex> conn_lock(conn_state->mutex);
480 if (conn_state->next_retry <= std::chrono::steady_clock::now()) {
481 m_impl->retry_queue.pop();
486 else if (conn_state->state ==
State::CONNECTED && conn_state->initializer) {
487 if (conn_state->next_retry <= std::chrono::steady_clock::now()) {
488 m_impl->retry_queue.pop();
494 m_impl->retry_queue.pop();
498 m_impl->retry_cond.wait_until(lock, conn_state->next_retry);
Retrieves system information (hardware, installation directory, etc)
void add(const CommAddress &addr, uint32_t timeout_ms, const char *service_name)
Adds a connection.
SharedImplPtr m_impl
Smart pointer to connection manager state.
int remove(const CommAddress &addr)
Removes a connection from the connection manager.
#define HT_WARNF(msg,...)
void add_internal(const CommAddress &addr, const CommAddress &local_addr, uint32_t timeout_ms, const char *service_name, DispatchHandlerPtr &handler, ConnectionInitializerPtr &initializer)
Called by the add methods to add a connection.
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
void add_with_initializer(const CommAddress &addr, uint32_t timeout_ms, const char *service_name, DispatchHandlerPtr &handler, ConnectionInitializerPtr &initializer)
Adds a connection with a dispatch handler and connection initializer.
std::string String
A String is simply a typedef to std::string.
std::shared_ptr< ConnectionState > ConnectionStatePtr
Smart pointer to ConnectionState.
void schedule_retry(ConnectionStatePtr &conn_state, const std::string &message)
Schedules a connection retry attempt.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
uint32_t remaining()
Returns the remaining time till expiry.
void connect_retry_loop()
Connect retry loop.
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer.
Connection established event.
void add(const Key &key, uint8_t flag, const void *value, uint32_t value_len, TableMutatorAsync *value_index_mutator, TableMutatorAsync *qualifier_index_mutator)
virtual void handle(EventPtr &event)
Primary dispatch handler method.
Unordered map specialization for InetAddr keys.
size_t encoded_length_str16(const char *str)
Computes the encoded length of a string16 encoding.
const char * get_text(int error)
Returns a descriptive error message.
Encapsulate an internet address.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Logging routines and macros.
Compatibility Macros for C/C++.
Functions to serialize/deserialize primitives to/from a memory buffer.
Connection disconnected event.
Time related declarations.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
bool is_set() const
Returns true if address has been initialized.
bool is_inet() const
Returns true if address is of type CommAddress::INET.
bool wait_for_connection(const CommAddress &addr, uint32_t max_wait_ms)
Blocks until the connection to the given address is established.
#define HT_FATALF(msg,...)
Declarations for ConnectionManager.
void start()
Starts the timer.
Declarations for Protocol.
#define HT_INFOF(msg,...)
Random number generator for int32, int64, double and ascii arrays.
Request/response message event.
A timer class to keep timeout states across AsyncComm related calls.
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Message buffer for holding data to be transmitted over a network.
void send_initialization_request(ConnectionStatePtr &conn_state)
Sends an initialization request.
void send_connect_request(ConnectionStatePtr &conn_state)
Calls Comm::connect to establish a connection.
InetAddr inet
IPv4:port address.
bool is_proxy() const
Returns true if address is of type CommAddress::PROXY.
std::shared_ptr< ConnectionInitializer > ConnectionInitializerPtr
Smart pointer to ConnectionInitializer.
Error codes, Exception handling, error logging.
static std::chrono::milliseconds duration_millis(uint32_t maximum)
Returns a random millisecond duration.
Address abstraction to hold either proxy name or IPv4:port address.