50 : m_table(table), m_range_locator(range_locator),
51 m_loc_cache(range_locator->location_cache()),
52 m_scan_limit_state(scan_spec), m_range_server(comm, timeout_ms), m_eos(false),
53 m_fetch_outstanding(false), m_create_outstanding(false),
54 m_end_inclusive(false), m_timeout_ms(timeout_ms),
55 m_current(current), m_bytes_scanned(0),
56 m_create_handler(app_queue, scanner, id, true),
57 m_fetch_handler(app_queue, scanner, id, false),
58 m_create_timer(timeout_ms), m_fetch_timer(timeout_ms),
59 m_cur_scanner_finished(false), m_cur_scanner_id(0), m_state(0),
60 m_create_event_saved(false), m_invalid_scanner_id_ok(false) {
70 const char *start_row, *end_row;
71 bool start_row_inclusive=
true;
75 "ROW predicates and CELL predicates can't be combined");
94 cp.column_qualifier, cp.operation, cp.value, cp.value_len);
98 for (
size_t i=0; i<scan_spec.
columns.size(); i++) {
99 colon = strchr(scan_spec.
columns[i],
':');
102 if (
m_schema->get_column_family(family.c_str()) == 0)
113 start_row_inclusive = scan_spec.
row_intervals[0].start_inclusive;
119 int cmpval = strcmp(start_row, end_row);
122 if (cmpval == 0 && !scan_spec.
row_intervals[0].start_inclusive
138 rowset.insert(ri.start);
141 for (
auto r : rowset) {
159 "Bad cell interval (start_column == NULL)");
167 "Bad cell interval (end_column == NULL)");
168 int cmpval = strcmp(start_row, end_row);
209 if (!start_row_inclusive)
237 while (row_intervals.size() && strcmp(row_intervals.front().start, row_key) < 0)
238 row_intervals.erase(row_intervals.begin());
250 this_thread::sleep_for(chrono::milliseconds(1000));
269 string msg =
format(
"Problem creating scanner at %s on %s[%s..%s] - %s",
282 HT_ERRORF(
"Scanner creation request will time out. Initial timer "
283 "duration %d (last error = %s - %s)", (
int)duration,
286 "complete request within %d ms", (
int)duration));
289 this_thread::sleep_for(chrono::milliseconds(1000));
339 bool *move_to_next,
int last_error) {
340 uint32_t wait_time = 1000;
351 if (!is_create && refresh) {
352 HT_ERROR_OUT <<
"Table schema can't be refreshed when schema changes after scanner creation"
366 HT_ERRORF(
"Scanner creation request will time out. Initial timer "
367 "duration %d", (
int)duration);
385 *move_to_next = (
m_state==ABORTED) &&
436 *show_results =
false;
440 *show_results =
true;
441 cells = make_shared<ScanCells>();
464 *show_results =
false;
479 cells = make_shared<ScanCells>();
502 cells = make_shared<ScanCells>();
511 int skipped_rows = 0;
512 int skipped_cells = 0;
514 skipped_rows = cells->get_skipped_rows();
515 skipped_cells = cells->get_skipped_cells();
530 HT_ASSERT(skipped_rows == 0 && skipped_cells == 0);
591 *show_results =
false;
600 *show_results =
false;
604 *show_results =
true;
645 HT_THROW2F(e.
code(), e,
"Problem calling RangeServer::fetch_scanblock(%s, sid=%d)",
bool m_invalid_scanner_id_ok
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
void restart_scan(bool refresh=false)
void set_row_offset(int32_t n)
Sets the number of rows to be skipped at the beginning of the query.
bool retry_or_abort(bool refresh, bool hard, bool is_create, bool *move_to_next, int last_error)
void create_scanner(const CommAddress &addr, const TableIdentifier &table, const RangeSpec &range, const ScanSpec &scan_spec, DispatchHandler *handler)
Issues a "create scanner" request asynchronously.
ScanSpec & get()
Returns the built ScanSpec object.
void clear()
Clears the state.
void load_result(ScanCellsPtr &cells)
void reserve_rows(size_t s)
std::string String
A String is simply a typedef to std::string.
ColumnPredicates column_predicates
ScanLimitState m_scan_limit_state
int32_t subscanners
Number of RangeServer::create_scanner() calls.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
Asynchronous table scanner.
bool m_create_event_saved
RangeLocationInfo m_next_range_info
ProfileDataScanner m_profile_data
Accumulated profile data.
pair< int64_t, int64_t > time_interval
bool abort(bool is_create)
const char * value_regexp
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
TableIdentifierManaged m_table_identifier
void set_range_spec(DynamicBuffer &dbuf, RangeSpec &range)
void get(TableIdentifierManaged &table_identifier, SchemaPtr &schema)
Get a copy of table identifier and schema atomically.
uint32_t remaining()
Returns the remaining time till expiry.
void set_result(EventPtr &event, ScanCellsPtr &cells, bool is_create=false)
bool expired()
Returns true if the timer is expired.
void set_value_regexp(const char *regexp)
Sets the regexp to filter cell values by.
A dynamic, resizable and reference counted memory buffer.
bool m_create_outstanding
DynamicBuffer m_last_key_buf
void reset(bool start_timer=false)
Resets the timer.
bool m_cur_scanner_finished
vector< RowInterval, RowIntervalAlloc > RowIntervals
Scan predicate and control specification.
void reset_outstanding_status(bool is_create, bool reset_timer)
Represents an open table.
void set_cell_limit(int32_t n)
Sets the maximum number of cells to return.
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
std::set< const char *, LtCstr > CstrSet
STL Set managing c-style strings.
virtual ~IntervalScannerAsync()
bool set_current(bool *show_results, ScanCellsPtr &cells, bool abort)
int32_t cell_limit_per_family
void fetch_scanblock(const CommAddress &addr, int32_t scanner_id, DispatchHandler *handler)
Issues a "fetch scanblock" request asynchronously.
Lib::RangeServer::Client m_range_server
std::set< std::string > servers
Set of server proxy names participating in scan.
const char * get_text(int error)
Returns a descriptive error message.
void set_row_regexp(const char *regexp)
Sets the regexp to filter rows by.
void set_keys_only(bool val)
Return only keys (no values)
uint8_t * add(const void *data, size_t len)
Adds more data WITH boundary checks; if required the buffer is resized and existing data is preserved...
TableParts rebuild_indices
uint32_t duration()
Returns the duration of the timer.
void init(const ScanSpec &)
bool has_outstanding_requests()
Compatibility Macros for C/C++.
void set_return_deletes(bool val)
Internal use only.
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
void destroy_scanner(const CommAddress &addr, int32_t scanner_id, DispatchHandler *handler)
Issues a "destroy scanner" request asynchronously.
void set_cell_limit_per_family(int32_t n)
Sets the maximum number of cells to return per column family.
void set_do_not_cache(bool val)
Don't cache.
void set_default_timeout(int32_t timeout_ms)
Sets the default client connection timeout.
String to_str() const
Returns string representation of address.
RangeLocationInfo m_range_info
const uint8_t * ptr
The pointer to the serialized data.
bool is_destroyed_scanner(bool is_create)
void set_time_interval(int64_t start, int64_t end)
Sets the time interval of the scan.
void add_column(const string &str)
Adds a column family to be returned by the scan.
void add_row_interval(const string &start, bool start_inclusive, const string &end, bool end_inclusive)
Adds a row interval to be returned in the scan.
Entry point to AsyncComm service.
void set_rebuild_indices(TableParts parts)
Rebuild indices.
void clear()
Clears the buffer.
void start()
Starts the timer.
IntervalScannerAsync(Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, const ScanSpec &scan_spec, uint32_t timeout_ms, bool current, TableScannerAsync *scanner, int id)
Constructs a IntervalScannerAsync object.
Provides access to internal components of opaque key.
uint8_t * base
Pointer to the allocated memory buffer.
RowIntervals row_intervals
RangeLocatorPtr m_range_locator
TableScannerDispatchHandler m_fetch_handler
This is a generic exception class for Hypertable.
A String class based on std::string.
TableScannerDispatchHandler m_create_handler
bool scan_and_filter_rows
#define HT_ERRORF(msg,...)
const std::string & get_name()
void add_cell_interval(const string &start_row, const string &start_column, bool start_inclusive, const string &end_row, const string &end_column, bool end_inclusive)
Adds a cell interval to be returned in the scan.
void find_range_and_start_scan(const char *row_key, bool hard=false)
void add_column_predicate(const string &column_family, const char *column_qualifier, uint32_t operation, const char *value, uint32_t value_len=0)
Adds a column predicate to the scan.
CellIntervals cell_intervals
void set_row_limit(int32_t n)
Sets the maximum number of rows to return in the scan.
bool is_proxy() const
Returns true if address is of type CommAddress::PROXY.
void set_max_versions(uint32_t n)
Sets the maximum number of revisions of each cell to return in the scan.
bool handle_result(bool *show_results, ScanCellsPtr &cells, EventPtr &event, bool is_create)
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
static const char * END_ROW_MARKER
void set_scan_and_filter_rows(bool val)
Scan and filter rows.
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
ScanSpecBuilder m_scan_spec_builder
void set_cell_offset(int32_t n)
Sets the number of cells to be skipped at the beginning of the query.
void refresh()
Refresh schema etc.
int code() const
Returns the error code.
#define HT_THROW2(_code_, _ex_, _msg_)
std::string m_create_scanner_row