40 #include <unordered_map>
56 namespace OperationState {
62 : MetaLog::Entity(type), m_context(context) {
63 int32_t timeout =
m_context->props->get_i32(
"Hypertable.Request.Timeout");
69 : MetaLog::Entity(type), m_context(context), m_event(event) {
75 : MetaLog::Entity(header_), m_context(context) {
93 os <<
" exclusivities=";
96 os <<
"\"" <<
str <<
"\"";
100 os <<
",\"" <<
str <<
"\"";
104 os <<
" dependencies=";
107 os <<
"\"" <<
str <<
"\"";
111 os <<
",\"" <<
str <<
"\"";
115 os <<
" obstructions=";
118 os <<
"\"" <<
str <<
"\"";
122 os <<
",\"" <<
str <<
"\"";
126 os <<
" permanent_obstructions=";
129 os <<
"\"" <<
str <<
"\"";
133 os <<
",\"" <<
str <<
"\"";
174 auto nanos = chrono::duration_cast<chrono::nanoseconds>(
m_expiration_time.time_since_epoch()).count();
225 const uint8_t *end = *bufp + encoding_length;
226 size_t tmp_remain = encoding_length;
229 *remainp -= encoding_length;
239 for (
size_t i=0; i<length; i++) {
246 for (
size_t i=0; i<length; i++) {
253 for (
size_t i=0; i<length; i++) {
261 for (
size_t i=0; i<length; i++) {
268 for (
size_t i=0; i<length; i++)
276 uint16_t definition_version) {
277 if (definition_version < 4)
278 decode_old(bufp, remainp, definition_version);
280 Entity::decode(bufp, remainp);
284 uint16_t definition_version) {
289 if (definition_version >= 2)
308 for (
size_t i=0; i<length; i++) {
315 for (
size_t i=0; i<length; i++) {
322 for (
size_t i=0; i<length; i++) {
327 if (definition_version >= 3) {
331 for (
size_t i=0; i<length; i++) {
338 for (
size_t i=0; i<length; i++)
368 std::vector<MetaLog::EntityPtr> entities;
369 entities.reserve(1 + additional.size() +
m_sub_ops.size());
373 entities.push_back(shared_from_this());
375 for (
auto & entity : additional) {
377 if (op && op->removal_approved())
379 entities.push_back(entity);
382 std::vector<int64_t> new_sub_ops;
385 if (op->removal_approved())
386 op->mark_for_removal();
388 new_sub_ops.push_back(op->id());
389 entities.push_back(op);
391 m_context->mml_writer->record_state(entities);
392 for (
auto & entity : entities) {
394 if (op && op->marked_for_removal())
395 m_context->reference_manager->remove(op);
397 m_sub_ops.swap(new_sub_ops);
401 std::vector<MetaLog::EntityPtr> &additional) {
412 op->remove_approval_add(op->get_remove_approval_mask());
416 std::stringstream sout;
417 sout <<
"Operation failed (" << *
this <<
") " <<
Error::get_text(error) <<
" - " << msg;
429 std::vector<MetaLog::EntityPtr> entities;
431 entities.push_back(additional);
453 std::vector<MetaLog::EntityPtr> entities;
455 entities.push_back(additional);
478 sub_ops.push_back(
m_context->reference_manager->get(
id));
509 return blocked_on_entry;
513 vector<MetaLog::EntityPtr> entities;
517 if (op->get_error()) {
521 op->remove_approval_add(op->get_remove_approval_mask());
522 string dependency_string =
523 format(
"%s subop %s %lld",
name().c_str(), op->name().c_str(),
524 (
Lld)op->hash_code());
527 entities.push_back(op);
537 string dependency_string =
538 format(
"%s subop %s %lld",
name().c_str(), operation->name().c_str(),
539 (
Lld)operation->hash_code());
540 operation->add_obstruction_permanent(dependency_string);
541 operation->set_remove_approval_mask(0x01);
542 m_context->reference_manager->add(operation);
558 StateInfo state_info[] = {
594 typedef std::unordered_map<int32_t, const char *> TextMap;
596 TextMap &build_text_map() {
597 TextMap *map =
new TextMap();
598 for (
int i=0; state_info[i].text != 0; i++)
599 (*map)[state_info[i].code] = state_info[i].text;
603 TextMap &text_map = build_text_map();
608 namespace OperationState {
610 const char *text = text_map[state];
612 return "STATE NOT REGISTERED";
virtual void encode_result(uint8_t **bufp) const
Encode operation result.
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
virtual void display_state(std::ostream &os)=0
Write human readable operation state to output stream.
ContextPtr m_context
Pointer to Master context.
uint16_t m_remove_approvals
Remove approvals received.
String m_error_msg
Result error message.
virtual size_t encoded_length_state() const =0
Encoded length of operation state.
std::string String
A String is simply a typedef to std::string.
std::shared_ptr< Entity > EntityPtr
Smart pointer to Entity.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
chrono::time_point< fast_clock > time_point
bool validate_subops()
Handles the results of sub operations.
virtual void decode_result(const uint8_t **bufp, size_t *remainp)
Decode operation result.
Po::typed_value< String > * str(String *v=0)
void decode(const uint8_t **bufp, size_t *remainp, uint16_t definition_version) override
Decode operation.
Declarations for Operation.
virtual uint8_t encoding_version_state() const =0
Returns version of encoding format of state.
virtual void obstructions(DependencySet &obstructions)
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
uint16_t m_remove_approval_mask
Remove approval mask.
int32_t m_original_type
Original entity type read from MML (prior to conversion)
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
const char * get_text(int32_t state)
const char * RECOVER_SERVER
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
Declarations for ReferenceManager.
int64_t m_hash_code
Hash code uniqely identifying operation.
EventPtr m_event
Pointer to client event (if any) that originated the operation.
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
uint8_t encoding_version() const override
Returns encoding version.
virtual void dependencies(DependencySet &dependencies)
uint8_t decode_i8(const uint8_t **bufp, size_t *remainp)
Decode a 8-bit integer (a byte/character)
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
int encoded_length_vi32(uint32_t val)
Length of a variable length encoded 32-bit integer (up to 5 bytes)
void encode_vi64(uint8_t **bufp, uint64_t val)
Encode a integer (up to 64-bit) in variable length encoding.
static time_point now() noexcept
const char * get_text(int error)
Returns a descriptive error message.
size_t encoded_length_internal() const override
Returns internal serialized length.
uint16_t decode_i16(const uint8_t **bufp, size_t *remainp)
Decode a 16-bit integer in little-endian order.
int32_t m_error
Result error code.
void add_dependency(const String &dependency)
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
virtual void decode_state_old(uint8_t version, const uint8_t **bufp, size_t *remainp)=0
Compatibility Macros for C/C++.
DependencySet m_obstructions_permanent
Set of permanent obstructions.
void stage_subop(std::shared_ptr< Operation > operation)
Stages a sub operation for execution.
bool removal_approved()
Checks if all remove approvals have been received.
void encode_i16(uint8_t **bufp, uint16_t val)
Encode a 16-bit integer in little-endian order.
void encode_internal(uint8_t **bufp) const override
Writes serialized representation of object to a buffer.
void encode_i64(uint8_t **bufp, uint64_t val)
Encode a 64-bit integer in little-endian order.
Functions to serialize/deserialize primitives to/from a memory buffer.
void record_state()
Records operation state to the MML.
const String name() override=0
Name of operation used for exclusivity.
bool m_blocked
Flag indicating if operation is blocked.
bool m_ephemeral
Indicates if operation is ephemeral and does not get persisted to MML.
void decode_internal(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Reads serialized representation of object from a buffer.
const char * RECOVERY_BLOCKER
void encode_vstr(uint8_t **bufp, const void *buf, size_t len)
Encode a buffer as variable length string (vint64, data, null)
uint64_t decode_vi64(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 64-bit.
long long int Lld
Shortcut for printf formats.
void encode_vi32(uint8_t **bufp, uint32_t val)
Encode a integer (up to 32-bit) in variable length encoding.
DependencySet m_obstructions
Set of obstructions.
virtual void exclusivities(DependencySet &exclusivities)
DependencySet m_dependencies
Set of dependencies.
void fetch_sub_operations(std::vector< std::shared_ptr< Operation > > &sub_ops)
void decode_old(const uint8_t **bufp, size_t *remainp, uint16_t definition_version)
#define HT_INFOF(msg,...)
#define HT_THROWF(_code_, _fmt_,...)
Abstract base class for master operations.
Operation(ContextPtr &context, int32_t type)
Constructor with operation type specifier.
void display(std::ostream &os) override
Write human readable string represenation of operation to output stream.
std::vector< int64_t > m_sub_ops
Vector of sub operations IDs.
virtual void decode_state(uint8_t version, const uint8_t **bufp, size_t *remainp)=0
Decode operation state.
bool m_unblock_on_exit
Flag to signal operation to be unblocked on exit (post_run())
virtual void encode_state(uint8_t **bufp) const =0
Encode operation state.
DependencySet m_exclusivities
Set of exclusivities.
std::set< String > DependencySet
Set of dependency string.
void complete_ok(std::vector< MetaLog::EntityPtr > &additional)
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
void complete_error(int error, const String &msg, std::vector< MetaLog::EntityPtr > &additional)
Completes operation with error.
uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 32-bit.
virtual size_t encoded_result_length() const
Length of encoded operation result.
void encode_i8(uint8_t **bufp, uint8_t val)
Encodes a byte into the given buffer.
int encoded_length_vi64(uint64_t val)
Length of a variable length encoded 64-bit integer (up to 9 bytes)
ClockT::time_point m_expiration_time
Expiration time (used by ResponseManager)