29 #include <kfs/KfsClient.h>
38 #include <sys/types.h>
50 HT_INFOF(
"close(%d) file: %s", fd, fname.c_str());
55 : m_host(cfg->get_str(
"host")),
56 m_port(cfg->get_i16(
"port")),
57 m_client(
KFS::Connect(m_host, m_port)) {
72 HT_WARNF(
"Checksum verification of %s failed - %s", fname,
73 KFS::ErrorCodeToStr(status).c_str());
76 int qfs_fd =
m_client->Open(fname, O_RDONLY);
78 HT_ERRORF(
"open(%s) failure (%d) - %s", fname, -qfs_fd, KFS::ErrorCodeToStr(qfs_fd).c_str());
83 HT_INFOF(
"open(%s) -> fd=%d, qfs_fd=%d", fname, fd, qfs_fd);
84 struct sockaddr_in addr;
89 m_client->SetIoBufferSize(qfs_fd, bufsz);
106 qfs_fd =
m_client->Open(fname, O_CREAT | O_TRUNC | O_RDWR);
109 qfs_fd =
m_client->Open(fname, O_CREAT|O_WRONLY);
112 HT_ERRORF(
"create(%s) failure (%d) - %s", fname, -qfs_fd, KFS::ErrorCodeToStr(qfs_fd).c_str());
117 HT_INFOF(
"open(%s) -> fd=%d, qfs_fd=%d", fname, fd, qfs_fd);
118 struct sockaddr_in addr;
123 m_client->SetIoBufferSize(qfs_fd, bufsz);
138 if(status == (chunkOff_t)-1) {
139 HT_ERRORF(
"seek(fd=%d,qfsFd=%d,%lld) failure (%d) - %s", (
int)fd,
140 (
int)fdata->
fd, (
Lld)offset, (
int)-status,
141 KFS::ErrorCodeToStr(status).c_str());
160 int len =
m_client->Read(fdata->
fd, reinterpret_cast<char*>(buf.
base), amount);
163 KFS::ErrorCodeToStr(len));
164 HT_ERRORF(
"read(fd=%d,qfsFd=%d,%lld) failure (%d) - %s", (
int)fd,
165 (
int)fdata->
fd, (
Lld)amount, -len,
166 KFS::ErrorCodeToStr(len).c_str());
170 buf.
size = (uint32_t)len;
186 ssize_t written =
m_client->Write(fdata->
fd, reinterpret_cast<const char*>(data), amount);
189 KFS::ErrorCodeToStr(written));
190 HT_ERRORF(
"append(fd=%d,qfsFd=%d,%lld,flags=%d) failure (%d) - %s", (
int)fd,
191 (
int)fdata->
fd, (
Lld)amount, static_cast<int>(flags),
192 (
int)-written, KFS::ErrorCodeToStr(written).c_str());
201 KFS::ErrorCodeToStr(error));
202 HT_ERRORF(
"append(fd=%d,qfsFd=%d,%lld,flags=%d) failure (%d) - %s", (
int)fd,
203 (
int)fdata->
fd, (
Lld)amount, static_cast<int>(flags),
204 (
int)-written, KFS::ErrorCodeToStr(written).c_str());
219 HT_ERRORF(
"remove(%s) failure (%d) - %s", fname, -status, KFS::ErrorCodeToStr(status).c_str());
227 int err =
m_client->Stat(fname, result);
231 HT_ERRORF(
"length(%s) failure (%d) - %s", fname, -err, KFS::ErrorCodeToStr(err).c_str());
237 uint32_t amount,
bool verify_checksum) {
246 if (verify_checksum) {
249 HT_WARNF(
"Checksum verification of fd=%d (qfsFd=%d) failed - %s", fd,
250 fdata->
fd, KFS::ErrorCodeToStr(status).c_str());
255 reinterpret_cast<char*>(buf.
base), amount);
258 KFS::ErrorCodeToStr(status));
259 HT_ERRORF(
"pread(fd=%d,qfsFd=%d,%lld,%lld) failure (%d) - %s", (
int)fd,
260 (
int)fdata->
fd, (
Lld)offset, (
Lld)amount, (
int)-status,
261 KFS::ErrorCodeToStr(status).c_str());
265 buf.
size = (uint32_t)status;
274 HT_ERRORF(
"mkdirs(%s) failure (%d) - %s", dname, -status, KFS::ErrorCodeToStr(status).c_str());
283 if(status < 0 && status != -ENOENT) {
284 HT_ERRORF(
"rmdir(%s) failure (%d) - %s", dname, -status, KFS::ErrorCodeToStr(status).c_str());
309 KFS::ErrorCodeToStr(status));
310 HT_ERRORF(
"sync(fd=%d,qfsFd=%d) failure (%d) - %s", (
int)fd,
311 (
int)fdata->
fd, -status,
312 KFS::ErrorCodeToStr(status).c_str());
320 std::vector<KfsFileAttr> result;
321 std::vector<Filesystem::Dirent> listing;
322 int err =
m_client->ReaddirPlus(dname, result);
325 for (std::vector<KfsFileAttr>::iterator it = result.begin();
326 it != result.end(); ++it) {
327 if(it->filename !=
"." && it->filename !=
"..") {
329 entry.
name.append(it->filename);
330 entry.
is_dir = it->isDirectory;
331 entry.
length = (uint64_t)it->fileSize;
333 listing.push_back(entry);
340 HT_ERRORF(
"readdir(%s) failure (%d) - %s", dname, -err, KFS::ErrorCodeToStr(err).c_str());
350 int err =
m_client->Rename(src, dst);
354 HT_ERRORF(
"rename(%s,%s) failure (%d) - %s", src, dst, -err, KFS::ErrorCodeToStr(err).c_str());
375 string errors = KFS::ErrorCodeToStr(error);
402 std::clog <<
"ERROR " << errors << std::endl;
A memory buffer of static size.
Retrieves system information (hardware, installation directory, etc)
int response(bool exists)
Sends response parameters back to client.
#define HT_WARNF(msg,...)
void get_address(struct sockaddr_in &addr)
Gets the remote address of the requesting client.
virtual void open(Response::Callback::Open *cb, const char *fname, uint32_t flags, uint32_t bufsz)
Open a file and pass the fd to the callback on success.
virtual void sync(ResponseCallback *cb, uint32_t fd)
Sync out data that has been written.
void report_error(ResponseCallback *cb, int error)
Application handler for append function.
Abstract base class for a filesystem.
bool get(int fd, OpenFileDataPtr &fdata)
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
virtual void exists(Response::Callback::Exists *cb, const char *fname)
Check for the existence of a file.
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
int response(int32_t fd)
Sends response parameters back to client.
Flags
Enumeration type for append flags.
Application handler for exists function.
std::vector< String > errors
File system broker definitions.
virtual void shutdown(ResponseCallback *cb)
Gracefully shutdown broker, closeing open files.
virtual void debug(ResponseCallback *cb, int32_t command, StaticBuffer &serialized_parameters)
Debug command.
time_t last_modification_time
Last modification time.
virtual void status(Response::Callback::Status *cb)
Check status of FSBroker.
virtual void read(Response::Callback::Read *cb, uint32_t fd, uint32_t amount)
Read data from an open file.
void remove_all(struct sockaddr_in &addr)
uint64_t length
Length of file.
Status & get()
Gets status information.
bool is_dir
Flag indicating if entry id a directory.
Application handler for length function.
void set_write_status(Status::Code code, const std::string &text)
Sets write status.
MetricsHandlerPtr m_metrics_handler
Metrics collection handler.
virtual ~OpenFileDataQfs()
virtual void rename(ResponseCallback *cb, const char *src, const char *dst)
Rename a file from src to dst.
virtual void pread(Response::Callback::Read *cb, uint32_t fd, uint64_t offset, uint32_t amount, bool verify_checksum)
Read from file at position.
const char * get_text(int error)
Returns a descriptive error message.
virtual void rmdir(ResponseCallback *cb, const char *dname)
Remove a directory.
bool remove(int fd, OpenFileDataPtr &fdata)
virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t offset)
Seek open file.
void set_read_status(Status::Code code, const std::string &text)
Sets read status.
std::shared_ptr< Properties > PropertiesPtr
Compatibility Macros for C/C++.
virtual void append(Response::Callback::Append *, uint32_t fd, uint32_t amount, const void *data, Filesystem::Flags flags)
Append data to open file.
int response(uint64_t offset, StaticBuffer &buffer)
Sends response parameters back to client.
static std::atomic< int > ms_next_fd
Atomic counter for file descriptor assignment.
virtual void mkdirs(ResponseCallback *cb, const char *dname)
Make a directory hierarcy, If the parent dirs are not, present, they are also created.
OpenFileMap m_open_file_map
A map of open files.
This class is used to generate and deliver standard responses back to a client.
int response(std::vector< Filesystem::Dirent > &listing)
Sends response parameters back to client.
Application handler for readdir function.
virtual void readdir(Response::Callback::Readdir *cb, const char *dname)
Read a directory's contents.
#define HT_DEBUGF(msg,...)
long long int Lld
Shortcut for printf formats.
Application handler for open function.
Application handler for read function.
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
virtual void remove(ResponseCallback *cb, const char *fname)
Remove a file or directory.
#define HT_INFOF(msg,...)
String name
File or directory name.
Application handler for open function.
int response(Hypertable::Status &status)
Sends response parameters back to client.
virtual void length(Response::Callback::Length *cb, const char *fname, bool accurate=true)
Get length of file.
virtual void create(Response::Callback::Open *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz)
Open a file, and create it if it doesn't exist, optionally overwriting the contents.
#define HT_ERRORF(msg,...)
int response(uint64_t offset, uint32_t amount)
Sends response parameters back to client.
void create(int fd, struct sockaddr_in &addr, OpenFileDataPtr &fdata)
virtual void close(ResponseCallback *cb, uint32_t fd)
Close open file.
KFS::KfsClient *const m_client
int response(uint64_t length)
Sends response parameters back to client.
virtual void flush(ResponseCallback *cb, uint32_t fd)
Flush data that has been written.
StatusManager m_status_manager
Server status manager.
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
#define HT_DIRECT_IO_ALIGNMENT
QfsBroker(PropertiesPtr &cfg)