55 m_context(context), m_query_cache(query_cache),
56 m_timer_handler(timer_handler), m_log(log), m_flags(flags) {
88 const uint8_t *mod, *mod_end;
93 int64_t latest_range_revision;
95 bool transfer_pending;
98 uint32_t go_buf_reset_offset;
99 uint32_t root_buf_reset_offset;
110 unique_lock<std::mutex> lock(mutex);
111 cond.wait(lock, [
this, &queue](){
return !queue.empty() ||
m_shutdown; });
120 go_buf_reset_offset = 0;
121 root_buf_reset_offset = 0;
160 if (table_update->
table_info->get_schema()->get_generation() !=
164 format(
"Update schema generation mismatch for table %s (received %lld != %lld)",
166 (
Lld)table_update->
table_info->get_schema()->get_generation());
185 go_buf_reset_offset = table_update->
go_buf.
fill();
190 while (mod < mod_end) {
196 if (!strcmp(row,
"1,+/JzamFvB6rqPqP5yNgI5nreCtZHkT\t\t01501")) {
224 if (!table_update->
table_info->find_containing_range(row, range,
225 start_row, end_row) ||
226 range->get_relinquish()) {
244 if ((rulist = table_update->
range_map[range.get()]) == 0) {
246 rulist->
range = range;
247 table_update->
range_map[range.get()] = rulist;
279 if (!rulist->
range->increment_update_counter()) {
291 String range_start_row, range_end_row;
292 rulist->
range->get_boundary_rows(range_start_row, range_end_row);
295 if (range_start_row != start_row || range_end_row != end_row) {
296 rulist->
range->decrement_update_counter();
304 bool wait_for_maintenance;
305 transfer_pending = rulist->
range->get_transfer_info(transfer_info, transfer_log,
306 &latest_range_revision, wait_for_maintenance);
314 bool in_transferring_region =
false;
320 int64_t difference, tmp_timestamp;
330 difference = (int32_t)((latest_range_revision - uc->
auto_revision)
334 HT_ERRORF(
"Clock skew of %lld microseconds exceeds maximum "
335 "(%lld) range=%s", (
Lld)difference,
337 rulist->
range->get_name().c_str());
347 if (transfer_pending) {
349 if (transfer_bufp->
empty()) {
361 if (rulist->
range->is_root()) {
371 cur_bufp = &table_update->
go_buf;
375 range_update.
bufp = cur_bufp;
378 while (mod < mod_end &&
379 (end_row ==
"" || (strcmp(row, end_row.c_str()) <= 0))) {
381 if (transfer_pending) {
384 if (!in_transferring_region) {
387 cur_bufp = transfer_bufp;
388 range_update.
bufp = cur_bufp;
390 in_transferring_region =
true;
395 if (in_transferring_region) {
398 cur_bufp = &table_update->
go_buf;
399 range_update.
bufp = cur_bufp;
401 in_transferring_region =
false;
408 uint8_t family=*(key.
ptr+1+strlen((
const char *)key.
ptr+1)+1);
416 "Auto revision (%lld) is less than latest range "
417 "revision (%lld) for range %s",
419 rulist->
range->get_name().c_str());
433 "Supplied revision (%lld) is less than most recently "
434 "seen revision (%lld) for range %s",
436 rulist->
range->get_name().c_str());
450 cur_bufp->
add(mod, key.
ptr-mod);
476 for (
auto iter = table_update->
range_map.begin();
477 iter != table_update->
range_map.end(); ++iter)
478 (*iter).second->reset_updates(request);
480 if (root_buf_reset_offset)
485 range_update.
bufp = 0;
497 HT_DEBUGF(
"Added %d (%d transferring) updates to '%s'",
499 table_update->
id.
id);
519 std::list<UpdateContext *> coalesce_queue;
520 uint64_t coalesce_amount = 0;
522 uint32_t committed_transfer_data;
523 bool log_needs_syncing {};
539 committed_transfer_data = 0;
540 log_needs_syncing =
false;
545 HT_FATALF(
"Problem writing %d bytes to ROOT commit log - %s",
555 for (
auto iter = table_update->
range_map.begin(); iter != table_update->
range_map.end(); ++iter) {
556 if ((*iter).second->transfer_buf.ptr > (*iter).second->transfer_buf.mark) {
557 committed_transfer_data += (*iter).second->transfer_buf.ptr - (*iter).second->transfer_buf.mark;
558 if ((error = (*iter).second->transfer_log->write(
ClusterId::get(), (*iter).second->transfer_buf,
559 (*iter).second->latest_transfer_revision,
561 table_update->
error = error;
562 table_update->
error_msg =
format(
"Problem writing %d bytes to transfer log",
563 (
int)(*iter).second->transfer_buf.fill());
573 constexpr uint32_t NO_LOG_SYNC_FLAGS =
577 if ((table_update->
flags & NO_LOG_SYNC_FLAGS) == 0)
578 log_needs_syncing =
true;
585 table_update->
error_msg =
format(
"Problem writing %d bytes to commit log (%s) - %s",
587 m_log->get_log_dir().c_str(),
590 table_update->
error = error;
597 bool do_sync =
false;
598 if (log_needs_syncing) {
600 coalesce_queue.push_back(uc);
605 else if (!coalesce_queue.empty())
610 size_t retry_count {};
616 error =
m_log->flush();
618 error =
m_log->sync();
623 HT_ERRORF(
"Problem %sing log fragment (%s) - %s",
625 m_log->get_current_fragment_file().c_str(),
627 if (++retry_count == 6)
629 this_thread::sleep_for(chrono::milliseconds(10000));
639 coalesce_queue.push_back(uc);
640 while (!coalesce_queue.empty()) {
641 uc = coalesce_queue.front();
642 coalesce_queue.pop_front();
675 for (
auto iter = table_update->
range_map.begin(); iter != table_update->
range_map.end(); ++iter) {
680 Range *rangep = (*iter).first;
681 lock_guard<Range> lock(*rangep);
683 uint8_t *end = ptr + update.
len;
689 std::set<uint8_t> columns;
691 const char *current_row {};
696 if (current_row ==
nullptr)
697 current_row = key_comps.
row;
703 HT_ERRORF(
"Skipping bad key - column family not specified in "
704 "non-delete row update on %s row=%s",
705 table_update->
id.
id, key_comps.
row);
708 rangep->
add(key_comps, value);
711 if (strcmp(current_row, key_comps.
row)) {
717 current_row = key_comps.
row;
739 for (
auto iter = table_update->
range_map.begin(); iter != table_update->
range_map.end(); ++iter) {
740 if ((*iter).second->range_blocked)
741 (*iter).first->decrement_update_counter();
748 bool maintenance_needed =
false;
755 for (
auto iter = table_update->
range_map.begin(); iter != table_update->
range_map.end(); ++iter) {
756 if ((*iter).first->need_maintenance() &&
758 maintenance_needed =
true;
759 HT_MAYBE_FAIL_X(
"metadata-update-and-respond", (*iter).first->is_metadata());
782 uint8_t *ptr = ext.
base;
828 int64_t auto_revision, int64_t *revisionp,
829 bool timeorder_desc) {
839 if (timeorder_desc) {
843 uint8_t *p=(uint8_t *)ptr+len-8;
845 p=(uint8_t *)ptr+len-8;
850 dest_bufp->
ensure((ptr-bskey.
ptr) + len + 9);
852 memcpy(dest_bufp->
ptr, ptr, len);
864 dest_bufp->
ptr += len;
866 timeorder_desc ?
false :
true);
867 *revisionp = auto_revision;
868 bskey.
ptr = ptr + len;
std::list< UpdateContext * > m_commit_queue
Stage 2 input queue.
A memory buffer of static size.
bool empty() const
Returns true if the buffer is empty.
static int64_t decode_ts64(const uint8_t **bufp, bool ascending=true)
std::vector< SendBackRec > send_back_vector
Vector of SendBacRec objects describing rejected key/value pairs.
std::mutex m_commit_queue_mutex
Mutex protecting stage 2 input queue
void shutdown()
Shuts down the pipeline Sets m_shutdown to true, signals the three pipeline condition variables...
std::vector< UpdateRecTable * > updates
The FailureInducer simulates errors.
void set_mark()
Sets the mark; the mark can be used by the caller just like a bookmark.
std::shared_ptr< Context > m_context
Range server context
static const uint32_t FLAG_DELETE_ROW
std::string String
A String is simply a typedef to std::string.
uint32_t transfer_buf_reset_offset
uint32_t count
Count of serialized key/value pairs in buffer.
std::vector< UpdateRequest * > requests
Vector of corresponding client requests.
UpdatePipeline(ContextPtr &context, QueryCachePtr &query_cache, TimerHandlerPtr &timer_handler, CommitLogPtr &log, Filesystem::Flags flags)
Constructor.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
bool get_option_time_order_desc() const
Gets time order desc option.
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
uint32_t offset
Starting byte offset within update buffer of rejected key/value pairs.
virtual size_t encoded_length() const
Returns serialized object length.
void commit()
Thread function for stage 2 of update pipeline.
static const uint8_t HAVE_REVISION
Holds updates destined for a specific table.
Flags
Enumeration type for append flags.
int64_t m_last_revision
Last (largest) assigned revision number.
Column family specification.
void transform_key(ByteString &bskey, DynamicBuffer *dest_bufp, int64_t revision, int64_t *revisionp, bool timeorder_desc)
Filesystem::Flags m_flags
Commit log flush flag (NONE, FLUSH, or SYNC)
void add_cells_written(uint64_t n)
std::unordered_map< Range *, UpdateRecRangeList * > range_map
bool m_shutdown
Flag indicating if pipeline is being shut down.
int32_t m_maintenance_pause_interval
Millisecond pause time at the end of the pipeline (TESTING)
uint8_t * ptr
Pointer to the end of the used part of the buffer.
A dynamic, resizable and reference counted memory buffer.
static const int64_t TIMESTAMP_MIN
std::mutex m_response_queue_mutex
Mutex protecting stage 3 input queue
DynamicBuffer transfer_buf
std::vector< std::thread > m_threads
Update pipeline threads.
Represents a table row range.
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
A class managing one or more serializable ByteStrings.
Declarations for RangeServerProtocol.
std::mutex m_qualify_queue_mutex
Mutex protecting stage 1 input queue
std::condition_variable m_response_queue_cond
Condition variable signaling addition to stage 3 input queue.
std::list< UpdateContext * > m_qualify_queue
Stage 1 input queue.
StaticBuffer buffer
Update buffer containing serialized key/value pairs.
void add_and_respond()
Thread function for stage 3 of update pipeline.
void add(const Key &key, const ByteString value)
This method must not fail.
void add_bytes_written(uint64_t n)
uint32_t error
Error code that applies to entire buffer.
A dynamic, resizable memory buffer.
uint64_t m_update_coalesce_limit
Commit log coalesce limit.
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
TimerHandlerPtr m_timer_handler
Pointer to timer handler.
EventPtr event
Event object of originating update requst.
DynamicBuffer * bufp
Pointer to buffer holding updates (serialized key/value pairs).
const char * get_text(int error)
Returns a descriptive error message.
uint8_t * add(const void *data, size_t len)
Adds more data WITH boundary checks; if required the buffer is resized and existing data is preserved...
CommitLogPtr transfer_log
static CommitLogPtr root_log
Logging routines and macros.
std::list< UpdateContext * > m_response_queue
Stage 3 input queue.
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
static uint64_t get()
Gets the cluster ID.
Compatibility Macros for C/C++.
int32_t m_commit_queue_count
Count of objects in stage 2 input queue.
static const uint8_t HAVE_TIMESTAMP
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
std::shared_ptr< TimerHandler > TimerHandlerPtr
Smart pointer to TimerHandler.
Functions to serialize/deserialize primitives to/from a memory buffer.
size_t length() const
Retrieves the length of the serialized string.
int response(StaticBuffer &ext)
TableInfoPtr table_info
TableInfo object for destination table.
static const uint8_t TS_CHRONOLOGICAL
std::shared_ptr< QueryCache > QueryCachePtr
Smart pointer to QueryCache.
int32_t m_max_clock_skew
Maximum allowable clock skew.
Context record for update request passed into UpdatePipeline.
uint32_t len
Length (in bytes) from offset covering key/value pairs rejected.
const uint8_t * ptr
The pointer to the serialized data.
UpdateRequest * last_request
TableIdentifier id
Table identifier for destination table.
#define HT_FATALF(msg,...)
#define HT_DEBUGF(msg,...)
static Hypertable::MaintenanceQueuePtr maintenance_queue
long long int Lld
Shortcut for printf formats.
void encode_vi32(uint8_t **bufp, uint32_t val)
Encode a integer (up to 32-bit) in variable length encoding.
static bool ignore_clock_skew_errors
static const uint8_t AUTO_TIMESTAMP
uint64_t total_bytes_added
std::condition_variable m_commit_queue_cond
Condition variable signaling addition to stage 2 input queue.
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
uint32_t m_update_delay
Update delay at start of pipeline (TESTING)
Declarations for UpdatePipeline.
uint32_t count
Number of key/value pairs to which error applies.
std::condition_variable m_qualify_queue_cond
Condition variable signaling addition to stage 1 input queue.
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
#define HT_THROWF(_code_, _fmt_,...)
void qualify_and_transform()
Thread function for stage 1 of update pipeline.
Provides access to internal components of opaque key.
std::shared_ptr< Range > RangePtr
Smart pointer to Range.
uint8_t * base
Pointer to the allocated memory buffer.
int64_t latest_transfer_revision
size_t fill() const
Returns the size of the used portion.
#define HT_FAILURE_SIGNALLED(_label_)
std::shared_ptr< CommitLog > CommitLogPtr
Smart pointer to CommitLog.
static void encode_ts64(uint8_t **bufp, int64_t val, bool ascending=true)
uint64_t offset
Offset of beginning of update range within bufp.
void add_update(UpdateRequest *request, UpdateRecRange &update)
This is a generic exception class for Hypertable.
Holds client update request and error state.
#define HT_MAYBE_FAIL_X(_label_, _exp_)
static LoadStatisticsPtr load_statistics
#define HT_ERRORF(msg,...)
Declarations for UpdateRecRange.
void add(UpdateContext *uc)
Adds updates to pipeline Adds uc to m_qualify_queue and signals m_qualify_queue_cond.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
uint8_t column_family_code
uint8_t * mark
A "bookmark", can be set by the caller.
bool transferring(const char *row)
CommitLogPtr m_log
Pointer to commit log.
QueryCachePtr m_query_cache
Pointer to query cache.
Holds updates destined for a specific range.
Declarations for UpdateRecTable.
uint64_t total_buffer_size
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Declarations for ClusterId.
Specifies a range of updates (key/value pairs) within a buffer.
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
uint8_t * next()
Retrieves the next serialized String in the buffer.
int code() const
Returns the error code.
void reserve(size_t len, bool nocopy=false)
Reserve space for additional data Will grow the space to exactly what's needed.
uint64_t len
Length of update range within bufp starting at offset.
static const uint8_t REV_IS_TS