39 ResponseManagerContext::OperationExpirationTimeIndex::iterator op_iter;
41 ResponseManagerContext::DeliveryExpirationTimeIndex::iterator delivery_iter;
44 bool shutdown =
false;
45 std::vector<OperationPtr> operations;
46 std::vector<MetaLog::EntityPtr> entities;
53 unique_lock<mutex> lock(m_context->mutex);
56 if (!op_expiration_time_index.empty()) {
57 expire_time = op_expiration_time_index.begin()->expiration_time();
61 if (!delivery_expiration_time_index.empty()) {
62 if (!timed_wait || delivery_expiration_time_index.begin()->expiration_time < expire_time)
63 expire_time = delivery_expiration_time_index.begin()->expiration_time;
68 m_context->cond.wait_until(lock, expire_time);
70 m_context->cond.wait(lock);
72 shutdown = m_context->shutdown;
76 op_iter = op_expiration_time_index.begin();
77 while (op_iter != op_expiration_time_index.end()) {
78 if (op_iter->expiration_time() < now) {
79 m_context->removal_queue.push_back(op_iter->op);
80 op_iter = op_expiration_time_index.erase(op_iter);
86 delivery_iter = delivery_expiration_time_index.begin();
87 while (delivery_iter != delivery_expiration_time_index.end()) {
88 if (delivery_iter->expiration_time < now)
89 delivery_iter = delivery_expiration_time_index.erase(delivery_iter);
96 if (!m_context->removal_queue.empty()) {
97 for (
auto & operation : m_context->removal_queue) {
98 if (!operation->ephemeral()) {
100 operations.push_back(operation);
101 entities.push_back(operation);
104 m_context->removal_queue.clear();
108 if (!entities.empty())
109 m_context->mml_writer->record_removal(entities);
121 lock_guard<mutex> lock(m_context->mutex);
123 ResponseManagerContext::OperationIdentifierIndex::iterator iter;
126 delivery_rec.
id = operation_id;
127 if ((iter = operation_identifier_index.find(delivery_rec.
id)) == operation_identifier_index.end()) {
128 delivery_rec.
event = event;
130 chrono::milliseconds(event->header.timeout_ms);
131 m_context->delivery_list.push_back(delivery_rec);
138 iter->op->encode_result( cbp->get_data_ptr_address() );
139 error = m_context->comm->send_response(event->addr, cbp);
141 HT_ERRORF(
"Problem sending ID response back for %s operation (id=%lld) - %s",
143 m_context->removal_queue.push_back(iter->op);
144 operation_identifier_index.erase(iter);
145 m_context->cond.notify_all();
151 lock_guard<mutex> lock(m_context->mutex);
153 ResponseManagerContext::DeliveryIdentifierIndex::iterator iter;
155 HT_ASSERT(operation->get_remove_approval_mask() == 0);
157 if ((iter = delivery_identifier_index.find(operation->id())) == delivery_identifier_index.end())
164 operation->encode_result( cbp->get_data_ptr_address() );
165 error = m_context->comm->send_response(iter->event->addr, cbp);
167 HT_ERRORF(
"Problem sending ID response back for %s operation (id=%lld) - %s",
169 m_context->removal_queue.push_back(operation);
170 delivery_identifier_index.erase(iter);
171 m_context->cond.notify_all();
177 lock_guard<mutex> lock(m_context->mutex);
178 m_context->shutdown =
true;
179 m_context->cond.notify_all();
void shutdown()
Initiates shutdown sequence.
int64_t id
Corresponding operation identifier.
chrono::time_point< fast_clock > time_point
DeliveryList::nth_index< 1 >::type DeliveryExpirationTimeIndex
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
void operator()()
Worker thread run method.
ClockT::time_point expiration_time
Delivery expiration time specified by client.
Record holding completed operation information.
static time_point now() noexcept
const char * get_text(int error)
Returns a descriptive error message.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
void add_operation(OperationPtr &operation)
Queues a completed operation and/or delivers response.
Compatibility Macros for C/C++.
OperationList::nth_index< 2 >::type OperationIdentifierIndex
long long int Lld
Shortcut for printf formats.
DeliveryList::nth_index< 2 >::type DeliveryIdentifierIndex
Record holding response delivery information for an operation.
This is a generic exception class for Hypertable.
Message buffer for holding data to be transmitted over a network.
#define HT_ERRORF(msg,...)
EventPtr event
Event object containing return address for operation result.
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
OperationList::nth_index< 1 >::type OperationExpirationTimeIndex
void add_delivery_info(int64_t operation_id, EventPtr &event)
Adds response delivery information and/or delivers response.
Declarations for ResponseManager.