43 #include <arpa/inet.h>
46 #include <sys/socket.h>
47 #include <sys/types.h>
48 #if defined(__APPLE__) || defined(__FreeBSD__)
49 #include <sys/event.h>
66 et_socket_read(
int fd,
void *vptr,
size_t n,
int *errnop,
bool *eofp) {
69 char *ptr = (
char *)vptr;
72 if ((nread = ::read(fd, ptr, nleft)) < 0) {
79 if (*errnop == EAGAIN || nleft < n)
84 else if (nread == 0) {
96 et_socket_writev(
int fd,
const iovec *vector,
int count,
int *errnop) {
98 while ((nwritten = writev(fd, vector, count)) <= 0) {
121 if (event->revents & POLLOUT) {
122 if (handle_write_readiness()) {
128 if (event->revents & POLLIN) {
132 nread = et_socket_read(m_sd, m_message_header_ptr,
133 m_message_header_remaining, &error, &eof);
134 if (nread == (
size_t)-1) {
135 if (errno != ECONNREFUSED) {
137 HT_INFOF(
"socket read(%d, len=%d) failure : %s", m_sd,
138 (
int)m_message_header_remaining, strerror(errno));
146 else if (nread < m_message_header_remaining) {
147 m_message_header_remaining -= nread;
148 m_message_header_ptr += nread;
154 m_message_header_ptr += nread;
155 handle_message_header(arrival_time);
162 nread = et_socket_read(m_sd, m_message_ptr, m_message_remaining,
164 if (nread == (
size_t)-1) {
166 HT_INFOF(
"socket read(%d, len=%d) failure : %s", m_sd,
167 (
int)m_message_header_remaining, strerror(errno));
171 else if (nread < m_message_remaining) {
172 m_message_ptr += nread;
173 m_message_remaining -= nread;
179 handle_message_body();
188 HT_DEBUGF(
"Received EOF on descriptor %d (%s:%d)", m_sd,
189 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
194 if (event->revents & POLLERR) {
196 HT_INFOF(
"Received POLLERR on descriptor %d (%s:%d)", m_sd,
197 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
202 if (event->revents & POLLHUP) {
203 HT_DEBUGF(
"Received POLLHUP on descriptor %d (%s:%d)", m_sd,
204 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
209 HT_ASSERT((event->revents & POLLNVAL) == 0);
222 #if defined(__linux__)
233 if (event->events & EPOLLOUT) {
234 if (handle_write_readiness()) {
240 if (event->events & EPOLLIN) {
244 nread = et_socket_read(m_sd, m_message_header_ptr,
245 m_message_header_remaining, &error, &eof);
246 if (nread == (
size_t)-1) {
247 if (errno != ECONNREFUSED) {
249 HT_INFOF(
"socket read(%d, len=%d) failure : %s", m_sd,
250 (
int)m_message_header_remaining, strerror(errno));
258 else if (nread < m_message_header_remaining) {
259 m_message_header_remaining -= nread;
260 m_message_header_ptr += nread;
266 m_message_header_ptr += nread;
267 handle_message_header(arrival_time);
274 nread = et_socket_read(m_sd, m_message_ptr, m_message_remaining,
276 if (nread == (
size_t)-1) {
278 HT_INFOF(
"socket read(%d, len=%d) failure : %s", m_sd,
279 (
int)m_message_header_remaining, strerror(errno));
283 else if (nread < m_message_remaining) {
284 m_message_ptr += nread;
285 m_message_remaining -= nread;
291 handle_message_body();
300 if (event->events & POLLRDHUP) {
301 HT_DEBUGF(
"Received POLLRDHUP on descriptor %d (%s:%d)", m_sd,
302 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
309 HT_DEBUGF(
"Received EOF on descriptor %d (%s:%d)", m_sd,
310 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
316 if (event->events & EPOLLERR) {
318 HT_INFOF(
"Received EPOLLERR on descriptor %d (%s:%d)", m_sd,
319 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
324 if (event->events & EPOLLHUP) {
325 HT_DEBUGF(
"Received EPOLLHUP on descriptor %d (%s:%d)", m_sd,
326 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
341 #elif defined(__sun__)
352 if (event->portev_events & POLLOUT) {
353 if (handle_write_readiness()) {
355 HT_INFO(
"handle_disconnect() write readiness");
361 if (event->portev_events & POLLIN) {
365 nread = et_socket_read(m_sd, m_message_header_ptr,
366 m_message_header_remaining, &error, &eof);
367 if (nread == (
size_t)-1) {
368 if (errno != ECONNREFUSED) {
370 HT_INFOF(
"socket read(%d, len=%d) failure : %s", m_sd,
371 (
int)m_message_header_remaining, strerror(errno));
379 else if (nread < m_message_header_remaining) {
380 m_message_header_remaining -= nread;
381 m_message_header_ptr += nread;
387 m_message_header_ptr += nread;
388 handle_message_header(arrival_time);
395 nread = et_socket_read(m_sd, m_message_ptr, m_message_remaining,
397 if (nread == (
size_t)-1) {
399 HT_INFOF(
"socket read(%d, len=%d) failure : %s", m_sd,
400 (
int)m_message_header_remaining, strerror(errno));
404 else if (nread < m_message_remaining) {
405 m_message_ptr += nread;
406 m_message_remaining -= nread;
412 handle_message_body();
421 HT_DEBUGF(
"Received EOF on descriptor %d (%s:%d)", m_sd,
422 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
428 if (event->portev_events & POLLERR) {
430 HT_INFOF(
"Received POLLERR on descriptor %d (%s:%d)", m_sd,
431 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
436 if (event->portev_events & POLLHUP) {
437 HT_DEBUGF(
"Received POLLHUP on descriptor %d (%s:%d)", m_sd,
438 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
443 if (event->portev_events & POLLREMOVE) {
444 HT_DEBUGF(
"Received POLLREMOVE on descriptor %d (%s:%d)", m_sd,
445 inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
454 test_and_set_error(e.
code());
462 #elif defined(__APPLE__) || defined(__FreeBSD__)
470 assert(m_sd == (
int)event->ident);
472 if (event->flags & EV_EOF) {
477 if (event->filter == EVFILT_WRITE) {
478 if (handle_write_readiness()) {
484 if (event->filter == EVFILT_READ) {
485 size_t available = (size_t)event->data;
487 while (available > 0) {
489 if (m_message_header_remaining <= available) {
491 m_message_header_remaining);
492 if (nread == (
size_t)-1) {
494 HT_INFOF(
"FileUtils::read(%d, len=%d) failure : %s", m_sd,
495 (
int)m_message_header_remaining, strerror(errno));
499 assert(nread == m_message_header_remaining);
501 m_message_header_ptr += nread;
502 handle_message_header(arrival_time);
506 if (nread == (
size_t)-1) {
508 HT_INFOF(
"FileUtils::read(%d, len=%d) failure : %s", m_sd,
509 (
int)available, strerror(errno));
513 assert(nread == available);
514 m_message_header_remaining -= nread;
515 m_message_header_ptr += nread;
520 if (m_message_remaining <= available) {
522 if (nread == (
size_t)-1) {
524 HT_INFOF(
"FileUtils::read(%d, len=%d) failure : %s", m_sd,
525 (
int)m_message_remaining, strerror(errno));
529 assert(nread == m_message_remaining);
531 handle_message_body();
535 if (nread == (
size_t)-1) {
537 HT_INFOF(
"FileUtils::read(%d, len=%d) failure : %s", m_sd,
538 (
int)available, strerror(errno));
542 assert(nread == available);
543 m_message_ptr += nread;
544 m_message_remaining -= nread;
554 test_and_set_error(e.
code());
567 size_t header_len = (size_t)m_message_header[1];
571 if (header_len > (
size_t)(m_message_header_ptr - m_message_header)) {
572 m_message_header_remaining = header_len - (size_t)(m_message_header_ptr
578 m_event->load_message_header(m_message_header, header_len);
579 m_event->arrival_time = arrival_time;
581 m_message_aligned =
false;
583 #if defined(__linux__)
584 if (m_event->header.alignment > 0) {
586 posix_memalign(&vptr, m_event->header.alignment,
587 m_event->header.total_len - header_len);
588 m_message = (uint8_t *)vptr;
589 m_message_aligned =
true;
592 m_message =
new uint8_t [m_event->header.total_len - header_len];
594 m_message =
new uint8_t [m_event->header.total_len - header_len];
596 m_message_ptr = m_message;
597 m_message_remaining = m_event->header.total_len - header_len;
598 m_message_header_remaining = 0;
608 m_event->header.total_len - m_event->header.header_len);
609 free_message_buffer();
614 (m_event->header.id == 0
615 || !m_reactor->remove_request(m_event->header.id, dh))) {
618 HT_WARNF(
"Received response for non-pending event (id=%d,version"
619 "=%d,total_len=%d)", m_event->header.id, m_event->header.version,
620 m_event->header.total_len);
622 free_message_buffer();
626 m_event->payload = m_message;
627 m_event->payload_len = m_event->header.total_len
628 - m_event->header.header_len;
629 m_event->payload_aligned = m_message_aligned;
631 lock_guard<mutex> lock(m_mutex);
632 m_event->set_proxy(m_proxy);
635 deliver_event(m_event, dh);
638 reset_incoming_message_state();
646 bool deliver_conn_estab_event =
false;
651 lock_guard<mutex> lock(m_mutex);
653 if (m_connected ==
false) {
654 socklen_t name_len =
sizeof(m_local_addr);
656 socklen_t sockerr_len =
sizeof(sockerr);
658 if (getsockopt(m_sd, SOL_SOCKET, SO_ERROR, &sockerr, &sockerr_len) < 0) {
660 HT_INFOF(
"getsockopt(SO_ERROR) failed - %s", strerror(errno));
665 HT_INFOF(
"connect() completion error - %s", strerror(sockerr));
669 int bufsize = 4*32768;
670 if (setsockopt(m_sd, SOL_SOCKET, SO_SNDBUF, (
char *)&bufsize,
671 sizeof(bufsize)) < 0) {
673 HT_INFOF(
"setsockopt(SO_SNDBUF) failed - %s", strerror(errno));
675 if (setsockopt(m_sd, SOL_SOCKET, SO_RCVBUF, (
char *)&bufsize,
676 sizeof(bufsize)) < 0) {
678 HT_INFOF(
"setsockopt(SO_RCVBUF) failed - %s", strerror(errno));
682 if (setsockopt(m_sd, SOL_SOCKET, SO_KEEPALIVE, &one,
sizeof(one)) < 0) {
684 HT_ERRORF(
"setsockopt(SO_KEEPALIVE) failure: %s", strerror(errno));
687 if (getsockname(m_sd, (
struct sockaddr *)&m_local_addr, &name_len) < 0) {
689 HT_INFOF(
"getsockname(%d) failed - %s", m_sd, strerror(errno));
695 deliver_conn_estab_event =
true;
699 if ((error = flush_send_queue()) !=
Error::OK) {
700 HT_DEBUG(
"error flushing send queue");
706 if (m_send_queue.empty()) {
718 if (deliver_conn_estab_event) {
723 HT_ERRORF(
"Problem sending proxy map to %s - %s",
730 deliver_event(event);
740 lock_guard<mutex> lock(m_mutex);
741 bool initially_empty = m_send_queue.empty() ?
true :
false;
748 if (cbp->header.id != 0 && disp_handler != 0
750 auto expire_time =
ClockT::now() + chrono::milliseconds(timeout_ms);
751 m_reactor->add_request(cbp->header.id,
this, disp_handler, expire_time);
756 m_send_queue.push_back(cbp);
759 if ((error = flush_send_queue()) !=
Error::OK) {
769 if (initially_empty && !m_send_queue.empty()) {
772 HT_ERRORF(
"Adding Write interest failed; error=%u", (
unsigned)error);
774 else if (!initially_empty && m_send_queue.empty()) {
777 HT_INFOF(
"Removing Write interest failed; error=%u", (
unsigned)error);
788 #if defined(__linux__)
791 ssize_t nwritten, towrite, remaining;
796 while (!m_send_queue.empty()) {
802 remaining = cbp->data.size - (cbp->data_ptr - cbp->data.base);
804 vec[0].iov_base = (
void *)cbp->data_ptr;
805 vec[0].iov_len = remaining;
809 if (cbp->ext.base != 0) {
810 remaining = cbp->ext.size - (cbp->ext_ptr - cbp->ext.base);
812 vec[count].iov_base = (
void *)cbp->ext_ptr;
813 vec[count].iov_len = remaining;
814 towrite += remaining;
819 nwritten = et_socket_writev(m_sd, vec, count, &error);
820 if (nwritten == (ssize_t)-1) {
824 HT_WARNF(
"FileUtils::writev(%d, len=%d) failed : %s", m_sd, (
int)towrite,
828 else if (nwritten < towrite) {
834 HT_WARNF(
"FileUtils::writev(%d, len=%d) failed : %s", m_sd,
835 (
int)towrite, strerror(error));
840 remaining = cbp->data.size - (cbp->data_ptr - cbp->data.base);
842 if (nwritten < remaining) {
843 cbp->data_ptr += nwritten;
850 nwritten -= remaining;
851 cbp->data_ptr += remaining;
854 if (cbp->ext.base != 0) {
855 cbp->ext_ptr += nwritten;
864 m_send_queue.pop_front();
870 #elif defined(__APPLE__) || defined (__sun__) || defined(__FreeBSD__)
873 ssize_t nwritten, towrite, remaining;
877 while (!m_send_queue.empty()) {
883 remaining = cbp->data.size - (cbp->data_ptr - cbp->data.base);
885 vec[0].iov_base = (
void *)cbp->data_ptr;
886 vec[0].iov_len = remaining;
890 if (cbp->ext.base != 0) {
891 remaining = cbp->ext.size - (cbp->ext_ptr - cbp->ext.base);
893 vec[count].iov_base = (
void *)cbp->ext_ptr;
894 vec[count].iov_len = remaining;
895 towrite += remaining;
901 if (nwritten == (ssize_t)-1) {
903 HT_WARNF(
"FileUtils::writev(%d, len=%d) failed : %s", m_sd, (
int)towrite,
907 else if (nwritten < towrite) {
910 remaining = cbp->data.size - (cbp->data_ptr - cbp->data.base);
912 if (nwritten < remaining) {
913 cbp->data_ptr += nwritten;
917 nwritten -= remaining;
918 cbp->data_ptr += remaining;
921 if (cbp->ext.base != 0) {
922 cbp->ext_ptr += nwritten;
928 m_send_queue.pop_front();
static bool verbose
Verbose mode.
#define HT_WARNF(msg,...)
static bool read(const String &fname, String &contents)
Reads a whole file into a String.
chrono::time_point< fast_clock > time_point
Declarations for ReactorRunner.
Abstract base class for application dispatch handlers registered with AsyncComm.
static bool ms_epollet
Use "edge triggered" epoll.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
int send_message(CommBufPtr &cbp, uint32_t timeout_ms=0, DispatchHandler *disp_handler=nullptr)
Sends message pointed to by cbp over socket associated with this I/O handler.
Connection established event.
bool handle_event(struct pollfd *event, ClockT::time_point arrival_time) override
Handle poll() interface events.
static HandlerMapPtr handler_map
Smart pointer to HandlerMap.
File system utility functions.
static time_point now() noexcept
void handle_message_body()
Processes a message body.
const char * get_text(int error)
Returns a descriptive error message.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Compatibility Macros for C/C++.
Time related declarations.
Declarations for IOHandlerData.
#define HT_DEBUGF(msg,...)
Writing can be performed without blocking.
bool handle_write_readiness()
Handles write readiness by completing connection and flushing send queue.
static ssize_t writev(int fd, const struct iovec *vector, int count)
Atomically writes data from multiple buffers to a file descriptor.
#define HT_INFOF(msg,...)
Internet address wrapper classes and utility functions.
Request/response message event.
This is a generic exception class for Hypertable.
Response should be ignored.
int flush_send_queue()
Flushes send queue.
#define HT_ERRORF(msg,...)
void handle_disconnect()
Decomissions the handler.
Error codes, Exception handling, error logging.
int code() const
Returns the error code.
void handle_message_header(ClockT::time_point arrival_time)
Processes a message header.
static bool proxy_master
Set to true if this process is acting as "Proxy Master".