30 #define HT_DISABLE_LOG_DEBUG 1
42 #include <arpa/inet.h>
44 #include <netinet/in.h>
45 #include <sys/socket.h>
46 #include <sys/types.h>
47 #if defined(__APPLE__) || defined(__FreeBSD__)
48 #include <sys/event.h>
62 if (event->revents & POLLOUT) {
63 if ((error = handle_write_readiness()) !=
Error::OK) {
64 test_and_set_error(error);
67 deliver_event(event_ptr);
72 if (event->revents & POLLIN) {
73 ssize_t nread, payload_len;
74 struct sockaddr_in addr;
75 socklen_t fromlen =
sizeof(
struct sockaddr_in);
78 (
struct sockaddr *)&addr, &fromlen)) != (ssize_t)-1) {
81 event_ptr->load_message_header(m_message, (
size_t)m_message[1]);
83 payload_len = nread - (ssize_t)event_ptr->header.header_len;
84 event_ptr->payload_len = payload_len;
85 event_ptr->payload =
new uint8_t [payload_len];
86 event_ptr->arrival_time = arrival_time;
87 memcpy((
void *)event_ptr->payload, m_message + event_ptr->header.header_len,
89 deliver_event( event_ptr );
90 fromlen =
sizeof(
struct sockaddr_in);
93 if (errno != EAGAIN) {
94 HT_ERRORF(
"FileUtils::recvfrom(%d) failure : %s", m_sd, strerror(errno));
97 deliver_event(event_ptr);
104 if (event->events & POLLERR) {
105 HT_WARN_OUT <<
"Received EPOLLERR on descriptor " << m_sd <<
" ("
106 << m_addr.format() <<
")" <<
HT_END;
108 deliver_event(event_ptr);
113 HT_ASSERT((event->revents & POLLNVAL) == 0);
118 #if defined(__linux__)
126 if (event->events & EPOLLOUT) {
127 if ((error = handle_write_readiness()) !=
Error::OK) {
129 deliver_event(event_ptr);
135 if (event->events & EPOLLIN) {
136 ssize_t nread, payload_len;
138 socklen_t fromlen =
sizeof(
struct sockaddr_in);
141 (
struct sockaddr *)&addr, &fromlen)) != (ssize_t)-1) {
146 event_ptr->load_message_header(m_message, (
size_t)m_message[1]);
153 payload_len = nread - (ssize_t)event_ptr->header.header_len;
154 event_ptr->payload_len = payload_len;
155 event_ptr->payload =
new uint8_t [payload_len];
156 event_ptr->arrival_time = arrival_time;
157 memcpy((
void *)event_ptr->payload, m_message + event_ptr->header.header_len,
159 deliver_event( event_ptr );
160 fromlen =
sizeof(
struct sockaddr_in);
163 if (errno != EAGAIN) {
164 HT_ERRORF(
"FileUtils::recvfrom(%d) failure : %s", m_sd, strerror(errno));
167 deliver_event(event_ptr);
175 if (event->events & EPOLLERR) {
176 HT_WARN_OUT <<
"Received EPOLLERR on descriptor " << m_sd <<
" ("
177 << m_addr.format() <<
")" <<
HT_END;
179 deliver_event(event_ptr);
187 #elif defined(__sun__)
197 if (event->portev_events == POLLOUT) {
198 if ((error = handle_write_readiness()) !=
Error::OK) {
200 deliver_event(event_ptr);
206 if (event->portev_events == POLLIN) {
207 ssize_t nread, payload_len;
208 struct sockaddr_in addr;
209 socklen_t fromlen =
sizeof(
struct sockaddr_in);
212 (
struct sockaddr *)&addr, &fromlen)) != (ssize_t)-1) {
216 event_ptr->load_message_header(m_message, (
size_t)m_message[1]);
218 payload_len = nread - (ssize_t)event_ptr->header.header_len;
219 event_ptr->payload_len = payload_len;
220 event_ptr->payload =
new uint8_t [payload_len];
221 event_ptr->arrival_time = arrival_time;
222 memcpy((
void *)event_ptr->payload, m_message + event_ptr->header.header_len,
224 deliver_event(event_ptr);
225 fromlen =
sizeof(
struct sockaddr_in);
228 if (errno != EAGAIN) {
229 HT_ERRORF(
"FileUtils::recvfrom(%d) failure : %s", m_sd, strerror(errno));
232 deliver_event(event_ptr);
240 if (event->portev_events == POLLERR) {
241 HT_WARN_OUT <<
"Received EPOLLERR on descriptor " << m_sd <<
" ("
242 << m_addr.format() <<
")" <<
HT_END;
244 deliver_event(event_ptr);
249 if (event->portev_events == POLLREMOVE) {
250 HT_DEBUGF(
"Received POLLREMOVE on descriptor %d (%s:%d)", m_sd,
251 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
267 #elif defined(__APPLE__) || defined(__FreeBSD__)
276 assert(m_sd == (
int)event->ident);
278 assert((event->flags & EV_EOF) == 0);
280 if (event->filter == EVFILT_WRITE) {
281 if ((error = handle_write_readiness()) !=
Error::OK) {
283 deliver_event(event_ptr);
289 if (event->filter == EVFILT_READ) {
290 size_t available = (size_t)event->data;
291 ssize_t nread, payload_len;
292 struct sockaddr_in addr;
293 socklen_t fromlen =
sizeof(
struct sockaddr_in);
296 (
struct sockaddr *)&addr, &fromlen)) == (ssize_t)-1) {
297 HT_ERRORF(
"FileUtils::recvfrom(%d, len=%d) failure : %s", m_sd,
298 (
int)available, strerror(errno));
301 deliver_event(event_ptr);
308 event_ptr->load_message_header(m_message, (
size_t)m_message[1]);
310 payload_len = nread - (ssize_t)event_ptr->header.header_len;
311 event_ptr->payload_len = payload_len;
312 event_ptr->payload =
new uint8_t [payload_len];
313 event_ptr->arrival_time = arrival_time;
314 memcpy((
void *)event_ptr->payload, m_message + event_ptr->header.header_len,
316 deliver_event(event_ptr);
328 lock_guard<mutex> lock(m_mutex);
331 if ((error = flush_send_queue()) !=
Error::OK) {
338 if (m_send_queue.empty())
350 lock_guard<mutex> lock(m_mutex);
352 bool initially_empty = m_send_queue.empty() ?
true :
false;
359 m_send_queue.push_back(
SendRec(addr, cbp));
361 if ((error = flush_send_queue()) !=
Error::OK) {
367 if (initially_empty && !m_send_queue.empty()) {
371 else if (!initially_empty && m_send_queue.empty()) {
384 ssize_t nsent, tosend;
386 while (!m_send_queue.empty()) {
388 SendRec &send_rec = m_send_queue.front();
390 tosend = send_rec.second->data.size - (send_rec.second->data_ptr
391 - send_rec.second->data.base);
393 assert(send_rec.second->ext.base == 0);
396 (sockaddr *)&send_rec.first,
397 sizeof(
struct sockaddr_in));
399 if (nsent == (ssize_t)-1) {
400 HT_WARNF(
"FileUtils::sendto(%d, len=%d, addr=%s:%d) failed : %s", m_sd,
401 (
int)tosend, inet_ntoa(send_rec.first.sin_addr),
402 ntohs(send_rec.first.sin_port), strerror(errno));
405 else if (nsent < tosend) {
406 HT_WARNF(
"Only sent %d bytes", (
int)nsent);
409 send_rec.second->data_ptr += nsent;
414 m_send_queue.pop_front();
#define HT_WARNF(msg,...)
int handle_write_readiness()
Handles write readiness.
int send_message(const InetAddr &addr, CommBufPtr &cbp)
Sends a message.
chrono::time_point< fast_clock > time_point
Declarations for ReactorRunner.
Declarations for IOHandlerDatagram.
int flush_send_queue()
Flushes send queue.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
static ssize_t recvfrom(int fd, void *vptr, size_t n, struct sockaddr *from, socklen_t *fromlen)
Receives data from a network connection and returns the sender's address.
static ssize_t sendto(int fd, const void *vptr, size_t n, const sockaddr *to, socklen_t tolen)
Sends data through a network connection; if the socket is TCP then the address is ignored...
static HandlerMapPtr handler_map
Smart pointer to HandlerMap.
File system utility functions.
Encapsulate an internet address.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Compatibility Macros for C/C++.
#define HT_DEBUGF(msg,...)
Writing can be performed without blocking.
Request/response message event.
This is a generic exception class for Hypertable.
#define HT_ERRORF(msg,...)
std::pair< struct sockaddr_in, CommBufPtr > SendRec
Send queue message record.
bool handle_event(struct pollfd *event, ClockT::time_point arrival_time) override
Handle poll() interface events.
Error codes, Exception handling, error logging.