46 : m_bytes_scanned(0), m_current_scanner(0), m_outstanding(0),
47 m_error(
Error::
OK), m_cancelled(false), m_use_index(false)
49 unique_lock<mutex> lock(
m_mutex);
53 bool use_qualifier =
false;
54 bool row_intervals_applied =
false;
55 std::vector<CellPredicate> cell_predicates;
61 &&
use_index(table, scan_spec, index_spec,
64 &row_intervals_applied)) {
66 first_pass_spec = &index_spec.
get();
73 cb, timeout_ms, use_qualifier, row_intervals_applied);
87 first_pass_spec = &primary_spec.
get();
93 init(comm, app_queue, table, range_locator, *first_pass_spec, timeout_ms, cb);
99 std::vector<CellPredicate> &cell_predicates,
101 bool *row_intervals_applied)
111 cell_predicates.resize(256);
118 string index_row_prefix;
121 bool qualifier_match_only =
false;
122 bool value_match =
false;
123 size_t qualifier_index_count = 0;
124 size_t value_index_count = 0;
125 size_t row_intervals_applied_count = 0;
133 if ((cf_spec = table->
schema()->get_column_family(cp.column_family)) == 0)
139 qualifier_index_count++;
145 if (qualifier_match_only)
149 qualifier_match_only =
true;
158 &prefix, &prefix_len, regex_prefix_buf))
160 const char *escaped_prefix;
161 size_t escaped_prefix_len;
162 lde.
escape(prefix, prefix_len,
163 &escaped_prefix, &escaped_prefix_len);
164 if (index_row_buf.
size < escaped_prefix_len+5)
165 index_row_buf.
grow(escaped_prefix_len+5);
166 index_row_buf.
ptr = (uint8_t*)cfid.append_to((
char*)index_row_buf.
base);
167 *index_row_buf.
ptr++ =
',';
168 index_row_buf.
add_unchecked(escaped_prefix, escaped_prefix_len);
169 *index_row_buf.
ptr = 0;
172 cell_predicates[cf_spec->
get_id()].add_column_predicate(cp,
id++);
180 lde.
escape(cp.value, cp.value_len, &value, &value_len);
182 const char *escaped_qualifier;
183 size_t escaped_qualifier_len = 0;
188 lde.
escape(cp.column_qualifier, cp.column_qualifier_len,
189 &escaped_qualifier, &escaped_qualifier_len);
197 if (index_row_buf.
size < value_len+escaped_qualifier_len+6)
198 index_row_buf.
grow(value_len+escaped_qualifier_len+6);
200 index_row_buf.
ptr = (uint8_t*)cfid.append_to((
char*)index_row_buf.
base);
201 *index_row_buf.
ptr++ =
',';
203 bool has_row_interval =
false;
206 *index_row_buf.
ptr++ =
'\t';
211 index_row_buf.
add_unchecked(escaped_qualifier, escaped_qualifier_len);
213 *index_row_buf.
ptr++ =
'\t';
214 *index_row_buf.
ptr = 0;
217 has_row_interval =
true;
218 ++row_intervals_applied_count;
219 index_row_prefix = (
const char *)index_row_buf.
base;
222 index_row_prefix + primary_ri.start, primary_ri.start_inclusive,
223 index_row_prefix + primary_ri.end, primary_ri.end_inclusive);
228 has_row_interval =
true;
229 index_row_prefix = (
const char *)index_row_buf.
base;
232 index_row_prefix + primary_ci.start_row,
true,
233 index_row_prefix + primary_ci.end_row,
true);
238 *index_row_buf.
ptr = 0;
242 &prefix, &prefix_len, regex_prefix_buf)) {
243 const char *escaped_prefix;
244 size_t escaped_prefix_len;
245 lde.
escape(prefix, prefix_len,
246 &escaped_prefix, &escaped_prefix_len);
247 index_row_buf.
add(escaped_prefix, escaped_prefix_len);
249 *index_row_buf.
ptr = 0;
252 *index_row_buf.
ptr = 0;
254 if (!has_row_interval) {
255 *index_row_buf.
ptr = 0;
259 cell_predicates[cf_spec->
get_id()].add_column_predicate(cp,
id++);
263 &prefix, &prefix_len, regex_prefix_buf))
265 const char *escaped_prefix;
266 size_t escaped_prefix_len;
267 lde.
escape(prefix, prefix_len,
268 &escaped_prefix, &escaped_prefix_len);
269 index_row_prefix.clear();
270 index_row_prefix.reserve(5+escaped_prefix_len);
271 index_row_prefix = cfid.c_str();
272 index_row_prefix.append(
",", 1);
273 index_row_prefix.append(escaped_prefix, escaped_prefix_len);
275 cell_predicates[cf_spec->
get_id()].add_column_predicate(cp,
id++);
280 const char *escaped_qualifier;
281 size_t escaped_qualifier_len;
282 lde.
escape(cp.column_qualifier, cp.column_qualifier_len,
283 &escaped_qualifier, &escaped_qualifier_len);
284 index_row_prefix.clear();
285 index_row_prefix.reserve(6+escaped_qualifier_len);
286 index_row_prefix = cfid.c_str();
287 index_row_prefix.append(
",", 1);
288 index_row_prefix.append(escaped_qualifier, escaped_qualifier_len);
290 index_row_prefix.append(
"\t", 1);
293 ++row_intervals_applied_count;
296 index_row_prefix + primary_ri.start, primary_ri.start_inclusive,
297 index_row_prefix + primary_ri.end, primary_ri.end_inclusive);
304 index_row_prefix + primary_ci.start_row,
true,
305 index_row_prefix + primary_ci.end_row,
true);
313 cell_predicates[cf_spec->
get_id()].add_column_predicate(cp,
id++);
319 if (row_intervals_applied)
320 *row_intervals_applied = row_intervals_applied_count == primary_spec.
column_predicates.size();
322 if (qualifier_match_only) {
325 *use_qualifier = qualifier_match_only;
346 predicate_columns.insert(predicate.column_family);
350 for (
auto column : predicate_columns)
358 for (
auto column : primary_spec.
get().
columns) {
360 if ((colon = strchr(column,
':')) != 0)
361 family.append(column, colon-column);
363 family.append(column);
364 if (predicate_columns.count(family.c_str()) == 0)
366 "Selected column %s must be referenced in Column predicate",
368 selected_columns.insert(family);
373 if (selected_columns.count(predicate.column_family) == 0)
374 primary_spec.
add_column(predicate.column_family);
387 const char *
str = row;
388 const char *end = str + strlen(row);
390 for (ptr = end - 1; ptr >
str; --ptr) {
391 if (::uint8_t(*ptr) < 0xffu) {
392 tmp =
String(str, ptr - str);
393 tmp.append(1, (*ptr)+1);
394 ri.
end = tmp.c_str();
401 tmp.append(4, (
char)0xff);
402 ri.
end = tmp.c_str();
416 Timer timer(timeout_ms);
417 bool current_set =
false;
426 make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
427 scan_spec, timeout_ms, !current_set,
441 make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
442 interval_scan_spec, timeout_ms,
443 !current_set,
this, scanner_id++);
455 if (ri.start != ri.end && strcmp(ri.start, ri.end) != 0) {
460 make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
461 interval_scan_spec, timeout_ms,
462 !current_set,
this, scanner_id++);
472 make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
473 rowset_scan_spec, timeout_ms,
474 !current_set,
this, scanner_id++);
486 make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
487 interval_scan_spec, timeout_ms,
488 !current_set,
this, scanner_id++);
498 if (ri_scanner && ri_scanner->has_outstanding_requests()) {
535 unique_lock<mutex> lock(
m_mutex);
549 is_create, &next, error));
558 is_create, &next, error));
572 is_create, &next, error));
606 unique_lock<mutex> lock(
m_mutex);
617 <<
" - " << error_msg <<
HT_END;
628 unique_lock<mutex> lock(
m_mutex);
635 bool do_callback =
false;
636 int current_scanner = scanner_id;
647 cells = make_shared<ScanCells>();
658 next =
m_interval_scanners[scanner_id]->handle_result(&do_callback, cells, event, is_create);
729 unique_lock<mutex> lock(
m_mutex);
755 HT_ASSERT(do_callback || !next || abort);
764 cells = make_shared<ScanCells>();
776 cells = make_shared<ScanCells>();
std::set< String > StringSet
STL Set managing Strings.
ScanSpec & get()
Returns the built ScanSpec object.
bool has_qualifier_index_table()
returns true if this table has a qualifier index
std::string get_table_name() const
Returns the name of the table as it was when the scanner was created.
std::string String
A String is simply a typedef to std::string.
void handle_error(int scanner_id, int error, const std::string &error_msg, bool is_create)
Deal with errors.
ColumnPredicates column_predicates
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
void init(Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, const ScanSpec &scan_spec, uint32_t timeout_ms, ResultCallback *cb)
void increment_outstanding()
void move_to_next_interval_scanner(int current_scanner)
Po::typed_value< String > * str(String *v=0)
pair< int64_t, int64_t > time_interval
void handle_timeout(int scanner_id, const std::string &error_msg, bool is_create)
Deal with timeouts.
bool get_value_index() const
Gets value index flag.
Column family specification.
std::condition_variable m_cond
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
std::shared_ptr< IntervalScannerAsync > IntervalScannerAsyncPtr
Smart pointer to IntervalScannerAsync.
void set_start_time(int64_t start)
bool escape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
uint8_t * ptr
Pointer to the end of the used part of the buffer.
A dynamic, resizable and reference counted memory buffer.
Represents a row interval.
virtual void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)=0
Callback method for scan errors.
Scan predicate and control specification.
Represents an open table.
void wait_for_completion()
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
std::set< const char *, LtCstr > CstrSet
STL Set managing c-style strings.
void set_end_time(int64_t end)
void set_keys_only(bool val)
Return only keys (no values)
void grow(size_t new_size, bool nocopy=false)
Grows the buffer and copies the data unless nocopy is true.
bool get_qualifier_index() const
Gets qualifier index flag.
void base_copy(ScanSpec &other) const
Initialize another ScanSpec object with this copy sans the intervals.
uint8_t * add(const void *data, size_t len)
Adds more data WITH boundary checks; if required the buffer is resized and existing data is preserved...
uint32_t size
The size of the allocated memory buffer (base)
Compatibility Macros for C/C++.
bool use_index(Table *table, const ScanSpec &primary_spec, ScanSpecBuilder &index_spec, std::vector< CellPredicate > &cell_predicates, bool *use_qualifier, bool *row_intervals_applied)
void maybe_callback_error(int scanner_id, bool next)
virtual void register_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells)=0
Callback method for successful scan.
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
Helper class for building a ScanSpec.
int32_t get_id() const
Gets column ID.
bool has_index_table()
returns true if this table has an index
void add_column(const string &str)
Adds a column family to be returned by the scan.
void add_row_interval(const string &start, bool start_inclusive, const string &end, bool end_inclusive)
Adds a row interval to be returned in the scan.
Entry point to AsyncComm service.
TablePtr get_index_table()
TableScannerAsync(Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, const ScanSpec &scan_spec, uint32_t timeout_ms, ResultCallback *cb, int flags=0)
Constructs a TableScannerAsync object.
#define HT_THROWF(_code_, _fmt_,...)
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
void cancel()
Cancels the scanner.
void handle_result(int scanner_id, EventPtr &event, bool is_create)
Deal with results of a scanner.
RowIntervals row_intervals
std::mutex m_cancel_mutex
A timer class to keep timeout states across AsyncComm related calls.
This is a generic exception class for Hypertable.
void maybe_callback_ok(int scanner_id, bool next, bool do_callback, ScanCellsPtr &cells)
A String class based on std::string.
ResultCallback for secondary indices; used by TableScannerAsync.
virtual ~TableScannerAsync()
bool scan_and_filter_rows
const std::string & get_name()
static bool extract_prefix(const char *regex, size_t regex_len, const char **output, size_t *output_len, DynamicBuffer &buf)
Extracts a fixed prefix from regular expression.
CellIntervals cell_intervals
std::vector< IntervalScannerAsyncPtr > m_interval_scanners
ProfileDataScanner m_profile_data
void decrement_outstanding()
Represents an open table.
Error codes, Exception handling, error logging.
virtual void deregister_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
void transform_primary_scan_spec(ScanSpecBuilder &primary_spec)
int code() const
Returns the error code.
void add_index_row(ScanSpecBuilder &ssb, const char *row)
TablePtr get_qualifier_index_table()
friend class IndexScannerCallback