52 #include <boost/algorithm/string.hpp>
53 #include <boost/any.hpp>
66 static void init_options() {
68 "Dumps the contents of the CellStore contained in the FS <filename>."
69 "\n\nOptions").add_options()
70 (
"repair",
"Repair any corruption that is found")
76 if (!
has(
"filename")) {
84 typedef Meta::list<AppPolicy, FsClientPolicy, DefaultCommPolicy>
Policies;
88 BlockEntry() : sequence(0), offset(0), rowkey(0), matched(false), key_mismatch(false) { }
98 State() : block_index_is_bad(false), bloom_filter_is_bad(false) { }
103 vector<BlockEntry> index_block_info;
104 vector<BlockEntry> reconstructed_block_info;
108 bool block_index_is_bad;
109 bool bloom_filter_is_bad;
110 uint16_t block_header_format;
115 const char INDEX_FIXED_BLOCK_MAGIC[10] =
116 {
'I',
'd',
'x',
'F',
'i',
'x',
'-',
'-',
'-',
'-' };
117 const char INDEX_VARIABLE_BLOCK_MAGIC[10] =
118 {
'I',
'd',
'x',
'V',
'a',
'r',
'-',
'-',
'-',
'-' };
120 void load_file(
const String &fname, State &state) {
121 int64_t length =
Global::dfs->length(fname.c_str());
122 uint8_t *base =
new uint8_t[length];
123 const uint8_t *ptr = base;
124 uint8_t *end = base + length;
126 int64_t nleft = length;
127 size_t nread, toread;
131 fd =
Global::dfs->open_buffered(fname.c_str(), 0, 1024*1024, 5);
133 toread = (nleft > 1024*1024) ? 1024*1024 : nleft;
134 nread =
Global::dfs->read(fd, (uint8_t *)ptr, toread);
146 state.block_header_format = 0;
149 cout <<
"unsupported CellStore version (" << version <<
")" << endl;
150 quick_exit(EXIT_FAILURE);
153 state.trailer->deserialize(end - state.trailer->size());
157 uint16_t compression_type = boost::any_cast<uint16_t>(state.trailer->get(
"compression_type"));
163 void read_block_index(State &state) {
164 uint32_t flags = boost::any_cast<uint32_t>(state.trailer->get(
"flags"));
165 int64_t fix_index_offset = boost::any_cast<int64_t>(state.trailer->get(
"fix_index_offset"));
166 int64_t var_index_offset = boost::any_cast<int64_t>(state.trailer->get(
"var_index_offset"));
167 int64_t filter_offset = boost::any_cast<int64_t>(state.trailer->get(
"filter_offset"));
176 input_buf.base = state.base + fix_index_offset;
177 input_buf.ptr = input_buf.base + (var_index_offset - fix_index_offset);
179 state.compressor->inflate(input_buf, output_buf, header);
181 if (!header.check_magic(INDEX_FIXED_BLOCK_MAGIC)) {
182 cout <<
"corrupt fixed index" << endl;
183 quick_exit(EXIT_FAILURE);
186 int64_t index_entries = output_buf.fill() / (bits64 ? 8 : 4);
188 state.index_block_info.reserve(index_entries);
191 const uint8_t *ptr = (
const uint8_t *)output_buf.base;
192 size_t remaining = output_buf.fill();
193 for (int64_t i=0; i<index_entries; i++) {
198 be.sequence = (size_t)i;
199 state.index_block_info.push_back(be);
203 input_buf.base = state.base + var_index_offset;
204 input_buf.ptr = input_buf.base + (filter_offset - var_index_offset);
207 state.compressor->inflate(input_buf, output_buf, header);
209 if (!header.check_magic(INDEX_VARIABLE_BLOCK_MAGIC)) {
210 cout <<
"corrupt variable index" << endl;
211 quick_exit(EXIT_FAILURE);
216 ptr = output_buf.base;
218 for (
size_t i=0; i<state.index_block_info.size(); i++) {
221 state.index_block_info[i].rowkey = (
char *)key.
row();
226 void read_bloom_filter(State &state) {
227 int64_t filter_offset = boost::any_cast<int64_t>(state.trailer->get(
"filter_offset"));
228 int64_t filter_length = boost::any_cast<int64_t>(state.trailer->get(
"filter_length"));
229 int64_t filter_items_actual = boost::any_cast<int64_t>(state.trailer->get(
"filter_items_actual"));
230 uint8_t bloom_filter_hash_count = boost::any_cast<uint8_t>(state.trailer->get(
"bloom_filter_hash_count"));
231 uint8_t bloom_filter_mode = boost::any_cast<uint8_t>(state.trailer->get(
"bloom_filter_mode"));
234 state.bloom_filter = 0;
239 cout <<
"Unsupported bloom filter type (BLOOM_FILTER_ROWS_COLS)" << endl;
240 quick_exit(EXIT_FAILURE);
246 filter_length, bloom_filter_hash_count);
247 memcpy(state.bloom_filter->base(), state.base+filter_offset, state.bloom_filter->total_size());
249 state.bloom_filter->validate(state.fname);
252 state.bloom_filter_is_bad =
true;
253 state.bloom_filter = 0;
258 template <
class Operator >
259 bool process_blocks (State &state, Operator op) {
262 int64_t end_offset = boost::any_cast<int64_t>(state.trailer->get(
"fix_index_offset"));
263 uint32_t alignment = boost::any_cast<uint32_t>(state.trailer->get(
"alignment"));
264 const uint8_t *ptr = state.base;
265 const uint8_t *end = state.base + end_offset;
273 if ((end-ptr) < (ptrdiff_t)header.encoded_length())
276 offset = ptr - state.base;
279 remaining = end - ptr;
280 input_buf.base = input_buf.ptr = (uint8_t *)ptr;
281 input_buf.size = remaining;
282 header.decode((
const uint8_t **)&input_buf.ptr, &remaining);
286 if ((header.encoded_length()+header.get_data_zlength())%alignment)
287 extra = alignment - ((header.encoded_length()+header.get_data_zlength())%alignment);
291 if (input_buf.ptr + header.get_data_zlength() + extra > end)
295 input_buf.ptr += header.get_data_zlength() + extra;
296 state.compressor->inflate(input_buf, expand_buf, header);
299 if (!op(sequence, offset, expand_buf))
302 ptr += input_buf.fill();
309 void check_bloom_filter(State &state,
const char *row) {
310 if (state.bloom_filter && !state.bloom_filter_is_bad) {
311 if (!state.bloom_filter->may_contain(row, strlen(row)))
312 state.bloom_filter_is_bad =
true;
316 struct reconstruct_block_info {
317 reconstruct_block_info(State &s) : state(s) { }
318 bool operator()(
size_t sequence, int64_t offset,
DynamicBuffer &buf) {
323 const uint8_t *end = buf.
base + buf.
fill();
325 be.sequence = sequence;
328 state.key_decompressor->reset();
329 value.
ptr = state.key_decompressor->add(buf.
base);
331 state.key_decompressor->load(key);
332 check_bloom_filter(state, key.
row);
335 value.
ptr = state.key_decompressor->add(ptr);
336 state.key_decompressor->load(key);
337 check_bloom_filter(state, key.
row);
344 state.key_decompressor->load(key);
345 be.rowkey =
new char [ strlen(key.
row)+1 ];
346 strcpy(be.rowkey, key.
row);
347 state.reconstructed_block_info.push_back(be);
355 bool operator()(
const BlockEntry* b1,
const BlockEntry* b2)
const {
356 return b1->offset < b2->offset;
360 void reconcile_block_index(State &state) {
361 std::set<BlockEntry *, ltbe> bset;
362 std::set<BlockEntry *, ltbe>::iterator iter;
363 int64_t last_offset = 0;
365 for (
size_t i=0; i<state.reconstructed_block_info.size(); i++)
366 bset.insert(&state.reconstructed_block_info[i]);
368 for (
size_t i=0; i<state.index_block_info.size(); i++) {
369 if (i > 0 && state.index_block_info[i].offset <= last_offset)
370 state.block_index_is_bad =
true;
371 iter = bset.find(&state.index_block_info[i]);
372 if (iter != bset.end()) {
373 (*iter)->matched =
true;
374 state.index_block_info[i].matched =
true;
375 if (strcmp((*iter)->rowkey, state.reconstructed_block_info[i].rowkey)) {
376 (*iter)->key_mismatch = state.index_block_info[i].key_mismatch =
true;
377 state.block_index_is_bad =
true;
380 last_offset = state.index_block_info[i].offset;
383 for (
size_t i=0; i<state.reconstructed_block_info.size(); i++) {
384 if (!state.reconstructed_block_info[i].matched)
385 state.block_index_is_bad =
true;
386 else if (state.reconstructed_block_info[i].key_mismatch)
387 state.block_index_is_bad =
true;
390 for (
size_t i=0; i<state.index_block_info.size(); i++) {
391 if (!state.index_block_info[i].matched)
392 state.block_index_is_bad =
true;
396 void describe_block_index_corruption(State &state) {
397 size_t key_mismatches = 0;
398 int64_t last_offset = 0;
400 for (
size_t i=0; i<state.reconstructed_block_info.size(); i++) {
401 if (!state.reconstructed_block_info[i].matched) {
402 cout <<
"Missing block index entry (offset=" << state.reconstructed_block_info[i].offset;
403 cout <<
", row=" << state.reconstructed_block_info[i].rowkey <<
")" << endl;
405 else if (state.reconstructed_block_info[i].key_mismatch) {
410 for (
size_t i=0; i<state.index_block_info.size(); i++) {
411 if (i > 0 && state.index_block_info[i].offset <= last_offset) {
412 cout <<
"Out-of-order block index entry (offset=" << state.index_block_info[i].offset;
413 cout <<
", row=" << state.index_block_info[i].rowkey <<
")" << endl;
415 if (!state.index_block_info[i].matched) {
416 cout <<
"Bogus block index entry (offset=" << state.index_block_info[i].offset;
417 cout <<
", row=" << state.index_block_info[i].rowkey <<
")" << endl;
419 last_offset = state.index_block_info[i].offset;
427 int main(
int argc,
char **argv) {
431 init_with_policies<Policies>(argc, argv);
433 int timeout = get_i32(
"timeout");
434 state.fname = get_str(
"filename");
436 cout <<
"Checking " << state.fname <<
" ... " << flush;
442 if (!dfs->wait_for_connection(timeout)) {
443 cout <<
"timed out waiting for FS broker" << endl;
444 quick_exit(EXIT_FAILURE);
450 load_file(state.fname, state);
451 read_block_index(state);
452 read_bloom_filter(state);
453 reconstruct_block_info rbi(state);
454 process_blocks(state, rbi);
455 reconcile_block_index(state);
457 if (state.block_index_is_bad || state.bloom_filter_is_bad) {
458 if (state.block_index_is_bad) {
459 cout <<
"block index is bad" << endl;
460 describe_block_index_corruption(state);
462 if (state.bloom_filter_is_bad)
463 cout <<
"bloom filter is bad" << endl;
466 cout <<
"valid" << endl;
Retrieves system information (hardware, installation directory, etc)
Abstract base class for cell store trailer.
Interface and base of config policy.
Declarations for CellStoreFactory.
PropertiesPtr properties
This singleton map stores all options.
std::string String
A String is simply a typedef to std::string.
Helper class for printing usage banners on the command line.
A space-efficent probabilistic set for membership test, false postives are possible, but false negatives are not.
void init(int argc, char *argv[], const Desc *desc=NULL)
Initialize with default policy.
Po::typed_value< String > * str(String *v=0)
Type
Enumeration for compression type.
A dynamic, resizable and reference counted memory buffer.
Tracks range server memory used.
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
Desc & cmdline_desc(const char *usage)
A macro which definds global functions like get_bool(), get_str(), get_i16() etc. ...
bool has(const String &name)
Check existence of a configuration value.
A class managing one or more serializable ByteStrings.
Declarations for CellStore.
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
static Hypertable::MemoryTracker * memory_tracker
uint16_t decode_i16(const uint8_t **bufp, size_t *remainp)
Decode a 16-bit integer in little-endian order.
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Logging routines and macros.
BloomFilterMode
Enumeration for bloom filter modes.
Compatibility Macros for C/C++.
Initialization helper for applications.
Functions to serialize/deserialize primitives to/from a memory buffer.
static Hypertable::FilesystemPtr dfs
size_t length() const
Retrieves the length of the serialized string.
static BlockCompressionCodec * create_block_codec(BlockCompressionCodec::Type, const BlockCompressionCodec::Args &args=BlockCompressionCodec::Args())
const uint8_t * ptr
The pointer to the serialized data.
Implementation of checksum routines.
Declarations for ConnectionManager.
Represents the trailer for CellStore version 7.
int main(int argc, char **argv)
Provides access to internal components of opaque key.
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
Internet address wrapper classes and utility functions.
Meta::list< MyPolicy, DefaultPolicy > Policies
Declarations for ReactorFactory.
This is a generic exception class for Hypertable.
A serializable ByteString.
Declarations for CellStoreTrailerV7.
A Bloom Filter with Checksums.
BasicBloomFilterWithChecksum BloomFilterWithChecksum
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
Desc & cmdline_hidden_desc()
Get the command line hidden options description (for positional options)
PositionalDesc & cmdline_positional_desc()
Get the command line positional options description.
Abstract base class for block compression codecs.