42 #include <boost/algorithm/string.hpp>
43 #include <boost/tokenizer.hpp>
55 : m_comm(comm), m_cfg(cfg), m_verbose(false), m_silent(false),
56 m_state(STATE_JEOPARDY), m_last_callback_id(0) {
58 HT_TRY(
"getting config values",
59 m_verbose = cfg->get_bool(
"Hypertable.Verbose");
60 m_silent = cfg->get_bool(
"Hypertable.Silent");
64 m_reconnect = cfg->get_bool(
"Hyperspace.Session.Reconnect"));
67 HT_INFO(
"Hyperspace session setup to reconnect");
69 for (
const auto &replica : cfg->get_strs(
"Hyperspace.Replica.Host")) {
81 function<void()> sleep_callback = [
this]() ->
void {this->
handle_sleep();};
82 function<void()> wakeup_callback = [
this]() ->
void {this->
handle_wakeup();};
120 CommBufPtr cbuf_ptr(Protocol::create_shutdown_request());
126 int error =
send_message(cbuf_ptr, &sync_handler, timer);
130 "Hyperspace 'shutdown' error");
162 handle_state->sequencer = 0;
163 handle_state->lock_status = 0;
164 uint32_t open_flags = handle_state->open_flags;
171 handle_state->lock_mode = 0;
177 int error =
send_message(cbuf_ptr, &sync_handler, timer);
181 HT_THROWF(error,
"Hyperspace 'open' error, name=%s flags=0x%x events=0x"
182 "%x", handle_state->normal_name.c_str(), open_flags,
183 handle_state->event_mask);
186 const uint8_t *decode_ptr = event_ptr->payload + 4;
187 size_t decode_remain = event_ptr->payload_len - 4;
188 handle_state->handle =
decode_i64(&decode_ptr, &decode_remain);
190 handle_state->lock_generation =
decode_i64(&decode_ptr, &decode_remain);
195 <<
", name=" << handle_state->normal_name
196 <<
", handle=" << handle_state->handle <<
", flags=" << open_flags
197 <<
", event_mask=" << handle_state->event_mask <<
HT_END;
199 return handle_state->handle;
213 std::vector<Attribute> empty_attrs;
215 handle_state->open_flags = flags;
216 handle_state->event_mask = (callback) ? callback->get_event_mask() : 0;
217 handle_state->callback = callback;
221 CommBufPtr cbuf_ptr(Protocol::create_open_request(handle_state->normal_name,
222 flags, callback, empty_attrs));
224 return open(handle_state, cbuf_ptr, timer);
230 return open(name, flags, null_handle_callback, timer);
237 const std::vector<Attribute> &init_attrs,
Timer *timer) {
241 handle_state->event_mask = (callback) ? callback->get_event_mask() : 0;
242 handle_state->callback = callback;
245 CommBufPtr cbuf_ptr(Protocol::create_open_request(handle_state->normal_name,
246 handle_state->open_flags, callback, init_attrs));
248 return open(handle_state, cbuf_ptr, timer);
258 CommBufPtr cbuf_ptr(Protocol::create_close_request(handle));
264 int error =
send_message(cbuf_ptr, &sync_handler, timer);
268 "Hyperspace 'close' error");
283 CommBufPtr cbuf_ptr(Protocol::create_close_request(handle));
292 mkdir(name,
false, &init_attrs, timer);
296 mkdir(name,
false, 0, timer);
300 mkdir(name,
true, 0, timer);
304 mkdir(name,
true, &init_attrs, timer);
315 CommBufPtr cbuf_ptr(Protocol::create_delete_request(normal_name));
321 int error =
send_message(cbuf_ptr, &sync_handler, timer);
325 "Hyperspace 'unlink' error, name=%s", normal_name.c_str());
342 CommBufPtr cbuf_ptr(Protocol::create_exists_request(normal_name));
348 int error =
send_message(cbuf_ptr, &sync_handler, timer);
352 "Hyperspace 'exists' error, name=%s", normal_name.c_str());
355 const uint8_t *decode_ptr = event_ptr->payload + 4;
356 size_t decode_remain = event_ptr->payload_len - 4;
357 uint8_t bval =
decode_byte(&decode_ptr, &decode_remain);
358 return (bval == 0) ?
false :
true;
370 const void *value,
size_t value_len,
Timer *timer) {
373 CommBufPtr cbuf_ptr(Protocol::create_attr_set_request(handle, 0, 0, attr, value,
380 int error =
send_message(cbuf_ptr, &sync_handler, timer);
386 fname = handle_state->normal_name.c_str();
388 "Problem setting attribute '%s' of hyperspace file '%s'",
389 attr.c_str(), fname.c_str());
404 CommBufPtr cbuf_ptr(Protocol::create_attr_set_request(handle, 0, 0, attrs));
410 int error =
send_message(cbuf_ptr, &sync_handler, timer);
416 fname = handle_state->normal_name.c_str();
418 "Problem setting attributes of hyperspace file '%s'", fname.c_str());
430 const void *value,
size_t value_len,
Timer *timer) {
431 attr_set(name, 0, attr, value, value_len, timer);
435 const void *value,
size_t value_len,
Timer *timer) {
438 CommBufPtr cbuf_ptr(Protocol::create_attr_set_request(0, &name, oflags, attr, value,
445 int error =
send_message(cbuf_ptr, &sync_handler, timer);
449 "Problem setting attribute '%s' of hyperspace file '%s'",
450 attr.c_str(), name.c_str());
460 const std::vector<Attribute> &attrs,
Timer *timer) {
464 CommBufPtr cbuf_ptr(Protocol::create_attr_set_request(0, &name, oflags, attrs));
470 int error =
send_message(cbuf_ptr, &sync_handler, timer);
474 "Problem setting attributes of hyperspace file '%s'", name.c_str());
488 CommBufPtr cbuf_ptr(Protocol::create_attr_incr_request(handle, 0, attr));
494 int error =
send_message(cbuf_ptr, &sync_handler, timer);
500 fname = handle_state->normal_name.c_str();
502 "Problem incrementing attribute '%s' of hyperspace file '%s'",
503 attr.c_str(), fname.c_str());
506 const uint8_t *decode_ptr = event_ptr->payload + 4;
507 size_t decode_remain = event_ptr->payload_len - 4;
508 uint64_t attr_val =
decode_i64(&decode_ptr, &decode_remain);
525 CommBufPtr cbuf_ptr(Protocol::create_attr_incr_request(0, &name, attr));
531 int error =
send_message(cbuf_ptr, &sync_handler, timer);
535 "Problem incrementing attribute '%s' of hyperspace file '%s'",
536 attr.c_str(), name.c_str());
539 const uint8_t *decode_ptr = event_ptr->payload + 4;
540 size_t decode_remain = event_ptr->payload_len - 4;
541 uint64_t attr_val =
decode_i64(&decode_ptr, &decode_remain);
558 CommBufPtr cbuf_ptr(Protocol::create_attr_get_request(handle, 0, attr));
564 int error =
send_message(cbuf_ptr, &sync_handler, timer);
570 fname = handle_state->normal_name.c_str();
572 "Problem getting attribute '%s' of hyperspace file '%s'",
573 attr.c_str(), fname.c_str());
589 CommBufPtr cbuf_ptr(Protocol::create_attr_get_request(0, &name, attr));
595 int error =
send_message(cbuf_ptr, &sync_handler, timer);
599 "Problem getting attribute '%s' of hyperspace file '%s'",
600 attr.c_str(), name.c_str());
628 std::vector<DynamicBufferPtr> &values,
Timer *timer) {
631 CommBufPtr cbuf_ptr(Protocol::create_attrs_get_request(handle, 0, attrs));
637 int error =
send_message(cbuf_ptr, &sync_handler, timer);
643 fname = handle_state->normal_name.c_str();
645 "Problem getting attributes of hyperspace file '%s'",
659 std::vector<DynamicBufferPtr> &values,
Timer *timer) {
662 CommBufPtr cbuf_ptr(Protocol::create_attrs_get_request(0, &name, attrs));
668 int error =
send_message(cbuf_ptr, &sync_handler, timer);
672 "Problem getting attributes of hyperspace file '%s'",
690 CommBufPtr cbuf_ptr(Protocol::create_attr_exists_request(handle, attr));
696 int error =
send_message(cbuf_ptr, &sync_handler, timer);
700 "Hyperspace 'attr_exists' error, name=%s", attr.c_str());
703 const uint8_t *decode_ptr = event_ptr->payload + 4;
704 size_t decode_remain = event_ptr->payload_len - 4;
705 uint8_t bval =
decode_byte(&decode_ptr, &decode_remain);
706 return (bval == 0) ?
false :
true;
720 CommBufPtr cbuf_ptr(Protocol::create_attr_exists_request(name, attr));
726 int error =
send_message(cbuf_ptr, &sync_handler, timer);
730 "Hyperspace 'attr_exists' error, name=%s", attr.c_str());
733 const uint8_t *decode_ptr = event_ptr->payload + 4;
734 size_t decode_remain = event_ptr->payload_len - 4;
735 uint8_t bval =
decode_byte(&decode_ptr, &decode_remain);
736 return (bval == 0) ?
false :
true;
750 CommBufPtr cbuf_ptr(Protocol::create_attr_del_request(handle, name));
756 int error =
send_message(cbuf_ptr, &sync_handler, timer);
762 fname = handle_state->normal_name.c_str();
764 "Problem deleting attribute '%s' of hyperspace file '%s'",
765 name.c_str(), fname.c_str());
778 CommBufPtr cbuf_ptr(Protocol::create_attr_list_request(handle));
784 int error =
send_message(cbuf_ptr, &sync_handler, timer);
790 fname = handle_state->normal_name.c_str();
792 "Problem getting list of attributes of hyperspace file '%s'",
796 const uint8_t *decode_ptr = event_ptr->payload + 4;
797 size_t decode_remain = event_ptr->payload_len - 4;
799 uint32_t num_attributes =
decode_i32(&decode_ptr, &decode_remain);
801 for (uint32_t k = 0; k < num_attributes; k++) {
803 attrname =
decode_vstr(&decode_ptr, &decode_remain);
804 anames.push_back(attrname);
821 CommBufPtr cbuf_ptr(Protocol::create_readdir_request(handle));
827 int error =
send_message(cbuf_ptr, &sync_handler, timer);
831 "Hyperspace 'readdir' error");
834 const uint8_t *decode_ptr = event_ptr->payload + 4;
835 size_t decode_remain = event_ptr->payload_len - 4;
839 entry_cnt =
decode_i32(&decode_ptr, &decode_remain);
845 for (uint32_t i=0; i<entry_cnt; i++) {
851 "Problem decoding entry %d of READDIR return packet", i);
853 listing.push_back(dentry);
865 std::vector<DirEntryAttr> &listing,
Timer *timer) {
868 CommBufPtr cbuf_ptr(Protocol::create_readdir_attr_request(handle, 0, attr, include_sub_entries));
874 int error =
send_message(cbuf_ptr, &sync_handler, timer);
878 "Hyperspace 'readdir_attr' error");
891 std::vector<DirEntryAttr> &listing,
Timer *timer) {
894 CommBufPtr cbuf_ptr(Protocol::create_readdir_attr_request(0, &name, attr, include_sub_entries));
900 int error =
send_message(cbuf_ptr, &sync_handler, timer);
904 "Hyperspace 'readdir_attr' error");
917 std::vector<DirEntryAttr> &listing,
Timer *timer) {
920 CommBufPtr cbuf_ptr(Protocol::create_readpath_attr_request(handle, 0, attr));
926 int error =
send_message(cbuf_ptr, &sync_handler, timer);
930 "Hyperspace 'readpath_attr' error");
943 std::vector<DirEntryAttr> &listing,
Timer *timer) {
946 CommBufPtr cbuf_ptr(Protocol::create_readpath_attr_request(0, &name, attr));
952 int error =
send_message(cbuf_ptr, &sync_handler, timer);
956 "Hyperspace 'readpath_attr' error");
972 CommBufPtr cbuf_ptr(Protocol::create_lock_request(handle, mode,
false));
979 if (handle_state->lock_status != 0)
987 lock_guard<mutex>
lock(handle_state->mutex);
988 sequencerp->
mode = mode;
989 sequencerp->
name = handle_state->normal_name;
990 handle_state->sequencer = sequencerp;
993 int error =
send_message(cbuf_ptr, &sync_handler, timer);
997 "Hyperspace 'lock' error, name='%s'",
998 handle_state->normal_name.c_str());
1000 unique_lock<mutex>
lock(handle_state->mutex);
1001 const uint8_t *decode_ptr = event_ptr->payload + 4;
1002 size_t decode_remain = event_ptr->payload_len - 4;
1003 handle_state->lock_mode = mode;
1005 status =
decode_i32(&decode_ptr, &decode_remain);
1009 handle_state->lock_generation = sequencerp->
generation;
1015 handle_state->cond.wait(lock, [&handle_state](){
return handle_state->lock_status !=
LOCK_STATUS_PENDING; });
1039 CommBufPtr cbuf_ptr(Protocol::create_lock_request(handle, mode,
true));
1045 if (handle_state->lock_status != 0)
1052 int error =
send_message(cbuf_ptr, &sync_handler, timer);
1056 "Hyperspace 'try_lock' error, name='%s'",
1057 handle_state->normal_name.c_str());
1059 lock_guard<mutex>
lock(handle_state->mutex);
1060 const uint8_t *decode_ptr = event_ptr->payload + 4;
1061 size_t decode_remain = event_ptr->payload_len - 4;
1067 sequencerp->
mode = mode;
1068 sequencerp->
name = handle_state->normal_name;
1069 handle_state->lock_mode = mode;
1071 handle_state->lock_generation = sequencerp->
generation;
1072 handle_state->sequencer = 0;
1091 CommBufPtr cbuf_ptr(Protocol::create_release_request(handle));
1101 int error =
send_message(cbuf_ptr, &sync_handler, timer);
1103 lock_guard<mutex>
lock(handle_state->mutex);
1106 "Hyperspace 'release' error");
1107 handle_state->lock_status = 0;
1108 handle_state->cond.notify_all();
1127 if (handle_state->lock_generation == 0)
1130 sequencerp->
name = handle_state->normal_name;
1131 sequencerp->
mode = handle_state->lock_mode;
1132 sequencerp->
generation = handle_state->lock_generation;
1140 HT_WARN(
"CheckSequencer not implemented.");
1154 location += replica +
"\n";
1165 CommBufPtr cbuf_ptr(Protocol::create_status_request());
1166 int error =
send_message(cbuf_ptr, &sync_handler, timer);
1171 const uint8_t *ptr = event_ptr->payload + 4;
1172 size_t remain = event_ptr->payload_len - 4;
1173 status.
decode(&ptr, &remain);
1189 (it->second)->safe();
1193 (it->second)->reconnected();
1198 (it->second)->jeopardy();
1207 (it->second)->disconnected();
1215 (it->second)->expired();
1237 auto drop_time = chrono::steady_clock::now() + chrono::milliseconds(max_wait_ms);
1238 return m_cond.wait_until(lock, drop_time,
1245 auto drop_time = chrono::steady_clock::now() + chrono::milliseconds(timer.
remaining());
1246 return m_cond.wait_until(lock, drop_time,
1251 void Session::mkdir(
const std::string &name,
bool create_intermediate,
const std::vector<Attribute> *init_attrs,
Timer *timer) {
1258 CommBufPtr cbuf_ptr(Protocol::create_mkdir_request(normal_name, create_intermediate, init_attrs));
1264 int error =
send_message(cbuf_ptr, &sync_handler, timer);
1268 "Hyperspace 'mkdir' error, name=%s", normal_name.c_str());
1277 uint32_t attr_val_len = 0;
1278 const uint8_t *decode_ptr = event_ptr->payload + 8;
1279 size_t decode_remain = event_ptr->payload_len - 8;
1284 value.
ensure(attr_val_len+1);
1296 uint32_t attr_val_len = 0;
1297 const uint8_t *decode_ptr = event_ptr->payload + 4;
1298 size_t decode_remain = event_ptr->payload_len - 4;
1300 uint32_t attr_val_cnt =
decode_i32(&decode_ptr, &decode_remain);
1301 values.reserve(attr_val_cnt);
1302 while (attr_val_cnt-- > 0) {
1307 value = make_shared<DynamicBuffer>(attr_val_len+1);
1308 value->add_unchecked(attr_val, attr_val_len);
1312 values.push_back(value);
1321 const uint8_t *decode_ptr = event_ptr->payload + 4;
1322 size_t decode_remain = event_ptr->payload_len - 4;
1326 entry_cnt =
decode_i32(&decode_ptr, &decode_remain);
1332 listing.reserve(entry_cnt);
1333 for (uint32_t ii=0; ii<entry_cnt; ii++) {
1339 "Problem decoding entry %d of READDIR_ATTR return packet", ii);
1341 listing.push_back(dentry);
1366 HT_WARNF(
"Comm::send_request to htHyperspace at %s failed - %s",
1384 if (name.find(
'/', name.length()-1) == string::npos)
1387 normal += name.substr(0, name.length()-1);
1391 return make_shared<HsCommandInterpreter>(
this);
1397 hyperspace->close(handle);
1402 hyperspace->close(*handlep);
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
uint16_t m_hyperspace_port
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
Lock successfully granted.
std::shared_ptr< HsCommandInterpreter > HsCommandInterpreterPtr
void attr_get(uint64_t handle, const std::string &attr, DynamicBuffer &value, Timer *timer=0)
Gets an extended attribute of a file.
void readdir(uint64_t handle, std::vector< DirEntry > &listing, Timer *timer=0)
Gets a directory listing.
#define HT_WARNF(msg,...)
uint64_t open(const std::string &name, uint32_t flags, HandleCallbackPtr &callback, Timer *timer=0)
Opens a file.
int get_state()
Returns current state (internal method)
void attr_list(uint64_t handle, vector< String > &anames, Timer *timer=0)
Lists all extended attributes of a file.
A callback object derived from this class gets passed into the constructor of Hyperspace.
void shutdown(Timer *timer=0)
Attempts to shutdown the Hyperspace server and destroys this session.
Holds Nagios-style program status information.
Delivers sleep and wakeup notifications.
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
void close(uint64_t handle, Timer *timer=0)
Closes a file handle.
std::string String
A String is simply a typedef to std::string.
void lock(uint64_t handle, LockMode mode, LockSequencer *sequencerp, Timer *timer=0)
Locks a file.
virtual void handle(EventPtr &event)
Event Dispatch method.
DirEntry & decode_dir_entry(const uint8_t **bufp, size_t *remainp, DirEntry &dir_entry)
Decodes (unserializes) a directory entry from a buffer.
Declarations for Protocol.
Abstract base class for application dispatch handlers registered with AsyncComm.
Error if create and file exists.
HsCommandInterpreterPtr create_hs_interpreter()
Creates a new Hyperspace command interpreter.
Po::typed_value< String > * str(String *v=0)
Program options handling.
void readpath_attr(uint64_t handle, const std::string &attr, std::vector< DirEntryAttr > &listing, Timer *timer=0)
Gets a listing of the value of a specified atribute for each path components of the file/dir name...
SleepWakeNotifier * m_sleep_wake_notifier
Delivers suspend/resume notifications (e.g. laptop close/open).
attempting to reconnect session
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
std::shared_ptr< DynamicBuffer > DynamicBufferPtr
uint32_t remaining()
Returns the remaining time till expiry.
Atomically open and lock file shared, fail if can't.
int status(Status &status, Timer *timer=0)
Check the status of the Hyperspace master server.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
void close_nowait(uint64_t handle)
Attempts close a file handle, but doesn't block.
A dynamic, resizable and reference counted memory buffer.
void handle_wakeup()
Handle wakeup event (e.g.
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
uint64_t m_last_callback_id
void normalize_name(const std::string &name, std::string &normal)
void decode_values(Hypertable::EventPtr &event_ptr, std::vector< DynamicBufferPtr > &values)
void handle_sleep()
Handle sleep event (e.g.
#define HT_EXPECT(_e_, _code_)
uint32_t m_lease_interval
uint64_t create(const std::string &name, uint32_t flags, HandleCallbackPtr &callback, const std::vector< Attribute > &init_attrs, Timer *timer=0)
Creates a file.
void attrs_get(uint64_t handle, const std::vector< std::string > &attrs, std::vector< DynamicBufferPtr > &values, Timer *timer=0)
Gets extended attributes of a file.
vector< String > m_hyperspace_replicas
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
void add_callback(SessionCallback *cb)
Register a new session callback.
void readdir_attr(uint64_t handle, const std::string &attr, bool include_sub_entries, std::vector< DirEntryAttr > &listing, Timer *timer=0)
Gets a listing of all entries in a directory which have a certain attribute .
Lock attempt was cancelled.
std::shared_ptr< Session > SessionPtr
const char * get_text(int error)
Returns a descriptive error message.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
bool exists(const std::string &name, Timer *timer=0)
Checks for the existence of a file.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
void release(uint64_t handle, Timer *timer=0)
Releases any file handle locks.
void attr_set(uint64_t handle, const std::string &attr, const void *value, size_t value_len, Timer *timer=0)
Sets an extended attribute of a file.
std::shared_ptr< Properties > PropertiesPtr
String locate(int type)
Returns location of Hyperspace Master/Replicas.
Logging routines and macros.
bool attr_exists(uint64_t handle, const std::string &attr, Timer *timer=0)
void close_handle_ptr(SessionPtr hyperspace, uint64_t *handlep)
bool wait_for_connection(uint32_t max_wait_ms)
Waits for session state to change to STATE_SAFE.
Compatibility Macros for C/C++.
std::chrono::steady_clock::time_point m_expire_time
void attr_del(uint64_t handle, const std::string &name, Timer *timer=0)
Deletes an extended attribute of a file.
void get_sequencer(uint64_t handle, LockSequencer *sequencerp, Timer *timer=0)
Gets the lock sequencer of a locked file or directory handle.
uint32_t mode
lock mode (LOCK_MODE_SHARED or LOCK_MODE_EXCLUSIVE)
Functions to serialize/deserialize primitives to/from a memory buffer.
uint64_t attr_incr(uint64_t handle, const std::string &attr, Timer *timer=0)
Atomically increments the attribute and returns pre-incremented value Attribute is assumed to be a ui...
void close_handle(SessionPtr hyperspace, uint64_t handle)
Time related declarations.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
ClientKeepaliveHandlerPtr m_keepalive_handler_ptr
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
std::shared_ptr< HandleCallback > HandleCallbackPtr
std::mutex m_callback_mutex
void try_lock(uint64_t handle, LockMode mode, LockStatus *statusp, LockSequencer *sequencerp, Timer *timer=0)
Attempts to lock a file.
void unlink(const std::string &name, Timer *timer=0)
Removes a file or directory.
bool expired()
Checks for session expiration (internal method)
DispatchHandler class used to synchronize with response messages.
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Entry point to AsyncComm service.
String m_hyperspace_master
void clear()
Clears the buffer.
Declarations for ConnectionManager.
std::string name
Pathname of file that is locked.
int state_transition(int state)
Transions state (internal method)
Lock attempt pending (internal use only)
void update_master_addr(const String &host)
atomically open and lock file exclusive, fail if can't
#define HT_THROWF(_code_, _fmt_,...)
DirEntryAttr & decode_dir_entry_attr(const uint8_t **bufp, size_t *remainp, DirEntryAttr &entry)
Decodes (unserializes) a directory entry from a buffer.
Declaration for SleepWakeNotifier.
Internet address wrapper classes and utility functions.
uint8_t * decode_bytes32(const uint8_t **bufp, size_t *remainp, uint32_t *lenp)
Decodes a variable sized byte array from the given buffer.
A timer class to keep timeout states across AsyncComm related calls.
This is a generic exception class for Hypertable.
void mkdir(const std::string &name, Timer *timer=0)
Creates a directory.
void decode_listing(Hypertable::EventPtr &event_ptr, std::vector< DirEntryAttr > &listing)
bool remove_callback(SessionCallback *cb)
De-register session callback.
std::shared_ptr< ClientHandleState > ClientHandleStatePtr
Create file if it does not exist.
int send_message(CommBufPtr &, DispatchHandler *, Timer *timer)
void decode_value(Hypertable::EventPtr &event_ptr, DynamicBuffer &value)
#define HT_TRY(_s_, _code_)
uint8_t decode_byte(const uint8_t **bufp, size_t *remainp)
Decodes a single byte from the given buffer.
void mkdirs(const std::string &name, Timer *timer=0)
Creates a directory, including all intermediate paths.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
int send_request(const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler)
Sends a request message over a connection, expecting a response.
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
std::condition_variable m_cond
Declarations for DispatchHandlerSynchronizer.
int code() const
Returns the error code.
#define HT_THROW2(_code_, _ex_, _msg_)
void check_sequencer(LockSequencer &sequencer, Timer *timer=0)
Checks to see if a lock sequencer is valid.