52 RangeLocatorPtr &range_locator,
bool auto_refresh, uint32_t timeout_ms, uint32_t
id)
53 : m_comm(comm), m_app_queue(app_queue), m_mutator(mutator), m_schema(schema),
54 m_range_locator(range_locator),
55 m_location_cache(range_locator->location_cache()),
56 m_range_server(comm, timeout_ms), m_table_identifier(*table_identifier),
57 m_auto_refresh(auto_refresh), m_timeout_ms(timeout_ms),
58 m_counter_value(9), m_timer(timeout_ms), m_id(id),
59 m_wait_time(ms_init_redo_wait_time) {
64 "Hypertable.Mutator.ScatterBuffer.FlushLimit.PerServer");
74 uint32_t value_len,
size_t incr_mem) {
76 TableMutatorAsyncSendBufferMap::const_iterator iter;
77 bool counter_reset =
false;
84 range_info = range_loc_info;
88 lock_guard<mutex> lock(
m_mutex);
90 bool is_counter =
false;
100 const char *ascii_value = (
const char *)value;
104 if (value_len > 0 && (*ascii_value ==
'=' || *ascii_value ==
'+')) {
105 counter_reset = (*ascii_value ==
'=');
124 (*iter).second->addr = range_info.
addr;
127 (*iter).second->key_offsets.push_back((*iter).second->accum.fill());
150 lock_guard<mutex> lock(
m_mutex);
153 TableMutatorAsyncSendBufferMap::const_iterator iter;
163 range_info = range_loc_info;
170 (*iter).second->addr = range_info.
addr;
173 (*iter).second->key_offsets.push_back((*iter).second->accum.fill());
195 lock_guard<mutex> lock(
m_mutex);
198 TableMutatorAsyncSendBufferMap::const_iterator iter;
199 const uint8_t *ptr = key.
ptr;
207 &range_loc_info, timer,
false);
208 range_info = range_loc_info;
216 (*iter).second->addr = range_info.
addr;
219 (*iter).second->key_offsets.push_back((*iter).second->accum.fill());
220 (*iter).second->accum.add(key.
ptr, (ptr-key.
ptr)+len);
221 (*iter).second->accum.add(value.
ptr, value.
length());
236 inline bool operator<(
const SendRec &sr1,
const SendRec &sr2) {
237 return strcmp(sr1.row, sr2.row) < 0;
243 lock_guard<mutex> lock(
m_mutex);
244 bool outstanding=
false;
248 std::vector<SendRec> send_vec;
253 string range_location;
258 for (TableMutatorAsyncSendBufferMap::const_iterator iter =
m_buffer_map.begin();
260 send_buffer = (*iter).second;
262 if ((len = send_buffer->accum.fill()) == 0) {
267 send_buffer->pending_updates.set(
new uint8_t [len], len);
269 if (send_buffer->resend()) {
270 memcpy(send_buffer->pending_updates.base,
271 send_buffer->accum.base, len);
272 send_buffer->send_count = send_buffer->retry_count;
276 send_vec.reserve(send_buffer->key_offsets.size());
277 for (
auto it = send_buffer->key_offsets.begin(); it != send_buffer->key_offsets.end(); ++it) {
278 send_rec.key.ptr = send_buffer->accum.base + *it;
279 send_rec.row = send_rec.key.row();
280 send_vec.push_back(send_rec);
282 std::stable_sort(send_vec.begin(), send_vec.end());
284 ptr = send_buffer->pending_updates.base;
286 for (
auto it = send_vec.begin(); it != send_vec.end(); ++it) {
290 memcpy(ptr, it->key.ptr, key.
ptr - it->key.ptr);
291 ptr += key.
ptr - it->key.ptr;
293 HT_ASSERT((
size_t)(ptr-send_buffer->pending_updates.base)==len);
294 send_buffer->dispatch_handler =
296 m_id, send_buffer.get(),
298 send_buffer->send_count = send_buffer->key_offsets.size();
302 send_buffer->accum.clear();
303 send_buffer->key_offsets.clear();
310 send_buffer->pending_updates.own =
false;
313 send_buffer->pending_updates, flags,
314 send_buffer->dispatch_handler.get());
327 send_buffer->add_retries(send_buffer->send_count, 0,
328 send_buffer->pending_updates.size);
338 HT_FATALF(
"Problem sending updates to %s - %s (%s)",
343 send_buffer->pending_updates.own =
true;
354 unique_lock<mutex> lock(
m_mutex);
371 format(
"Timer remaining=%lld wait_time=%lld",
376 this_thread::sleep_for(chrono::milliseconds(
m_wait_time));
380 redo_buffer->m_timer =
m_timer;
383 for (TableMutatorAsyncSendBufferMap::const_iterator iter =
m_buffer_map.begin();
385 send_buffer = (*iter).second;
387 if (send_buffer->accum.fill()) {
388 const uint8_t *endptr;
390 bs.
ptr = send_buffer->accum.base;
391 endptr = bs.
ptr + send_buffer->accum.fill();
394 while (bs.
ptr < endptr) {
398 redo_buffer->set(key, value, incr_mem);
418 for (TableMutatorAsyncSendBufferMap::const_iterator iter =
m_buffer_map.begin();
420 send_buffer = (*iter).second;
421 if (send_buffer->accum.fill()) {
422 const uint8_t *endptr;
424 bs.
ptr = send_buffer->accum.base;
425 endptr = bs.
ptr + send_buffer->accum.fill();
427 while (bs.
ptr < endptr) {
452 std::vector<FailedRegionAsync> failed_regions;
455 for (TableMutatorAsyncSendBufferMap::const_iterator it =
m_buffer_map.begin();
457 (*it).second->get_failed_regions(failed_regions);
458 (*it).second->failed_regions.clear();
461 if (!failed_regions.empty()) {
465 const uint8_t *endptr;
467 for (
size_t i=0; i<failed_regions.size(); i++) {
468 bs.
ptr = failed_regions[i].base;
469 endptr = bs.
ptr + failed_regions[i].len;
470 while (bs.
ptr < endptr) {
488 failed_regions[i].error));
493 error = failed_regions[0].error;
500 bool has_retries=
false;
516 lock_guard<mutex> lock(
m_mutex);
void set_retries_to_fail(int error)
TableMutatorAsyncScatterBuffer(Comm *comm, ApplicationQueueInterfacePtr &app_queue, TableMutatorAsync *mutator, const TableIdentifier *, SchemaPtr &, RangeLocatorPtr &, bool auto_refresh, uint32_t timeout_ms, uint32_t id)
RangeLocatorPtr m_range_locator
int set_failed_mutations()
PropertiesPtr properties
This singleton map stores all options.
static const uint32_t FLAG_DELETE_ROW
bool get_option_counter() const
Gets the counter option.
void set(const Key &, const ColumnFamilySpec *cf, const void *value, uint32_t value_len, size_t incr_mem)
TableMutatorAsync * m_mutator
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
void stop()
Stops the timer.
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
static const uint32_t FLAG_INSERT
void update(const CommAddress &addr, uint64_t cluster_id, const TableIdentifier &table, int32_t count, StaticBuffer &buffer, int32_t flags, DispatchHandler *handler)
Issues an "update" request asynchronously.
void send(uint32_t flags)
std::shared_ptr< TableMutatorAsyncScatterBuffer > TableMutatorAsyncScatterBufferPtr
Smart pointer to TableMutatorAsyncScatterBuffer.
Column family specification.
Holds range start and end row plus location.
const char * column_qualifier
static const uint32_t FLAG_DELETE_CELL
TableMutatorAsyncSendBufferMap m_buffer_map
FlyweightString m_constant_strings
uint32_t remaining()
Returns the remaining time till expiry.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
void append_as_byte_string(DynamicBuffer &dst_buf, const void *value, uint32_t value_len)
Serializes and appends a byte array to a DynamicBuffer object.
std::shared_ptr< TableMutatorAsyncSendBuffer > TableMutatorAsyncSendBufferPtr
Smart pointer to TableMutatorAsyncSendBuffer.
A class managing one or more serializable ByteStrings.
CommAddressSet m_unsynced_rangeservers
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
uint32_t m_server_flush_limit
const char * get_text(int error)
Returns a descriptive error message.
void buffer_finish(uint32_t id, int error, bool retry)
This is where buffers call back into when their outstanding operations are complete.
A timer class to keep timeout states across AsyncComm related calls.
static uint64_t get()
Gets the cluster ID.
ApplicationQueueInterfacePtr m_app_queue
Compatibility Macros for C/C++.
FailedMutations m_failed_mutations
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
void encode_i64(uint8_t **bufp, uint64_t val)
Encode a 64-bit integer in little-endian order.
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
size_t length() const
Retrieves the length of the serialized string.
std::condition_variable m_cond
TableMutatorAsyncScatterBufferPtr create_redo_buffer(uint32_t id)
DynamicBuffer m_counter_value
const uint8_t * ptr
The pointer to the serialized data.
bool operator<(const directory_entry< _Key, _Tp > &lhs, const directory_entry< _Key, _Tp > &rhs)
#define HT_FATALF(msg,...)
const std::string & get_name() const
Gets column family name.
long long int Lld
Shortcut for printf formats.
void create_key_and_append(DynamicBuffer &dst_buf, const Key &key, bool time_order_asc)
Entry point to AsyncComm service.
void clear()
Clears the buffer.
const char * get(const char *str)
Returns a copy of the string; this string is valid till the FlyweightString set is destructed...
void start()
Starts the timer.
const char * column_family
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
LocationCachePtr m_location_cache
TableMutatorAsyncCompletionCounter m_completion_counter
#define HT_THROWF(_code_, _fmt_,...)
Provides access to internal components of opaque key.
Random number generator for int32, int64, double and ascii arrays.
uint8_t * base
Pointer to the allocated memory buffer.
A timer class to keep timeout states across AsyncComm related calls.
This is a generic exception class for Hypertable.
void wait_for_completion()
TableIdentifierManaged m_table_identifier
Declarations for TableMutatorAsyncScatterBuffer.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
uint8_t column_family_code
void set_delete(const Key &key, size_t incr_mem)
Lib::RangeServer::Client m_range_server
static const int64_t AUTO_ASSIGN
virtual ~TableMutatorAsyncScatterBuffer()
Encapsulates decomposed key and value.
const char * column_qualifier
#define HT_THROW(_code_, _msg_)
static const uint32_t FLAG_DELETE_CELL_VERSION
static std::chrono::milliseconds duration_millis(uint32_t maximum)
Returns a random millisecond duration.
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 32-bit.
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
Declarations for ClusterId.
This class is a DispatchHandler.
uint8_t * next()
Retrieves the next serialized String in the buffer.
int code() const
Returns the error code.