39 #include <boost/algorithm/string.hpp>
40 #include <boost/tokenizer.hpp>
51 #include <sys/types.h>
53 #if defined(__FreeBSD__)
54 #include <sys/extattr.h>
56 #include <sys/xattr.h>
66 #define HT_BDBTXN_BEGIN(parent_txn) \
69 std::stringstream txn_str;\
70 HT_ASSERT(is_master());\
71 m_bdb_fs->start_transaction(txn); \
74 #define HT_BDBTXN_END_CB(_cb_) \
75 catch (Exception &e) { \
76 if (e.code() == Error::HYPERSPACE_BERKELEYDB_DEADLOCK) { \
78 HT_INFOF("Berkeley DB deadlock encountered in txn %s", txn_str.str().c_str()); \
80 this_thread::sleep_for(Random::duration_millis(3000)); \
83 else if (e.code() == Error::HYPERSPACE_BERKELEYDB_REP_HANDLE_DEAD) { \
85 HT_INFOF("Berkeley DB rep handle dead deadlock encountered in txn %s", txn_str.str().c_str()); \
89 else if (e.code() == Error::HYPERSPACE_BERKELEYDB_ERROR) \
90 HT_ERROR_OUT << e << HT_END; \
92 HT_ERRORF("%s - %s", Error::get_text(e.code()), e.what()); \
94 _cb_->error(e.code(), e.what()); \
97 HT_DEBUG_OUT << "end txn " << txn << HT_END; \
101 #define HT_BDBTXN_END(...) \
102 catch (Exception &e) { \
103 if (e.code() == Error::HYPERSPACE_BERKELEYDB_DEADLOCK) {\
105 HT_INFOF("Berkeley DB deadlock encountered in txn %s", txn_str.str().c_str()); \
107 this_thread::sleep_for(Random::duration_millis(3000)); \
110 else if (e.code() == Error::HYPERSPACE_BERKELEYDB_REP_HANDLE_DEAD) { \
112 HT_INFOF("Berkeley DB rep handle dead deadlock encountered in txn %s", txn_str.str().c_str()); \
116 else if (e.code() == Error::HYPERSPACE_BERKELEYDB_ERROR) \
117 HT_ERROR_OUT << e << HT_END; \
119 HT_ERRORF("%s - %s", Error::get_text(e.code()), e.what()); \
121 return __VA_ARGS__; \
123 HT_DEBUG_OUT << "end txn " << txn << HT_END; \
136 : m_verbose(false), m_next_handle_number(1), m_next_session_id(1),
137 m_maintenance_outstanding(false),
138 m_shutdown(false), m_bdb_fs(0) {
145 Path base_dir(props->get_str(
"Hyperspace.Replica.Dir"));
147 if (!base_dir.is_complete()) {
148 Path data_dir = props->get_str(
"Hypertable.DataDirectory");
149 base_dir = data_dir / base_dir;
159 HT_INFOF(
"Base directory '%s' does not exist, creating...",
162 HT_ERRORF(
"Unable to create base directory %s - %s",
169 HT_INFOF(
"Lock file '%s' does not exist, creating...",
170 m_lock_file.c_str());
171 if ((
m_lock_fd = ::
open(m_lock_file.c_str(), O_RDWR|O_CREAT|O_TRUNC, 0644)) < 0) {
172 HT_ERRORF(
"Unable to create lock file '%s' - %s", m_lock_file.c_str(), strerror(errno));
178 HT_ERRORF(
"Unable to open lock file '%s' - %s", m_lock_file.c_str(), strerror(errno));
189 memset(&fl, 0,
sizeof fl);
192 fl.l_whence = SEEK_SET;
197 if (fcntl(
m_lock_fd, F_SETLKW, &fl) == -1) {
198 if (errno == EWOULDBLOCK) {
199 HT_ERRORF(
"Lock file '%s' is locked by another process.",
200 m_lock_file.c_str());
203 HT_ERRORF(
"Unable to lock file '%s' - %s",
204 m_lock_file.c_str(), strerror(errno));
209 if (flock(
m_lock_fd, LOCK_EX | LOCK_NB) != 0) {
210 if (errno == EWOULDBLOCK) {
211 HT_ERRORF(
"Lock file '%s' is locked by another process.",
212 m_lock_file.c_str());
215 HT_ERRORF(
"Unable to lock file '%s' - %s",
216 m_lock_file.c_str(), strerror(errno));
222 app_queue_ptr = make_shared<ApplicationQueue>(get_i32(
"workers"),
false);
223 vector<Thread::id> thread_ids = app_queue_ptr->get_thread_ids();
224 thread_ids.push_back(ThisThread::get_id());
236 uint16_t port = props->get_i16(
"Hyperspace.Replica.Port");
267 uint64_t session_id = 0;
269 HT_INFOF(
"Create session for %s", addr_str.c_str());
276 session_data = make_shared<SessionData>(addr,
m_lease_interval, session_id);
296 session_data = (*iter).second;
311 session_data = (*iter).second;
313 session_data->expire();
315 session_data->set_expire_time_now();
316 HT_INFOF(
"destroyed session %llu(%s)",
317 (
Llu)session_id, session_data->get_name());
329 HT_ERRORF(
"Unable to initialize session %llu (%s)", (
Llu)session_id, name.c_str());
332 session_data = (*iter).second;
339 session_data->set_name(name);
343 HT_INFOF(
"Initialized session %llu (%s)", (
Llu)session_id, name.c_str());
356 bool renewed =
false;
357 bool commited =
false;
364 session_data = iter->second;
365 renewed = session_data->renew_lease();
378 session_data->expire();
398 std::chrono::steady_clock::time_point now) {
405 if (session_data->is_expired(now) ||
m_shutdown) {
427 std::vector<uint64_t> handles;
428 std::vector<uint64_t> expired_sessions;
430 auto now = std::chrono::steady_clock::now();
434 bool commited =
false;
436 HT_INFOF(
"Expiring session %llu name=%s", (
Llu)session_data->get_id(),
437 session_data->get_name());
445 expired_sessions.push_back(session_data->get_id());
450 session_data->expire();
454 for (
auto handle : handles) {
458 HT_INFOF(
"Problem destroying handle - %s (%s)",
463 if (expired_sessions.size() > 0) {
465 for (
auto expired_session : expired_sessions) {
485 bool commited =
false;
492 if (init_attrs.size() && !ctx.
aborted)
525 std::vector<EventContext> evts;
526 bool commited =
false;
533 exists(ctx, name, file_exists);
534 if (!ctx.
aborted && !file_exists) {
535 typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
536 boost::char_separator<char> sep(
"/");
537 std::vector<String> name_components;
539 tokenizer tokens(path, sep);
540 for (tokenizer::iterator tok_iter = tokens.begin();
541 tok_iter != tokens.end(); ++tok_iter)
542 name_components.push_back(*tok_iter);
545 for (
size_t i=0; i<name_components.size(); i++) {
546 path +=
String(
"/") + name_components[i];
547 mkdir(ctx, path.c_str());
550 if (init_attrs.size() && !ctx.
aborted &&
551 i == name_components.size() - 1)
601 bool commited =
false;
639 uint32_t flags, uint32_t event_mask, std::vector<Attribute> &init_attrs) {
641 bool commited =
false;
643 bool created =
false;
644 uint64_t lock_generation = 0;
653 open(ctx, name, flags, event_mask, init_attrs, handle, created, lock_generation);
675 HT_INFOF(
"exitting open(session_id=%llu, session_name = %s, fname=%s, flags=0x%x, event_mask=0x%x)",
736 format(
"Session %llu (%s) does not exist",
759 const char *name, uint32_t oflags,
const std::vector<Attribute> &attrs) {
761 bool commited =
false;
762 uint64_t opened_handle = 0;
777 uint64_t lock_generation;
778 std::vector<Attribute> none;
779 open(ctx, name, oflags, 0, none, opened_handle, created, lock_generation);
781 attr_set(ctx, opened_handle, 0, attrs);
782 close(ctx, opened_handle);
832 uint64_t handle,
const char *name,
833 const std::vector<String> &attrs) {
835 std::vector<DynamicBufferPtr> dbufs;
836 dbufs.reserve(attrs.size());
842 if (attrs.size() == 1) {
843 dbufs.push_back(make_shared<DynamicBuffer>());
844 attr_get(ctx, handle, name, attrs.front().c_str(), *dbufs.back());
847 attr_get(ctx, handle, name, attrs, dbufs);
884 uint64_t handle,
const char *name,
const char* attr) {
891 attr_incr(ctx, handle, name, attr, attr_val);
924 bool commited =
false;
957 const char *name,
const char *attr)
985 std::vector<String> attributes;
1020 bool file_exists =
false;
1025 exists(ctx, name, file_exists);
1056 std::vector<DirEntry> listing;
1061 readdir(ctx, handle, listing);
1091 uint64_t handle,
const char *name,
const char *attr,
bool include_sub_entries) {
1092 std::vector<DirEntryAttr> listing;
1097 readdir_attr(ctx, handle, name, attr, include_sub_entries, listing);
1130 uint64_t handle,
const char *name,
const char *attr) {
1131 std::vector<DirEntryAttr> listing;
1162 HT_INFOF(
"shutdown(session=%llu", (
Llu)session_id);
1174 session_data = (*iter).second;
1176 session_data->expire();
1178 session_data->set_expire_time_now();
1179 HT_INFOF(
"destroyed dangling session %llu(%s)",
1180 (
Llu)session_data->get_id(), session_data->get_name());
1204 uint32_t mode,
bool try_lock) {
1207 uint32_t open_flags, cur_lock_mode;
1209 uint64_t lock_generation = 0;
1213 bool persisted_notifications =
false;
1214 bool aborted=
false, commited=
false;
1215 int lock_status = 0;
1225 HT_INFOF(
"lock(session=%llu(%s), handle=%llu, mode=0x%x, try_lock=%d)",
1226 (
Llu)session_id, session_data->get_name(), (
Llu)handle, mode, try_lock);
1231 aborted =
false; commited =
false; persisted_notifications =
false;
1237 error_msg =
format(
"session: %lld", (
Lld)session_id);
1250 error_msg =
"handle not open for locking";
1257 error_msg =
"handle not open for writing";
1319 lock_acquired_event = make_shared<EventLockAcquired>(event_id, mode);
1321 lock_acquired_notifications)) {
1323 persisted_notifications =
true;
1331 std::stringstream sout;
1332 sout <<
"lock txn=" << txn <<
" aborted " <<
" handle=" << handle <<
" node="
1333 << node <<
" mode=" << mode <<
" status=" << lock_status
1334 <<
" lock_generation=" << lock_generation;
1335 HT_INFOF(
"%s", sout.str().c_str());
1340 std::stringstream sout;
1341 sout <<
"lock txn=" << txn <<
" commited " <<
" handle=" << handle <<
" node="
1342 << node <<
" mode=" << mode <<
" status=" << lock_status
1343 <<
" lock_generation=" << lock_generation <<
" notification_count="
1344 << lock_acquired_notifications.size();
1345 HT_INFOF(
"%s", sout.str().c_str());
1352 cb->
error(error, error_msg);
1357 switch (lock_status) {
1359 cb->
response(lock_status, lock_generation);
1366 if (commited && persisted_notifications) {
1421 NotificationMap lock_release_notifications, lock_granted_notifications,
1422 lock_acquired_notifications;
1424 bool aborted =
false, commited =
false;
1432 HT_INFOF(
"release(session=%llu(%s), handle=%llu)",
1433 (
Llu)session_id, session_data->get_name(), (
Llu)handle);
1442 error_msg =
format(
"session: %lld", (
Lld)session_id);
1449 error_msg =
format(
"handle=%lld", (
Lld)handle);
1456 release_lock(txn, handle, node, lock_release_event, lock_release_notifications);
1471 cb->
error(error, error_msg);
1482 lock_acquired_event, lock_acquired_notifications);
1511 vector<uint64_t> next_lock_handles;
1512 uint64_t exclusive_lock_handle=0;
1516 if (exclusive_lock_handle != 0) {
1517 HT_ASSERT(handle == exclusive_lock_handle);
1530 HT_INFO(
"Persisting lock released notifications");
1532 release_event = make_shared<EventLockReleased>(event_id);
1534 release_event->get_mask());
1536 release_notifications)) {
1541 HT_INFO(
"Finished persisting lock released notifications");
1557 vector<uint64_t> next_lock_handles;
1562 next_mode = front_lock_req.
mode;
1566 next_lock_handles.push_back(front_lock_req.
handle);
1576 next_lock_handles.push_back(lockreq.
handle);
1581 if (!next_lock_handles.empty()) {
1588 next_mode, lock_generation);
1592 lock_granted_event = make_shared<EventLockGranted>(event_id, next_mode, lock_generation);
1594 for (
auto handle : next_lock_handles) {
1597 lock_granted_notifications[handle] = session;
1606 lock_acquired_event = make_shared<EventLockAcquired>(event_id, next_mode);
1609 lock_acquired_notifications))
1624 if (handles_to_sessions.size() > 0) {
1625 vector<uint64_t> handles;
1626 for (NotificationMap::iterator iter = handles_to_sessions.begin();
1627 iter != handles_to_sessions.end(); iter++) {
1628 handles.push_back(iter->first);
1642 vector<uint64_t> handles;
1643 handles.push_back(handle);
1655 uint64_t session_id;
1657 bool has_notifications =
false;
1658 vector<uint64_t> sessions;
1660 for (NotificationMap::iterator iter = handles_to_sessions.begin();
1661 iter != handles_to_sessions.end(); iter++) {
1662 handle_id = iter->first;
1663 session_id = iter->second;
1665 session_data->add_notification(
new Notification(handle_id, event_ptr ) );
1666 sessions.push_back(session_id);
1667 has_notifications =
true;
1671 if (has_notifications) {
1674 for (
auto session_id : sessions) {
1676 sessions_str +=
String(
" ") + session_id;
1679 if (wait_for_notify)
1680 event_ptr->wait_for_notifications();
1683 HT_INFOF(
"exitting deliver_event_notifications for event_id=%llu mask=0x%x sessions=(%s )",
1684 (
Llu)event_ptr->get_id(), (int)(
Llu)event_ptr->get_mask(), sessions_str.c_str());
1696 size_t last_slash = normal_name.rfind(
"/", normal_name.length());
1700 if (last_slash > 0) {
1701 parent_name = normal_name.substr(0, last_slash);
1702 child_name.append(normal_name, last_slash + 1, normal_name.length() - last_slash - 1);
1705 else if (last_slash == 0) {
1707 child_name.append(normal_name, 1, normal_name.length() - 1);
1735 bool wait_for_notify) {
1736 bool has_refs =
false;
1737 NotificationMap lock_release_notifications, lock_granted_notifications,
1738 lock_acquired_notifications, node_removed_notifications;
1740 node_removed_event ;
1741 bool node_removed =
false;
1743 bool aborted =
false;
1754 errmsg =
format(
"Handle %lld already deleted or being deleted", (
Lld)handle);
1760 release_lock(txn, handle, node, lock_release_event, lock_release_notifications);
1777 lock_acquired_event, lock_acquired_notifications);
1791 String parent_node, child_node;
1801 EVENT_MASK_CHILD_NODE_REMOVED, node_removed_notifications)) {
1807 node_removed =
true;
1837 auto now = std::chrono::steady_clock::now();
1839 std::chrono::milliseconds lease_credit =
1840 std::chrono::duration_cast<std::chrono::milliseconds>(now -
m_sleep_time) +
1847 HT_INFOF(
"Resume detected, extending all session leases "
1848 "by %lu milliseconds.", (
Lu)lease_credit.count());
1851 (*iter).second->extend_lease(lease_credit);
1879 String parent_node, child_name;
1885 if (name[0] !=
'/' || name[strlen(name)-1] ==
'/') {
1925 uint32_t flags, uint32_t event_mask,
1926 std::vector<Attribute> &init_attrs, uint64_t& handle,
1927 bool& created, uint64_t& lock_generation) {
1931 lock_generation = 0;
1933 String child_name, node = name, parent_node;
1934 bool lock_notify =
false;
1935 uint32_t lock_mode = 0;
1947 HT_INFOF(
"open(session_id=%llu, session_name = %s, fname=%s, flags=0x%x, event_mask=0x%x)",
1948 (
Llu)ctx.
session_id,ctx. session_data->get_name(), name, flags, event_mask);
2020 lock_generation = 1;
2029 for (
size_t i=0; i<init_attrs.size(); i++)
2031 init_attrs[i].value, init_attrs[i].value_len);
2045 if (lock_mode != 0) {
2061 std::vector<EventContext>::iterator it = ctx.
evts.insert(ctx.
evts.end(),
2062 EventContext(make_shared<EventLockAcquired>(lock_acquired_event_id, lock_mode)));
2065 it->notifications)) {
2068 it->persisted_notifications =
true;
2075 HT_INFOF(
"handle %llu created ('%s', session=%llu(%s), flags=0x%x, mask=0x%x)",
2085 if (!strcmp(name,
"/")) {
2087 "Cannot remove '/' directory");
2099 String child_name, parent_node;
2127 HT_ASSERT(name[0] ==
'/' && name[strlen(name)-1] !=
'/');
2140 const char *name,
const std::vector<Attribute> &attrs) {
2144 std::string attr_names;
2145 size_t total_value_len = 0;
2147 for (
const auto &attr : attrs) {
2148 attr_names += attr.name;
2150 total_value_len += attr.value_len;
2152 boost::trim_right_if(attr_names, boost::is_any_of(
","));
2156 if (name && *name) {
2164 for (
const auto &attr : attrs) {
2171 HT_INFOF(
"exitting attrset(session=%llu(%s), handle=%llu, name=%s, value_len=%d)",
2182 if (name && *name) {
2197 const char *name,
const std::vector<String> &attrs,
2198 std::vector<DynamicBufferPtr> &dbufs) {
2204 if (name && *name) {
2212 dbufs.reserve(attrs.size());
2213 for (
const auto &attr : attrs) {
2214 dbufs.push_back(make_shared<DynamicBuffer>());
2221 const char *name,
const char* attr, uint64_t& attr_val) {
2226 if (name && *name) {
2254 const char *name,
const char *attr,
bool& exists) {
2261 if (name && *name) {
2285 format(
"handle=%lld node=%s", (
Lld)handle, node.c_str()));
2291 file_exists =
false;
2299 HT_ASSERT(name[0] ==
'/' && (name[1] ==
'\0' || name[strlen(name)-1] !=
'/'));
2320 bool include_sub_entries, std::vector<DirEntryAttr>& listing) {
2327 if (name && *name) {
2339 std::vector<DirEntryAttr>& listing) {
2347 if (name && *name) {
2363 while (pos != string::npos) {
2364 pos = node.find(
'/', pos);
2367 if (pos == string::npos)
2368 entry.
is_dir = node_is_dir;
2372 path_component = node.substr(0, pos);
2373 entry.
name = path_component;
2377 entry.
attr = attr_buf;
2384 listing.push_back(entry);
2451 String child_name, parent_node;
2456 boost::trim_right_if(node, boost::is_any_of(
"/"));
2481 std::vector<EventContext>::iterator it = ctx.
evts.insert(ctx.
evts.end(),
2482 EventContext(make_shared<EventNamed>(event_id, event_mask, name)));
2485 it->notifications)) {
2487 it->persisted_notifications =
true;
2490 it->persisted_notifications =
false;
2496 for (
auto &evt : ctx.
evts)
2497 if (evt.persisted_notifications)
2535 uint64_t lock_generation;
2537 lock_generation = 1;
uint32_t get_handle_del_state(BDbTxn &txn, uint64_t id)
#define HT_BDBTXN_END(...)
bool validate_and_create_node_data(BDbTxn &txn, const String &node)
bool next_expired_session(SessionDataPtr &, std::chrono::steady_clock::time_point now)
int response(bool exists)
void close(ResponseCallback *cb, uint64_t session_id, uint64_t handle)
uint64_t handle
Node handle ID.
Lock successfully granted.
std::chrono::steady_clock::time_point m_last_tick
void set_session_name(BDbTxn &txn, uint64_t id, const String &name)
void attr_set(ResponseCallback *cb, uint64_t session_id, uint64_t handle, const char *name, uint32_t oflags, const std::vector< Attribute > &attrs)
void add_node_pending_lock_request(BDbTxn &txn, const String &name, LockRequest &request)
Adds a lock request.
bool node_has_shared_lock_handles(BDbTxn &txn, const String &name)
std::string String
A String is simply a typedef to std::string.
Compatibility class for boost::filesystem::path.
void delete_node_shared_lock_handle(BDbTxn &txn, const String &name, uint64_t handle_id)
bool get_xattr_i32(BDbTxn &txn, const String &fname, const String &aname, uint32_t *valuep)
void release_lock(BDbTxn &txn, uint64_t handle, const String &node, HyperspaceEventPtr &release_event, NotificationMap &release_notifications)
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
void lock_handle(BDbTxn &txn, uint64_t handle, uint32_t mode, String &node)
void status(ResponseCallbackStatus *cb)
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
void get_handle_node(BDbTxn &txn, uint64_t id, String &node_name)
void add_node_handle(BDbTxn &txn, const String &name, uint64_t handle)
Error if create and file exists.
Manages transaction state.
bool incr_attr(BDbTxn &txn, const String &fname, const String &aname, uint64_t *valuep)
void unlink(ResponseCallback *cb, uint64_t session_id, const char *name)
Exclusive lock attempt failed because another has it locked.
struct sockaddr_in m_local_addr
void create_handle(BDbTxn &txn, uint64_t id, String node_name, uint32_t open_flags, uint32_t event_mask, uint64_t session_id, bool locked, uint32_t del_state)
void create_node(BDbTxn &txn, const String &name, bool ephemeral=false, uint64_t lock_generation=0, uint32_t cur_lock_mode=0, uint64_t exclusive_handle=0)
long long unsigned int Llu
Shortcut for printf formats.
bool m_maintenance_outstanding
void open(ResponseCallbackOpen *cb, uint64_t session_id, const char *name, uint32_t flags, uint32_t event_mask, std::vector< Attribute > &init_attrs)
void create(BDbTxn &txn, const String &fname, bool temp)
bool get_session(uint64_t session_id, SessionDataPtr &session_data)
std::shared_ptr< SessionData > SessionDataPtr
int response(uint32_t status, uint64_t lock_generation=0)
static bool exists(const String &fname)
Checks if a file or directory exists.
SessionDataVec m_session_heap
void delete_session(BDbTxn &txn, uint64_t id)
static bool mkdirs(const String &dirname)
Creates a directory (with all parent directories, if required)
void initialize_session(uint64_t session_id, const String &name)
bool node_is_ephemeral(BDbTxn &txn, const String &name)
std::shared_ptr< Event > HyperspaceEventPtr
void set_node_cur_lock_mode(BDbTxn &txn, const String &name, uint32_t lock_mode)
std::vector< EventContext > evts
int response(std::vector< DirEntry > &listing)
void attr_incr(ResponseCallbackAttrIncr *cb, uint64_t session_id, uint64_t handle, const char *name, const char *attr)
bool node_has_pending_lock_request(BDbTxn &txn, const String &name)
Atomically open and lock file shared, fail if can't.
void add_node_shared_lock_handle(BDbTxn &txn, const String &name, uint64_t handle)
A dynamic, resizable and reference counted memory buffer.
void attr_del(ResponseCallback *cb, uint64_t session_id, uint64_t handle, const char *name)
void remove_expired_sessions()
std::mutex m_session_map_mutex
int response(std::vector< DirEntryAttr > &listing)
void get_session_handles(BDbTxn &txn, uint64_t id, std::vector< uint64_t > &handles)
int response(const vector< string > &attributes)
void destroy_session(uint64_t session_id)
void grant_pending_lock_reqs(BDbTxn &txn, const String &node, HyperspaceEventPtr &lock_granted_event, NotificationMap &lock_granted_notifications, HyperspaceEventPtr &lock_acquired_event, NotificationMap &lock_acquired_notifications)
void readdir(ResponseCallbackReaddir *cb, uint64_t session_id, uint64_t handle)
bool exists(BDbTxn &txn, String fname, bool *is_dir_p=0)
Compatibility class for boost::filesystem::path.
void shutdown(ResponseCallback *cb, uint64_t session_id)
File system utility functions.
void handle_wakeup()
Handle wakeup event (e.g.
std::unordered_map< uint64_t, uint64_t > NotificationMap
Hash map from Node handle ID to Session ID.
Master(ConnectionManagerPtr &, PropertiesPtr &, ServerKeepaliveHandlerPtr &, ApplicationQueuePtr &app_queue_ptr)
bool node_has_open_handles(BDbTxn &txn, const String &name)
bool has_attr
Boolean value indicating whether or not this entry is a directory.
const char * get_text(int error)
Returns a descriptive error message.
void do_checkpoint()
Checkpoints the BerkeleyDB database.
bool handle_is_locked(BDbTxn &txn, uint64_t id)
bool delete_session_handle(BDbTxn &txn, uint64_t id, uint64_t handle_id)
int response(const Hypertable::Status &status)
int response(uint64_t val)
bool delete_node(BDbTxn &txn, const String &name)
NotificationMap notifications
Sends back result of an attr_exists request.
std::shared_ptr< Properties > PropertiesPtr
bool get_node_event_notification_map(BDbTxn &txn, const String &name, uint32_t event_mask, NotificationMap &handles_to_sessions)
bool get_node_pending_lock_request(BDbTxn &txn, const String &name, LockRequest &front_req)
Check if a node has any pending lock requests from non-expired handles.
void readpath_attr(ResponseCallbackReadpathAttr *cb, uint64_t session_id, uint64_t handle, const char *name, const char *attr)
uint64_t get_node_exclusive_lock_handle(BDbTxn &txn, const String &name)
void set_handle_locked(BDbTxn &txn, uint64_t id, bool locked)
uint32_t m_maintenance_interval
Compatibility Macros for C/C++.
bool get_xattr(BDbTxn &txn, const String &fname, const String &aname, Hypertable::DynamicBuffer &vbuf)
void delete_node_handle(BDbTxn &txn, const String &name, uint64_t handle)
Status m_status
Program status tracker.
std::shared_ptr< ServerKeepaliveHandler > ServerKeepaliveHandlerPtr
#define HT_BDBTXN_BEGIN(parent_txn)
void set_handle_del_state(BDbTxn &txn, uint64_t id, uint32_t del_state)
Sends back result of an attr_get request.
Importing boost::thread and boost::thread_group into the Hypertable namespace.
void free()
Clears the data; if this object is owner of the data then the allocated buffer is delete[]d...
bool exists_xattr(BDbTxn &txn, const String &fname, const String &aname)
BerkeleyDbFilesystem * m_bdb_fs
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
void create_event(BDbTxn &txn, uint32_t type, uint64_t id, uint32_t mask)
void mkdir(BDbTxn &txn, const String &name)
Hyperspace filesystem implementation on top of BerkeleyDB.
This class is used to generate and deliver standard responses back to a client.
void get_generation_number()
void add_session_handle(BDbTxn &txn, uint64_t id, uint64_t handle_id)
uint32_t get_node_cur_lock_mode(BDbTxn &txn, const String &name)
std::chrono::steady_clock::time_point m_sleep_time
Suspension time recorded by handle_sleep()
void set_xattr(BDbTxn &txn, const String &fname, const String &aname, const void *value, size_t value_len)
long long int Lld
Shortcut for printf formats.
void expire_session(BDbTxn &txn, uint64_t id)
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
bool handle_exists(BDbTxn &txn, uint64_t id)
void attr_exists(ResponseCallbackAttrExists *cb, uint64_t session_id, uint64_t handle, const char *name, const char *attr)
void deliver_event_notifications(CommandContext &ctx, bool wait_for_notify=true)
void mkdir(ResponseCallback *cb, uint64_t session_id, const char *name, const std::vector< Attribute > &init_attrs)
void attr_list(ResponseCallbackAttrList *cb, uint64_t session_id, uint64_t handle)
void attr_get(ResponseCallbackAttrGet *cb, uint64_t session_id, uint64_t handle, const char *name, const std::vector< String > &attrs)
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
void delete_node_pending_lock_request(BDbTxn &txn, const String &name, uint64_t handle)
void lock(ResponseCallbackLock *cb, uint64_t session_id, uint64_t handle, uint32_t mode, bool try_lock)
std::mutex m_maintenance_mutex
bool find_parent_node(const std::string &normal_name, std::string &parent_name, std::string &child_name)
Lock attempt pending (internal use only)
#define HT_INFOF(msg,...)
uint64_t create_session(struct sockaddr_in &addr)
void readdir_attr(ResponseCallbackReaddirAttr *cb, uint64_t session_id, uint64_t handle, const char *name, const char *attr, bool include_sub_entries)
int renew_session_lease(uint64_t session_id)
atomically open and lock file exclusive, fail if can't
int response(uint64_t handle, bool created, uint64_t lock_generation)
Random number generator for int32, int64, double and ascii arrays.
uint64_t get_handle_session(BDbTxn &txn, uint64_t id)
void set_xattr_i32(BDbTxn &txn, const String &fname, const String &aname, uint32_t value)
bool get_named_node(CommandContext &ctx, const char *name, const char *attr, String &node, bool *is_dir=0)
uint64_t get_next_id_i64(BDbTxn &txn, IdentifierType id_type, bool increment=false)
Encapsulates a lock request for a file node.
uint32_t m_lease_interval
uint32_t m_keep_alive_interval
bool list_xattr(BDbTxn &txn, const String &fname, std::vector< String > &anames)
bool session_exists(BDbTxn &txn, uint64_t id)
Used in conjunction with CREATE to create an ephemeral file.
void create_event(CommandContext &ctx, const String &node, uint32_t event_mask, const String &name)
long unsigned int Lu
Shortcut for printf formats.
void set_event_notification_handles(BDbTxn &txn, uint64_t id, const std::vector< uint64_t > &handles)
#define HT_ERRORF(msg,...)
void create_session(BDbTxn &txn, uint64_t id, const String &addr)
#define HT_BDBTXN_END_CB(_cb_)
Create file if it does not exist.
int response(bool exists)
Sends back result of an attr_exists request.
void set_error(int _error, const char *_error_msg, bool abort=true)
bool get_xattr_i64(BDbTxn &txn, const String &fname, const String &aname, uint64_t *valuep)
void release(ResponseCallback *cb, uint64_t session_id, uint64_t handle)
void unlink(BDbTxn &txn, const String &name)
ServerKeepaliveHandlerPtr m_keepalive_handler_ptr
void delete_handle(BDbTxn &txn, uint64_t id)
int response(std::vector< DirEntryAttr > &listing)
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
void del_xattr(BDbTxn &txn, const String &fname, const String &aname)
bool node_exists(BDbTxn &txn, const String &name)
std::shared_ptr< ApplicationQueue > ApplicationQueuePtr
Shared smart pointer to ApplicationQueue object.
MetricsHandlerPtr m_metrics_handler
System information and statistics based on libsigar.
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
int response(const std::vector< Hypertable::DynamicBufferPtr > &buffers)
Sends back result of an attr_get request.
void get_directory_listing(BDbTxn &txn, String fname, std::vector< DirEntry > &listing)
void exists(ResponseCallbackExists *cb, uint64_t session_id, const char *name)
uint32_t get_handle_open_flags(BDbTxn &txn, uint64_t id)
SessionDataPtr session_data
bool get_handle_node(CommandContext &ctx, uint64_t handle, const char *attr, String &node)
void handle_sleep()
Handle sleep event (e.g.
bool destroy_handle(uint64_t handle, int &error, String &errmsg, bool wait_for_notify=true)
void mkdirs(ResponseCallback *cb, uint64_t session_id, const char *name, const std::vector< Attribute > &init_attrs)
uint64_t incr_node_lock_generation(BDbTxn &txn, const String &name)
void get_directory_attr_listing(BDbTxn &txn, String fname, const String &aname, bool include_sub_entries, std::vector< DirEntryAttr > &listing)
void set_xattr_i64(BDbTxn &txn, const String &fname, const String &aname, uint64_t value)
const char * friendly_name
void persist_event_notifications(BDbTxn &txn, uint64_t event_id, NotificationMap &handles_to_sessions)
void set_node_exclusive_lock_handle(BDbTxn &txn, const String &name, uint64_t exclusive_lock_handle)