55 {
'C',
'O',
'M',
'M',
'I',
'T',
'D',
'A',
'T',
'A' };
57 {
'C',
'O',
'M',
'M',
'I',
'T',
'L',
'I',
'N',
'K' };
80 m_replication = props->get_i32(
"Hypertable.Metadata.Replication");
82 m_replication = props->get_i32(
"Hypertable.RangeServer.Data.DefaultReplication");
86 HT_TRY(
"getting commit log properites",
88 compressor = cfg.get_str(
"Compressor"));
92 boost::trim_right_if(
m_log_dir, boost::is_any_of(
"/"));
107 std::vector<Filesystem::Dirent> listing;
109 for (
size_t i=0; i<listing.size(); i++) {
110 num = atoi(listing[i].name.c_str());
131 HT_ERRORF(
"Problem initializing commit log '%s' - %s (%s)",
144 lock_guard<mutex> lock(
m_mutex);
153 HT_ERRORF(
"Problem flushing commit log: %s: %s",
162 lock_guard<mutex> lock(
m_mutex);
171 HT_ERRORF(
"Problem syncing commit log: %s: %s",
187 lock_guard<mutex> lock(
m_mutex);
202 lock_guard<mutex> lock(
m_mutex);
212 lock_guard<mutex> lock(
m_mutex);
221 HT_WARNF(
"Skipping log %s because it is already linked in", log_dir.c_str());
230 HT_INFOF(
"clgc Linking log %s into fragment %d; link_rev=%lld latest_rev=%lld",
247 input.
add(log_dir.c_str(), log_dir.length() + 1);
250 size_t amount = input.
fill();
267 if (fi->parent == 0) {
268 fi->parent = file_info;
280 HT_ERRORF(
"Problem linking external log into commit log - %s", e.what());
291 lock_guard<mutex> lock(
m_mutex);
300 HT_ERRORF(
"Problem closing commit log file '%s' - %s (%s)",
311 StringSet &removed_logs,
string *trace) {
312 lock_guard<mutex> lock(
m_mutex);
320 *trace += fi->to_str(remove_ok_logs) +
"\n";
325 std::set<CommitLogFileInfo *>::iterator rm_iter, iter =
m_reap_set.begin();
327 if ((*iter)->references == 0 &&
352 string msg =
format(
"purge(%s,rev=%llu) breaking on %s",
354 fi->
to_str(remove_ok_logs).c_str());
357 *trace += msg +
"\n";
374 HT_INFOF(
"Removing linked log directory '%s' because all fragments have been removed", logdir.c_str());
375 removed_logs.insert(logdir);
379 HT_ERRORF(
"Problem removing log directory '%s' (%s - %s)",
387 HT_INFOF(
"Removing log fragment '%s' revision=%lld, parent=%lld",
394 HT_ERRORF(
"Problem removing log fragment '%s' (%s - %s)",
425 HT_ERRORF(
"Problem closing commit log fragment: %s: %s",
465 HT_ERRORF(
"Problem rolling commit log: %s: %s",
479 lock_guard<mutex> lock(
m_mutex);
491 size_t amount = zblock.
fill();
494 m_fs->append(
m_fd, send_buf, flags);
495 assert(revision != 0);
501 HT_ERRORF(
"Problem writing commit log: %s: %s",
511 lock_guard<mutex> lock(
m_mutex);
512 int64_t cumulative_total = 0;
513 uint32_t distance = 0;
519 memset(&frag_data, 0,
sizeof(frag_data));
527 for (std::deque<CommitLogFileInfo *>::reverse_iterator iter
529 frag_data.
size = (*iter)->size;
530 frag_data.
fragno = (*iter)->num;
531 cumulative_size_map[(*iter)->revision] = frag_data;
534 for (CumulativeSizeMap::reverse_iterator riter = cumulative_size_map.rbegin();
535 riter != cumulative_size_map.rend(); riter++) {
536 (*riter).second.
distance = distance++;
537 cumulative_total += (*riter).second.size;
538 (*riter).second.cumulative_size = cumulative_total;
545 lock_guard<mutex> lock(
m_mutex);
552 result += prefix +
String(
"-log-fragment[") + frag->num +
"]\tsize\t" + frag->size +
"\n";
553 result += prefix +
String(
"-log-fragment[") + frag->num +
"]\trevision\t" + frag->revision +
"\n";
554 result += prefix +
String(
"-log-fragment[") + frag->num +
"]\tdir\t" + frag->log_dir +
"\n";
std::set< String > StringSet
STL Set managing Strings.
void stitch_in(CommitLogBase *other)
This method assumes that the other commit log is not being concurrently used which is why it doesn't ...
A memory buffer of static size.
#define HT_WARNF(msg,...)
static const char MAGIC_DATA[10]
int64_t get_latest_revision()
std::map< int64_t, CumulativeFragmentData > CumulativeSizeMap
int64_t md5_hash(const char *input)
Returns a 64-bit hash checksum of a null terminated input buffer.
int write(uint64_t cluster_id, DynamicBuffer &buffer, int64_t revision, Filesystem::Flags flags)
Writes a block of updates to the commit log.
void initialize(const std::string &log_dir, PropertiesPtr &, CommitLogBase *init_log, bool is_meta)
PropertiesPtr properties
This singleton map stores all options.
std::string String
A String is simply a typedef to std::string.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Declarations for CommitLogReader.
int64_t m_latest_revision
Flags
Enumeration type for append flags.
long long unsigned int Llu
Shortcut for printf formats.
std::string & get_log_dir()
static void write_header(FilesystemPtr &fs, int32_t fd)
Writes commit log file header.
bool m_range_reference_required
Declarations for BlockCompressionCodec.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
A dynamic, resizable and reference counted memory buffer.
static const int64_t TIMESTAMP_MIN
void load_cumulative_size_map(CumulativeSizeMap &cumulative_size_map)
Fills up a map of cumulative fragment size data.
void get_stats(const std::string &prefix, std::string &result)
Returns the stats on all commit log fragments.
uint32_t fletcher32(const void *data8, size_t len8)
Compute fletcher32 checksum for arbitary data.
File system utility functions.
LogFragmentQueue & fragment_queue()
A dynamic, resizable memory buffer.
int sync()
Sync previous updates written to commit log.
int link_log(uint64_t cluster_id, CommitLogBase *log_base)
Links an external log into this log.
static const char MAGIC_LINK[10]
const char * get_text(int error)
Returns a descriptive error message.
uint8_t * add(const void *data, size_t len)
Adds more data WITH boundary checks; if required the buffer is resized and existing data is preserved...
std::shared_ptr< Properties > PropertiesPtr
Logging routines and macros.
Compatibility Macros for C/C++.
int close()
Closes the log.
int64_t m_max_fragment_size
static BlockCompressionCodec * create_block_codec(BlockCompressionCodec::Type, const BlockCompressionCodec::Args &args=BlockCompressionCodec::Args())
Time related declarations.
void remove_file_info(CommitLogFileInfo *fi, StringSet &removed_logs)
int64_t m_cur_fragment_length
bool range_reference_required()
long long int Lld
Shortcut for printf formats.
Implementation of checksum routines.
std::string to_str(StringSet &remove_ok_logs)
Helper class to access parts of the properties.
std::shared_ptr< Filesystem > FilesystemPtr
Smart pointer to Filesystem.
bool remove_ok(StringSet &remove_ok_logs)
CommitLogFileInfo * parent
Declarations for Protocol.
#define HT_INFOF(msg,...)
#define HT_THROWF(_code_, _fmt_,...)
static uint64_t header_size()
Size of header.
size_t fill() const
Returns the size of the used portion.
std::string m_cur_fragment_fname
int flush()
Flushes previous updates written to commit log.
This is a generic exception class for Hypertable.
LogFragmentQueue m_fragment_queue
std::set< CommitLogFileInfo * > m_reap_set
#define HT_ERRORF(msg,...)
Declarations for CommitLog.
uint32_t m_cur_fragment_num
int64_t get_timestamp()
Atomically obtains a timestamp.
std::set< int64_t > m_linked_log_hashes
int roll(CommitLogFileInfo **clfip=0)
#define HT_TRY(_s_, _code_)
std::unique_ptr< BlockCompressionCodec > m_compressor
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
int purge(int64_t revision, StringSet &remove_ok_logs, StringSet &removed_logs, std::string *trace)
Purges the log.
int compress_and_write(DynamicBuffer &input, BlockHeader *header, int64_t revision, Filesystem::Flags flags)
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
int code() const
Returns the error code.
CommitLog(FilesystemPtr &fs, const std::string &log_dir, PropertiesPtr &props, CommitLogBase *init_log=0, bool is_meta=true)
Constructs a CommitLog object using supplied properties.