Entry point to AsyncComm service. More...
#include <Comm.h>
Public Member Functions | |
int | register_socket (int sd, const CommAddress &addr, RawSocketHandler *handler) |
Registers an externally managed socket with comm event loop. More... | |
int | connect (const CommAddress &addr, const DispatchHandlerPtr &default_handler) |
Establishes a TCP connection and attaches a default dispatch handler. More... | |
int | connect (const CommAddress &addr, const CommAddress &local_addr, const DispatchHandlerPtr &default_handler) |
Establishes a locally bound TCP connection and attaches a default dispatch handler. More... | |
int | set_alias (const InetAddr &addr, const InetAddr &alias) |
Sets an alias for a TCP connection. More... | |
int | add_proxy (const String &proxy, const String &hostname, const InetAddr &addr) |
Adds a proxy name for a TCP connection. More... | |
int | remove_proxy (const String &proxy) |
Removes a proxy name for a TCP connection. More... | |
bool | translate_proxy (const String &proxy, InetAddr *addr) |
Translates a proxy name to an IP address. More... | |
void | get_proxy_map (ProxyMapT &proxy_map) |
Returns the proxy map. More... | |
bool | wait_for_proxy_load (Timer &timer) |
Waits until a CommHeader::FLAGS_BIT_PROXY_MAP_UPDATE message is received from the proxy master. More... | |
void | listen (const CommAddress &addr, ConnectionHandlerFactoryPtr &chf) |
Creates listen (accept) socket on addr . More... | |
void | listen (const CommAddress &addr, ConnectionHandlerFactoryPtr &chf, const DispatchHandlerPtr &default_handler) |
Creates listen (accept) socket on addr and attaches a default dispatch handler. More... | |
int | send_request (const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler) |
Sends a request message over a connection, expecting a response. More... | |
int | send_response (const CommAddress &addr, CommBufPtr &cbuf) |
Sends a response message back over a connection. More... | |
void | create_datagram_receive_socket (CommAddress &addr, int tos, const DispatchHandlerPtr &handler) |
Creates a socket for receiving datagrams and attaches handler as the default dispatch handler. More... | |
int | send_datagram (const CommAddress &addr, const CommAddress &send_addr, CommBufPtr &cbuf) |
Sends a datagram to a remote address. More... | |
int | set_timer (uint32_t duration_millis, const DispatchHandlerPtr &handler) |
Sets a timer for duration_millis milliseconds in the future. More... | |
int | set_timer_absolute (ClockT::time_point expire_time, const DispatchHandlerPtr &handler) |
Sets a timer for absolute time expire_time . More... | |
void | cancel_timer (const DispatchHandlerPtr &handler) |
Cancels all scheduled timers registered with the dispatch handler handler . More... | |
void | close_socket (const CommAddress &addr) |
Closes the socket specified by the addr argument. More... | |
void | find_available_tcp_port (InetAddr &addr) |
Finds an unused TCP port starting from addr . More... | |
void | find_available_udp_port (InetAddr &addr) |
Finds an unused UDP port starting from addr . More... | |
Static Public Member Functions | |
static Comm * | instance () |
Creates/returns singleton instance of the Comm class. More... | |
static void | destroy () |
Destroys singleton instance of the Comm class. More... | |
Private Member Functions | |
Comm () | |
Private constructor (prevent non-singleton usage). More... | |
~Comm () | |
Destructor. More... | |
int | send_request (IOHandlerData *data_handler, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler) |
Sends a request message over a connection. More... | |
int | connect_socket (int sd, const CommAddress &addr, const DispatchHandlerPtr &default_handler) |
Creates a TCP socket connection. More... | |
Private Attributes | |
HandlerMapPtr | m_handler_map |
Pointer to IOHandler map. More... | |
ReactorPtr | m_timer_reactor |
Pointer to dedicated reactor for handling timer events. More... | |
InetAddr | m_local_addr |
Local address initialized to primary interface and empty port. More... | |
bool | m_verbose |
Verbose flag. More... | |
Static Private Attributes | |
static Comm * | ms_instance = NULL |
Pointer to singleton instance of this class. More... | |
static std::atomic< uint32_t > | ms_next_request_id |
Atomic integer used for assinging request IDs. More... | |
static std::mutex | ms_mutex |
Mutex for serializing access to ms_instance More... | |
Entry point to AsyncComm service.
There should be only one instance of this class per process and the static method ReactorFactory::initialize must be called prior to constructing this class in order to create the system-wide I/O reactor threads.
|
private |
Adds a proxy name for a TCP connection.
Hypertable uses proxy names (e.g. "rs1") to refer to servers so that the system can continue to operate properly even when servers are reassigned IP addresses, such as starting and stopping Hypertable running on EBS volumes in AWS EC2. This method adds a proxy name for the connection identified by addr
and pushes the new mapping to the remote end of all active connections.
proxy | Proxy name |
hostname | Hostname of remote machine |
addr | Connection address (remote address) |
void Comm::cancel_timer | ( | const DispatchHandlerPtr & | handler | ) |
void Comm::close_socket | ( | const CommAddress & | addr | ) |
Closes the socket specified by the addr argument.
This has the effect of closing the connection and removing it from the event demultiplexer (e.g epoll). It also causes all outstanding requests on the connection to get purged.
addr | Connection or accept or datagram address |
int Comm::connect | ( | const CommAddress & | addr, |
const DispatchHandlerPtr & | default_handler | ||
) |
Establishes a TCP connection and attaches a default dispatch handler.
This method establishes a TCP connection to addr
and associates it with default_handler
as the default dispatch handler for the connection. The two types of events that are delivered via the handler are CONNECTION_ESTABLISHED and DISCONNECT. No ERROR events will be deliverd via the handler because any errors that occur on the connection will result in the connection being closed, resulting in a DISCONNECT event only. If this method fails and returns an error code, the connection will not have been setup and default_handler
will not have been installed. The default dispatch handler, default_handler
, will never be called back via the calling thread. It will be called back from a reactor thread. Because reactor threads are used to service I/O events on many different sockets, the default dispatch handler should return quickly from the callback. When calling back into the default dispatch handler, the calling reactor thread does not hold any locks so the default dispatch handler callback may safely callback into the Comm object (e.g. send_response). Upon successful completion, addr
can be used to subsequently refer to the connection.
addr | Address to connect to |
default_handler | Smart pointer to default dispatch handler |
int Comm::connect | ( | const CommAddress & | addr, |
const CommAddress & | local_addr, | ||
const DispatchHandlerPtr & | default_handler | ||
) |
Establishes a locally bound TCP connection and attaches a default dispatch handler.
Establishes a TCP connection to addr
argument, binding the local side of the connection to local_addr
. A default dispatch handler is associated with the connection to receive CONNECTION_ESTABLISHED and DISCONNECT events. No ERROR events will be deliverd via the handler because any errors that occur on the connection will result in the connection being closed, resulting in a DISCONNECT event only. If this method fails and returns an error code, the connection will not have been setup and default_handler
will not have been installed. The default dispatch handler, default_handler
, will never be called back via the calling thread. It will be called back from a reactor thread. Because reactor threads are used to service I/O events on many different sockets, the default dispatch handler should return quickly from the callback. When calling back into the default dispatch handler, the calling reactor thread does not hold any locks so the default dispatch handler callback may safely callback into the Comm object (e.g. send_response). Upon successful completion, addr
can be used to subsequently refer to the connection.
addr | address to connect to |
local_addr | Local address to bind to |
default_handler | smart pointer to default dispatch handler |
|
private |
Creates a TCP socket connection.
--— Private methods --—
This method is called by the connect methods to setup a socket, connect to a remote address, and attach a data handler. If addr
is of type CommAddress::PROXY then it is translated. Then the socket is setup as follows:
O_NONBLOCK
option is setTCP_NODELAY
option is set (Linux and Sun)SO_NOSIGPIPE
option is set (Apple and FreeBSD)Then a data (TCP) handler is created for the socket and added to the handler map. Finally connect
is called and polling is started on the socket.
sd | Socket descriptor |
addr | Remote address to connect to |
default_handler | Default dispatch handler |
void Comm::create_datagram_receive_socket | ( | CommAddress & | addr, |
int | tos, | ||
const DispatchHandlerPtr & | handler | ||
) |
Creates a socket for receiving datagrams and attaches handler
as the default dispatch handler.
This socket can also be used for sending datagrams. The events delivered for this socket consist of either MESSAGE events or ERROR events. In setting up the datagram (UDP) socket, the following setup is performed:
O_NONBLOCK
option is set on socket4*32768
bytestos
is non-zero, IP_TOS
and SO_PRIORITY
options are set using tos
as the argument (Linux only)tos
is non-zero, IP_TOS
option is set with IPTOS_LOWDELAY
as the argument (Apple, Sun, and FreeBSD)addr | pointer to address structure |
tos | TOS value to set on IP packet |
handler | Default dispatch handler for socket |
Exception | Code set to Error::COMM_SOCKET_ERROR or Error::COMM_BIND_ERROR. |
|
static |
void Comm::find_available_tcp_port | ( | InetAddr & | addr | ) |
Finds an unused TCP port starting from addr
.
This method iterates through 15 ports starting with addr.sin_port
until it is able to bind to one. If an available port is found, addr.sin_port
will be set to the available port, otherwise the method will assert.
addr | Starting address template |
void Comm::find_available_udp_port | ( | InetAddr & | addr | ) |
Finds an unused UDP port starting from addr
.
This method iterates through 15 ports starting with addr.sin_port
until it is able to bind to one. If an available port is found, addr.sin_port
will be set to the available port, otherwise the method will assert.
addr | Starting address template |
void Comm::get_proxy_map | ( | ProxyMapT & | proxy_map | ) |
|
inlinestatic |
Creates/returns singleton instance of the Comm class.
This method will construct a new instance of the Comm class if it has not already been created. All calls to this method return a pointer to the same singleton instance of the Comm object between calls to destroy. The static method ReactorFactory::initialize must be called prior to calling this method for the first time, in order to create system-wide I/O reactor threads.
void Comm::listen | ( | const CommAddress & | addr, |
ConnectionHandlerFactoryPtr & | chf | ||
) |
Creates listen (accept) socket on addr
.
New connections will be assigned dispatch handlers by calling the ConnectionHandlerFactory::get_instance method of the handler factory pointed to by chf
. Since no default dispatch handler is supplied for this listen (accept) socket, Event::CONNECTION_ESTABLISHED events are logged, but not delivered to the application.
addr | IP address and port on which to listen for connections |
chf | Smart pointer to connection handler factory |
Exception | Code set to Error::COMM_SOCKET_ERROR, Error::COMM_BIND_ERROR, Error::COMM_LISTEN_ERROR, Error::COMM_SEND_ERROR, or Error::COMM_RECEIVE_ERROR |
void Comm::listen | ( | const CommAddress & | addr, |
ConnectionHandlerFactoryPtr & | chf, | ||
const DispatchHandlerPtr & | default_handler | ||
) |
Creates listen (accept) socket on addr
and attaches a default dispatch handler.
New connections will be assigned dispatch handlers by calling the ConnectionHandlerFactory::get_instance method of the handler factory pointed to by chf
. default_handler
is registered as the default dispatch handler for the newly created listen (accept) socket and Event::CONNECTION_ESTABLISHED events will be delivered to the application via this handler.
addr | IP address and port on which to listen for connections |
chf | Smart pointer to connection handler factory |
default_handler | Smart pointer to default dispatch handler |
Exception | Code set to Error::COMM_SOCKET_ERROR, Error::COMM_BIND_ERROR, Error::COMM_LISTEN_ERROR, Error::COMM_SEND_ERROR, or Error::COMM_RECEIVE_ERROR |
int Comm::register_socket | ( | int | sd, |
const CommAddress & | addr, | ||
RawSocketHandler * | handler | ||
) |
Registers an externally managed socket with comm event loop.
This function allows an application to register a socket with the comm layer just for its the event loop. The wire protocol is handled entirely by the handler
object.
sd | Socket descriptor |
addr | Address structure for connection identification purposes |
handler | Raw connection handler |
addr
already registered with comm layer. Exception | on polling error |
int Comm::remove_proxy | ( | const String & | proxy | ) |
Removes a proxy name for a TCP connection.
Hypertable uses proxy names (e.g. "rs1") to refer to servers so that the system can continue to operate properly even when servers are reassigned IP addresses, such as starting and stopping Hypertable running on EBS volumes in AWS EC2. This method removes the proxy name proxy
locally and from the remote end of all active connections.
proxy | Proxy name to remove |
int Comm::send_datagram | ( | const CommAddress & | addr, |
const CommAddress & | send_addr, | ||
CommBufPtr & | cbuf | ||
) |
Sends a datagram to a remote address.
The remote address is specified by addr
and the local socket address to send it from is specified by send_addr
. The send_addr
argument must refer to a socket that was created with a call to create_datagram_receive_socket. If an error is encountered while trying to send the datagram, the associated handler will be decomissioned.
addr | Remote address to send datagram to |
send_addr | Local socket address to send from |
cbuf | Datagram message with valid header |
int Comm::send_request | ( | const CommAddress & | addr, |
uint32_t | timeout_ms, | ||
CommBufPtr & | cbuf, | ||
DispatchHandler * | response_handler | ||
) |
Sends a request message over a connection, expecting a response.
The connection is specified by addr
which is the remote end of the connection. The request message to send is encapsulated in cbuf
(see CommBuf) and should start with a valid header. The* response_handler
argument will get called back with a response MESSAGE event, a TIMEOUT event if no response is received within the number of seconds specified by the timeout argument, or an ERROR event (see below). The following errors may be returned by this method:
A return value of Error::COMM_NOT_CONNECTED implies that response_handler
was not installed and an Event::ERROR event will not be delivered. A return value of Error::COMM_BROKEN_CONNECTION implies that response_handler
was installed and an Event::ERROR event will be delivered to the application. If the server at the other end of the connection uses an ApplicationQueue to carry out requests, then the gid field in the header can be used to serialize request execution. For example, the following code serializes requests to the same file descriptor:
HeaderBuilder hbuilder(Header::PROTOCOL_DFSBROKER); hbuilder.set_group_id(fd); CommBuf *cbuf = new CommBuf(hbuilder, 14); cbuf->AppendShort(COMMAND_READ); cbuf->AppendInt(fd); cbuf->AppendLong(amount);
This method locates the I/O handler associated with addr
and then calls the private method send_request to carry out the send request. If an error is encountered while trying to send the request, the associated handler will be decomissioned.
addr | Connection address (remote address) |
timeout_ms | Number of milliseconds to wait before delivering TIMEOUT event |
cbuf | Request message to send (see CommBuf) |
response_handler | Pointer to response handler associated with the request |
|
private |
Sends a request message over a connection.
This method sets the CommHeader::FLAGS_BIT_REQUEST bit of the flags field of cbuf->header
. If response_handler
is 0, then the CommHeader::FLAGS_BIT_IGNORE_RESPONSE is also set. The CommHeader::id field of cbuf->header
is assigned by incrementing ms_next_request_id and the CommHeader::timeout_ms field of cbuf->header
is set to timeout_ms
. Finally, cbuf->write_header_and_reset()
is called and the message is sent via data_handler
.
data_handler | I/O handler for connection. |
timeout_ms | Number of milliseconds to wait before delivering TIMEOUT event |
cbuf | Request message to send (see CommBuf) |
response_handler | Pointer to response handler associated with the request |
response_handler
is only installed on Error::OK or Error::COMM_BROKEN_CONNECTION). int Comm::send_response | ( | const CommAddress & | addr, |
CommBufPtr & | cbuf | ||
) |
Sends a response message back over a connection.
It is assumed that the CommHeader::id field of the header matches the id field of the request for which this is a response to. The connection is specified by the addr
which is the remote end of the connection. The response message to send is encapsulated in the cbuf (see CommBuf) object and should start with a valid header. The following code snippet illustrates how a simple response message gets created to send back to a client in response to a request message:
CommHeader header; header.initialize_from_request_header(request_event->header); CommBufPtr cbp(new CommBuf(header, 4)); cbp->append_i32(Error::OK);
If an error is encountered while trying to send the response, the associated handler will be decomissioned.
addr | Connection address (remote address) |
cbuf | Response message to send (must have valid header with matching request id) |
Sets an alias for a TCP connection.
RangeServers listen on a well-known port defined by the Hypertable.RangeServer.Port
configuration property (default = 15865). However, RangeServers connect to the master using an ephemeral port due to a bind conflict with its listen socket. So that the Master can refer to the RangeServer using the well-known port, an alias address can be registered and subsequently used to reference the connection.
addr | Connection address (remote address) |
alias | Alias address |
addr
does not refer to an established connection. int Comm::set_timer | ( | uint32_t | duration_millis, |
const DispatchHandlerPtr & | handler | ||
) |
Sets a timer for duration_millis
milliseconds in the future.
This method will cause a Event::TIMER event to be generated after duration_millis
milliseconds have elapsed. handler
is the dispatch handler registered with the timer to receive the Event::TIMER event. This timer registration is one shot. To set up a periodic timer event, the timer must be re-registered each time it is handled.
handler
, so if the reference count to handler
is zero when this method is called, it will be deleted after the event is delivered. To prevent this from happening, the caller should hold a smart pointer (DispatchHandlerPtr) to handler
. duration_millis | Number of milliseconds to wait |
handler | Dispatch handler to receive Event::TIMER event upon expiration |
int Comm::set_timer_absolute | ( | ClockT::time_point | expire_time, |
const DispatchHandlerPtr & | handler | ||
) |
Sets a timer for absolute time expire_time
.
This method will cause a Event::TIMER event to be generated at the absolute time specified by expire_time
. handler
is the dispatch handler registered with the timer to receive Event::TIMER events. This timer registration is one shot. To set up a periodic timer event, the timer must be re-registered each time it is handled.
handler
, so if the reference count to handler
is zero when this method is called, it will be deleted after the event is delivered. To prevent this from happening, the caller should hold a smart pointer (DispatchHandlerPtr) to handler
. expire_time | Absolute expiration time |
handler | Dispatch handler to receive Event::TIMER event upon expiration |
Translates a proxy name to an IP address.
Hypertable uses proxy names (e.g. "rs1") to refer to servers so that the system can continue to operate properly even when servers are reassigned IP addresses, such as starting and stopping Hypertable running on EBS volumes in AWS EC2. This method translates proxy
to its associated address that was registered with a prior call to add_proxy(). If addr
is NULL, then the method just checks to see if the proxy name has been registered and represents a valid mapping.
proxy | Proxy name to translate |
addr | Address of object to hold translated address |
bool Comm::wait_for_proxy_load | ( | Timer & | timer | ) |
Waits until a CommHeader::FLAGS_BIT_PROXY_MAP_UPDATE message is received from the proxy master.
timer | Expiration timer |
|
private |
|
private |
|
private |
|
staticprivate |
|
staticprivate |
Mutex for serializing access to ms_instance
|
staticprivate |