44 : m_comm(comm), m_session(session) {
46 HT_TRY(
"getting config values",
47 m_verbose = cfg->get_bool(
"Hypertable.Verbose");
52 m_reconnect = cfg->get_bool(
"Hyperspace.Session.Reconnect"));
54 auto now = chrono::steady_clock::now();
58 for (
const auto &replica : cfg->get_strs(
"Hyperspace.Replica.Host")) {
98 lock_guard<recursive_mutex> lock(
m_mutex);
116 const uint8_t *decode_ptr =
event->payload;
117 size_t decode_remain =
event->payload_len;
124 (
Llu)event->header.command);
126 switch (event->header.command) {
127 case Protocol::COMMAND_REDIRECT:
134 if (strlen(host) != 0) {
135 HT_DEBUG_OUT <<
"Received COMMAND_REDIRECT looking for master at "
156 case Protocol::COMMAND_KEEPALIVE:
159 uint32_t notifications;
160 uint64_t
handle, event_id;
163 const uint8_t *post_notification_buf;
164 size_t post_notification_size;
173 session_id =
decode_i64(&decode_ptr, &decode_remain);
174 error =
decode_i32(&decode_ptr, &decode_remain);
191 notifications =
decode_i32(&decode_ptr, &decode_remain);
194 post_notification_buf = decode_ptr;
195 post_notification_size = decode_remain;
197 std::set<uint64_t> delivered_events;
199 for (uint32_t i=0; i<notifications; i++) {
200 handle =
decode_i64(&decode_ptr, &decode_remain);
201 event_id =
decode_i64(&decode_ptr, &decode_remain);
202 event_mask =
decode_i32(&decode_ptr, &decode_remain);
205 delivered_events.insert(event_id);
210 auto now = chrono::steady_clock::now();
213 <<
", handle=" << handle <<
", event_id=" << event_id
214 <<
", event_mask=" << event_mask <<
HT_END;
227 uint64_t time_diff = chrono::duration_cast<chrono::milliseconds>(now - (*uiter).second).count();
229 HT_ERROR_OUT <<
"[Issue 313] Still receiving bad notification after grace "
232 <<
", handle=" << handle <<
", event_id=" << event_id
233 <<
", event_mask=" << event_mask <<
HT_END;
243 HT_ERROR_OUT <<
"[Issue 313] Previously bad notification cleared within grace "
246 <<
", handle=" << handle <<
", event_id=" << event_id
247 <<
", event_mask=" << event_mask <<
HT_END;
267 decode_ptr = post_notification_buf;
268 decode_remain = post_notification_size;
271 for (uint32_t i=0; i<notifications; i++) {
272 handle =
decode_i64(&decode_ptr, &decode_remain);
273 event_id =
decode_i64(&decode_ptr, &decode_remain);
274 event_mask =
decode_i32(&decode_ptr, &decode_remain);
278 HT_ERROR_OUT <<
"[Issue 313] this should never happen bad notification session="
279 <<
m_session_id <<
", handle=" << handle <<
", event_id=" << event_id
280 <<
", event_mask=" << event_mask <<
HT_END;
295 if (handle_state->callback) {
297 handle_state->callback->attr_set(name);
299 handle_state->callback->attr_del(name);
301 handle_state->callback->child_node_added(name);
303 handle_state->callback->child_node_removed(name);
307 uint32_t mode =
decode_i32(&decode_ptr, &decode_remain);
310 if (handle_state->callback)
311 handle_state->callback->lock_acquired(mode);
316 if (handle_state->callback)
317 handle_state->callback->lock_released();
320 uint32_t mode =
decode_i32(&decode_ptr, &decode_remain);
321 handle_state->lock_generation =
decode_i64(&decode_ptr,
326 handle_state->sequencer->generation =
327 handle_state->lock_generation;
328 handle_state->sequencer->mode = mode;
329 handle_state->cond.notify_all();
342 if (notifications > 0) {
343 CommBufPtr cbp(Protocol::create_client_keepalive_request(
360 (
Llu)event->header.command);
382 CommBufPtr cbp(Hyperspace::Protocol::create_client_keepalive_request(
400 HT_INFOF(
"%s", event->to_str().c_str());
410 this_thread::sleep_for(chrono::milliseconds(2000));
417 auto now = chrono::steady_clock::now();
425 CommBufPtr cbp(Hyperspace::Protocol::create_client_keepalive_request(
448 lock_guard<recursive_mutex> lock(
m_mutex);
456 CommBufPtr cbp(Hyperspace::Protocol::create_client_keepalive_request(
468 unique_lock<recursive_mutex> lock(
m_mutex);
473 if (
m_cond_destroyed.wait_for(lock, chrono::seconds(2)) == cv_status::timeout)
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
Lock successfully granted.
void wait_for_destroy_session()
int get_state()
Returns current state (internal method)
uint32_t m_keep_alive_interval
BadNotificationHandleMap m_bad_handle_map
Declarations for Protocol.
static const uint64_t ms_bad_notification_grace_period
uint16_t m_hyperspace_port
void advance_expire_time(std::chrono::steady_clock::time_point now)
long long unsigned int Llu
Shortcut for printf formats.
attempting to reconnect session
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
std::vector< String > m_hyperspace_replicas
int send_datagram(const CommAddress &addr, const CommAddress &send_addr, CommBufPtr &cbuf)
Sends a datagram to a remote address.
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
virtual void handle(Hypertable::EventPtr &event)
Callback method.
#define HT_EXPECT(_e_, _code_)
uint32_t m_lease_interval
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
const char * get_text(int error)
Returns a descriptive error message.
Encapsulate an internet address.
void close_socket(const CommAddress &addr)
Closes the socket specified by the addr argument.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
std::shared_ptr< Properties > PropertiesPtr
ClientConnectionHandlerPtr m_conn_handler
Compatibility Macros for C/C++.
Time related declarations.
std::chrono::steady_clock::time_point m_jeopardy_time
bool expired()
Checks for session expiration (internal method)
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Entry point to AsyncComm service.
std::set< uint64_t > m_delivered_events
int state_transition(int state)
Transions state (internal method)
std::chrono::steady_clock::time_point m_last_keep_alive_send_time
#define HT_INFOF(msg,...)
void update_master_addr(const String &host)
#define HT_THROWF(_code_, _fmt_,...)
std::recursive_mutex m_mutex
std::condition_variable_any m_cond_destroyed
Internet address wrapper classes and utility functions.
Request/response message event.
void create_datagram_receive_socket(CommAddress &addr, int tos, const DispatchHandlerPtr &handler)
Creates a socket for receiving datagrams and attaches handler as the default dispatch handler...
This is a generic exception class for Hypertable.
#define HT_ERRORF(msg,...)
std::shared_ptr< ClientHandleState > ClientHandleStatePtr
int set_timer(uint32_t duration_millis, const DispatchHandlerPtr &handler)
Sets a timer for duration_millis milliseconds in the future.
sockaddr_in m_master_addr
#define HT_TRY(_s_, _code_)
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
uint16_t m_datagram_send_port