56 #include "Response/Parameters/Status.h"
76 : m_conn_mgr(conn_mgr), m_addr(addr), m_timeout_ms(timeout_ms) {
77 m_comm = conn_mgr->get_comm();
83 : m_conn_mgr(conn_mgr) {
84 m_comm = conn_mgr->get_comm();
85 uint16_t port = cfg->get_i16(
"FsBroker.Port");
86 String host = cfg->get_str(
"FsBroker.Host");
87 if (cfg->has(
"FsBroker.Timeout"))
90 m_timeout_ms = cfg->get_i32(
"Hypertable.Request.Timeout");
93 if (cfg->has(
"DfsBroker.Host"))
94 host = cfg->get_str(
"DfsBroker.Host");
95 if (cfg->has(
"DfsBroker.Port"))
96 port = cfg->get_i16(
"DfsBroker.Port");
104 : m_comm(comm), m_conn_mgr(0), m_addr(addr), m_timeout_ms(timeout_ms) {
108 : m_timeout_ms(timeout_ms) {
115 "Timed out waiting for connection to FS Broker");
132 params.
encode(cbuf->get_data_ptr_address());
138 HT_THROW2F(e.
code(), e,
"Error opening FS file: %s", name.c_str());
150 params.
encode(cbuf->get_data_ptr_address());
164 HT_THROW2F(e.
code(), e,
"Error opening FS file: %s", name.c_str());
171 uint32_t outstanding, uint64_t start_offset,
172 uint64_t end_offset) {
180 lock_guard<mutex> lock(
m_mutex);
184 start_offset, end_offset);
189 HT_THROW2F(e.
code(), e,
"Error opening buffered FS file=%s buf_size=%u "
190 "outstanding=%u start_offset=%llu end_offset=%llu", name.c_str(),
191 buf_size, outstanding, (
Llu)start_offset, (
Llu)end_offset);
200 const uint8_t *ptr =
event->payload + 4;
201 size_t remain =
event->payload_len - 4;
204 params.
decode(&ptr, &remain);
211 int32_t replication, int64_t blksz,
216 params.
encode(cbuf->get_data_ptr_address());
222 HT_THROW2F(e.
code(), e,
"Error creating FS file: %s:", name.c_str());
229 int32_t replication, int64_t blksz) {
235 params.
encode(cbuf->get_data_ptr_address());
249 HT_THROW2F(e.
code(), e,
"Error creating FS file: %s", name.c_str());
266 params.
encode(cbuf->get_data_ptr_address());
269 lock_guard<mutex> lock(
m_mutex);
272 reader_handler = (*iter).second;
276 delete reader_handler;
296 params.
encode(cbuf->get_data_ptr_address());
298 lock_guard<mutex> lock(
m_mutex);
301 reader_handler = (*iter).second;
305 delete reader_handler;
326 params.
encode(cbuf->get_data_ptr_address());
332 HT_THROW2F(e.
code(), e,
"Error sending read request for %u bytes "
333 "from FS fd: %d", (unsigned)len, (
int)fd);
342 lock_guard<mutex> lock(
m_mutex);
345 reader_handler = (*iter).second;
349 return reader_handler->
read(dst, len);
357 params.
encode(cbuf->get_data_ptr_address());
369 memcpy(dst, data, length);
374 (unsigned)len, (
int)fd);
379 uint64_t *offset, uint32_t *length) {
384 const uint8_t *ptr =
event->payload + 4;
385 size_t remain =
event->payload_len - 4;
388 params.
decode(&ptr, &remain);
392 if (*length == (uint32_t)-1) {
397 if (remain < (
size_t)*length)
425 (unsigned)buffer.
size, (
int)fd);
459 if (buffer.
size != amount)
461 "%u", (
unsigned)buffer.
size, (
unsigned)amount);
462 return (
size_t)amount;
466 (unsigned)buffer.
size, (
int)fd);
477 const uint8_t *ptr =
event->payload + 4;
478 size_t remain =
event->payload_len - 4;
481 params.
decode(&ptr, &remain);
493 params.
encode(cbuf->get_data_ptr_address());
498 (
Llu)offset, (
int)fd);
511 params.
encode(cbuf->get_data_ptr_address());
522 (
Llu)offset, (
int)fd);
532 params.
encode(cbuf->get_data_ptr_address());
536 HT_THROW2F(e.
code(), e,
"Error removing FS file: %s", name.c_str());
548 params.
encode(cbuf->get_data_ptr_address());
560 HT_THROW2F(e.
code(), e,
"Error removing FS file: %s", name.c_str());
570 params.
encode(cbuf->get_data_ptr_address());
574 HT_THROW2F(e.
code(), e,
"sending FS shutdown (flags=%d)", (int)flags);
605 const uint8_t *ptr =
event->payload + 4;
606 size_t remain =
event->payload_len - 4;
609 params.
decode(&ptr, &remain);
620 params.
encode(cbuf->get_data_ptr_address());
624 HT_THROW2F(e.
code(), e,
"Error sending length request for FS file: %s",
636 params.
encode(cbuf->get_data_ptr_address());
658 const uint8_t *ptr =
event->payload + 4;
659 size_t remain =
event->payload_len - 4;
662 params.
decode(&ptr, &remain);
674 params.
encode(cbuf->get_data_ptr_address());
678 HT_THROW2F(e.
code(), e,
"Error sending pread request at byte %llu "
679 "on FS fd %d", (
Llu)offset, (
int)fd);
685 Client::pread(int32_t fd,
void *dst,
size_t len, uint64_t offset,
bool verify_checksum) {
692 params.
encode(cbuf->get_data_ptr_address());
706 memcpy(dst, data, length);
710 HT_THROW2F(e.
code(), e,
"Error preading at byte %llu on FS fd %d",
711 (
Llu)offset, (
int)fd);
716 uint64_t *offset, uint32_t *length) {
724 params.
encode(cbuf->get_data_ptr_address());
729 "directory: %s", name.c_str());
741 params.
encode(cbuf->get_data_ptr_address());
751 HT_THROW2F(e.
code(), e,
"Error mkdirs FS directory %s", name.c_str());
762 params.
encode(cbuf->get_data_ptr_address());
779 params.
encode(cbuf->get_data_ptr_address());
802 params.
encode(cbuf->get_data_ptr_address());
821 params.
encode(cbuf->get_data_ptr_address());
825 HT_THROW2F(e.
code(), e,
"Error sending rmdir request for FS directory: "
838 params.
encode(cbuf->get_data_ptr_address());
851 HT_THROW2F(e.
code(), e,
"Error removing FS directory: %s", name.c_str());
860 params.
encode(cbuf->get_data_ptr_address());
864 HT_THROW2F(e.
code(), e,
"Error sending readdir request for FS directory"
865 ": %s", name.c_str());
876 params.
encode(cbuf->get_data_ptr_address());
889 "directory: %s", name.c_str());
895 std::vector<Dirent> &listing) {
900 const uint8_t *ptr =
event->payload + 4;
901 size_t remain =
event->payload_len - 4;
904 params.
decode(&ptr, &remain);
913 params.
encode(cbuf->get_data_ptr_address());
917 HT_THROW2F(e.
code(), e,
"sending 'exists' request for FS path: %s",
929 params.
encode(cbuf->get_data_ptr_address());
941 HT_THROW2F(e.
code(), e,
"Error checking existence of FS path: %s",
952 const uint8_t *ptr =
event->payload + 4;
953 size_t remain =
event->payload_len - 4;
956 params.
decode(&ptr, &remain);
967 params.
encode(cbuf->get_data_ptr_address());
972 "path: %s -> %s", src.c_str(), dst.c_str());
984 params.
encode(cbuf->get_data_ptr_address());
995 src.c_str(), dst.c_str());
1006 serialized_parameters));
1007 params.
encode(cbuf->get_data_ptr_address());
1011 HT_THROW2F(e.
code(), e,
"Error sending debug command %d request", command);
1023 serialized_parameters));
1024 params.
encode(cbuf->get_data_ptr_address());
1034 HT_THROW2F(e.
code(), e,
"Error sending debug command %d request", command);
void decode_response_open(EventPtr &event, int32_t *fd) override
Decodes the response from an open request.
static Comm * instance()
Creates/returns singleton instance of the Comm class.
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
A memory buffer of static size.
Request parameters for append requests.
Response parameters for append requests.
Response parameters for read requests.
uint64_t get_offset()
Gets read data offset.
Request parameters for debug requests.
void sync(int32_t fd) override
Response parameters for open requests.
int64_t decode_response_length(EventPtr &event) override
Decodes the response from a length request.
Holds Nagios-style program status information.
int open_buffered(const String &name, uint32_t flags, uint32_t buf_size, uint32_t outstanding, uint64_t start_offset=0, uint64_t end_offset=0) override
Opens a file in buffered (readahead) mode.
void remove(const String &name, DispatchHandler *handler) override
Removes a file asynchronously.
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Abstract base class for a filesystem.
Declarations for Append response parameters.
std::string String
A String is simply a typedef to std::string.
Request parameters for rmdir requests.
#define HT_IO_ALIGNED(size)
std::unordered_map< uint32_t, ClientBufferedReaderHandler * > m_buffered_reader_map
Request parameters for read requests.
static String string_format_message(const Event *event)
Returns error message decoded standard error MESSAGE generated in response to a request message...
Declarations for Rename request parameters.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
void mkdirs(const String &name, DispatchHandler *handler) override
Creates a directory asynchronously.
Abstract base class for application dispatch handlers registered with AsyncComm.
virtual size_t encoded_length() const
Returns serialized object length.
uint64_t get_length()
Gets file length.
bool decode_response_exists(EventPtr &event) override
Decodes the response from an exists request.
Flags
Enumeration type for append flags.
long long unsigned int Llu
Shortcut for printf formats.
uint64_t get_offset()
Gets append offset.
Request parameters for shutdown requests.
File system broker definitions.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
void flush(int32_t fd, DispatchHandler *handler) override
uint8_t ** get_data_ptr_address()
Returns address of the primary buffer internal data pointer.
Request parameters for open requests.
void append(int32_t fd, StaticBuffer &buffer, Flags flags, DispatchHandler *handler) override
Request parameters for rename requests.
Declarations for Read response parameters.
uint32_t remaining()
Returns the remaining time till expiry.
void decode_response_pread(EventPtr &event, const void **buffer, uint64_t *offset, uint32_t *length) override
Decodes the response from a pread request.
ConnectionManagerPtr m_conn_mgr
Client(const std::string &install_dir, const std::string &config_file, uint32_t default_timeout_ms=0)
Constructs the object using the specified config file.
int32_t get_fd()
Gets file descriptor.
void status(Status &status, Timer *timer=0) override
Check status of filesystem.
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
void set(Code code, const std::string &text)
Sets status code and text.
void send_message(CommBufPtr &cbuf, DispatchHandler *handler, Timer *timer=0)
Sends a message to the FS broker.
Declarations for Remove request parameters.
Response parameters for exists requests.
void length(const String &name, bool accurate, DispatchHandler *handler) override
Gets the length of a file asynchronously.
size_t read(void *buf, size_t len)
void decode_response_status(EventPtr &event, Status &status) override
Decodes the response from an status request.
Request parameters for remove requests.
void create(const String &name, uint32_t flags, int32_t bufsz, int32_t replication, int64_t blksz, DispatchHandler *handler) override
Creates a file asynchronously.
void readdir(const String &name, DispatchHandler *handler) override
Obtains a listing of all files in a directory asynchronously.
Request parameters for flush requests.
Request parameters for sync requests.
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
void close(int32_t fd, DispatchHandler *handler) override
Declarations for Exists response parameters.
const char * get_text(int error)
Returns a descriptive error message.
Declarations for Rmdir request parameters.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Declarations for Length response parameters.
void decode_response_create(EventPtr &event, int32_t *fd) override
Decodes the response from a create request.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
void pread(int32_t fd, size_t len, uint64_t offset, bool verify_checksum, DispatchHandler *handler) override
std::shared_ptr< Properties > PropertiesPtr
Logging routines and macros.
bool get_exists()
Gets exists flag.
Response parameters for readdir requests.
Compatibility Macros for C/C++.
Declarations for Create request parameters.
Declarations for Pread request parameters.
Declarations for Open response parameters.
uint32_t get_amount()
Gets amount of data read.
void * advance_data_ptr(size_t len)
Advance the primary buffer internal data pointer by len bytes.
void decode_response_readdir(EventPtr &event, std::vector< Dirent > &listing) override
Decodes the response from a readdir request.
Functions to serialize/deserialize primitives to/from a memory buffer.
Declarations for Flush request parameters.
Response parameters for length requests.
void debug(int32_t command, StaticBuffer &serialized_parameters) override
Invokes debug request asynchronously.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
void read(int32_t fd, size_t amount, DispatchHandler *handler) override
Declarations for Length request parameters.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
void rename(const String &src, const String &dst, DispatchHandler *handler) override
Rename a path asynchronously.
void shutdown(uint16_t flags, DispatchHandler *handler)
Shuts down the FS broker.
Declarations for Exists request parameters.
DispatchHandler class used to synchronize with response messages.
Request parameters for close requests.
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Entry point to AsyncComm service.
Client(ConnectionManagerPtr &conn_manager_ptr, const sockaddr_in &addr, uint32_t timeout_ms)
Constructor with explicit values.
Declarations for CommBuf.
uint32_t get_amount()
Gets amount of data appended.
void open(const String &name, uint32_t flags, DispatchHandler *handler) override
Opens a file asynchronously.
Declarations for Protocol.
#define HT_THROWF(_code_, _fmt_,...)
Request parameters for exists requests.
Declarations for Close request parameters.
void rmdir(const String &name, DispatchHandler *handler) override
Recursively removes a directory asynchronously.
A timer class to keep timeout states across AsyncComm related calls.
Response parameters for open requests.
This is a generic exception class for Hypertable.
Declarations for Readdir request parameters.
Message buffer for holding data to be transmitted over a network.
Declarations for Sync request parameters.
void seek(int32_t fd, uint64_t offset, DispatchHandler *handler) override
long unsigned int Lu
Shortcut for printf formats.
Request parameters for seek requests.
File system broker framework and client library.
const Hypertable::Status & status() const
Gets status information.
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
Declarations for Mkdirs request parameters.
Request parameters for mkdirs requests.
void * get_data_ptr()
Returns the primary buffer internal data pointer.
Request parameters for pread requests.
Request parameters for length requests.
Declarations for Debug request parameters.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
int send_request(const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler)
Sends a request message over a connection, expecting a response.
Declarations for FsBroker request handler Factory.
Declarations for Open request parameters.
Request parameters for readdir requests.
void decode_response_append(EventPtr &event, uint64_t *offset, uint32_t *length) override
Decodes the response from an append request.
Request parameters for create requests.
Declarations for Seek request parameters.
void decode_response_read(EventPtr &event, const void **buffer, uint64_t *offset, uint32_t *length) override
Decodes the response from a read request.
void exists(const String &name, DispatchHandler *handler) override
Determines if a file exists asynchronously.
Declarations for Readdir response parameters.
void get_listing(std::vector< Filesystem::Dirent > &listing)
Gets directory listing.
Declarations for Shutdown request parameters.
#define HT_DIRECT_IO_ALIGNMENT
Declarations for Append request parameters.
int code() const
Returns the error code.
Declarations for Read request parameters.