48 #include <sys/fcntl.h>
51 #include <sys/types.h>
63 m_verbose = cfg->get_bool(
"verbose");
64 m_no_removal = cfg->get_bool(
"FsBroker.DisableFileRemoval");
65 if (cfg->has(
"DfsBroker.Local.DirectIO"))
66 m_directio = cfg->get_bool(
"DfsBroker.Local.DirectIO");
68 m_directio = cfg->get_bool(
"FsBroker.Local.DirectIO");
70 m_metrics_handler = std::make_shared<MetricsHandler>(cfg,
"local");
71 m_metrics_handler->start_collecting();
73 #if defined(__linux__)
86 if (cfg->has(
"DfsBroker.Local.Root"))
87 root =
Path(cfg->get_str(
"DfsBroker.Local.Root"));
89 root =
Path(cfg->get_str(
"root",
""));
91 if (!root.is_complete()) {
92 Path data_dir = cfg->get_str(
"Hypertable.DataDirectory");
93 root = data_dir / root;
96 m_rootdir = root.string();
106 m_metrics_handler->stop_collecting();
112 uint32_t flags, uint32_t bufsz) {
116 HT_DEBUGF(
"open file='%s' flags=%u bufsz=%d", fname, flags, bufsz);
119 abspath = m_rootdir + fname;
121 abspath = m_rootdir +
"/" + fname;
125 int oflags = O_RDONLY;
136 if ((local_fd = ::open(abspath.c_str(), oflags)) == -1) {
138 HT_ERRORF(
"open failed: file='%s' - %s", abspath.c_str(), strerror(errno));
142 #if defined(__APPLE__)
146 #elif defined(__sun__)
148 directio(local_fd, DIRECTIO_ON);
151 HT_INFOF(
"open( %s ) = %d (local=%d)", fname, (
int)fd, local_fd);
154 struct sockaddr_in addr;
159 m_open_file_map.create(fd, addr, fdata);
168 int32_t bufsz, int16_t replication, int64_t blksz) {
170 int oflags = O_WRONLY | O_CREAT;
173 HT_DEBUGF(
"create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
174 fname, flags, bufsz, (
int)replication, (
Lld)blksz);
177 abspath = m_rootdir + fname;
179 abspath = m_rootdir +
"/" + fname;
197 if ((local_fd = ::open(abspath.c_str(), oflags, 0644)) == -1) {
199 HT_ERRORF(
"open failed: file='%s' - %s", abspath.c_str(), strerror(errno));
203 #if defined(__APPLE__)
205 fcntl(local_fd, F_NOCACHE, 1);
207 #elif defined(__sun__)
209 directio(local_fd, DIRECTIO_ON);
214 HT_INFOF(
"create( %s ) = %d (local=%d)", fname, (
int)fd, local_fd);
217 struct sockaddr_in addr;
222 m_open_file_map.create(fd, addr, fdata);
232 m_open_file_map.remove(fd);
246 HT_DEBUGF(
"read fd=%d amount=%d", fd, amount);
248 if (!m_open_file_map.get(fd, fdata)) {
250 sprintf(errbuf,
"%d", fd);
253 m_metrics_handler->increment_error_count();
257 if ((offset = (uint64_t)lseek(fdata->
fd, 0, SEEK_CUR)) == (uint64_t)-1) {
261 m_status_manager.set_read_error(error);
262 HT_ERRORF(
"lseek failed: fd=%d offset=0 SEEK_CUR - %s", fdata->
fd,
270 m_status_manager.set_read_error(error);
271 HT_ERRORF(
"read failed: fd=%d offset=%llu amount=%d - %s",
272 fdata->
fd, (
Llu)offset, amount, strerror(errno));
277 m_metrics_handler->add_bytes_read(buf.
size);
279 m_status_manager.clear_status();
282 HT_ERRORF(
"Problem sending response for read(%u, %u) - %s",
295 HT_DEBUG_OUT <<
"append fd=" << fd <<
" amount=" << amount <<
" data='"
297 <<
static_cast<uint8_t
>(flags) <<
HT_END;
299 if (!m_open_file_map.get(fd, fdata)) {
301 sprintf(errbuf,
"%d", fd);
303 m_metrics_handler->increment_error_count();
307 if ((offset = (uint64_t)lseek(fdata->
fd, 0, SEEK_CUR)) == (uint64_t)-1) {
311 m_status_manager.set_write_error(error);
312 HT_ERRORF(
"lseek failed: fd=%d offset=0 SEEK_CUR - %s", fdata->
fd,
320 m_status_manager.set_write_error(error);
321 HT_ERRORF(
"write failed: fd=%d offset=%llu amount=%d data=%p- %s",
322 fdata->
fd, (
Llu)offset, amount, data, strerror(errno));
328 if (fsync(fdata->
fd) != 0) {
331 m_status_manager.set_write_error(error);
332 HT_ERRORF(
"flush failed: fd=%d - %s", fdata->
fd, strerror(errno));
335 m_metrics_handler->add_sync(
get_ts64() - start_time);
338 m_metrics_handler->add_bytes_written(nwritten);
339 m_status_manager.clear_status();
342 HT_ERRORF(
"Problem sending response for append(%u, localfd=%u, %u) - %s",
354 if (!m_open_file_map.get(fd, fdata)) {
356 sprintf(errbuf,
"%d", fd);
358 m_metrics_handler->increment_error_count();
362 if ((offset = (uint64_t)lseek(fdata->
fd, offset, SEEK_SET)) == (uint64_t)-1) {
364 HT_ERRORF(
"lseek failed: fd=%d offset=%llu - %s", fdata->
fd, (
Llu)offset,
370 HT_ERRORF(
"Problem sending response for seek(%u, %llu) - %s",
380 HT_INFOF(
"remove file='%s'", fname);
383 abspath = m_rootdir + fname;
385 abspath = m_rootdir +
"/" + fname;
388 String deleted_file = abspath +
".deleted";
395 if (unlink(abspath.c_str()) == -1) {
397 HT_ERRORF(
"unlink failed: file='%s' - %s", abspath.c_str(),
404 HT_ERRORF(
"Problem sending response for remove(%s) - %s",
415 HT_DEBUGF(
"length file='%s' (accurate=%s)", fname,
416 accurate ?
"true" :
"false");
419 abspath = m_rootdir + fname;
421 abspath = m_rootdir +
"/" + fname;
425 HT_ERRORF(
"length (stat) failed: file='%s' - %s", abspath.c_str(),
431 HT_ERRORF(
"Problem sending response (%llu) for length(%s) - %s",
438 uint32_t amount,
bool) {
443 HT_DEBUGF(
"pread fd=%d offset=%llu amount=%d", fd, (
Llu)offset, amount);
447 if (!m_open_file_map.get(fd, fdata)) {
449 sprintf(errbuf,
"%d", fd);
451 m_metrics_handler->increment_error_count();
459 m_status_manager.set_read_error(error);
460 HT_ERRORF(
"pread failed: fd=%d amount=%d aligned_size=%d offset=%llu - %s",
466 m_metrics_handler->add_bytes_read(nread);
467 m_status_manager.clear_status();
470 HT_ERRORF(
"Problem sending response for pread(%u, %llu, %u) - %s",
482 absdir = m_rootdir + dname;
484 absdir = m_rootdir +
"/" + dname;
488 HT_ERRORF(
"mkdirs failed: dname='%s' - %s", absdir.c_str(),
494 HT_ERRORF(
"Problem sending response for mkdirs(%s) - %s",
508 absdir = m_rootdir + dname;
510 absdir = m_rootdir +
"/" + dname;
514 String deleted_file = absdir +
".deleted";
521 cmd_str = (
String)
"/bin/rm -rf " + absdir;
522 if (system(cmd_str.c_str()) != 0) {
523 HT_ERRORF(
"%s failed.", cmd_str.c_str());
524 m_metrics_handler->increment_error_count();
532 if (rmdir(absdir.c_str()) != 0) {
534 HT_ERRORF(
"rmdir failed: dname='%s' - %s", absdir.c_str(), strerror(errno));
540 HT_ERRORF(
"Problem sending response for mkdirs(%s) - %s",
546 std::vector<Filesystem::Dirent> listing;
553 absdir = m_rootdir + dname;
555 absdir = m_rootdir +
"/" + dname;
557 DIR *dirp = opendir(absdir.c_str());
560 HT_ERRORF(
"opendir('%s') failed - %s", absdir.c_str(), strerror(errno));
564 struct dirent *dp = (
struct dirent *)
new uint8_t [
sizeof(
struct dirent)+1025];
565 struct dirent *result;
567 if (readdir_r(dirp, dp, &result) != 0) {
569 HT_ERRORF(
"readdir('%s') failed - %s", absdir.c_str(), strerror(errno));
570 (void)closedir(dirp);
571 delete [] (uint8_t *)dp;
577 while (result != 0) {
579 if (result->d_name[0] !=
'.' && result->d_name[0] != 0) {
581 size_t len = strlen(result->d_name);
582 if (len <= 8 || strcmp(&result->d_name[len-8],
".deleted")) {
584 entry.
name.append(result->d_name);
585 entry.
is_dir = result->d_type == DT_DIR;
586 full_entry_path.clear();
587 full_entry_path.append(absdir);
588 full_entry_path.append(
"/");
589 full_entry_path.append(entry.
name);
590 if (stat(full_entry_path.c_str(), &statbuf) == -1) {
591 if (errno != ENOENT) {
593 HT_ERRORF(
"readdir('%s') failed - %s", absdir.c_str(), strerror(errno));
594 delete [] (uint8_t *)dp;
599 entry.
length = (uint64_t)statbuf.st_size;
601 listing.push_back(entry);
607 entry.
name.append(result->d_name);
608 entry.
is_dir = result->d_type == DT_DIR;
609 full_entry_path.clear();
610 full_entry_path.append(absdir);
611 full_entry_path.append(
"/");
612 full_entry_path.append(entry.
name);
613 if (stat(full_entry_path.c_str(), &statbuf) == -1) {
615 HT_ERRORF(
"readdir('%s') failed - %s", absdir.c_str(), strerror(errno));
616 delete [] (uint8_t *)dp;
619 entry.
length = (uint64_t)statbuf.st_size;
621 listing.push_back(entry);
626 if (readdir_r(dirp, dp, &result) != 0) {
628 HT_ERRORF(
"readdir('%s') failed - %s", absdir.c_str(), strerror(errno));
629 delete [] (uint8_t *)dp;
633 (void)closedir(dirp);
635 delete [] (uint8_t *)dp;
637 HT_DEBUGF(
"Sending back %d listings", (
int)listing.size());
651 if (!m_open_file_map.get(fd, fdata)) {
653 sprintf(errbuf,
"%d", fd);
659 if (fsync(fdata->
fd) != 0) {
662 m_status_manager.set_write_error(error);
663 HT_ERRORF(
"sync failed: fd=%d - %s", fdata->
fd, strerror(errno));
667 m_metrics_handler->add_sync(
get_ts64() - start_time);
668 m_status_manager.clear_status();
675 cb->
response(m_status_manager.get());
680 m_open_file_map.remove_all();
682 this_thread::sleep_for(chrono::milliseconds(2000));
692 abspath = m_rootdir + fname;
694 abspath = m_rootdir +
"/" + fname;
702 HT_INFOF(
"rename %s -> %s", src, dst);
705 format(
"%s%s%s", m_rootdir.c_str(), *src ==
'/' ?
"" :
"/", src);
707 format(
"%s%s%s", m_rootdir.c_str(), *dst ==
'/' ?
"" :
"/", dst);
709 if (std::rename(asrc.c_str(), adst.c_str()) != 0) {
719 HT_ERRORF(
"debug command %d not implemented.", command);
730 m_metrics_handler->increment_error_count();
732 strerror_r(errno, errbuf, 128);
734 if (errno == ENOTDIR || errno == ENAMETOOLONG || errno == ENOENT)
736 else if (errno == EACCES || errno == EPERM)
738 else if (errno == EBADF)
740 else if (errno == EINVAL)
A memory buffer of static size.
Retrieves system information (hardware, installation directory, etc)
int response(bool exists)
Sends response parameters back to client.
virtual void shutdown(ResponseCallback *cb)
Gracefully shutdown broker, closeing open files.
void get_address(struct sockaddr_in &addr)
Gets the remote address of the requesting client.
static atomic< int > ms_next_fd
Application handler for append function.
virtual void read(Response::Callback::Read *cb, uint32_t fd, uint32_t amount)
Read data from an open file.
Abstract base class for a filesystem.
static bool read(const String &fname, String &contents)
Reads a whole file into a String.
std::string String
A String is simply a typedef to std::string.
Compatibility class for boost::filesystem::path.
virtual void create(Response::Callback::Open *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz)
Open a file, and create it if it doesn't exist, optionally overwriting the contents.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
virtual void mkdirs(ResponseCallback *cb, const char *dname)
Make a directory hierarcy, If the parent dirs are not, present, they are also created.
static off_t length(const String &fname)
Returns the size of a file (-1 on error)
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
int response(int32_t fd)
Sends response parameters back to client.
Flags
Enumeration type for append flags.
virtual void remove(ResponseCallback *cb, const char *fname)
Remove a file or directory.
long long unsigned int Llu
Shortcut for printf formats.
Application handler for exists function.
static ssize_t write(const String &fname, const std::string &contents)
Writes a String buffer to a file; the file is overwritten if it already exists.
File system broker definitions.
static bool exists(const String &fname)
Checks if a file or directory exists.
virtual void readdir(Response::Callback::Readdir *cb, const char *dname)
Read a directory's contents.
static bool mkdirs(const String &dirname)
Creates a directory (with all parent directories, if required)
time_t last_modification_time
Last modification time.
virtual void pread(Response::Callback::Read *cb, uint32_t fd, uint64_t offset, uint32_t amount, bool verify_checksum)
Read from file at position.
String format_bytes(size_t n, const void *buf, size_t len, const char *trailer)
Return first n bytes of buffer with an optional trailer if the size of the buffer exceeds n...
static ssize_t pread(int fd, void *vptr, size_t n, off_t offset)
Reads positional data from a file descriptor into a buffer.
virtual void status(Response::Callback::Status *cb)
Check status of FSBroker.
virtual void rmdir(ResponseCallback *cb, const char *dname)
Remove a directory.
virtual void flush(ResponseCallback *cb, uint32_t fd)
Flush data that has been written.
uint64_t length
Length of file.
virtual void rename(ResponseCallback *cb, const char *src, const char *dst)
Rename a file from src to dst.
bool is_dir
Flag indicating if entry id a directory.
Application handler for length function.
Compatibility class for boost::filesystem::path.
virtual void debug(ResponseCallback *, int32_t command, StaticBuffer &serialized_parameters)
Debug command.
File system utility functions.
const char * get_text(int error)
Returns a descriptive error message.
std::shared_ptr< Properties > PropertiesPtr
static const OsInfo & os_info()
Retrieves updated Operating system information (see SystemInfo.h)
Compatibility Macros for C/C++.
LocalBroker(PropertiesPtr &props)
int response(uint64_t offset, StaticBuffer &buffer)
Sends response parameters back to client.
virtual void close(ResponseCallback *cb, uint32_t fd)
Close open file.
This class is used to generate and deliver standard responses back to a client.
virtual void open(Response::Callback::Open *cb, const char *fname, uint32_t flags, uint32_t bufsz)
Open a file and pass the fd to the callback on success.
int response(std::vector< Filesystem::Dirent > &listing)
Sends response parameters back to client.
Application handler for readdir function.
#define HT_DEBUGF(msg,...)
long long int Lld
Shortcut for printf formats.
Application handler for open function.
Application handler for read function.
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
#define HT_INFOF(msg,...)
String name
File or directory name.
virtual void sync(ResponseCallback *cb, uint32_t fd)
Sync out data that has been written.
Declarations for ReactorFactory.
Application handler for open function.
int response(Hypertable::Status &status)
Sends response parameters back to client.
A String class based on std::string.
virtual void report_error(ResponseCallback *cb)
long unsigned int Lu
Shortcut for printf formats.
#define HT_ERRORF(msg,...)
int response(uint64_t offset, uint32_t amount)
Sends response parameters back to client.
static bool rename(const String &oldpath, const String &newpath)
Renames a file or directory.
System information and statistics based on libsigar.
virtual void append(Response::Callback::Append *cb, uint32_t fd, uint32_t amount, const void *data, Filesystem::Flags flags)
Append data to open file.
int response(uint64_t length)
Sends response parameters back to client.
virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t offset)
Seek open file.
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
#define HT_DIRECT_IO_ALIGNMENT
virtual void length(Response::Callback::Length *cb, const char *fname, bool accurate=true)
Get length of file.
virtual void exists(Response::Callback::Exists *cb, const char *fname)
Check for the existence of a file.