36 #include <boost/algorithm/string.hpp>
50 #include <sys/types.h>
52 #include <sys/fcntl.h>
65 m_verbose = cfg->get_bool(
"verbose");
66 m_aggregate_writes = cfg->get_bool(
"DfsBroker.Mapr.aggregate.writes",
true);
67 m_readbuffering = cfg->get_bool(
"DfsBroker.Mapr.readbuffering",
true);
68 m_namenode_host = cfg->get_str(
"DfsBroker.Hdfs.NameNode.Host");
69 m_namenode_port = cfg->get_i16(
"DfsBroker.Hdfs.NameNode.Port");
71 m_metrics_handler = std::make_shared<MetricsHandler>(cfg,
"mapr");
72 m_metrics_handler->start_collecting();
74 m_filesystem = hdfsConnectNewInstance(m_namenode_host.c_str(), m_namenode_port);
81 m_metrics_handler->stop_collecting();
82 hdfsDisconnect(m_filesystem);
88 uint32_t flags, uint32_t bufsz) {
92 if (bufsz == (uint32_t)-1)
95 HT_DEBUGF(
"open file='%s' flags=%u bufsz=%d", fname, flags, bufsz);
99 int oflags = O_RDONLY;
101 if ((file = hdfsOpenFile(m_filesystem, fname, oflags, bufsz, 0, 0)) == 0) {
103 HT_ERRORF(
"open failed: file='%s' - %s", fname, strerror(errno));
107 HT_INFOF(
"open(%s) = %d", fname, (
int)fd);
110 struct sockaddr_in addr;
115 m_open_file_map.create(fd, addr, fdata);
124 int32_t bufsz, int16_t replication, int64_t blksz) {
127 int oflags = O_WRONLY;
131 if (replication == -1)
136 HT_DEBUGF(
"create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
137 fname, flags, bufsz, (
int)replication, (
Lld)blksz);
144 if ((file = hdfsOpenFile(m_filesystem, fname, oflags, bufsz, replication, blksz)) == 0) {
146 HT_ERRORF(
"create failed: file='%s' - %s", fname, strerror(errno));
152 HT_INFOF(
"create(%s) = %d", fname, (
int)fd);
155 struct sockaddr_in addr;
160 m_open_file_map.create(fd, addr, fdata);
170 m_open_file_map.remove(fd);
183 #if defined(__linux__)
186 readbuf = (uint8_t *)vptr;
188 readbuf =
new uint8_t [amount];
193 HT_DEBUGF(
"read fd=%d amount=%d", fd, amount);
195 if (!m_open_file_map.get(fd, fdata)) {
197 sprintf(errbuf,
"%d", fd);
200 m_metrics_handler->increment_error_count();
204 if ((offset = hdfsTell(m_filesystem, fdata->
file)) == -1) {
206 HT_ERRORF(
"lseek failed: fd=%d offset=0 SEEK_CUR - %s", fd,
211 if ((nread = hdfsRead(m_filesystem, fdata->
file, buf.
base, (tSize)amount)) == -1) {
213 HT_ERRORF(
"read failed: fd=%d offset=%llu amount=%d - %s",
214 fd, (
Llu)offset, amount, strerror(errno));
220 m_metrics_handler->add_bytes_read(nread);
223 HT_ERRORF(
"Problem sending response for read(%u, %u) - %s",
236 HT_DEBUG_OUT <<
"append fd=" << fd <<
" amount=" << amount <<
" data='"
238 <<
static_cast<int>(flags) <<
HT_END;
240 if (!m_open_file_map.get(fd, fdata)) {
242 sprintf(errbuf,
"%d", fd);
244 m_metrics_handler->increment_error_count();
248 if ((offset = hdfsTell(m_filesystem, fdata->
file)) == -1) {
250 HT_ERRORF(
"lseek failed: fd=%d offset=0 SEEK_CUR - %s", fd,
255 if ((nwritten = hdfsWrite(m_filesystem, fdata->
file, data, (tSize)amount)) == -1) {
257 HT_ERRORF(
"write failed: fd=%d offset=%llu amount=%d data=%p- %s",
258 fd, (
Llu)offset, amount, data, strerror(errno));
264 if (hdfsFlush(m_filesystem, fdata->
file)) {
266 HT_ERRORF(
"flush failed: fd=%d - %s", fd, strerror(errno));
269 m_metrics_handler->add_sync(
get_ts64() - start_time);
272 m_metrics_handler->add_bytes_written(nwritten);
275 HT_ERRORF(
"Problem sending response for append(%u, %u) - %s",
287 if (!m_open_file_map.get(fd, fdata)) {
289 sprintf(errbuf,
"%d", fd);
291 m_metrics_handler->increment_error_count();
295 if ((offset = hdfsSeek(m_filesystem, fdata->
file, (tOffset)offset)) == (uint64_t)-1) {
297 HT_ERRORF(
"lseek failed: fd=%d offset=%llu - %s", fd, (
Llu)offset,
303 HT_ERRORF(
"Problem sending response for seek(%u, %llu) - %s",
314 if (hdfsDelete(m_filesystem, fname) == -1) {
316 HT_ERRORF(
"unlink failed: file='%s' - %s", fname,
322 HT_ERRORF(
"Problem sending response for remove(%s) - %s",
329 hdfsFileInfo *fileInfo;
334 if ((fileInfo = hdfsGetPathInfo(m_filesystem, fname)) == 0) {
336 HT_ERRORF(
"length (stat) failed: file='%s' - %s", fname,
341 HT_DEBUGF(
"length('%s') = %lu", fname, (
unsigned long)fileInfo->mSize);
344 HT_ERRORF(
"Problem sending response (%llu) for length(%s) - %s",
347 hdfsFreeFileInfo(fileInfo, 1);
353 uint32_t amount,
bool) {
359 HT_DEBUGF(
"pread fd=%d offset=%llu amount=%d", fd, (
Llu)offset, amount);
361 #if defined(__linux__)
364 readbuf = (uint8_t *)vptr;
366 readbuf =
new uint8_t [amount];
371 if (!m_open_file_map.get(fd, fdata)) {
373 sprintf(errbuf,
"%d", fd);
375 m_metrics_handler->increment_error_count();
379 if ((nread = hdfsPread(m_filesystem, fdata->
file, (tOffset)offset, buf.
base, (tSize)amount)) == -1) {
381 HT_ERRORF(
"pread failed: fd=%d amount=%d offset=%llu - %s", fd,
382 amount, (
Llu)offset, strerror(errno));
388 m_metrics_handler->add_bytes_read(nread);
391 HT_ERRORF(
"Problem sending response for pread(%u, %llu, %u) - %s",
404 boost::trim_right_if(make_dir, boost::is_any_of(
"/"));
406 if (hdfsCreateDirectory(m_filesystem, make_dir.c_str()) == -1) {
408 HT_ERRORF(
"mkdirs failed: dname='%s' - %s", dname,
414 HT_ERRORF(
"Problem sending response for mkdirs(%s) - %s",
420 void free_file_info(hdfsFileInfo *fileInfo,
int numEntries) {
422 hdfsFreeFileInfo(fileInfo, numEntries);
426 void rmdir_recursive(hdfsFS fs,
const String &dname) {
427 hdfsFileInfo *fileInfo = 0;
432 if ((fileInfo = hdfsListDirectory(fs, dname.c_str(), &numEntries)) == 0) {
436 dname.c_str(), strerror(errno));
439 for (
int i=0; i<numEntries; i++) {
440 String child = fileInfo[i].mName;
441 if (fileInfo[i].mKind == kObjectKindDirectory)
442 rmdir_recursive(fs, child);
443 else if (fileInfo[i].mKind == kObjectKindFile) {
444 if (hdfsDelete(fs, child.c_str()) == -1) {
447 child.c_str(), strerror(errno));
453 if (hdfsDelete(fs, dname.c_str()) == -1) {
455 dname.c_str(), strerror(errno));
465 boost::trim_right_if(removal_dir, boost::is_any_of(
"/"));
471 rmdir_recursive(m_filesystem, removal_dir);
475 HT_ERRORF(
"Problem sending error response for rmdir(%s) - %s",
481 HT_ERRORF(
"Problem sending response for mkdirs(%s) - %s",
487 std::vector<Filesystem::Dirent> listing;
488 hdfsFileInfo *fileInfo;
493 if ((fileInfo = hdfsListDirectory(m_filesystem, dname, &numEntries)) == 0) {
495 HT_ERRORF(
"readdir('%s') failed - %s", dname, strerror(errno));
500 for (
int i=0; i<numEntries; i++) {
502 if (fileInfo[i].mName[0] !=
'.' && fileInfo[i].mName[0] != 0) {
503 if ((ptr = strrchr(fileInfo[i].mName,
'/')))
507 entry.
length = fileInfo[i].mSize;
509 entry.
is_dir = fileInfo[i].mKind == kObjectKindDirectory;
510 listing.push_back(entry);
513 hdfsFreeFileInfo(fileInfo, numEntries);
515 HT_DEBUGF(
"Sending back %d listings", (
int)listing.size());
530 if (!m_open_file_map.get(fd, fdata)) {
532 sprintf(errbuf,
"%d", fd);
534 m_metrics_handler->increment_error_count();
541 if (hdfsFlush(m_filesystem, fdata->
file) == -1) {
543 HT_ERRORF(
"sync failed: fd=%d - %s", fd, strerror(errno));
546 m_metrics_handler->add_sync(
get_ts64() - start_time);
558 m_open_file_map.remove_all();
560 this_thread::sleep_for(chrono::milliseconds(2000));
569 if (hdfsExists(m_filesystem, fname) == -1) {
583 if (hdfsRename(m_filesystem, src, dst) == -1) {
593 HT_ERRORF(
"debug command %d not implemented.", command);
604 m_metrics_handler->increment_error_count();
606 strerror_r(errno, errbuf, 128);
608 if (errno == ENOTDIR || errno == ENAMETOOLONG || errno == ENOENT)
610 else if (errno == EACCES || errno == EPERM)
612 else if (errno == EBADF)
614 else if (errno == EINVAL)
A memory buffer of static size.
Retrieves system information (hardware, installation directory, etc)
int response(bool exists)
Sends response parameters back to client.
virtual void remove(ResponseCallback *cb, const char *fname)
Remove a file or directory.
virtual void append(Response::Callback::Append *cb, uint32_t fd, uint32_t amount, const void *data, Filesystem::Flags flags)
Append data to open file.
virtual void flush(ResponseCallback *cb, uint32_t fd)
Flush data that has been written.
void get_address(struct sockaddr_in &addr)
Gets the remote address of the requesting client.
Application handler for append function.
Abstract base class for a filesystem.
std::string String
A String is simply a typedef to std::string.
Compatibility class for boost::filesystem::path.
virtual void rmdir(ResponseCallback *cb, const char *dname)
Remove a directory.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
int response(int32_t fd)
Sends response parameters back to client.
Flags
Enumeration type for append flags.
long long unsigned int Llu
Shortcut for printf formats.
Application handler for exists function.
virtual void status(Response::Callback::Status *cb)
Check status of FSBroker.
File system broker definitions.
virtual void mkdirs(ResponseCallback *cb, const char *dname)
Make a directory hierarcy, If the parent dirs are not, present, they are also created.
time_t last_modification_time
Last modification time.
#define HT_ON_SCOPE_EXIT(...)
virtual void readdir(Response::Callback::Readdir *cb, const char *dname)
Read a directory's contents.
virtual void open(Response::Callback::Open *cb, const char *fname, uint32_t flags, uint32_t bufsz)
Open a file and pass the fd to the callback on success.
String format_bytes(size_t n, const void *buf, size_t len, const char *trailer)
Return first n bytes of buffer with an optional trailer if the size of the buffer exceeds n...
virtual void create(Response::Callback::Open *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz)
Open a file, and create it if it doesn't exist, optionally overwriting the contents.
virtual void shutdown(ResponseCallback *cb)
Gracefully shutdown broker, closeing open files.
virtual void sync(ResponseCallback *cb, uint32_t fd)
Sync out data that has been written.
uint64_t length
Length of file.
bool is_dir
Flag indicating if entry id a directory.
Application handler for length function.
File system utility functions.
virtual void rename(ResponseCallback *cb, const char *src, const char *dst)
Rename a file from src to dst.
const char * get_text(int error)
Returns a descriptive error message.
virtual void length(Response::Callback::Length *cb, const char *fname, bool accurate=true)
Get length of file.
std::shared_ptr< Properties > PropertiesPtr
Compatibility Macros for C/C++.
virtual void report_error(ResponseCallback *cb)
int response(uint64_t offset, StaticBuffer &buffer)
Sends response parameters back to client.
virtual void pread(Response::Callback::Read *cb, uint32_t fd, uint64_t offset, uint32_t amount, bool verify_checksum)
Read from file at position.
virtual void close(ResponseCallback *cb, uint32_t fd)
Close open file.
virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t offset)
Seek open file.
This class is used to generate and deliver standard responses back to a client.
int response(std::vector< Filesystem::Dirent > &listing)
Sends response parameters back to client.
Application handler for readdir function.
#define HT_DEBUGF(msg,...)
long long int Lld
Shortcut for printf formats.
Application handler for open function.
Application handler for read function.
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
virtual void debug(ResponseCallback *, int32_t command, StaticBuffer &serialized_parameters)
Debug command.
#define HT_INFOF(msg,...)
String name
File or directory name.
#define HT_THROWF(_code_, _fmt_,...)
MaprBroker(PropertiesPtr &props)
Declarations for ReactorFactory.
This is a generic exception class for Hypertable.
Application handler for open function.
int response(Hypertable::Status &status)
Sends response parameters back to client.
A String class based on std::string.
long unsigned int Lu
Shortcut for printf formats.
#define HT_ERRORF(msg,...)
int response(uint64_t offset, uint32_t amount)
Sends response parameters back to client.
virtual void read(Response::Callback::Read *cb, uint32_t fd, uint32_t amount)
Read data from an open file.
static atomic< int > ms_next_fd
System information and statistics based on libsigar.
virtual void exists(Response::Callback::Exists *cb, const char *fname)
Check for the existence of a file.
int response(uint64_t length)
Sends response parameters back to client.
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
#define HT_DIRECT_IO_ALIGNMENT
int code() const
Returns the error code.
Executes user-defined functions when leaving the current scope.