52 #if defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
59 #include <netinet/in.h>
60 #include <netinet/tcp.h>
61 #include <sys/socket.h>
62 #include <sys/types.h>
63 #include <arpa/inet.h>
76 HT_ERROR(
"ReactorFactory::initialize must be called before creating "
77 "AsyncComm::comm object");
92 m_handler_map->decomission_all();
95 m_handler_map->wait_for_empty();
113 if (m_handler_map->checkout_handler(addr, &io_handler) ==
Error::OK) {
120 m_handler_map->insert_handler(io_handler);
125 HT_THROWF(error,
"Problem polling on raw socket bound to %s",
136 int error = m_handler_map->contains_data_handler(addr);
146 if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
148 HT_ERRORF(
"socket: %s", strerror(errno));
154 m_local_addr.sin_port = htons(port);
157 if ((::bind(sd, (
const sockaddr *)&m_local_addr,
sizeof(sockaddr_in))) < 0) {
158 if (errno == EADDRINUSE) {
163 HT_ERRORF(
"bind: %s: %s", m_local_addr.format().c_str(), strerror(errno));
169 return connect_socket(sd, addr, default_handler);
178 int error = m_handler_map->contains_data_handler(addr);
187 if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
189 HT_ERRORF(
"socket: %s", strerror(errno));
194 if ((::bind(sd, (
const sockaddr *)&local_addr.
inet,
sizeof(sockaddr_in))) < 0) {
196 HT_ERRORF(
"bind: %s: %s", local_addr.
to_str().c_str(), strerror(errno));
200 return connect_socket(sd, addr, default_handler);
205 return m_handler_map->set_alias(addr, alias);
211 listen(addr, chf, null_handler);
217 return m_handler_map->add_proxy(proxy, hostname, addr);
222 return m_handler_map->remove_proxy(proxy);
228 return m_handler_map->translate_proxy_address(proxy_addr, addr);
232 m_handler_map->get_proxy_map(proxy_map);
236 return m_handler_map->wait_for_proxy_map(timer);
249 if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
255 #if defined(__linux__)
256 if (setsockopt(sd, SOL_TCP, TCP_NODELAY, &one,
sizeof(one)) < 0 && m_verbose)
257 HT_ERRORF(
"setting TCP_NODELAY: %s", strerror(errno));
258 #elif defined(__sun__)
259 if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (
char*)&one,
sizeof(one)) < 0 && m_verbose)
260 HT_ERRORF(
"setting TCP_NODELAY: %s", strerror(errno));
261 #elif defined(__APPLE__) || defined(__FreeBSD__)
262 if (setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, &one,
sizeof(one)) < 0 && m_verbose)
263 HT_WARNF(
"setsockopt(SO_NOSIGPIPE) failure: %s", strerror(errno));
264 if (setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &one,
sizeof(one)) < 0 && m_verbose)
265 HT_WARNF(
"setsockopt(SO_REUSEPORT) failure: %s", strerror(errno));
268 if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one)) < 0 && m_verbose)
269 HT_ERRORF(
"setting SO_REUSEADDR: %s", strerror(errno));
271 int bind_attempts = 0;
272 while ((::bind(sd, (
const sockaddr *)&addr.
inet,
sizeof(sockaddr_in))) < 0) {
273 if (bind_attempts == 24)
275 addr.
to_str().c_str(), strerror(errno));
277 HT_INFOF(
"Unable to bind to %s: %s, will retry in 10 seconds...",
278 addr.
to_str().c_str(), strerror(errno));
279 this_thread::sleep_for(chrono::milliseconds(10000));
283 if (::listen(sd, 1000) < 0)
286 handler =
new IOHandlerAccept(sd, default_handler, m_handler_map, chf);
287 m_handler_map->insert_handler(handler);
292 HT_THROWF(error,
"Problem polling on listen socket bound to %s",
305 if ((error = m_handler_map->checkout_handler(addr, &data_handler)) !=
Error::OK) {
313 return send_request(data_handler, timeout_ms, cbuf, resp_handler);
322 if (resp_handler == 0) {
327 cbuf->header.id = ms_next_request_id++;
328 if (cbuf->header.id == 0)
329 cbuf->header.id = ms_next_request_id++;
332 cbuf->header.timeout_ms = timeout_ms;
333 cbuf->write_header_and_reset();
335 int error = data_handler->
send_message(cbuf, timeout_ms, resp_handler);
337 m_handler_map->decomission_handler(data_handler);
347 if ((error = m_handler_map->checkout_handler(addr, &data_handler)) !=
Error::OK) {
357 cbuf->write_header_and_reset();
361 m_handler_map->decomission_handler(data_handler);
374 if ((sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
380 int bufsize = 4*32768;
382 if (setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (
char *)&bufsize,
sizeof(bufsize))
385 HT_ERRORF(
"setsockopt(SO_SNDBUF) failed - %s", strerror(errno));
387 if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (
char *)&bufsize,
sizeof(bufsize))
390 HT_ERRORF(
"setsockopt(SO_RCVBUF) failed - %s", strerror(errno));
397 #if defined(__linux__)
399 setsockopt(sd, SOL_IP, IP_TOS, &opt,
sizeof(opt));
401 setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &opt,
sizeof(opt));
402 #elif defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
403 opt = IPTOS_LOWDELAY;
404 setsockopt(sd, IPPROTO_IP, IP_TOS, &opt,
sizeof(opt));
409 int bind_attempts = 0;
410 while ((::bind(sd, (
const sockaddr *)&addr.
inet,
sizeof(sockaddr_in))) < 0) {
411 if (bind_attempts == 24)
413 addr.
to_str().c_str(), strerror(errno));
415 HT_INFOF(
"Unable to bind to %s: %s, will retry in 10 seconds...",
416 addr.
to_str().c_str(), strerror(errno));
417 this_thread::sleep_for(chrono::milliseconds(10000));
425 m_handler_map->insert_handler(handler);
430 HT_THROWF(error,
"Problem polling on datagram socket bound to %s",
444 if ((error = m_handler_map->checkout_handler(send_addr, &handler)) !=
Error::OK) {
446 HT_ERRORF(
"Datagram send/local address %s not registered",
447 send_addr.
to_str().c_str());
456 cbuf->write_header_and_reset();
460 m_handler_map->decomission_handler(handler);
469 m_timer_reactor->add_timer(timer);
479 m_timer_reactor->add_timer(timer);
484 m_timer_reactor->cancel_timer(handler);
495 if (m_handler_map->checkout_handler(addr, &data_handler) ==
Error::OK)
496 handler = data_handler;
497 else if (m_handler_map->checkout_handler(addr, &datagram_handler) ==
Error::OK)
498 handler = datagram_handler;
499 else if (m_handler_map->checkout_handler(addr, &accept_handler) ==
Error::OK)
500 handler = accept_handler;
501 else if (m_handler_map->checkout_handler(addr, &raw_handler) ==
Error::OK)
502 handler = raw_handler;
508 m_handler_map->decomission_handler(handler);
515 uint16_t starting_port = ntohs(addr.sin_port);
517 for (
size_t i=0; i<15; i++) {
519 if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
520 HT_FATALF(
"socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) failure: %s",
523 if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one)) < 0)
524 HT_FATALF(
"setting TCP socket SO_REUSEADDR: %s", strerror(errno));
526 #if defined(__APPLE__) || defined(__FreeBSD__)
527 if (setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &one,
sizeof(one)) < 0 && m_verbose)
528 HT_WARNF(
"setsockopt(SO_REUSEPORT) failure: %s", strerror(errno));
532 check_addr.sin_port = htons(starting_port+i);
534 if (::bind(sd, (
const sockaddr *)&check_addr,
sizeof(sockaddr_in)) == 0) {
536 addr.sin_port = check_addr.sin_port;
542 HT_FATALF(
"Unable to find available TCP port in range [%d..%d]",
543 (
int)addr.sin_port, (
int)addr.sin_port+14);
551 uint16_t starting_port = ntohs(addr.sin_port);
553 for (
size_t i=0; i<15; i++) {
555 if ((sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
556 HT_FATALF(
"socket(AF_INET, SOCK_DGRAM, 0) failure: %s", strerror(errno));
558 if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one)) < 0)
559 HT_FATALF(
"setting UDP socket SO_REUSEADDR: %s", strerror(errno));
561 #if defined(__APPLE__) || defined(__FreeBSD__)
562 if (setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &one,
sizeof(one)) < 0 && m_verbose)
563 HT_WARNF(
"setsockopt(SO_REUSEPORT) failure: %s", strerror(errno));
567 check_addr.sin_port = htons(starting_port+i);
569 if (::bind(sd, (
const sockaddr *)&addr,
sizeof(sockaddr_in)) == 0) {
571 addr.sin_port = check_addr.sin_port;
577 HT_FATALF(
"Unable to find available UDP port in range [%d..%d]",
578 (
int)addr.sin_port, (
int)addr.sin_port+14);
598 if (!m_handler_map->translate_proxy_address(addr, &inet_addr))
600 connectable_addr.
set_inet(inet_addr);
603 connectable_addr = addr;
608 #if defined(__linux__)
609 if (setsockopt(sd, SOL_TCP, TCP_NODELAY, &one,
sizeof(one)) < 0 && m_verbose)
610 HT_ERRORF(
"setsockopt(TCP_NODELAY) failure: %s", strerror(errno));
611 #elif defined(__sun__)
612 if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one,
sizeof(one)) < 0 && m_verbose)
613 HT_ERRORF(
"setsockopt(TCP_NODELAY) failure: %s", strerror(errno));
614 #elif defined(__APPLE__) || defined(__FreeBSD__)
615 if (setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, &one,
sizeof(one)) < 0 && m_verbose)
616 HT_WARNF(
"setsockopt(SO_NOSIGPIPE) failure: %s", strerror(errno));
622 m_handler_map->insert_handler(handler);
624 while (::connect(sd, (
struct sockaddr *)&connectable_addr.
inet,
sizeof(
struct sockaddr_in))
626 if (errno == EINTR) {
627 this_thread::sleep_for(chrono::milliseconds(1000));
630 else if (errno == EINPROGRESS) {
635 HT_ERRORF(
"Polling problem on connection to %s: %s",
636 connectable_addr.
to_str().c_str(), strerror(errno));
637 m_handler_map->remove_handler(handler);
642 m_handler_map->remove_handler(handler);
653 HT_ERRORF(
"Polling problem on connection to %s: %s (%s)",
654 connectable_addr.
to_str().c_str(),
656 m_handler_map->remove_handler(handler);
int start_polling(int mode=PollEvent::READ)
Start polling on the handler with the poll interest specified in mode.
static bool verbose
Verbose mode.
int register_socket(int sd, const CommAddress &addr, RawSocketHandler *handler)
Registers an externally managed socket with comm event loop.
void set_proxy(const String &proxy)
Sets the proxy name for this connection.
static std::atomic< uint32_t > ms_next_request_id
Atomic integer used for assinging request IDs.
#define HT_WARNF(msg,...)
int send_message(const InetAddr &addr, CommBufPtr &cbp)
Sends a message.
PropertiesPtr properties
This singleton map stores all options.
std::string String
A String is simply a typedef to std::string.
I/O handler for datagram (UDP) sockets.
int connect(const CommAddress &addr, const DispatchHandlerPtr &default_handler)
Establishes a TCP connection and attaches a default dispatch handler.
chrono::time_point< fast_clock > time_point
Declarations for ReactorRunner.
Abstract base class for application dispatch handlers registered with AsyncComm.
bool wait_for_proxy_load(Timer &timer)
Waits until a CommHeader::FLAGS_BIT_PROXY_MAP_UPDATE message is received from the proxy master...
int set_alias(const InetAddr &addr, const InetAddr &alias)
Sets an alias for a TCP connection.
bool translate_proxy(const String &proxy, InetAddr *addr)
Translates a proxy name to an IP address.
static std::vector< ReactorPtr > ms_reactors
Vector of reactors (last position is timer reactor)
InetAddr get_address()
Gets the handler socket address.
static std::default_random_engine rng
Pseudo random number generator.
ClockT::time_point expire_time
Absolute expiration time.
int send_message(CommBufPtr &cbp, uint32_t timeout_ms=0, DispatchHandler *disp_handler=nullptr)
Sends message pointed to by cbp over socket associated with this I/O handler.
int send_datagram(const CommAddress &addr, const CommAddress &send_addr, CommBufPtr &cbuf)
Sends a datagram to a remote address.
static void destroy()
Destroys singleton instance of the Comm class.
Abstract base class for application raw socket handlers registered with AsyncComm.
void decrement_reference_count()
Decrement reference count.
void set_inet(sockaddr_in addr)
Sets address type to CommAddress::INET and inet value to addr.
DispatchHandlerPtr handler
Dispatch handler to receive TIMER event.
static HandlerMapPtr handler_map
Smart pointer to HandlerMap.
File system utility functions.
static void destroy()
This method shuts down the reactors.
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
static time_point now() noexcept
Declarations for IOHandlerAccept.
void find_available_tcp_port(InetAddr &addr)
Finds an unused TCP port starting from addr.
const char * get_text(int error)
Returns a descriptive error message.
Encapsulate an internet address.
void close_socket(const CommAddress &addr)
Closes the socket specified by the addr argument.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p.
std::unordered_map< String, ProxyAddressInfo > ProxyMapT
Forward mapping hash type from proxy name to ProxyAddressInfo.
Compatibility Macros for C/C++.
Base class for socket descriptor I/O handlers.
int connect_socket(int sd, const CommAddress &addr, const DispatchHandlerPtr &default_handler)
Creates a TCP socket connection.
I/O handler for TCP sockets.
String to_str() const
Returns string representation of address.
Time related declarations.
Declarations for IOHandlerData.
static bool set_flags(int fd, int flags)
Sets fcntl flags of a socket.
Comm()
Private constructor (prevent non-singleton usage).
int send_response(const CommAddress &addr, CommBufPtr &cbuf)
Sends a response message back over a connection.
static Comm * ms_instance
Pointer to singleton instance of this class.
bool is_inet() const
Returns true if address is of type CommAddress::INET.
#define HT_FATALF(msg,...)
void find_available_udp_port(InetAddr &addr)
Finds an unused UDP port starting from addr.
Writing can be performed without blocking.
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Entry point to AsyncComm service.
static std::mutex ms_mutex
Mutex for serializing access to ms_instance
void get_proxy_map(ProxyMapT &proxy_map)
Returns the proxy map.
#define HT_INFOF(msg,...)
#define HT_THROWF(_code_, _fmt_,...)
Internet address wrapper classes and utility functions.
void create_datagram_receive_socket(CommAddress &addr, int tos, const DispatchHandlerPtr &handler)
Creates a socket for receiving datagrams and attaches handler as the default dispatch handler...
Priority
Enumeration for reactor priority.
Declarations for ReactorFactory.
A timer class to keep timeout states across AsyncComm related calls.
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Response should be ignored.
void decrement_reference_count(IOHandler *handler)
Decrements the reference count of handler.
#define HT_ERRORF(msg,...)
void listen(const CommAddress &addr, ConnectionHandlerFactoryPtr &chf)
Creates listen (accept) socket on addr.
int set_timer_absolute(ClockT::time_point expire_time, const DispatchHandlerPtr &handler)
Sets a timer for absolute time expire_time.
#define HT_ON_OBJ_SCOPE_EXIT(...)
InetAddr inet
IPv4:port address.
int set_timer(uint32_t duration_millis, const DispatchHandlerPtr &handler)
Sets a timer for duration_millis milliseconds in the future.
void cancel_timer(const DispatchHandlerPtr &handler)
Cancels all scheduled timers registered with the dispatch handler handler.
I/O handler for accept (listen) sockets.
void alias(const String &cmdline_opt, const String &file_opt, bool overwrite)
Setup command line option alias for config file option.
bool is_proxy() const
Returns true if address is of type CommAddress::PROXY.
System information and statistics based on libsigar.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
int send_request(const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler)
Sends a request message over a connection, expecting a response.
int add_proxy(const String &proxy, const String &hostname, const InetAddr &addr)
Adds a proxy name for a TCP connection.
I/O handler for raw sockets.
static const NetInfo & net_info()
Retrieves updated Network information (see SystemInfo.h)
Address abstraction to hold either proxy name or IPv4:port address.
Executes user-defined functions when leaving the current scope.
static bool proxy_master
Set to true if this process is acting as "Proxy Master".
static void get_timer_reactor(ReactorPtr &reactor)
This method returns the timer reactor.
int remove_proxy(const String &proxy)
Removes a proxy name for a TCP connection.