35 #include <sys/types.h>
47 m_verbose = cfg->get_bool(
"Hypertable.Verbose");
52 argv[argc++] =
"cephBroker";
54 argv[argc++] = (cfg->get_str(
"CephBroker.MonAddr").c_str());
64 HT_INFO(
"Calling ceph_initialize");
65 ceph_initialize(argc, argv);
68 HT_INFO(
"Returning from constructor");
76 uint32_t flags, uint32_t bufsz) {
79 HT_DEBUGF(
"open file='%s' bufsz=%d", fname, bufsz);
81 make_abs_path(fname, abspath);
85 if ((ceph_fd = ceph_open(abspath.c_str(), O_RDONLY)) < 0) {
86 report_error(cb, -ceph_fd);
89 HT_INFOF(
"open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd);
92 struct sockaddr_in addr;
97 m_open_file_map.create(fd, addr, fdata);
104 int32_t bufsz, int16_t replication, int64_t blksz){
109 make_abs_path(fname, abspath);
110 HT_DEBUGF(
"create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
111 fname, flags, bufsz, (
int)replication, (
Lld)blksz);
116 oflags = O_WRONLY | O_CREAT | O_TRUNC;
118 oflags = O_WRONLY | O_CREAT | O_APPEND;
123 HT_INFOF(
"Calling mkdirs on %s", directory.c_str());
124 if((r=ceph_mkdirs(directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
125 HT_ERRORF(
"create failed on mkdirs: dname='%s' - %d", directory.c_str(), -r);
126 report_error(cb, -r);
131 if ((ceph_fd = ceph_open(abspath.c_str(), oflags, 0644)) < 0) {
132 HT_ERRORF(
"open failed: file=%s - %s", abspath.c_str(), strerror(-ceph_fd));
133 report_error(cb, ceph_fd);
137 HT_INFOF(
"create %s = %d", fname, ceph_fd);
140 struct sockaddr_in addr;
145 m_open_file_map.create(fd, addr, fdata);
156 m_open_file_map.get(fd, fdata);
157 m_open_file_map.remove(fd);
167 HT_DEBUGF(
"read fd=%d amount = %d", fd, amount);
169 if (!m_open_file_map.get(fd, fdata)) {
171 sprintf(errbuf,
"%d", fd);
177 if ((offset = ceph_lseek(fdata->
fd, 0, SEEK_CUR)) < 0) {
178 HT_ERRORF(
"lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->
fd, strerror(-offset));
179 report_error(cb, offset);
183 if ((nread = ceph_read(fdata->
fd, (
char *)buf.
base, amount)) < 0 ) {
184 HT_ERRORF(
"read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata->
fd, amount);
185 report_error(cb, -nread);
200 HT_DEBUG_OUT <<
"append fd="<< fd <<
" amount="<< amount <<
" data='"
202 <<
static_cast<uint8_t
>(flags) <<
HT_END;
204 if (!m_open_file_map.get(fd, fdata)) {
206 sprintf(errbuf,
"%d", fd);
211 if ((offset = (uint64_t)ceph_lseek(fdata->
fd, 0, SEEK_CUR)) < 0) {
212 HT_ERRORF(
"lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->
fd,
214 report_error(cb, offset);
218 if ((nwritten = ceph_write(fdata->
fd, (
const char *)data, amount)) < 0) {
219 HT_ERRORF(
"write failed: fd=%d ceph_fd=%d amount=%d - %s", fd, fdata->
fd, amount,
220 strerror(-nwritten));
221 report_error(cb, -nwritten);
227 ((r = ceph_fsync(fdata->
fd,
true)) != 0)) {
228 HT_ERRORF(
"flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->
fd, strerror(errno));
241 if (!m_open_file_map.get(fd, fdata)) {
243 sprintf(errbuf,
"%d", fd);
248 if ((r = (uint64_t)ceph_lseek(fdata->
fd, offset, SEEK_SET)) < 0) {
249 HT_ERRORF(
"lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd, fdata->
fd,
250 (
Llu)offset, strerror(-r));
251 report_error(cb, offset);
263 make_abs_path(fname, abspath);
266 if ((r = ceph_unlink(abspath.c_str())) < 0) {
267 HT_ERRORF(
"unlink failed: file='%s' - %s", abspath.c_str(), strerror(-r));
283 if ((r = ceph_lstat(fname, &statbuf)) < 0) {
285 make_abs_path(fname, abspath);
286 HT_ERRORF(
"length (stat) failed: file='%s' - %s", abspath.c_str(), strerror(-r));
287 report_error(cb,- r);
294 uint32_t amount,
bool) {
299 HT_DEBUGF(
"pread fd=%d offset=%llu amount=%d", fd, (
Llu)offset, amount);
301 if (!m_open_file_map.get(fd, fdata)) {
303 sprintf(errbuf,
"%d", fd);
308 if ((nread = ceph_read(fdata->
fd, (
char *)buf.
base, amount, offset)) < 0) {
309 HT_ERRORF(
"pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - %s", fd, fdata->
fd,
310 amount, (
Llu)offset, strerror(-nread));
311 report_error(cb, nread);
325 make_abs_path(dname, absdir);
327 if((r=ceph_mkdirs(absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
328 HT_ERRORF(
"mkdirs failed: dname='%s' - %d", absdir.c_str(), -r);
329 report_error(cb, -r);
339 make_abs_path(dname, absdir);
340 if((r = rmdir_recursive(absdir.c_str())) < 0) {
341 HT_ERRORF(
"failed to remove dir %s, got error %d", absdir.c_str(), r);
342 report_error(cb, -r);
353 if ((r = ceph_opendir(directory, &dirp) < 0))
355 while ((r = ceph_readdirplus_r(dirp, &de, &st, 0)) > 0) {
356 String new_dir = de.d_name;
357 if(!(new_dir.compare(
".")==0 || new_dir.compare(
"..")==0)) {
360 new_dir += de.d_name;
361 if (S_ISDIR(st.st_mode)) {
362 if((r=rmdir_recursive(new_dir.c_str())) < 0)
return r;
364 if((r=ceph_unlink(new_dir.c_str())) < 0)
return r;
369 if ((r = ceph_closedir(dirp)) < 0)
return r;
370 return ceph_rmdir(directory);
382 if (!m_open_file_map.get(fd, fdata)) {
384 sprintf(errbuf,
"%d", fd);
390 if ((r = ceph_fsync(fdata->
fd,
true)) != 0) {
391 HT_ERRORF(
"sync failed: fd=%d ceph_fd=%d - %s", fd, fdata->
fd, strerror(-r));
392 report_error(cb, -r);
404 m_open_file_map.remove_all();
406 this_thread::sleep_for(chrono::milliseconds(2000));
410 std::vector<Filesystem::Dirent> listing;
416 make_abs_path(dname, absdir);
419 ceph_opendir(absdir.c_str(), &dirp);
422 char *buf =
new char[buflen];
425 r = ceph_getdnames(dirp, buf, buflen);
429 buf =
new char[buflen];
439 if (entry.
name.compare(
".") && entry.
name.compare(
".."))
440 listing.push_back(entry);
441 bufpos+=entry.
name.size()+1;
447 if (r < 0) report_error(cb, -r);
458 make_abs_path(fname, abspath);
459 cb->
response(ceph_lstat(abspath.c_str(), &statbuf) == 0);
467 make_abs_path(src, src_abs);
468 make_abs_path(dst, dest_abs);
469 if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
478 HT_ERROR(
"debug commands not implemented!");
486 strerror_r(error, errbuf, 128);
A memory buffer of static size.
Retrieves system information (hardware, installation directory, etc)
int response(bool exists)
Sends response parameters back to client.
virtual void create(Response::Callback::Open *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz)
Open a file, and create it if it doesn't exist, optionally overwriting the contents.
void get_address(struct sockaddr_in &addr)
Gets the remote address of the requesting client.
Application handler for append function.
Abstract base class for a filesystem.
std::string String
A String is simply a typedef to std::string.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
int response(int32_t fd)
Sends response parameters back to client.
Flags
Enumeration type for append flags.
long long unsigned int Llu
Shortcut for printf formats.
Application handler for exists function.
File system broker definitions.
static atomic< int > ms_next_fd
virtual void length(Response::Callback::Length *cb, const char *fname, bool accurate=true)
Get length of file.
virtual void flush(ResponseCallback *cb, uint32_t fd)
Flush data that has been written.
int rmdir_recursive(const char *directory)
String format_bytes(size_t n, const void *buf, size_t len, const char *trailer)
Return first n bytes of buffer with an optional trailer if the size of the buffer exceeds n...
virtual void report_error(ResponseCallback *cb, int error)
virtual void sync(ResponseCallback *cb, uint32_t fd)
Sync out data that has been written.
Application handler for length function.
Directory container class.
File system utility functions.
virtual void pread(Response::Callback::Read *cb, uint32_t fd, uint64_t offset, uint32_t amount, bool verify_checksum)
Read from file at position.
virtual void readdir(Response::Callback::Readdir *cb, const char *dname)
Read a directory's contents.
virtual void append(Response::Callback::Append *cb, uint32_t fd, uint32_t amount, const void *data, Filesystem::Flags flags)
Append data to open file.
virtual void open(Response::Callback::Open *cb, const char *fname, uint32_t flags, uint32_t bufsz)
Open a file and pass the fd to the callback on success.
virtual void status(Response::Callback::Status *cb)
Check status of FSBroker.
std::shared_ptr< Properties > PropertiesPtr
Compatibility Macros for C/C++.
int response(uint64_t offset, StaticBuffer &buffer)
Sends response parameters back to client.
virtual void debug(ResponseCallback *, int32_t command, StaticBuffer &serialized_parameters)
Debug command.
CephBroker(PropertiesPtr &cfg)
virtual void exists(Response::Callback::Exists *cb, const char *fname)
Check for the existence of a file.
This class is used to generate and deliver standard responses back to a client.
int response(std::vector< Filesystem::Dirent > &listing)
Sends response parameters back to client.
Application handler for readdir function.
virtual void mkdirs(ResponseCallback *cb, const char *dname)
Make a directory hierarcy, If the parent dirs are not, present, they are also created.
#define HT_DEBUGF(msg,...)
long long int Lld
Shortcut for printf formats.
Application handler for open function.
virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t offset)
Seek open file.
virtual void rename(ResponseCallback *cb, const char *src, const char *dst)
Rename a file from src to dst.
Application handler for read function.
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
virtual void read(Response::Callback::Read *cb, uint32_t fd, uint32_t amount)
Read data from an open file.
#define HT_INFOF(msg,...)
String name
File or directory name.
virtual void rmdir(ResponseCallback *cb, const char *dname)
Remove a directory.
virtual void close(ResponseCallback *cb, uint32_t fd)
Close open file.
Application handler for open function.
int response(Hypertable::Status &status)
Sends response parameters back to client.
long unsigned int Lu
Shortcut for printf formats.
#define HT_ERRORF(msg,...)
int response(uint64_t offset, uint32_t amount)
Sends response parameters back to client.
virtual void shutdown(ResponseCallback *cb)
Gracefully shutdown broker, closeing open files.
int response(uint64_t length)
Sends response parameters back to client.
virtual void remove(ResponseCallback *cb, const char *fname)
Remove a file or directory.