36 unique_lock<mutex> lock(m_outstanding_mutex);
42 m_outstanding_cond.wait(lock, [
this](){
43 return !_is_empty() || _is_done() || _is_cancelled(); });
47 if (_is_empty() && _is_done())
49 result = m_queue.front();
50 mem_result = result->memory_used();
52 m_memory_used -= mem_result;
55 m_outstanding_cond.notify_one();
57 if (result->is_error())
59 else if (result->is_scan()) {
65 else if (result->is_update()) {
68 if (!mutator || m_mutator_map.find((uint64_t)mutator) == m_mutator_map.end() ||
82 unique_lock<mutex> lock(m_outstanding_mutex);
88 auto wait_time = chrono::system_clock::now() + chrono::milliseconds(timeout_ms);
92 while(_is_empty() && !_is_done() && !_is_cancelled()) {
93 timed_out = m_outstanding_cond.wait_until(lock, wait_time) == std::cv_status::timeout;
99 if (_is_empty() && _is_done())
101 result = m_queue.front();
102 mem_result = result->memory_used();
104 m_memory_used -= mem_result;
107 m_outstanding_cond.notify_one();
109 if (result->is_error())
111 if (result->is_scan()) {
117 else if (result->is_update()) {
120 if (!mutator || m_mutator_map.find((uint64_t)mutator) == m_mutator_map.end() ||
126 m_outstanding_cond.notify_one();
132 ResultPtr result = make_shared<Result>(scanner, cells);
137 unique_lock<mutex> lock(m_outstanding_mutex);
138 size_t mem_result = result->memory_used();
140 m_outstanding_cond.wait(lock, [
this](){
141 return has_remaining_capacity() || _is_cancelled(); });
143 if (!_is_cancelled()) {
144 m_queue.push_back(result);
145 m_memory_used += mem_result;
147 m_outstanding_cond.notify_one();
152 ResultPtr result = make_shared<Result>(scanner, error, error_msg);
157 ResultPtr result = make_shared<Result>(mutator);
162 ResultPtr result = make_shared<Result>(mutator, error, failures);
167 lock_guard<mutex> lock(m_outstanding_mutex);
169 ScannerMap::iterator s_it = m_scanner_map.begin();
170 while (s_it != m_scanner_map.end()) {
171 s_it->second->cancel();
174 MutatorMap::iterator m_it = m_mutator_map.begin();
175 while (m_it != m_mutator_map.end()) {
176 m_it->second->cancel();
182 m_outstanding_cond.notify_all();
186 lock_guard<mutex> lock(m_outstanding_mutex);
187 uint64_t addr = (uint64_t) mutator;
188 MutatorMap::iterator it = m_mutator_map.find(addr);
191 "Attempt to register mutator with cancelled future %lld",
192 (
Lld)reinterpret_cast<int64_t>(
this));
194 m_mutator_map[addr] = mutator;
198 lock_guard<mutex> lock(m_outstanding_mutex);
199 uint64_t addr = (uint64_t) mutator;
200 MutatorMap::iterator it = m_mutator_map.find(addr);
202 m_mutator_map.erase(it);
205 lock_guard<mutex> lock(m_outstanding_mutex);
206 uint64_t addr = (uint64_t) scanner;
207 ScannerMap::iterator it = m_scanner_map.find(addr);
210 "Attempt to register scanner with cancelled future %lld",
211 (
Lld)reinterpret_cast<int64_t>(
this));
213 m_scanner_map[addr] = scanner;
217 lock_guard<mutex> lock(m_outstanding_mutex);
218 uint64_t addr = (uint64_t) scanner;
219 ScannerMap::iterator it = m_scanner_map.find(addr);
221 m_scanner_map.erase(it);
void update_ok(TableMutatorAsync *mutator)
Callback method for successful update.
void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)
Callback method for update errors.
Asynchronous table scanner.
std::shared_ptr< Result > ResultPtr
Smart pointer to Result.
void deregister_mutator(TableMutatorAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
void cancel()
Cancels outstanding scanners/mutators.
void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)
Callback method for scan errors.
Compatibility Macros for C/C++.
void deregister_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
bool get(ResultPtr &result)
This call blocks till there is a result available unless async ops have completed.
void enqueue(ResultPtr &result)
Time related declarations.
long long int Lld
Shortcut for printf formats.
void register_mutator(TableMutatorAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
#define HT_THROWF(_code_, _fmt_,...)
void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells)
Callback method for successful scan.
void register_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
std::vector< FailedMutation > FailedMutations