22 #ifndef Hypertable_Lib_IndexScannerCallback_h
23 #define Hypertable_Lib_IndexScannerCallback_h
39 #include <condition_variable>
52 struct QualifierFilterMatch :
53 std::unary_function<std::pair<String, String>, bool> {
54 QualifierFilterMatch(
const char *row) : row(row) { }
55 bool operator()(
const std::pair<String, String> &filter)
const {
56 if (!strncmp(filter.first.c_str(), row, filter.first.length())) {
57 if (filter.second.empty() ||
58 strstr(row+filter.first.length(), filter.second.c_str()))
71 "<AccessGroup name=\"default\">"
79 "<Counter>false</Counter>"
80 "<deleted>false</deleted>"
87 #if defined (TEST_SSB_QUEUE)
90 static const size_t SSB_QUEUE_LIMIT = 40;
95 #if defined (TEST_SSB_QUEUE)
98 static const size_t TMP_CUTOFF = 16*1024*1024;
105 std::vector<CellPredicate> &cell_predicates,
108 bool row_intervals_applied)
140 for (
auto cf : primary_table->
schema()->get_column_families()) {
141 if (!cf->get_value_index() && !cf->get_qualifier_index())
149 std::lock_guard<std::mutex> lock2(
m_mutex);
161 std::lock_guard<std::mutex> lock(
m_mutex);
189 bool is_eos = scancells->get_eos();
192 std::unique_lock<std::mutex> lock(
m_mutex);
195 if (scancells->get_eos() ==
false && scancells->empty())
223 scancells->set_eos(
false);
252 const std::string &error_msg,
bool eos) {
290 const char *unescaped_row;
291 const char *unescaped_qualifier;
292 const char *unescaped_value;
293 size_t unescaped_row_len;
294 size_t unescaped_qualifier_len;
295 size_t unescaped_value_len;
299 scancells->get(cells);
300 for (
auto &cell : cells) {
301 char *qv = (
char *)cell.row_key;
305 if ((row = strrchr(qv,
'\t')) == 0) {
306 HT_WARNF(
"Invalid index entry '%s' in index table '^%s'",
311 escaper_row.
unescape(row, strlen(row), &unescaped_row, &unescaped_row_len);
324 while (*qv !=
',' && (qv -
id <= 4))
327 HT_WARNF(
"Invalid index entry '%s' in index table '^%s'",
332 uint32_t cfid = (uint32_t)atoi(
id);
334 HT_WARNF(
"Invalid index entry '%s' in index table '^%s'",
346 uint32_t matching = 0;
349 escaper_qualifier.
unescape(qv, strlen(qv),
350 &unescaped_qualifier, &unescaped_qualifier_len);
353 std::bitset<32> bits;
355 unescaped_qualifier_len,
357 if ((matching = (uint32_t)bits.to_ulong()) == 0L)
362 unescaped_qualifier_len,
"", 0))
367 if ((qv = strchr(value,
'\t')) == 0) {
368 HT_WARNF(
"Invalid index entry '%s' in index table '^%s'",
372 size_t value_len = qv-value;
374 escaper_qualifier.
unescape(qv, strlen(qv),
375 &unescaped_qualifier, &unescaped_qualifier_len);
376 escaper_value.
unescape(value, value_len,
377 &unescaped_value, &unescaped_value_len);
379 std::bitset<32> bits;
381 unescaped_qualifier_len,
385 if ((matching = (uint32_t)bits.to_ulong()) == 0L)
390 unescaped_qualifier_len,
392 unescaped_value_len))
401 key.
row_len = unescaped_row_len;
409 m_mutator->set(key, &matching,
sizeof(matching));
413 m_tmp_keys.insert(CkeyMap::value_type(key, matching));
415 it->second |= matching;
421 if (scancells->get_eos()) {
435 for (CkeyMap::iterator it =
m_tmp_keys.begin();
437 m_mutator->set(it->first, &it->second,
sizeof(it->second));
470 for (
auto col : primary_spec.
columns)
475 const char *last_row =
"";
480 for (CkeyMap::iterator it =
m_tmp_keys.begin();
482 if (strcmp((
const char *)it->first.row, last_row)) {
485 last_row = (
const char *)it->first.row;
496 for (CkeyMap::iterator it =
m_tmp_keys.begin();
498 if (strcmp((
const char *)it->first.row, last_row)) {
501 last_row = (
const char *)it->first.row;
540 inner +=
format(tmp_schema_inner, cf->get_name().c_str());
546 nstmp->create_table(guid,
format(tmp_schema_outer, inner.c_str()));
556 if ((scancells->get_eos() && scancells->empty() &&
566 scancells->get(cells);
575 #if defined (TEST_SSB_QUEUE)
576 for (
auto &cell : cells) {
577 if (!strcmp(last, (
const char *)cell.row_key))
579 last = (
const char *)cell.row_key;
582 for (
const auto &s : primary_spec.
columns)
587 ssb->add_column_predicate(cp.column_family, cp.operation,
588 cp.value, cp.value_len);
592 ssb->add_row(cell.row_key);
614 for (
auto col : primary_spec.
columns)
628 for (
auto &cell : cells) {
630 HT_ASSERT(cell.value_len ==
sizeof(matching));
631 memcpy(&matching, cell.value,
sizeof(matching));
633 if (!strcmp(last, (
const char *)cell.row_key)) {
642 last = (
const char *)cell.row_key;
653 for (
auto &cell : cells) {
654 if (!strcmp(last, (
const char *)cell.row_key))
658 last = (
const char *)cell.row_key;
660 if (scancells->get_eos()) {
730 scancells->get(cells);
734 bool skip_row =
false;
735 for (
auto &cell : cells) {
736 bool new_row =
false;
737 if (strcmp(last, cell.row_key)) {
774 scp->add(cell,
true);
791 for (
const auto &ri : rivec) {
792 if (ri.start && ri.start[0]) {
793 if (ri.start_inclusive) {
794 if (strcmp(row, ri.start)<0)
798 if (strcmp(row, ri.start)<=0)
802 if (ri.end && ri.end[0]) {
803 if (ri.end_inclusive) {
804 if (strcmp(row, ri.end)>0)
808 if (strcmp(row, ri.end)>=0)
818 const char *column) {
819 for (
const auto &ci : civec) {
820 if (ci.start_row && ci.start_row[0]) {
821 int s=strcmp(row, ci.start_row);
827 if (ci.start_column && ci.start_column[0]) {
828 if (ci.start_inclusive) {
829 if (strcmp(column, ci.start_column)<0)
833 if (strcmp(column, ci.start_column)<=0)
837 if (ci.end_row && ci.end_row[0]) {
838 int s=strcmp(row, ci.end_row);
844 if (ci.end_column && ci.end_column[0]) {
845 if (ci.end_inclusive) {
846 if (strcmp(column, ci.end_column)>0)
850 if (strcmp(column, ci.end_column)>=0)
964 size_t len1 = strlen((
const char *)lhs.
row);
965 size_t len2 = strlen((
const char *)rhs.
row);
966 int cmp = memcmp(lhs.
row, rhs.
row, std::min(len1, len2));
977 #endif // Hypertable_Lib_IndexScannerCallback_h
std::unique_ptr< TableMutatorAsync > m_mutator
TableScannerAsync * create_scanner_async(ResultCallback *cb, const ScanSpec &scan_spec, uint32_t timeout_ms=0, int32_t flags=0)
Creates an asynchronous scanner on this table.
FlyweightString m_strings
int m_cell_limit_per_family
std::vector< Cell, CellAlloc > Cells
ScanSpec & get()
Returns the built ScanSpec object.
#define HT_WARNF(msg,...)
NamespacePtr open_namespace(const std::string &name, Namespace *base=NULL)
Opens a Namespace.
Namespace * get_namespace()
std::string get_table_name() const
Returns the name of the table as it was when the scanner was created.
Abstract base class for a filesystem.
ColumnPredicates column_predicates
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
static const size_t TMP_CUTOFF
if more than TMP_CUTOFF bytes are received from the index then store all results in a temporary table...
Asynchronous table scanner.
void increment_outstanding()
bool row_intervals_match(const RowIntervals &rivec, const char *row)
pair< int64_t, int64_t > time_interval
const char * value_regexp
const char * column_qualifier
void add_row(const string &str)
Adds a row to be returned in the scan.
std::string m_last_rowkey_tracking
bool cell_intervals_match(const CellIntervals &civec, const char *row, const char *column)
The Flyweight string set stores duplicate strings efficiently.
virtual void update_ok(TableMutatorAsync *mutator)
Callback method for successful update.
size_t column_qualifier_len
void set_value_regexp(const char *regexp)
Sets the regexp to filter cell values by.
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)
Callback method for update errors.
std::map< uint32_t, String > m_column_map
String generate_guid()
Generates a new GUID.
std::atomic< int > m_outstanding_scanners
void final_decrement(bool is_eos)
virtual void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)=0
Callback method for scan errors.
std::shared_ptr< Namespace > NamespacePtr
Shared smart pointer to Namespace.
IndexScannerCallback(TableScannerAsync *primary_scanner, Table *primary_table, const ScanSpec &primary_spec, std::vector< CellPredicate > &cell_predicates, ResultCallback *original_cb, uint32_t timeout_ms, bool qualifier_scan, bool row_intervals_applied)
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
static const char * tmp_schema_outer
vector< RowInterval, RowIntervalAlloc > RowIntervals
Scan predicate and control specification.
Represents an open table.
vector< CellInterval, CellIntervalAlloc > CellIntervals
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
int32_t cell_limit_per_family
std::string m_last_rowkey_verify
void set_row_regexp(const char *regexp)
Sets the regexp to filter rows by.
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)=0
Callback method for update errors.
void set_keys_only(bool val)
Return only keys (no values)
bool and_column_predicates
ResultCallback * m_original_cb
std::deque< ScanSpecBuilder * > m_sspecs
void set_return_deletes(bool val)
Internal use only.
TableScannerAsync * m_primary_scanner
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells)=0
Callback method for successful scan.
Helper class for building a ScanSpec.
static const char * tmp_schema_inner
Client * get_client()
Returns a pointer to the client object which created this Namespace.
std::condition_variable m_sspecs_cond
virtual ~IndexScannerCallback()
std::map< String, String > CstrMap
static String basename(String name, char separator= '/')
A posix-compliant basename() which strips directory names from a filename.
void track_predicates(ScanCellsPtr &scancells)
std::map< KeySpec, uint32_t > CkeyMap
bool m_row_intervals_applied
bool operator<(const directory_entry< _Key, _Tp > &lhs, const directory_entry< _Key, _Tp > &rhs)
std::vector< TableScannerAsync * > m_scanners
virtual void register_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
void set_time_interval(int64_t start, int64_t end)
Sets the time interval of the scan.
void add_column(const string &str)
Adds a column family to be returned by the scan.
const char * get(const char *str)
Returns a copy of the string; this string is valid till the FlyweightString set is destructed...
static const size_t SSB_QUEUE_LIMIT
virtual void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)
Callback method for scan errors.
void clear()
Clears and deallocates the set of strings.
std::mutex m_scanner_mutex
RowIntervals row_intervals
ResultCallback for secondary indices; used by TableScannerAsync.
void verify_results(std::unique_lock< std::mutex > &lock, TableScannerAsync *scanner, ScanCellsPtr &scancells)
std::vector< CellPredicate > m_cell_predicates
ScanSpecBuilder m_primary_spec
bool scan_and_filter_rows
const std::string & get_name()
CellIntervals cell_intervals
void set_max_versions(uint32_t n)
Sets the maximum number of revisions of each cell to return in the scan.
void decrement_outstanding()
Represents an open table.
bool unescape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
const char * column_family
void set_scan_and_filter_rows(bool val)
Scan and filter rows.
std::shared_ptr< Table > TablePtr
void collect_indices(TableScannerAsync *scanner, ScanCellsPtr &scancells)
void wait_for_completion()
Blocks till outstanding == 0.
std::vector< FailedMutation > FailedMutations
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &scancells)
Callback method for successful scan.