52 catch (std::bad_alloc&) {
53 HT_ERRORF(
"caught bad_alloc here, %s", info.c_str());
55 catch (std::exception &e) {
56 HT_ERRORF(
"caught std::exception: %s, %s", e.what(), info.c_str());
59 HT_ERRORF(
"caught unknown exception here, %s", info.c_str());
67 : m_comm(comm), m_app_queue(app_queue), m_table(table),
68 m_range_locator(range_locator),
69 m_timeout_ms(timeout_ms), m_cb(cb), m_flags(flags), m_mutex(m_buffer_mutex),
70 m_cond(m_buffer_cond), m_explicit_block_only(explicit_block_only) {
80 : m_comm(comm), m_app_queue(app_queue), m_table(table),
81 m_range_locator(range_locator),
82 m_timeout_ms(timeout_ms), m_cb(cb), m_flags(flags), m_mutex(mutex),
83 m_cond(cond), m_mutator(mutator), m_explicit_block_only(explicit_block_only) {
91 m_max_memory = props->get_i64(
"Hypertable.Mutator.ScatterBuffer.FlushLimit.Aggregate");
156 unique_lock<mutex> lock(
m_mutex);
162 const void *value, uint32_t value_len) {
172 m_imc->buffer_key(key, value, value_len);
188 value_index_mutator, qualifier_index_mutator);
193 uint32_t value_len) {
207 else if (full_key.
row)
221 format(
"row=%s, cf=%s, cq=%s, value_len=%d (%s:%d)",
222 (
const char*)key.
row,
278 const void *value,
size_t value_len)
295 Cells::const_iterator end) {
301 for (; it != end; ++it) {
303 const Cell &cell = *it;
309 (
String)
"Column family not specified in non-delete row set "
335 format(
"row=%s, cf=%s, cq=%s, value_len=%d (%s:%d)",
338 it->column_qualifier ? it->column_qualifier :
"-",
361 full_key.
row = (
const char *)key.
row;
373 else if (full_key.
row)
386 format(
"row=%s, cf=%s, cq=%s (%s:%d)",
387 (
const char*)key.
row,
402 const void *column_qualifier, int64_t timestamp, int64_t revision,
410 cf =
m_schema->get_column_family(column_family);
416 cf =
m_schema->get_column_family(column_family);
432 full_key.
row = (
const char *)row;
433 if (column_qualifier) {
443 full_key.
flag = flag;
464 return m_imc->needs_flush();
493 m_imc->propagate_failures();
496 m_imc->consume_keybuffer(
this);
513 lock_guard<mutex> lock(
m_mutex);
540 unsynced.push_back(comm_addr);
553 uint32_t retry_count = 0;
564 sync_handler.
add(addr);
568 std::vector<TableMutatorSyncDispatchHandler::ErrorResult>
errors;
570 bool do_refresh =
false;
573 for (
size_t i=0; i<errors.size(); i++) {
579 HT_ERRORF(
"commit log sync error - %s - %s", errors[i].msg.c_str(),
587 sync_handler.
retry();
597 format(
"commit log sync error '%s' '%s' max retry limit=%d hit.",
600 HT_THROW(errors[0].error, error_str);
611 format(
"retry_count=%d (%s:%d)",
620 lock_guard<mutex> lock(
m_mutex);
629 for (
const auto &comm_addr : unsynced)
644 bool cancelled =
false;
646 uint32_t next_id = 0;
649 ScatterBufferAsyncMap::iterator it;
652 lock_guard<mutex> lock(
m_mutex);
681 buffer->set_retries_to_fail(error);
701 redo = buffer->create_redo_buffer(next_id);
709 lock_guard<mutex> lock(
m_mutex);
725 redo->send(buffer->get_send_flags());
void add(const CommAddress &addr)
Adds.
void wait_for_flush_completion(TableMutatorAsync *mutator)
bool wait_for_completion()
void initialize(PropertiesPtr &props)
bool has_qualifier_index_table()
returns true if this table has a qualifier index
void initialize_indices(PropertiesPtr &props)
static const uint32_t FLAG_DELETE_ROW
std::string String
A String is simply a typedef to std::string.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
static const uint32_t FLAG_INSERT
void increment_outstanding()
ScatterBufferAsyncMap m_outstanding_buffers
std::mutex m_member_mutex
TableIdentifierManaged m_table_identifier
const char * column_qualifier
void set(const KeySpec &key, const void *value, uint32_t value_len)
Inserts a cell into the table.
bool get_value_index() const
Gets value index flag.
std::vector< String > errors
std::shared_ptr< TableMutatorAsyncScatterBuffer > TableMutatorAsyncScatterBufferPtr
Smart pointer to TableMutatorAsyncScatterBuffer.
virtual void register_mutator(TableMutatorAsync *mutator)
Hook for derived classes which want to keep track of scanners/mutators.
Column family specification.
TableMutatorAsyncPtr m_qualifier_index_mutator
const char * column_qualifier
void to_full_key(const void *row, const char *cf, const void *cq, int64_t ts, int64_t rev, uint8_t flag, Key &full_key, ColumnFamilySpec **pcf=0)
void set_delete(const KeySpec &key)
Deletes an entire row, a column family in a particular row, or a specific cell within a row...
void flush_with_tablequeue(TableMutator *mutator, bool sync=true)
FailedMutations m_failed_mutations
void get(TableIdentifierManaged &table_identifier, SchemaPtr &schema)
Get a copy of table identifier and schema atomically.
void flush(bool sync=true)
Flushes the current buffer accumulated mutations to their respective range servers.
void handle_send_exceptions(const String &info)
void add(const Key &key, uint8_t flag, const void *value, uint32_t value_len, TableMutatorAsync *value_index_mutator, TableMutatorAsync *qualifier_index_mutator)
void update_with_index(Key &key, const ColumnFamilySpec *cf, const void *value, uint32_t value_len)
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
Wrapper for TableIdentifier providing member storage.
Declarations for TableMutatorSyncDispatchHandler.
Represents an open table.
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
IndexMutatorCallbackPtr m_imc
void set_cells(const Cells &cells)
Insert a bunch of cells into the table (atomically if cells are in the same range/row) ...
const char * get_text(int error)
Returns a descriptive error message.
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)=0
Callback method for update errors.
virtual ~TableMutatorAsync()
Destructor for TableMutatorAsync object Make sure buffers are flushed and unsynced rangeservers get s...
virtual void update_ok(TableMutatorAsync *mutator)=0
Callback method for successful update.
bool get_qualifier_index() const
Gets qualifier index flag.
void buffer_finish(uint32_t id, int error, bool retry)
This is where buffers call back into when their outstanding operations are complete.
std::shared_ptr< Properties > PropertiesPtr
void update_unsynced_rangeservers(const CommAddressSet &unsynced)
TableMutatorAsyncPtr m_index_mutator
Compatibility Macros for C/C++.
void update_without_index(const Cell &cell)
CommAddressSet m_unsynced_rangeservers
static const uint32_t ms_max_sync_retries
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
TableMutatorAsyncScatterBufferPtr m_current_buffer
TableMutatorAsync(PropertiesPtr &props, Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, uint32_t timeout_ms, ResultCallback *cb, uint32_t flags=0, bool explicit_block_only=false)
Constructs the TableMutatorAsync object.
int32_t get_id() const
Gets column ID.
virtual void deregister_mutator(TableMutatorAsync *mutator)
Hook for derived classes which want to keep track of scanners/mutators.
void do_sync()
Calls sync on any unsynced rangeservers and waits for completion.
bool has_index_table()
returns true if this table has an index
void get_errors(vector< ErrorResult > &errors)
bool m_explicit_block_only
std::set< CommAddress > CommAddressSet
Set of CommAddress objects.
void sanity_check() const
This class is a DispatchHandler class that is used for collecting asynchronous commit log sync reques...
RangeLocatorPtr m_range_locator
Entry point to AsyncComm service.
TableMutatorAsyncScatterBufferPtr get_outstanding_buffer(size_t id)
void get_unsynced_rangeservers(std::vector< CommAddress > &unsynced)
uint32_t m_next_buffer_id
void update_outstanding(TableMutatorAsyncScatterBufferPtr &buffer)
const char * column_family
std::condition_variable & m_cond
TablePtr get_index_table()
#define HT_THROWF(_code_, _fmt_,...)
Provides access to internal components of opaque key.
uint32_t column_qualifier_len
void wait_for_completion()
void sanity_check() const
This is a generic exception class for Hypertable.
#define HT_ERRORF(msg,...)
uint8_t column_family_code
static const int64_t AUTO_ASSIGN
ApplicationQueueInterfacePtr m_app_queue
Encapsulates decomposed key and value.
void decrement_outstanding()
const char * column_qualifier
Represents an open table.
String extensions and helpers: sets, maps, append operators etc.
const char * column_family
#define HT_THROW(_code_, _msg_)
void refresh()
Refresh schema etc.
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
int code() const
Returns the error code.
TablePtr get_qualifier_index_table()