36 #include <boost/algorithm/string.hpp>
51 #define SAVE_ERR(_code_, _msg_) \
53 lock_guard<mutex> lock(m_mutex); \
54 m_last_errors.push_back(HT_EXCEPTION(_code_, _msg_)); \
55 while (m_last_errors.size() > m_max_error_queue_length) \
56 m_last_errors.pop_front(); \
59 #define SAVE_ERR2(_code_, _ex_, _msg_) \
61 lock_guard<mutex> lock(m_mutex); \
62 m_last_errors.push_back(HT_EXCEPTION2(_code_, _ex_, _msg_)); \
63 while (m_last_errors.size() > m_max_error_queue_length) \
64 m_last_errors.pop_front(); \
73 class MetaKeyBuilder {
75 MetaKeyBuilder() : start(buf_start), end(buf_end) { }
77 build_keys(
const char *
format,
const char *table_name,
const char *row_key) {
78 int len_end = strlen(format) + strlen(table_name) + 3;
79 int len_start = len_end;
81 len_start += strlen(row_key);
82 if( len_start > size ) start =
new char [len_start];
83 sprintf(start, format, table_name);
84 strcat(start, row_key);
87 if( len_start > size ) start =
new char [len_start];
88 sprintf(start, format, table_name);
90 if( len_end > size ) end =
new char [len_end];
91 sprintf(end, format, table_name);
92 char *ptr = end + strlen(end);
98 if (start != buf_start)
delete [] start;
99 if (end != buf_end)
delete [] end;
106 char buf_start[size];
114 : m_conn_manager(conn_mgr), m_hyperspace(hyperspace),
115 m_root_stale(true), m_range_server(conn_mgr->get_comm(), timeout_ms),
116 m_hyperspace_init(false), m_hyperspace_connected(true),
117 m_timeout_ms(timeout_ms) {
120 = cfg->get_i32(
"Hypertable.RangeLocator.MetadataReadaheadCount");
122 = cfg->get_i32(
"Hypertable.RangeLocator.MaxErrorQueueLength");
124 = cfg->get_i32(
"Hypertable.RangeLocator.MetadataRetryInterval");
126 = cfg->get_i32(
"Hypertable.RangeLocator.RootMetadataRetryInterval");
128 int cache_size = cfg->get_i64(
"Hypertable.LocationCache.MaxEntries");
134 m_cache = make_shared<LocationCache>(cache_size);
183 this_thread::sleep_for(chrono::milliseconds(3000));
199 this_thread::sleep_for(chrono::milliseconds(3000));
210 if ((cf_spec = schema->get_column_family(
"StartRow")) == 0) {
211 HT_ERROR(
"Unable to find column family 'StartRow' in METADATA schema");
216 if ((cf_spec = schema->get_column_family(
"Location")) == 0) {
217 HT_ERROR(
"Unable to find column family 'Location' in METADATA schema");
235 uint32_t wait_time = 1000;
236 uint32_t total_wait_time = 0;
238 error =
find(table, row_key, rane_loc_infop, timer, hard);
242 HT_THROWF(error,
"Table '%s' is (being) dropped", table->
id);
254 this_thread::sleep_for(chrono::milliseconds(wait_time));
255 total_wait_time += wait_time;
256 wait_time = (wait_time * 3) / 2;
259 if ((error =
find(table, row_key, rane_loc_infop, timer,
true))
262 HT_THROWF(error,
"Table '%s' is (being) dropped", table->
id);
275 vector<ScanBlock> scan_blocks(1);
283 bool inclusive = (row_key == 0 || *row_key == 0) ?
true :
false;
296 lock_guard<mutex> lock(
m_mutex);
300 if (!hard &&
m_cache->lookup(table->
id, row_key, rane_loc_infop))
309 rane_loc_infop->
addr = addr;
318 MetaKeyBuilder meta_keys;
326 meta_keys.build_keys(format_str, table->
id, row_key);
334 rane_loc_infop, inclusive)) {
338 meta_scan_spec.
columns.push_back(
"StartRow");
339 meta_scan_spec.
columns.push_back(
"Location");
341 ri.
start = meta_keys.start;
357 meta_scan_spec, scan_blocks.back(), timer);
358 while (!scan_blocks.back().eos()) {
359 int scanner_id = scan_blocks.back().get_scanner_id();
360 scan_blocks.resize(scan_blocks.size()+1);
373 "'%s' on METADATA[..??]", meta_keys.start));
376 catch (std::exception &e) {
377 HT_INFOF(
"std::exception - %s", e.what());
388 rane_loc_infop, inclusive)) {
389 string err_msg =
format(
"Unable to find metadata for row '%s' row_key=%s",
390 meta_keys.start, row_key);
407 addr = rane_loc_infop->
addr;
409 meta_scan_spec.
clear();
413 meta_scan_spec.
columns.push_back(
"StartRow");
414 meta_scan_spec.
columns.push_back(
"Location");
426 scan_blocks.resize(1);
428 meta_scan_spec, scan_blocks.back(), timer);
430 while (!scan_blocks.back().eos()) {
431 int scanner_id = scan_blocks.back().get_scanner_id();
432 scan_blocks.resize(scan_blocks.size()+1);
445 "METADATA (start row = %s)", ri.
start));
448 catch (std::exception &e) {
449 HT_INFOF(
"std::exception - %s", e.what());
460 if (!
m_cache->lookup(table->
id, row_key, rane_loc_infop, inclusive)) {
462 "metadata for table '" + table->
id +
"' row '" + row_key +
"'");
475 const char *stripped_key;
483 bool got_start_row =
false;
484 bool got_end_row =
false;
485 bool got_location =
false;
487 for (
auto & scan_block : scan_blocks) {
489 while (scan_block.next(serkey, value)) {
491 if (!key.
load(serkey)) {
492 string err_msg =
format(
"METADATA lookup for '%s' returned bad key",
499 if ((stripped_key = strchr(key.
row,
':')) == 0) {
500 string err_msg =
format(
"Bad row key found in METADATA - '%s'", key.
row);
510 string tmp_str =
String((
const char *)str, len);
512 << stripped_key <<
", value=" << tmp_str <<
" got start_row="
513 << got_start_row <<
", got_end_row=" << got_end_row
514 <<
", got_location=" << got_location <<
HT_END;
518 if (strcmp(stripped_key, range_loc_info.
end_row.c_str())) {
519 if (got_start_row && got_location) {
522 if (connected.count(range_loc_info.
addr) == 0) {
524 connected.insert(range_loc_info.
addr);
527 m_cache->insert(table_name.c_str(), range_loc_info);
540 "found under row key '%s' (got_location=%s)", range_loc_info
541 .end_row.c_str(), got_location ?
"true" :
"false"));
546 got_start_row =
false;
548 got_location =
false;
552 const char *colon = strchr(key.
row,
':');
555 table_name.append(key.
row, colon-key.
row);
556 range_loc_info.
end_row = stripped_key;
565 got_start_row =
true;
570 if (str[0] ==
'!' && len == 1)
576 HT_ERRORF(
"METADATA lookup on row '%s' returned incorrect column (id=%d)",
583 if (got_start_row && got_end_row && got_location) {
586 if (connected.count(range_loc_info.
addr) == 0) {
588 connected.insert(range_loc_info.
addr);
591 m_cache->insert(table_name.c_str(), range_loc_info);
598 else if (got_end_row) {
604 "under row key '%s' (got_location=%s)", range_loc_info
605 .end_row.c_str(), got_location ?
"true" :
"false"));
631 lock_guard<mutex> lock(
m_mutex);
648 if (old_addr.
is_set() && old_addr != addr) {
658 HT_ERRORF(
"Timeout waiting for root RangeServer connection - %s",
#define SAVE_ERR2(_code_, _ex_, _msg_)
RangeLocationInfo m_root_range_info
int process_metadata_scanblocks(std::vector< ScanBlock > &scan_blocks, Timer &timer)
void create_scanner(const CommAddress &addr, const TableIdentifier &table, const RangeSpec &range, const ScanSpec &scan_spec, DispatchHandler *handler)
Issues a "create scanner" request asynchronously.
static const char * METADATA_ID
uint32_t m_metadata_retry_interval
std::string String
A String is simply a typedef to std::string.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Po::typed_value< String > * str(String *v=0)
int connect(CommAddress &addr, Timer &timer)
Column family specification.
Holds range start and end row plus location.
uint32_t m_max_error_queue_length
void dump_error_history()
Displays the error history.
uint32_t remaining()
Returns the remaining time till expiry.
#define HT_ON_SCOPE_EXIT(...)
bool expired()
Returns true if the timer is expired.
RangeLocator * m_rangelocator
A dynamic, resizable and reference counted memory buffer.
Represents a row interval.
Lib::RangeServer::Client m_range_server
A class managing one or more serializable ByteStrings.
TableIdentifier m_metadata_table
RangeLocator(PropertiesPtr &cfg, ConnectionManagerPtr &conn_mgr, Hyperspace::SessionPtr &hyperspace, uint32_t timeout_ms)
Constructor.
Scan predicate and control specification.
const char * str() const
Returns a pointer to the String's deserialized data.
void hyperspace_disconnected()
std::shared_ptr< Session > SessionPtr
void fetch_scanblock(const CommAddress &addr, int32_t scanner_id, DispatchHandler *handler)
Issues a "fetch scanblock" request asynchronously.
uint64_t m_root_file_handle
uint32_t m_metadata_readahead_count
void invalidate_host(const std::string &hostname)
#define SAVE_ERR(_code_, _msg_)
std::shared_ptr< Properties > PropertiesPtr
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p.
Compatibility Macros for C/C++.
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
int32_t get_id() const
Gets column ID.
Hyperspace::HandleCallbackPtr m_root_handler
#define HT_THROW_(_code_)
String to_str() const
Returns string representation of address.
void close_handle(SessionPtr hyperspace, uint64_t handle)
std::string m_toplevel_dir
bool is_set() const
Returns true if address has been initialized.
std::set< CommAddress > CommAddressSet
Set of CommAddress objects.
static Schema * new_instance(const std::string &buf)
Creates schema object from XML schema string.
Hyperspace::SessionPtr m_hyperspace
bool m_hyperspace_connected
void initialize()
Assumes access is serialized via m_hyperspace_mutex.
static const int METADATA_ID_LENGTH
#define HT_INFOF(msg,...)
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
int read_root_location(Timer &timer)
#define HT_THROWF(_code_, _fmt_,...)
Provides access to internal components of opaque key.
uint8_t * base
Pointer to the allocated memory buffer.
uint32_t m_root_metadata_retry_interval
RowIntervals row_intervals
ConnectionManagerPtr m_conn_manager
int find(const TableIdentifier *table, const char *row_key, RangeLocationInfo *range_loc_infop, Timer &timer, bool hard)
Locates the range that contains the given row key.
A timer class to keep timeout states across AsyncComm related calls.
This is a generic exception class for Hypertable.
~RangeLocator()
Destructor.
Declarations for ScanBlock.
static const char * END_ROOT_ROW
#define HT_ERRORF(msg,...)
std::mutex m_hyperspace_mutex
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
uint8_t column_family_code
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
void find_loop(const TableIdentifier *table, const char *row_key, RangeLocationInfo *range_loc_infop, Timer &timer, bool hard)
Locates the range that contains the given row key.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
void hyperspace_reconnected()
void clear_error_history()
Clears the error history.
Address abstraction to hold either proxy name or IPv4:port address.
void clear()
Clears address to uninitialized state.
RangeLocatorHyperspaceSessionCallback m_hyperspace_session_callback
int code() const
Returns the error code.
#define HT_THROW2(_code_, _ex_, _msg_)
Executes user-defined functions when leaving the current scope.