60 : m_identifier(*identifier), m_schema(schema), m_name(ag_spec->get_name()),
61 m_cell_cache_manager {make_shared<CellCacheManager>()},
73 for (
auto cf_spec : ag_spec->
columns())
101 set<uint8_t>::iterator iter;
104 schema->get_generation() >
m_schema->get_generation()) {
121 Config::get_str(
"Hypertable.RangeServer.CellStore.DefaultBloomFilter"),
125 for (
auto cf_spec : ag_spec->
columns()) {
129 if (!cf_spec->get_deleted())
134 if (cf_spec->get_deleted())
170 HT_ERRORF(
"Revision (clock) skew detected! Key '%s' revision=%lld, latest_stored=%lld",
203 uint64_t initial_bytes_read;
208 bool bloom_filter_disabled;
210 for (
size_t i=0; i<
m_stores.size(); ++i) {
218 initial_bytes_read =
m_stores[i].cs->bytes_read();
222 if (bloom_filter_disabled ||
234 m_stores[i].bloom_filter_accesses++;
235 if (
m_stores[i].cs->may_contain(scan_ctx)) {
247 if (
m_stores[i].cs->bytes_read() > initial_bytes_read)
258 HT_THROW2F(e.
code(), e,
"Problem creating scanner on access group %s",
289 csinfo.cs->split_row_estimate_data(split_row_data);
296 csinfo.cs->populate_index_pseudo_table_scanner(scanner);
322 uint64_t memory_purged = 0;
327 for (
size_t i=0; i<
m_stores.size(); i++) {
331 memory_purged +=
m_stores[i].shadow_cache->memory_allocated();
336 memory_purged +=
m_stores[i].cs->purge_indexes();
340 return memory_purged;
380 for (
size_t i=0; i<
m_stores.size(); i++) {
388 (*tailp)->next->cs =
m_stores[i].cs.get();
389 tailp = &(*tailp)->
next;
391 m_stores[i].cs->get_index_memory_stats( &(*tailp)->index_stats );
405 (*tailp)->shadow_cache_size =
m_stores[i].shadow_cache->memory_allocated();
406 (*tailp)->shadow_cache_ecr =
m_stores[i].shadow_cache_ecr;
407 (*tailp)->shadow_cache_hits =
m_stores[i].shadow_cache_hits;
410 (*tailp)->shadow_cache_size = 0;
412 (*tailp)->shadow_cache_hits = 0;
414 (*tailp)->maintenance_flags = 0;
439 int64_t revision = boost::any_cast<int64_t>
440 (cellstore->get_trailer()->get(
"revision"));
453 int64_t total_index_entries = 0;
461 = make_shared<MergeScannerAccessGroup>(
m_table_name, scan_ctx.get());
468 if (immutable_scanner)
469 mscanner->add_scanner(immutable_scanner);
472 for (
size_t i=0; i<
m_stores.size(); i++) {
474 mscanner->add_scanner(
m_stores[i].cs->create_scanner(scan_ctx.get()));
478 while (mscanner->get(key, value))
481 *total = (double)mscanner->get_input_bytes();
482 *garbage = *total - (double)mscanner->get_output_bytes();
494 bool abort_loop =
true;
496 bool merging =
false;
499 bool cellstore_created =
false;
500 size_t merge_offset=0, merge_length=0;
532 HT_INFOF(
"Starting Merging Compaction of %s (end_merge=%s)",
535 if (merge_length ==
m_stores.size())
575 time_t now = time(0);
576 int64_t max_num_entries {};
583 scan_ctx = make_shared<ScanContext>(
m_schema);
585 cs_file =
format(
"%s/tables/%s/%s/%s/cs%d",
597 double total, garbage;
601 if (minor || merging)
602 HT_INFOF(
"Switching to major compaction to collect %.2f%% garbage",
603 (garbage/total)*100.00);
609 HT_INFOF(
"Aborting GC compaction because measured garbage of %.2f%% "
610 "is below threshold", (garbage/total)*100.00);
623 mscanner = make_shared<MergeScannerAccessGroup>(
m_table_name, scan_ctx.get(),
627 filtered_cache = make_shared<CellCache>();
630 mscanner = make_shared<MergeScannerAccessGroup>(
m_table_name, scan_ctx.get(),
640 for (
size_t i=merge_offset; i<merge_offset+merge_length; i++) {
642 mscanner->add_scanner(
m_stores[i].cs->create_scanner(scan_ctx.get()));
644 max_num_entries += (boost::any_cast<int64_t>
645 (
m_stores[i].cs->get_trailer()->get(
"total_entries")))/divisor;
649 mscanner = make_shared<MergeScannerAccessGroup>(
m_table_name, scan_ctx.get(),
653 for (
size_t i=0; i<
m_stores.size(); i++) {
655 mscanner->add_scanner(
m_stores[i].cs->create_scanner(scan_ctx.get()));
657 max_num_entries += (boost::any_cast<int64_t>
658 (
m_stores[i].cs->get_trailer()->get(
"total_entries")))/divisor;
667 cellstore->create(cs_file.c_str(), max_num_entries, cellstore_props, &
m_identifier);
670 while (mscanner->get(key, value)) {
671 cellstore->add(key, value);
673 filtered_cache->add(key, value);
679 while (scanner->get(key, value)) {
680 cellstore->add(key, value);
682 filtered_cache->add(key, value);
710 cellstore_created =
true;
715 vector<String> removed_files;
716 int64_t total_index_entries = 0;
721 vector<CellStoreInfo> new_stores;
722 new_stores.reserve(
m_stores.size() - (merge_length-1));
723 for (
size_t i=0; i<merge_offset; i++)
725 for (
size_t i=merge_offset; i<merge_offset+merge_length; i++)
726 removed_files.push_back(
m_stores[i].cs->get_filename());
727 if (cellstore->get_total_entries() > 0) {
728 new_stores.push_back(cellstore);
729 added_file = cellstore->get_filename();
731 for (
size_t i=merge_offset+merge_length; i<
m_stores.size(); i++)
745 for (
size_t i=0; i<
m_stores.size(); i++)
746 removed_files.push_back(
m_stores[i].cs->get_filename());
759 for (
size_t i=0; i<
m_stores.size(); i++)
760 removed_files.push_back(
m_stores[i].cs->get_filename());
768 if (cellstore->get_total_entries() > 0) {
773 added_file = cellstore->get_filename();
782 (cellstore->get_trailer()->get(
"revision"));
784 HT_ERROR(
"Revision (clock) skew detected! May result in data loss.");
794 if (cellstore->get_total_entries() == 0) {
795 String fname = cellstore->get_filename();
801 HT_WARN_OUT <<
"Problem removing empty CellStore '" << fname <<
"' " << e <<
HT_END;
825 m_name.c_str(), added_file.c_str());
830 if (!cellstore_created) {
831 if (!cs_file.empty()) {
841 HT_FATALF(
"Problem compacting access group %s: %s - %s",
857 for (
size_t i=0; i<
m_stores.size(); i++) {
860 str +=
m_stores[i].cs->get_filename();
884 while (old_scanner->get(key, value)) {
887 old_scanner->forward();
941 while (old_scanner->get(key_comps, value)) {
943 cmp = strcmp(key_comps.
row, split_row.c_str());
945 if ((cmp > 0 && !drop_high) || (cmp <= 0 && drop_high)) {
958 add(key_comps, value);
960 old_scanner->forward();
964 bool cellstores_shrunk =
false;
969 for (
size_t i=0; i<
m_stores.size(); i++)
971 cellstores_shrunk =
true;
975 if (!cellstores_shrunk) {
976 vector<CellStoreInfo> new_stores;
977 for (
size_t i=0; i<
m_stores.size(); i++) {
981 new_stores.push_back( new_cell_store );
1064 memset(hash_str,
'0', 16);
1082 vector<Filesystem::Dirent> listing;
1084 for (
size_t i=0; i<listing.size(); i++) {
1085 const char *fname = listing[i].name.c_str();
1086 if (!strncmp(fname,
"cs", 2)) {
1087 id = atoi(&fname[2]);
1088 if (
id >= m_next_cs_id)
1089 m_next_cs_id =
id+1;
1109 if (total_index_entriesp)
1110 *total_index_entriesp = 0;
1111 for (
size_t i=0; i<
m_stores.size(); i++) {
1113 if (total_index_entriesp)
1114 *total_index_entriesp += (int64_t)
m_stores[i].cs->block_count();
1130 int64_t running_total = 0;
1139 bool run_found =
false;
1151 if (running_total >= target) {
1152 count = (i - index) + 1;
1153 if (count >= (
size_t)2) {
1182 count = (i - index) + 1;
1209 struct LtCellStoreInfoTimestamp {
1218 LtCellStoreInfoTimestamp order;
1233 for (KeySet::iterator iter = keys.begin();
1234 iter != keys.end(); ++iter) {
1235 if ((cf_spec =
m_schema->get_column_family((*iter).column_family_code,
true)))
1236 family = cf_spec->
get_name().c_str();
1239 out << (*iter).row <<
" " << family;
1240 if (*(*iter).column_qualifier)
1241 out <<
":" << (*iter).column_qualifier;
1242 out <<
" 0x" << hex << (int)(*iter).flag << dec
1243 <<
" ts=" << (*iter).timestamp
1244 <<
" rev=" << (*iter).revision <<
"\n";
1257 os <<
"mem_used=" << mdata.
mem_used <<
"\n";
1259 os <<
"cell_count=" << mdata.
cell_count <<
"\n";
1260 os <<
"disk_used=" << mdata.
disk_used <<
"\n";
1263 os <<
"key_bytes=" << mdata.
key_bytes <<
"\n";
1264 os <<
"value_bytes=" << mdata.
value_bytes <<
"\n";
1265 os <<
"file_count=" << mdata.
file_count <<
"\n";
1266 os <<
"deletes=" << mdata.
deletes <<
"\n";
1276 os <<
"in_memory=" << (mdata.
in_memory ?
"true" :
"false") <<
"\n";
1277 os <<
"gc_needed=" << (mdata.
gc_needed ?
"true" :
"false") <<
"\n";
1278 os <<
"needs_merging=" << (mdata.
needs_merging ?
"true" :
"false") <<
"\n";
static bool enable_shadow_cache
MaintenanceData * get_maintenance_data(ByteArena &arena, time_t now, int flags)
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
void update_cellstore_info(std::vector< CellStoreInfo > &stores, time_t t=0, bool collection_performed=true)
Updates stored data statistics from current set of CellStores.
std::set< uint8_t > m_column_families
float m_compression_ratio
void add_references(const std::vector< String > &filev)
Adds a set of files to the referenced file set.
Declarations for AccessGroup.
int64_t earliest_cached_revision
std::vector< String > & get_file_vector()
Cell list scanner over a buffer of cells.
Declarations for CellStoreFactory.
static int32_t merge_cellstore_run_length_threshold
The FailureInducer simulates errors.
Declarations for MergeScannerAccessGroup.
PropertiesPtr properties
This singleton map stores all options.
Declarations for CellStoreV7.
void add_file(const String &filename)
std::string String
A String is simply a typedef to std::string.
bool purge_cellstore(int flags)
Tests the PURGE_CELLSTORE bit of flags
void measure_garbage(double *total, double *garbage)
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
void update_live(const String &add, std::vector< String > &deletes, uint32_t nextcsid, int64_t total_blocks)
Updates the live file set.
std::mutex m_outstanding_scanner_mutex
int32_t m_outstanding_scanner_count
uint64_t shadow_cache_memory
int64_t latest_stored_revision
void install_release_callback(CellStoreReleaseCallback &cb)
void update_schema(SchemaPtr &schema, AccessGroupSpec *ag_spec)
Currently supports only adding and deleting column families from AccessGroup.
Po::typed_value< String > * str(String *v=0)
Column family specification.
static void parse_bloom_filter(const std::string &spec, PropertiesPtr &props)
Parsers a bloom filter specification and sets properties.
CellStoreMaintenanceData * csdata
bool purge_shadow_cache(int flags)
Tests the PURGE_SHADOW_CACHE bit of flags
Maps object pointers to bit fields.
int16_t maintenance_flags
bool recompute_merge_run(int flags)
Tests the RECOMPUTE_MERGE_RUN bit of flags
void load_hints(Hints *hints)
int32_t outstanding_scanners
static FailureInducer * instance
This is a singleton class.
Scan context information.
std::mutex m_schema_mutex
bool include_in_scan(ScanContext *scan_ctx)
void sort_cellstores_by_timestamp()
int16_t get_option_replication() const
Gets replication option.
int64_t m_latest_stored_revision_hint
static bool enabled()
Returns true if the FailureInducer is enabled (= if an instance was allocated)
int64_t latest_stored_revision
void split_row_estimate_data_stored(CellList::SplitRowDataMapT &split_row_data)
CharT * alloc(size_t sz)
Allocate sz bytes.
void md5_trunc_modified_base64(const char *input, char output[17])
Get the modified base64 encoded string of the first 12 Bytes of the 16 Byte MD5 code of a null termin...
void run_compaction(int maintenance_flags, Hints *hints)
A class managing one or more serializable ByteStrings.
CellStoreMaintenanceData * next
int flags(const void *key)
Returns bit field for a give pointer.
void get_merge_info(bool &needs_merging, bool &end_merge)
Gets merging compaction information.
std::set< Key, key_revision_lt > KeySet
int64_t m_earliest_cached_revision
int64_t m_latest_stored_revision
void merge_caches()
Assumes mutex is locked.
std::map< const char *, int64_t, LtCstr, SplitRowDataAlloc > SplitRowDataMapT
A dynamic, resizable memory buffer.
std::condition_variable m_outstanding_scanner_cond
void maybe_fail(const String &label)
Tests and executes the induced failures.
static std::string toplevel_dir
bool get_option_in_memory() const
Gets in memory option.
const char * get_text(int error)
Returns a descriptive error message.
void split_row_estimate_data_cached(CellList::SplitRowDataMapT &split_row_data)
const std::string & get_name() const
Gets access group name.
std::shared_ptr< MergeScannerAccessGroup > MergeScannerAccessGroupPtr
Shared pointer to MergeScannerAccessGroup.
void remove_references(const std::vector< String > &filev)
Decrements the reference count of each file in the given vector.
std::shared_ptr< Properties > PropertiesPtr
bool check_needed(time_t now)
Signals if garbage collection is likely needed.
void shrink(String &split_row, bool drop_high, Hints *hints)
Compatibility Macros for C/C++.
void populate_cellstore_index_pseudo_table_scanner(CellListScannerBuffer *scanner)
Populates scanner with data for .cellstore.index pseudo table.
std::ostream & operator<<(std::ostream &os, const crontab_entry &entry)
Helper function to write crontab_entry to an ostream.
void update_schema(AccessGroupSpec *ag_spec)
Updates control variables from access group schema definition.
uint32_t bloom_filter_accesses
void dump_garbage_tracker_statistics(std::ofstream &out)
Prints human-readable representation of garbage tracker state to an output stream.
static Hypertable::FilesystemPtr dfs
void space_usage(int64_t *memp, int64_t *diskp)
uint32_t bloom_filter_fps
pair< int64_t, int64_t > time_interval
static int64_t cellstore_target_size_min
void add(const Key &key, const ByteString value)
Adds a key/value pair.
void purge_stored_cells_from_cache()
Access group specification.
void add_scanner(CellListScannerPtr scanner)
int32_t get_option_blocksize() const
Gets blocksize option.
bool move_compaction(int flags)
Tests the COMPACT_MOVE bit of flags
std::shared_ptr< CellStore > CellStorePtr
Smart pointer to CellStore.
#define HT_FATALF(msg,...)
const std::string & get_name() const
Gets column family name.
long long int Lld
Shortcut for printf formats.
static bool ignore_clock_skew_errors
CellCacheManagerPtr m_cell_cache_manager
static const int64_t TIMESTAMP_MAX
LiveFileTracker m_file_tracker
void update_files_column()
Updates the 'Files' METADATA column if it needs updating.
AccessGroupGarbageTracker m_garbage_tracker
const char * get_full_name()
ColumnFamilySpecs & columns()
Returns reference to column specifications.
Represents the trailer for CellStore version 7.
#define HT_INFOF(msg,...)
bool m_cellcache_needs_compaction
Provides access to internal components of opaque key.
uint64_t bloom_filter_memory
uint32_t bloom_filter_maybes
std::vector< CellStoreInfo > m_stores
static int64_t cellstore_target_size_max
void get_file_list(String &file_list)
Populates string with live files separated by ';'.
bool split(int flags)
Tests the SPLIT bit of flags
PropertiesPtr m_cellstore_props
This is a generic exception class for Hypertable.
void unstage_compaction()
const std::string & get_option_bloom_filter() const
Gets bloom filter option.
void recompute_compression_ratio(int64_t *total_index_entriesp=0)
static const char * END_ROOT_ROW
static LoadStatisticsPtr load_statistics
void add_disk_read(int64_t amount)
TableIdentifierManaged m_identifier
void range_dir_initialize()
void output_state(std::ofstream &out, const std::string &label)
Prints a human-readable representation of internal state to an output stream.
#define HT_ERRORF(msg,...)
void change_range(const String &start_row, const String &end_row)
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
uint8_t column_family_code
std::shared_ptr< CellListScanner > CellListScannerPtr
static bool ignore_cells_with_clock_skew
AccessGroup(const TableIdentifier *identifier, SchemaPtr &schema, AccessGroupSpec *ag_spec, const RangeSpec *range, const Hints *hints=nullptr)
static TimeWindow low_activity_time
bool find_merge_run(size_t *indexp=0, size_t *lenp=0)
bool relinquish(int flags)
Tests the RELINQUISH bit of flags
MergeScannerAccessGroup * create_scanner(ScanContext *scan_ctx)
Declarations for MaintenanceFlag This file contains declarations that are part of the MaintenanceFlag...
bool major_compaction(int flags)
Tests the COMPACT_MAJOR bit of flags
static CellStorePtr open(const String &name, const char *start_row, const char *end_row)
Creates a CellStore object from a given cell store file.
void dump_keys(std::ofstream &out)
void release_files(const std::vector< String > &files)
uint64_t block_index_memory
Error codes, Exception handling, error logging.
const std::string get_option_compressor() const
Gets compressor option.
int64_t m_earliest_cached_revision_saved
void add_live_noupdate(const String &fname, int64_t total_blocks)
Adds a file to the live file set without seting the 'need_update' bit.
uint64_t purge_memory(MaintenanceFlag::Map &subtask_map)
void load_cellstore(CellStorePtr &cellstore)
void adjust_targets(time_t now, double total, double garbage)
Adjusts targets based on measured garbage.
bool gc_compaction(int flags)
Tests the COMPACT_GC bit of flags
bool collection_needed(double total, double garbage)
Determines if garbage collection is actually needed.
bool merging_compaction(int flags)
Tests the COMPACT_MERGING bit of flags
std::shared_ptr< CellCache > CellCachePtr
Shared smart pointer to CellCache.
int code() const
Returns the error code.
std::shared_ptr< ScanContext > ScanContextPtr
Merge scanner for access groups.