40 lock_guard<mutex> lock(m_mutex);
42 == m_accept_handler_map.end());
47 lock_guard<mutex> lock(m_mutex);
49 == m_data_handler_map.end());
50 m_data_handler_map[handler->
get_address()] = handler;
56 lock_guard<mutex> lock(m_mutex);
58 == m_datagram_handler_map.end());
63 lock_guard<mutex> lock(m_mutex);
65 == m_raw_handler_map.end());
66 m_raw_handler_map[handler->
get_address()] = handler;
72 lock_guard<mutex> lock(m_mutex);
74 if ((*handler = lookup_accept_handler(addr.
inet)) == 0)
77 HT_ASSERT(!(*handler)->is_decomissioned());
79 (*handler)->increment_reference_count();
87 lock_guard<mutex> lock(m_mutex);
91 if ((error = translate_address(addr, &inet_addr)) !=
Error::OK)
94 if ((*handler = lookup_data_handler(inet_addr)) == 0)
97 HT_ASSERT(!(*handler)->is_decomissioned());
99 (*handler)->increment_reference_count();
106 lock_guard<mutex> lock(m_mutex);
108 if ((*handler = lookup_datagram_handler(addr.
inet)) == 0)
111 HT_ASSERT(!(*handler)->is_decomissioned());
113 (*handler)->increment_reference_count();
120 lock_guard<mutex> lock(m_mutex);
122 if ((*handler = lookup_raw_handler(addr.
inet)) == 0)
125 HT_ASSERT(!(*handler)->is_decomissioned());
127 (*handler)->increment_reference_count();
133 lock_guard<mutex> lock(m_mutex);
138 lock_guard<mutex> lock(m_mutex);
143 if ((error = translate_address(addr, &inet_addr)) !=
Error::OK)
146 if ((handler = lookup_data_handler(inet_addr)) == 0)
153 lock_guard<mutex> lock(m_mutex);
156 if (m_data_handler_map.find(alias) != m_data_handler_map.end())
159 if ((iter = m_data_handler_map.find(addr)) == m_data_handler_map.end())
162 (*iter).second->set_alias(alias);
163 m_data_handler_map[
alias] = (*iter).second;
179 if ((diter = m_data_handler_map.find(remote_addr)) != m_data_handler_map.end()) {
181 m_data_handler_map.erase(diter);
184 if ((diter = m_data_handler_map.find(remote_addr)) != m_data_handler_map.end()) {
186 m_data_handler_map.erase(diter);
189 else if ((dgiter = m_datagram_handler_map.find(local_addr))
190 != m_datagram_handler_map.end()) {
192 m_datagram_handler_map.erase(dgiter);
194 else if ((aiter = m_accept_handler_map.find(local_addr))
195 != m_accept_handler_map.end()) {
197 m_accept_handler_map.erase(aiter);
199 else if ((riter = m_raw_handler_map.find(remote_addr))
200 != m_raw_handler_map.end()) {
202 m_raw_handler_map.erase(riter);
210 lock_guard<mutex> lock(m_mutex);
211 return remove_handler_unlocked(handler);
215 if (remove_handler_unlocked(handler) !=
Error::OK) {
216 HT_ASSERT(m_decomissioned_handlers.count(handler) > 0);
219 m_decomissioned_handlers.insert(handler);
224 lock_guard<mutex> lock(m_mutex);
231 for (diter = m_data_handler_map.begin(); diter != m_data_handler_map.end(); ++diter) {
232 m_decomissioned_handlers.insert(diter->second);
233 diter->second->decomission();
235 m_data_handler_map.clear();
238 for (dgiter = m_datagram_handler_map.begin();
239 dgiter != m_datagram_handler_map.end(); ++dgiter) {
240 m_decomissioned_handlers.insert(dgiter->second);
241 dgiter->second->decomission();
243 m_datagram_handler_map.clear();
246 for (aiter = m_accept_handler_map.begin();
247 aiter != m_accept_handler_map.end(); ++aiter) {
248 m_decomissioned_handlers.insert(aiter->second);
249 aiter->second->decomission();
251 m_accept_handler_map.clear();
254 for (riter = m_raw_handler_map.begin();
255 riter != m_raw_handler_map.end(); ++riter) {
256 m_decomissioned_handlers.insert(riter->second);
257 riter->second->decomission();
259 m_raw_handler_map.clear();
264 lock_guard<mutex> lock(m_mutex);
265 bool is_decomissioned = m_decomissioned_handlers.count(handler) > 0;
274 if (!m_proxy_map.get_mapping(proxy_addr.
proxy, hostname, inet_addr))
282 unique_lock<mutex> lock(m_mutex);
283 m_cond.wait(lock, [
this](){
return m_decomissioned_handlers.empty(); });
288 lock_guard<mutex> lock(m_mutex);
289 HT_ASSERT(m_decomissioned_handlers.count(handler) > 0);
291 m_decomissioned_handlers.erase(handler);
292 if (m_decomissioned_handlers.empty())
300 lock_guard<mutex> lock(m_mutex);
303 m_proxy_map.update_mapping(proxy, hostname, addr, invalidated_map, new_map);
305 for (
const auto &v : new_map) {
306 IOHandler *handler = lookup_data_handler(v.second.addr);
311 return propagate_proxy_map(new_map);
315 lock_guard<mutex> lock(m_mutex);
317 m_proxy_map.remove_mapping(proxy, remove_map);
318 if (!remove_map.empty()) {
320 for (
const auto &v : remove_map) {
321 handler = lookup_data_handler(v.second.addr);
323 decomission_handler_unlocked(handler);
325 return propagate_proxy_map(remove_map);
332 m_proxy_map.get_map(proxy_map);
337 lock_guard<mutex> lock(m_mutex);
338 String mappings(message, message_len);
343 m_proxy_map.update_mappings(mappings, invalidated_map, new_map);
345 for (
const auto &v : invalidated_map) {
346 IOHandler *handler = lookup_data_handler(v.second.addr);
348 if (v.second.hostname ==
"--DELETED--")
349 decomission_handler_unlocked(handler);
353 for (
const auto &v : new_map) {
354 IOHandler *handler = lookup_data_handler(v.second.addr);
361 m_proxies_loaded =
true;
362 m_cond_proxy.notify_all();
366 lock_guard<mutex> lock(m_mutex);
368 CommBufPtr comm_buf = m_proxy_map.create_update_message();
369 comm_buf->write_header_and_reset();
375 unique_lock<mutex> lock(m_mutex);
377 auto drop_time = chrono::steady_clock::now() +
379 return m_cond_proxy.wait_until(lock, drop_time,
380 [
this](){
return m_proxies_loaded; });
386 if (mappings.empty())
392 for (
const auto &v : mappings)
393 mapping += v.first +
"\t" + v.second.hostname +
"\t" +
InetAddr::format(v.second.addr) +
"\n";
395 uint8_t *buffer =
new uint8_t [ mapping.length() + 1 ];
396 strcpy((
char *)buffer, mapping.c_str());
397 boost::shared_array<uint8_t> payload(buffer);
400 std::vector<IOHandler *> decomission;
401 for (iter = m_data_handler_map.begin(); iter != m_data_handler_map.end(); ++iter) {
404 CommBufPtr comm_buf = make_shared<CommBuf>(header, 0, payload, mapping.length()+1);
405 comm_buf->write_header_and_reset();
408 decomission.push_back(handler);
409 HT_ERRORF(
"Unable to propagate proxy mappings to %s - %s",
418 for (
auto handler : decomission)
419 decomission_handler_unlocked(handler);
430 if (!m_proxy_map.get_mapping(addr.
proxy, hostname, *inet_addr))
441 if (iter != m_accept_handler_map.end())
448 if (iter != m_data_handler_map.end())
455 if (iter != m_datagram_handler_map.end())
462 if (iter != m_raw_handler_map.end())
void set_proxy(const String &proxy)
Sets the proxy name for this connection.
virtual void disconnect()
Disconnect connection.
InetAddr get_local_address()
Get local socket address for connection.
std::string String
A String is simply a typedef to std::string.
I/O handler for datagram (UDP) sockets.
int checkout_handler(const CommAddress &addr, IOHandlerAccept **handler)
Checks out accept I/O handler associated with addr.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
bool wait_for_proxy_map(Timer &timer)
Waits for proxy map to get updated from a proxy map update message received from the master...
void decomission_handler_unlocked(IOHandler *handler)
Decomissions handler.
InetAddr get_address()
Gets the handler socket address.
void update_proxy_map(const char *message, size_t message_len)
Updates the proxy map with a proxy map update message received from the proxy master.
void increment_reference_count()
Increment reference count.
int set_alias(const InetAddr &addr, const InetAddr &alias)
Sets an alias address for an existing TCP address in map.
int send_message(CommBufPtr &cbp, uint32_t timeout_ms=0, DispatchHandler *disp_handler=nullptr)
Sends message pointed to by cbp over socket associated with this I/O handler.
uint32_t remaining()
Returns the remaining time till expiry.
bool destroy_ok(IOHandler *handler)
Determines if handler can be destoryed.
int remove_handler_unlocked(IOHandler *handler)
Removes handler from map without locking m_mutex.
void decrement_reference_count()
Decrement reference count.
Unordered map specialization for InetAddr keys.
Declarations for IOHandlerAccept.
IOHandlerAccept * lookup_accept_handler(const InetAddr &addr)
Finds accept I/O handler associated with addr.
int remove_proxy(const String &proxy)
Removes a proxy name from the proxy map.
const char * get_text(int error)
Returns a descriptive error message.
Encapsulate an internet address.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
void decomission()
Decomission handler.
IOHandlerRaw * lookup_raw_handler(const InetAddr &addr)
Finds raw I/O handler associated with addr.
int remove_handler(IOHandler *handler)
Removes handler from map.
std::unordered_map< String, ProxyAddressInfo > ProxyMapT
Forward mapping hash type from proxy name to ProxyAddressInfo.
IOHandlerData * lookup_data_handler(const InetAddr &addr)
Finds data (TCP) I/O handler associated with addr.
Compatibility Macros for C/C++.
void insert_handler(IOHandlerAccept *handler)
Inserts an accept handler.
Base class for socket descriptor I/O handlers.
I/O handler for TCP sockets.
void purge_handler(IOHandler *handler)
Purges (removes) handler.
Declarations for HandlerMap.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
void wait_for_empty()
Waits for map to become empty.
bool is_set() const
Returns true if address has been initialized.
InetAddr get_alias()
Get alias address for this connection.
void get_proxy_map(ProxyMapT &proxy_map)
Returns the proxy map.
void start()
Starts the timer.
int contains_data_handler(const CommAddress &addr)
Checks to see if addr is contained in map.
void decomission_all()
Decomissions all handlers.
Declarations for ReactorFactory.
A timer class to keep timeout states across AsyncComm related calls.
bool translate_proxy_address(const CommAddress &proxy_addr, InetAddr *addr)
Translates proxy_addr to its corresponding IPV4 address.
size_t reference_count()
Return reference count.
void decrement_reference_count(IOHandler *handler)
Decrements the reference count of handler.
int translate_address(const CommAddress &addr, InetAddr *inet_addr)
Translates addr to an InetAddr (IP address).
#define HT_ERRORF(msg,...)
InetAddr inet
IPv4:port address.
I/O handler for accept (listen) sockets.
int32_t propagate_proxy_map(IOHandlerData *handler)
Sends the current proxy map over connection identified by handler.
void alias(const String &cmdline_opt, const String &file_opt, bool overwrite)
Setup command line option alias for config file option.
bool is_proxy() const
Returns true if address is of type CommAddress::PROXY.
bool is_decomissioned()
Checks to see if handler is decomissioned.
IOHandlerDatagram * lookup_datagram_handler(const InetAddr &addr)
Finds datagram I/O handler associated with addr.
int add_proxy(const String &proxy, const String &hostname, const InetAddr &addr)
Adds or updates proxy information.
I/O handler for raw sockets.
Address abstraction to hold either proxy name or IPv4:port address.
static bool proxy_master
Set to true if this process is acting as "Proxy Master".