0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ResponseManager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "ResponseManager.h"
31 
32 #include <chrono>
33 
34 using namespace Hypertable;
35 using namespace std;
36 
38  ResponseManagerContext::OperationExpirationTimeIndex &op_expiration_time_index = m_context->expirable_ops.get<1>();
39  ResponseManagerContext::OperationExpirationTimeIndex::iterator op_iter;
40  ResponseManagerContext::DeliveryExpirationTimeIndex &delivery_expiration_time_index = m_context->delivery_list.get<1>();
41  ResponseManagerContext::DeliveryExpirationTimeIndex::iterator delivery_iter;
42  ClockT::time_point now, expire_time;
43  bool timed_wait;
44  bool shutdown = false;
45  std::vector<OperationPtr> operations;
46  std::vector<MetaLog::EntityPtr> entities;
47 
48  try {
49 
50  while (!shutdown) {
51 
52  {
53  unique_lock<mutex> lock(m_context->mutex);
54 
55  timed_wait = false;
56  if (!op_expiration_time_index.empty()) {
57  expire_time = op_expiration_time_index.begin()->expiration_time();
58  timed_wait = true;
59  }
60 
61  if (!delivery_expiration_time_index.empty()) {
62  if (!timed_wait || delivery_expiration_time_index.begin()->expiration_time < expire_time)
63  expire_time = delivery_expiration_time_index.begin()->expiration_time;
64  timed_wait = true;
65  }
66 
67  if (timed_wait)
68  m_context->cond.wait_until(lock, expire_time);
69  else
70  m_context->cond.wait(lock);
71 
72  shutdown = m_context->shutdown;
73 
74  now = ClockT::now();
75 
76  op_iter = op_expiration_time_index.begin();
77  while (op_iter != op_expiration_time_index.end()) {
78  if (op_iter->expiration_time() < now) {
79  m_context->removal_queue.push_back(op_iter->op);
80  op_iter = op_expiration_time_index.erase(op_iter);
81  }
82  else
83  break;
84  }
85 
86  delivery_iter = delivery_expiration_time_index.begin();
87  while (delivery_iter != delivery_expiration_time_index.end()) {
88  if (delivery_iter->expiration_time < now)
89  delivery_iter = delivery_expiration_time_index.erase(delivery_iter);
90  else
91  break;
92  }
93 
94  operations.clear();
95  entities.clear();
96  if (!m_context->removal_queue.empty()) {
97  for (auto & operation : m_context->removal_queue) {
98  if (!operation->ephemeral()) {
99  HT_ASSERT(m_context->mml_writer);
100  operations.push_back(operation);
101  entities.push_back(operation);
102  }
103  }
104  m_context->removal_queue.clear();
105  }
106  }
107 
108  if (!entities.empty())
109  m_context->mml_writer->record_removal(entities);
110 
111  }
112  }
113  catch (Exception &e) {
114  HT_ERROR_OUT << e << HT_END;
115  }
116 
117 }
118 
119 
120 void ResponseManager::add_delivery_info(int64_t operation_id, EventPtr &event) {
121  lock_guard<mutex> lock(m_context->mutex);
122  ResponseManagerContext::OperationIdentifierIndex &operation_identifier_index = m_context->expirable_ops.get<2>();
123  ResponseManagerContext::OperationIdentifierIndex::iterator iter;
125 
126  delivery_rec.id = operation_id;
127  if ((iter = operation_identifier_index.find(delivery_rec.id)) == operation_identifier_index.end()) {
128  delivery_rec.event = event;
129  delivery_rec.expiration_time = ClockT::now() +
130  chrono::milliseconds(event->header.timeout_ms);
131  m_context->delivery_list.push_back(delivery_rec);
132  }
133  else {
134  int error = Error::OK;
135  CommHeader header;
136  header.initialize_from_request_header(event->header);
137  CommBufPtr cbp(new CommBuf(header, iter->op->encoded_result_length()));
138  iter->op->encode_result( cbp->get_data_ptr_address() );
139  error = m_context->comm->send_response(event->addr, cbp);
140  if (error != Error::OK)
141  HT_ERRORF("Problem sending ID response back for %s operation (id=%lld) - %s",
142  iter->op->label().c_str(), (Lld)iter->op->id(), Error::get_text(error));
143  m_context->removal_queue.push_back(iter->op);
144  operation_identifier_index.erase(iter);
145  m_context->cond.notify_all();
146  }
147 }
148 
149 
151  lock_guard<mutex> lock(m_context->mutex);
152  ResponseManagerContext::DeliveryIdentifierIndex &delivery_identifier_index = m_context->delivery_list.get<2>();
153  ResponseManagerContext::DeliveryIdentifierIndex::iterator iter;
154 
155  HT_ASSERT(operation->get_remove_approval_mask() == 0);
156 
157  if ((iter = delivery_identifier_index.find(operation->id())) == delivery_identifier_index.end())
158  m_context->expirable_ops.push_back(ResponseManagerContext::OperationRec(operation));
159  else {
160  int error = Error::OK;
161  CommHeader header;
162  header.initialize_from_request_header(iter->event->header);
163  CommBufPtr cbp(new CommBuf(header, operation->encoded_result_length()));
164  operation->encode_result( cbp->get_data_ptr_address() );
165  error = m_context->comm->send_response(iter->event->addr, cbp);
166  if (error != Error::OK)
167  HT_ERRORF("Problem sending ID response back for %s operation (id=%lld) - %s",
168  operation->label().c_str(), (Lld)operation->id(), Error::get_text(error));
169  m_context->removal_queue.push_back(operation);
170  delivery_identifier_index.erase(iter);
171  m_context->cond.notify_all();
172  }
173 }
174 
175 
177  lock_guard<mutex> lock(m_context->mutex);
178  m_context->shutdown = true;
179  m_context->cond.notify_all();
180 }
void initialize_from_request_header(CommHeader &req_header)
Initializes header from req_header.
Definition: CommHeader.h:128
void shutdown()
Initiates shutdown sequence.
int64_t id
Corresponding operation identifier.
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
DeliveryList::nth_index< 1 >::type DeliveryExpirationTimeIndex
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
void operator()()
Worker thread run method.
ClockT::time_point expiration_time
Delivery expiration time specified by client.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Record holding completed operation information.
static time_point now() noexcept
Definition: fast_clock.cc:37
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
void add_operation(OperationPtr &operation)
Queues a completed operation and/or delivers response.
Compatibility Macros for C/C++.
OperationList::nth_index< 2 >::type OperationIdentifierIndex
#define HT_END
Definition: Logger.h:220
#define HT_ERROR_OUT
Definition: Logger.h:301
Hypertable definitions
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
DeliveryList::nth_index< 2 >::type DeliveryIdentifierIndex
Record holding response delivery information for an operation.
This is a generic exception class for Hypertable.
Definition: Error.h:314
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
EventPtr event
Event object containing return address for operation result.
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
OperationList::nth_index< 1 >::type OperationExpirationTimeIndex
void add_delivery_info(int64_t operation_id, EventPtr &event)
Adds response delivery information and/or delivers response.
Declarations for ResponseManager.