43 #include <boost/algorithm/string/predicate.hpp>
54 #include <sys/types.h>
64 struct ByFragmentNumber {
66 int num_x = atoi(x.
name.c_str());
67 int num_y = atoi(y.
name.c_str());
76 if (get_bool(
"Hypertable.CommitLog.SkipErrors"))
83 const std::vector<int32_t> &fragment_filter)
86 m_fragment_filter(fragment_filter.begin(), fragment_filter.end()),
87 m_last_fragment_id(-1) {
88 if (get_bool(
"Hypertable.CommitLog.SkipErrors"))
97 LogFragmentQueue::iterator fragment_queue_iter;
104 if ((*fragment_queue_iter)->block_stream == 0) {
105 (*fragment_queue_iter)->block_stream =
107 format(
"%u", (*fragment_queue_iter)->num));
112 if (!(*fragment_queue_iter)->block_stream->next(infop, header)) {
119 HT_INFOF(
"Skipping log fragment '%s/%u' because unable to read any "
120 " valid blocks", info->
log_dir.c_str(), info->
num);
134 boost::trim_right_if(log_dir, boost::is_any_of(
"/"));
146 HT_INFOF(
"Replaying commit log fragment %s/%u", (*fragment_queue_iter)->log_dir.c_str(),
147 (*fragment_queue_iter)->num);
154 ids.push_back((uint32_t)
id);
178 HT_ERRORF(
"Inflate error in CommitLog fragment %s starting at "
179 "postion %lld (block len = %lld) - %s",
180 (*iter)->block_stream->get_fname().c_str(),
198 HT_WARNF(
"Corruption detected in CommitLog fragment %s starting at "
199 "postion %lld for %lld bytes - %s",
200 (*iter)->block_stream->get_fname().c_str(),
213 vector<Filesystem::Dirent> listing;
219 <<
" which is part of CommitLog " <<
m_log_dir
220 <<
" mark_for_deletion=" << mark_for_deletion
224 m_fs->readdir(log_dir, listing);
229 HT_INFOF(
"Skipping directory '%s' because it does not exist",
236 if (listing.size() == 0)
239 sort(listing.begin(), listing.end(), ByFragmentNumber());
241 for (
size_t i = 0; i < listing.size(); i++) {
242 if (boost::ends_with(listing[i].name,
".tmp"))
245 if (boost::ends_with(listing[i].name,
".mark")) {
246 mark = atoi(listing[i].name.c_str());
251 int32_t num = (int32_t)strtol(listing[i].name.c_str(), &endptr, 10);
255 HT_INFOF(
"Dropping log fragment %s/%d because it is filtered",
256 log_dir.c_str(), num);
266 HT_WARNF(
"Invalid file '%s' found in commit log directory '%s'",
267 listing[i].name.c_str(), log_dir.c_str());
271 fi->
num = (uint32_t)num;
274 fi->
size =
m_fs->length(log_dir +
"/" + listing[i].name);
290 string mark_filename;
292 mark_filename = log_dir +
"/" + mark +
".mark";
293 m_fs->remove(mark_filename);
296 HT_FATALF(
"Problem removing mark file '%s' - %s",
297 mark_filename.c_str(), e.what());
323 "Invalid compression type '%d'", (
int)ztype);
CompressorMap m_compressor_map
CommitLogReader(FilesystemPtr &fs, const std::string &log_dir)
DynamicBuffer m_block_buffer
#define HT_WARNF(msg,...)
static bool ms_assert_on_error
int64_t md5_hash(const char *input)
Returns a 64-bit hash checksum of a null terminated input buffer.
Holds information about an individual block.
std::string String
A String is simply a typedef to std::string.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Declarations for CommitLogReader.
int64_t m_latest_revision
bool next_raw_block(CommitLogBlockInfo *, BlockHeaderCommitLog *)
std::vector< int32_t > m_init_fragments
bool m_range_reference_required
Abstraction for reading a stream of blocks from a commit log file.
Declarations for BlockCompressionCodec.
Type
Enumeration for compression type.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
A dynamic, resizable and reference counted memory buffer.
static const int64_t TIMESTAMP_MIN
Declarations for CommitLogBlockStream.
File system utility functions.
void load_fragments(String log_dir, CommitLogFileInfo *parent)
std::string m_last_fragment_fname
static const char MAGIC_LINK[10]
void get_init_fragment_ids(std::vector< uint32_t > &ids)
const char * get_text(int error)
Returns a descriptive error message.
Limit of compression types.
int error
Error (if any) encountered while reading block
Logging routines and macros.
Compatibility Macros for C/C++.
int32_t m_last_fragment_id
CommitLogBlockStream * block_stream
static BlockCompressionCodec * create_block_codec(BlockCompressionCodec::Type, const BlockCompressionCodec::Args &args=BlockCompressionCodec::Args())
uint16_t m_compressor_type
uint8_t * block_ptr
Pointer to beginning of compressed block.
BlockCompressionCodecPtr m_compressor
#define HT_FATALF(msg,...)
long long int Lld
Shortcut for printf formats.
void clear()
Clears the buffer.
std::shared_ptr< Filesystem > FilesystemPtr
Smart pointer to Filesystem.
CommitLogFileInfo * parent
#define HT_INFOF(msg,...)
String name
File or directory name.
#define HT_THROWF(_code_, _fmt_,...)
uint8_t * base
Pointer to the allocated memory buffer.
static uint64_t header_size()
Size of header.
size_t fill() const
Returns the size of the used portion.
This is a generic exception class for Hypertable.
std::set< uint32_t > m_fragment_filter
bool next(const uint8_t **blockp, size_t *lenp, BlockHeaderCommitLog *)
uint64_t start_offset
Starting offset of block within fragment file.
LogFragmentQueue m_fragment_queue
#define HT_ERRORF(msg,...)
Declarations for CommitLog.
uint64_t end_offset
Ending offset of block within fragment file.
size_t block_len
Length of block.
uint64_t m_fragment_queue_offset
std::set< int64_t > m_linked_log_hashes
uint32_t toplevel_fragment_id(CommitLogFileInfo *finfo)
void load_compressor(uint16_t ztype)
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
int code() const
Returns the error code.
#define HT_THROW2(_code_, _ex_, _msg_)