44 const vector<int32_t> &fragments)
45 : m_range_spec(spec), m_range_state(state), m_schema(schema),
46 m_outstanding(fragments.size()), m_state(LOADED) {
47 for (int32_t fragment : fragments) {
49 m_fragments[fragment] = make_shared<FragmentData>();
54 lock_guard<mutex> lock(
m_mutex);
59 lock_guard<mutex> lock(
m_mutex);
60 FragmentMap::iterator it =
m_fragments.find(fragment);
65 it->second->add(event);
71 lock_guard<mutex> lock(
m_mutex);
79 lock_guard<mutex> lock(
m_mutex);
84 m_range->deferred_initialization();
92 lock_guard<mutex> lock(
m_mutex);
106 lock_guard<Range> range_lock(*
m_range);
112 phantom_log->close();
116 std::stringstream sout;
119 metalog_entity->get_range_state(range_state);
120 sout <<
"Created phantom log " << m_phantom_logname
130 while (m_phantom_log->next(&base, &len, &header))
132 *is_empty = m_phantom_log->get_latest_revision() ==
TIMESTAMP_MIN;
137 lock_guard<mutex> lock(
m_mutex);
142 lock_guard<mutex> lock(
m_mutex);
148 lock_guard<mutex> lock(
m_mutex);
154 lock_guard<mutex> lock(
m_mutex);
159 lock_guard<mutex> lock(
m_mutex);
165 lock_guard<mutex> lock(
m_mutex);
170 lock_guard<mutex> lock(
m_mutex);
176 lock_guard<mutex> lock(
m_mutex);
184 String start_row, end_row;
185 char md5DigestStr[33];
188 range_entity->get_table_identifier(table);
189 range_entity->get_boundary_rows(start_row, end_row);
192 md5DigestStr[16] = 0;
194 logname =
format(
"%s/tables/%s/_xfer/%s_phantom_%lld",
196 table.
id, md5DigestStr, (
Lld)recovery_id);
199 log_dfs->rmdir(logname);
201 log_dfs->mkdirs(logname);
bool add(int32_t fragment, EventPtr &event)
QualifiedRangeSpec m_range_spec
std::string String
A String is simply a typedef to std::string.
void purge_incomplete_fragments()
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
static const int64_t TIMESTAMP_MIN
std::shared_ptr< CommitLogReader > CommitLogReaderPtr
Smart pointer to CommitLogReader.
void md5_trunc_modified_base64(const char *input, char output[17])
Get the modified base64 encoded string of the first 12 Bytes of the 16 Byte MD5 code of a null termin...
std::shared_ptr< Client > ClientPtr
Declarations for PhantomRange.
static std::string toplevel_dir
Compatibility Macros for C/C++.
CommitLogReaderPtr m_phantom_log
String create_log(FilesystemPtr &log_dfs, int64_t recovery_id, MetaLogEntityRangePtr &range_entity)
uint8_t state
Range state value (see StateType)
CommitLogReaderPtr get_phantom_log()
void create_range(Lib::Master::ClientPtr &master_client, TableInfoPtr &table_info, FilesystemPtr &log_dfs)
const String & get_phantom_logname()
long long int Lld
Shortcut for printf formats.
std::shared_ptr< MetaLogEntityRange > MetaLogEntityRangePtr
Smart pointer to MetaLogEntityRange.
std::shared_ptr< Filesystem > FilesystemPtr
Smart pointer to Filesystem.
void populate_range_and_log(FilesystemPtr &log_dfs, int64_t recovery_id, bool *is_empty)
std::shared_ptr< TableInfo > TableInfoPtr
Smart pointer to TableInfo.
#define HT_INFOF(msg,...)
std::shared_ptr< CommitLog > CommitLogPtr
Smart pointer to CommitLog.
Qualified (with table identifier) range specification.
PhantomRange(const QualifiedRangeSpec &spec, const RangeState &state, SchemaPtr &schema, const vector< int32_t > &fragments)
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Range state with memory management.