0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
IOHandlerData.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "IOHandlerData.h"
31 #include "ReactorRunner.h"
32 
33 #include <Common/Error.h>
34 #include <Common/FileUtils.h>
35 #include <Common/InetAddr.h>
36 #include <Common/Time.h>
37 
38 #include <cassert>
39 #include <chrono>
40 #include <iostream>
41 
42 extern "C" {
43 #include <arpa/inet.h>
44 #include <errno.h>
45 #include <stdlib.h>
46 #include <sys/socket.h>
47 #include <sys/types.h>
48 #if defined(__APPLE__) || defined(__FreeBSD__)
49 #include <sys/event.h>
50 #endif
51 #include <sys/uio.h>
52 }
53 
54 using namespace Hypertable;
55 using namespace std;
56 
57 
58 namespace {
59 
65  ssize_t
66  et_socket_read(int fd, void *vptr, size_t n, int *errnop, bool *eofp) {
67  size_t nleft = n;
68  ssize_t nread;
69  char *ptr = (char *)vptr;
70 
71  while (nleft > 0) {
72  if ((nread = ::read(fd, ptr, nleft)) < 0) {
73  if (errno == EINTR) {
74  nread = 0; /* and call read() again */
75  continue;
76  }
77  *errnop = errno;
78 
79  if (*errnop == EAGAIN || nleft < n)
80  break; /* already read something, most likely EAGAIN */
81 
82  return -1; /* other errors and nothing read */
83  }
84  else if (nread == 0) {
85  *eofp = true;
86  break;
87  }
88 
89  nleft -= nread;
90  ptr += nread;
91  }
92  return n - nleft;
93  }
94 
95  ssize_t
96  et_socket_writev(int fd, const iovec *vector, int count, int *errnop) {
97  ssize_t nwritten;
98  while ((nwritten = writev(fd, vector, count)) <= 0) {
99  if (errno == EINTR) {
100  nwritten = 0; /* and call write() again */
101  continue;
102  }
103  *errnop = errno;
104  return -1;
105  }
106  return nwritten;
107  }
108 
109 } // local namespace
110 
111 
112 bool
113 IOHandlerData::handle_event(struct pollfd *event,
114  ClockT::time_point arrival_time) {
115  int error = 0;
116  bool eof = false;
117 
118  //DisplayEvent(event);
119 
120  try {
121  if (event->revents & POLLOUT) {
122  if (handle_write_readiness()) {
123  handle_disconnect();
124  return true;
125  }
126  }
127 
128  if (event->revents & POLLIN) {
129  size_t nread;
130  while (true) {
131  if (!m_got_header) {
132  nread = et_socket_read(m_sd, m_message_header_ptr,
133  m_message_header_remaining, &error, &eof);
134  if (nread == (size_t)-1) {
135  if (errno != ECONNREFUSED) {
137  HT_INFOF("socket read(%d, len=%d) failure : %s", m_sd,
138  (int)m_message_header_remaining, strerror(errno));
139  }
140  else
141  test_and_set_error(Error::COMM_CONNECT_ERROR);
142 
143  handle_disconnect();
144  return true;
145  }
146  else if (nread < m_message_header_remaining) {
147  m_message_header_remaining -= nread;
148  m_message_header_ptr += nread;
149  if (error == EAGAIN)
150  break;
151  error = 0;
152  }
153  else {
154  m_message_header_ptr += nread;
155  handle_message_header(arrival_time);
156  }
157 
158  if (eof)
159  break;
160  }
161  else { // got header
162  nread = et_socket_read(m_sd, m_message_ptr, m_message_remaining,
163  &error, &eof);
164  if (nread == (size_t)-1) {
166  HT_INFOF("socket read(%d, len=%d) failure : %s", m_sd,
167  (int)m_message_header_remaining, strerror(errno));
168  handle_disconnect();
169  return true;
170  }
171  else if (nread < m_message_remaining) {
172  m_message_ptr += nread;
173  m_message_remaining -= nread;
174  if (error == EAGAIN)
175  break;
176  error = 0;
177  }
178  else
179  handle_message_body();
180 
181  if (eof)
182  break;
183  }
184  }
185  }
186 
187  if (eof) {
188  HT_DEBUGF("Received EOF on descriptor %d (%s:%d)", m_sd,
189  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
190  handle_disconnect();
191  return true;
192  }
193 
194  if (event->revents & POLLERR) {
196  HT_INFOF("Received POLLERR on descriptor %d (%s:%d)", m_sd,
197  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
198  handle_disconnect();
199  return true;
200  }
201 
202  if (event->revents & POLLHUP) {
203  HT_DEBUGF("Received POLLHUP on descriptor %d (%s:%d)", m_sd,
204  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
205  handle_disconnect();
206  return true;
207  }
208 
209  HT_ASSERT((event->revents & POLLNVAL) == 0);
210 
211  }
212  catch (Hypertable::Exception &e) {
214  HT_ERROR_OUT << e << HT_END;
215  handle_disconnect();
216  return true;
217  }
218 
219  return false;
220 }
221 
222 #if defined(__linux__)
223 
224 bool
225 IOHandlerData::handle_event(struct epoll_event *event,
226  ClockT::time_point arrival_time) {
227  int error = 0;
228  bool eof = false;
229 
230  //DisplayEvent(event);
231 
232  try {
233  if (event->events & EPOLLOUT) {
234  if (handle_write_readiness()) {
235  handle_disconnect();
236  return true;
237  }
238  }
239 
240  if (event->events & EPOLLIN) {
241  size_t nread;
242  while (true) {
243  if (!m_got_header) {
244  nread = et_socket_read(m_sd, m_message_header_ptr,
245  m_message_header_remaining, &error, &eof);
246  if (nread == (size_t)-1) {
247  if (errno != ECONNREFUSED) {
249  HT_INFOF("socket read(%d, len=%d) failure : %s", m_sd,
250  (int)m_message_header_remaining, strerror(errno));
251  }
252  else
253  test_and_set_error(Error::COMM_CONNECT_ERROR);
254 
255  handle_disconnect();
256  return true;
257  }
258  else if (nread < m_message_header_remaining) {
259  m_message_header_remaining -= nread;
260  m_message_header_ptr += nread;
261  if (error == EAGAIN)
262  break;
263  error = 0;
264  }
265  else {
266  m_message_header_ptr += nread;
267  handle_message_header(arrival_time);
268  }
269 
270  if (eof)
271  break;
272  }
273  else { // got header
274  nread = et_socket_read(m_sd, m_message_ptr, m_message_remaining,
275  &error, &eof);
276  if (nread == (size_t)-1) {
278  HT_INFOF("socket read(%d, len=%d) failure : %s", m_sd,
279  (int)m_message_header_remaining, strerror(errno));
280  handle_disconnect();
281  return true;
282  }
283  else if (nread < m_message_remaining) {
284  m_message_ptr += nread;
285  m_message_remaining -= nread;
286  if (error == EAGAIN)
287  break;
288  error = 0;
289  }
290  else
291  handle_message_body();
292 
293  if (eof)
294  break;
295  }
296  }
297  }
298 
300  if (event->events & POLLRDHUP) {
301  HT_DEBUGF("Received POLLRDHUP on descriptor %d (%s:%d)", m_sd,
302  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
303  handle_disconnect();
304  return true;
305  }
306  }
307  else {
308  if (eof) {
309  HT_DEBUGF("Received EOF on descriptor %d (%s:%d)", m_sd,
310  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
311  handle_disconnect();
312  return true;
313  }
314  }
315 
316  if (event->events & EPOLLERR) {
318  HT_INFOF("Received EPOLLERR on descriptor %d (%s:%d)", m_sd,
319  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
320  handle_disconnect();
321  return true;
322  }
323 
324  if (event->events & EPOLLHUP) {
325  HT_DEBUGF("Received EPOLLHUP on descriptor %d (%s:%d)", m_sd,
326  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
327  handle_disconnect();
328  return true;
329  }
330  }
331  catch (Hypertable::Exception &e) {
333  HT_ERROR_OUT << e << HT_END;
334  handle_disconnect();
335  return true;
336  }
337 
338  return false;
339 }
340 
341 #elif defined(__sun__)
342 
343 bool IOHandlerData::handle_event(port_event_t *event,
344  ClockT::time_point arrival_time) {
345  int error = 0;
346  bool eof = false;
347 
348  //display_event(event);
349 
350  try {
351 
352  if (event->portev_events & POLLOUT) {
353  if (handle_write_readiness()) {
355  HT_INFO("handle_disconnect() write readiness");
356  handle_disconnect();
357  return true;
358  }
359  }
360 
361  if (event->portev_events & POLLIN) {
362  size_t nread;
363  while (true) {
364  if (!m_got_header) {
365  nread = et_socket_read(m_sd, m_message_header_ptr,
366  m_message_header_remaining, &error, &eof);
367  if (nread == (size_t)-1) {
368  if (errno != ECONNREFUSED) {
370  HT_INFOF("socket read(%d, len=%d) failure : %s", m_sd,
371  (int)m_message_header_remaining, strerror(errno));
372  }
373  else
374  test_and_set_error(Error::COMM_CONNECT_ERROR);
375 
376  handle_disconnect();
377  return true;
378  }
379  else if (nread < m_message_header_remaining) {
380  m_message_header_remaining -= nread;
381  m_message_header_ptr += nread;
382  if (error == EAGAIN)
383  break;
384  error = 0;
385  }
386  else {
387  m_message_header_ptr += nread;
388  handle_message_header(arrival_time);
389  }
390 
391  if (eof)
392  break;
393  }
394  else { // got header
395  nread = et_socket_read(m_sd, m_message_ptr, m_message_remaining,
396  &error, &eof);
397  if (nread == (size_t)-1) {
399  HT_INFOF("socket read(%d, len=%d) failure : %s", m_sd,
400  (int)m_message_header_remaining, strerror(errno));
401  handle_disconnect();
402  return true;
403  }
404  else if (nread < m_message_remaining) {
405  m_message_ptr += nread;
406  m_message_remaining -= nread;
407  if (error == EAGAIN)
408  break;
409  error = 0;
410  }
411  else
412  handle_message_body();
413 
414  if (eof)
415  break;
416  }
417  }
418  }
419 
420  if (eof) {
421  HT_DEBUGF("Received EOF on descriptor %d (%s:%d)", m_sd,
422  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
423  handle_disconnect();
424  return true;
425  }
426 
427 
428  if (event->portev_events & POLLERR) {
430  HT_INFOF("Received POLLERR on descriptor %d (%s:%d)", m_sd,
431  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
432  handle_disconnect();
433  return true;
434  }
435 
436  if (event->portev_events & POLLHUP) {
437  HT_DEBUGF("Received POLLHUP on descriptor %d (%s:%d)", m_sd,
438  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
439  handle_disconnect();
440  return true;
441  }
442 
443  if (event->portev_events & POLLREMOVE) {
444  HT_DEBUGF("Received POLLREMOVE on descriptor %d (%s:%d)", m_sd,
445  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
446  handle_disconnect();
447  return true;
448  }
449 
450  }
451  catch (Hypertable::Exception &e) {
453  HT_ERROR_OUT << e << HT_END;
454  test_and_set_error(e.code());
455  handle_disconnect();
456  return true;
457  }
458 
459  return false;
460 }
461 
462 #elif defined(__APPLE__) || defined(__FreeBSD__)
463 
464 bool IOHandlerData::handle_event(struct kevent *event,
465  ClockT::time_point arrival_time) {
466 
467  //DisplayEvent(event);
468 
469  try {
470  assert(m_sd == (int)event->ident);
471 
472  if (event->flags & EV_EOF) {
473  handle_disconnect();
474  return true;
475  }
476 
477  if (event->filter == EVFILT_WRITE) {
478  if (handle_write_readiness()) {
479  handle_disconnect();
480  return true;
481  }
482  }
483 
484  if (event->filter == EVFILT_READ) {
485  size_t available = (size_t)event->data;
486  size_t nread;
487  while (available > 0) {
488  if (!m_got_header) {
489  if (m_message_header_remaining <= available) {
490  nread = FileUtils::read(m_sd, m_message_header_ptr,
491  m_message_header_remaining);
492  if (nread == (size_t)-1) {
494  HT_INFOF("FileUtils::read(%d, len=%d) failure : %s", m_sd,
495  (int)m_message_header_remaining, strerror(errno));
496  handle_disconnect();
497  return true;
498  }
499  assert(nread == m_message_header_remaining);
500  available -= nread;
501  m_message_header_ptr += nread;
502  handle_message_header(arrival_time);
503  }
504  else {
505  nread = FileUtils::read(m_sd, m_message_header_ptr, available);
506  if (nread == (size_t)-1) {
508  HT_INFOF("FileUtils::read(%d, len=%d) failure : %s", m_sd,
509  (int)available, strerror(errno));
510  handle_disconnect();
511  return true;
512  }
513  assert(nread == available);
514  m_message_header_remaining -= nread;
515  m_message_header_ptr += nread;
516  return false;
517  }
518  }
519  if (m_got_header) {
520  if (m_message_remaining <= available) {
521  nread = FileUtils::read(m_sd, m_message_ptr, m_message_remaining);
522  if (nread == (size_t)-1) {
524  HT_INFOF("FileUtils::read(%d, len=%d) failure : %s", m_sd,
525  (int)m_message_remaining, strerror(errno));
526  handle_disconnect();
527  return true;
528  }
529  assert(nread == m_message_remaining);
530  available -= nread;
531  handle_message_body();
532  }
533  else {
534  nread = FileUtils::read(m_sd, m_message_ptr, available);
535  if (nread == (size_t)-1) {
537  HT_INFOF("FileUtils::read(%d, len=%d) failure : %s", m_sd,
538  (int)available, strerror(errno));
539  handle_disconnect();
540  return true;
541  }
542  assert(nread == available);
543  m_message_ptr += nread;
544  m_message_remaining -= nread;
545  available = 0;
546  }
547  }
548  }
549  }
550  }
551  catch (Hypertable::Exception &e) {
553  HT_ERROR_OUT << e << HT_END;
554  test_and_set_error(e.code());
555  handle_disconnect();
556  return true;
557  }
558 
559  return false;
560 }
561 #else
563 #endif
564 
565 
567  size_t header_len = (size_t)m_message_header[1];
568 
569  // check to see if there is any variable length header
570  // after the fixed length portion that needs to be read
571  if (header_len > (size_t)(m_message_header_ptr - m_message_header)) {
572  m_message_header_remaining = header_len - (size_t)(m_message_header_ptr
573  - m_message_header);
574  return;
575  }
576 
577  m_event = make_shared<Event>(Event::MESSAGE, m_addr);
578  m_event->load_message_header(m_message_header, header_len);
579  m_event->arrival_time = arrival_time;
580 
581  m_message_aligned = false;
582 
583 #if defined(__linux__)
584  if (m_event->header.alignment > 0) {
585  void *vptr = 0;
586  posix_memalign(&vptr, m_event->header.alignment,
587  m_event->header.total_len - header_len);
588  m_message = (uint8_t *)vptr;
589  m_message_aligned = true;
590  }
591  else
592  m_message = new uint8_t [m_event->header.total_len - header_len];
593 #else
594  m_message = new uint8_t [m_event->header.total_len - header_len];
595 #endif
596  m_message_ptr = m_message;
597  m_message_remaining = m_event->header.total_len - header_len;
598  m_message_header_remaining = 0;
599  m_got_header = true;
600 }
601 
602 
604  DispatchHandler *dh {};
605 
606  if (m_event->header.flags & CommHeader::FLAGS_BIT_PROXY_MAP_UPDATE) {
607  ReactorRunner::handler_map->update_proxy_map((const char *)m_message,
608  m_event->header.total_len - m_event->header.header_len);
609  free_message_buffer();
610  m_event.reset();
611  //HT_INFO("proxy map update");
612  }
613  else if ((m_event->header.flags & CommHeader::FLAGS_BIT_REQUEST) == 0 &&
614  (m_event->header.id == 0
615  || !m_reactor->remove_request(m_event->header.id, dh))) {
616  if ((m_event->header.flags & CommHeader::FLAGS_BIT_IGNORE_RESPONSE) == 0) {
618  HT_WARNF("Received response for non-pending event (id=%d,version"
619  "=%d,total_len=%d)", m_event->header.id, m_event->header.version,
620  m_event->header.total_len);
621  }
622  free_message_buffer();
623  m_event.reset();
624  }
625  else {
626  m_event->payload = m_message;
627  m_event->payload_len = m_event->header.total_len
628  - m_event->header.header_len;
629  m_event->payload_aligned = m_message_aligned;
630  {
631  lock_guard<mutex> lock(m_mutex);
632  m_event->set_proxy(m_proxy);
633  }
634  //HT_INFOF("Just received messaage of size %d", m_event->header.total_len);
635  deliver_event(m_event, dh);
636  }
637 
638  reset_incoming_message_state();
639 }
640 
642  ReactorRunner::handler_map->decomission_handler(this);
643 }
644 
646  bool deliver_conn_estab_event = false;
647  bool rval = true;
648  int error = Error::OK;
649 
650  while (true) {
651  lock_guard<mutex> lock(m_mutex);
652 
653  if (m_connected == false) {
654  socklen_t name_len = sizeof(m_local_addr);
655  int sockerr = 0;
656  socklen_t sockerr_len = sizeof(sockerr);
657 
658  if (getsockopt(m_sd, SOL_SOCKET, SO_ERROR, &sockerr, &sockerr_len) < 0) {
660  HT_INFOF("getsockopt(SO_ERROR) failed - %s", strerror(errno));
661  }
662 
663  if (sockerr) {
665  HT_INFOF("connect() completion error - %s", strerror(sockerr));
666  break;
667  }
668 
669  int bufsize = 4*32768;
670  if (setsockopt(m_sd, SOL_SOCKET, SO_SNDBUF, (char *)&bufsize,
671  sizeof(bufsize)) < 0) {
673  HT_INFOF("setsockopt(SO_SNDBUF) failed - %s", strerror(errno));
674  }
675  if (setsockopt(m_sd, SOL_SOCKET, SO_RCVBUF, (char *)&bufsize,
676  sizeof(bufsize)) < 0) {
678  HT_INFOF("setsockopt(SO_RCVBUF) failed - %s", strerror(errno));
679  }
680 
681  int one = 1;
682  if (setsockopt(m_sd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)) < 0) {
684  HT_ERRORF("setsockopt(SO_KEEPALIVE) failure: %s", strerror(errno));
685  }
686 
687  if (getsockname(m_sd, (struct sockaddr *)&m_local_addr, &name_len) < 0) {
689  HT_INFOF("getsockname(%d) failed - %s", m_sd, strerror(errno));
690  break;
691  }
692 
693  //HT_INFO("Connection established.");
694  m_connected = true;
695  deliver_conn_estab_event = true;
696  }
697 
698  //HT_INFO("about to flush send queue");
699  if ((error = flush_send_queue()) != Error::OK) {
700  HT_DEBUG("error flushing send queue");
701  if (m_error == Error::OK)
702  m_error = error;
703  return true;
704  }
705  //HT_INFO("about to remove poll interest");
706  if (m_send_queue.empty()) {
707  if ((error = remove_poll_interest(PollEvent::WRITE)) != Error::OK) {
708  if (m_error == Error::OK)
709  m_error = error;
710  return true;
711  }
712  }
713 
714  rval = false;
715  break;
716  }
717 
718  if (deliver_conn_estab_event) {
720  if ((error = ReactorRunner::handler_map->propagate_proxy_map(this))
721  != Error::OK) {
723  HT_ERRORF("Problem sending proxy map to %s - %s",
724  m_addr.format().c_str(), Error::get_text(error));
725  return true;
726  }
727  }
728  EventPtr event = make_shared<Event>(Event::CONNECTION_ESTABLISHED, m_addr,
729  m_proxy, Error::OK);
730  deliver_event(event);
731  }
732 
733  return rval;
734 }
735 
736 
737 int
738 IOHandlerData::send_message(CommBufPtr &cbp, uint32_t timeout_ms,
739  DispatchHandler *disp_handler) {
740  lock_guard<mutex> lock(m_mutex);
741  bool initially_empty = m_send_queue.empty() ? true : false;
742  int error = Error::OK;
743 
744  if (m_decomissioned)
746 
747  // If request, Add message ID to request cache
748  if (cbp->header.id != 0 && disp_handler != 0
749  && cbp->header.flags & CommHeader::FLAGS_BIT_REQUEST) {
750  auto expire_time = ClockT::now() + chrono::milliseconds(timeout_ms);
751  m_reactor->add_request(cbp->header.id, this, disp_handler, expire_time);
752  }
753 
754  //HT_INFOF("About to send message of size %d", cbp->header.total_len);
755 
756  m_send_queue.push_back(cbp);
757 
758  if (m_connected) {
759  if ((error = flush_send_queue()) != Error::OK) {
761  HT_WARNF("Problem flushing send queue - %s", Error::get_text(error));
762  ReactorRunner::handler_map->decomission_handler(this);
763  if (m_error == Error::OK)
764  m_error = error;
765  return error;
766  }
767  }
768 
769  if (initially_empty && !m_send_queue.empty()) {
770  error = add_poll_interest(PollEvent::WRITE);
771  if (error && ReactorFactory::verbose)
772  HT_ERRORF("Adding Write interest failed; error=%u", (unsigned)error);
773  }
774  else if (!initially_empty && m_send_queue.empty()) {
775  error = remove_poll_interest(PollEvent::WRITE);
776  if (error && ReactorFactory::verbose)
777  HT_INFOF("Removing Write interest failed; error=%u", (unsigned)error);
778  }
779 
780  // Set m_error if not already set
781  if (error != Error::OK && m_error == Error::OK)
782  m_error = error;
783 
784  return error;
785 }
786 
787 
788 #if defined(__linux__)
789 
791  ssize_t nwritten, towrite, remaining;
792  struct iovec vec[2];
793  int count;
794  int error = 0;
795 
796  while (!m_send_queue.empty()) {
797 
798  CommBufPtr &cbp = m_send_queue.front();
799 
800  count = 0;
801  towrite = 0;
802  remaining = cbp->data.size - (cbp->data_ptr - cbp->data.base);
803  if (remaining > 0) {
804  vec[0].iov_base = (void *)cbp->data_ptr;
805  vec[0].iov_len = remaining;
806  towrite = remaining;
807  ++count;
808  }
809  if (cbp->ext.base != 0) {
810  remaining = cbp->ext.size - (cbp->ext_ptr - cbp->ext.base);
811  if (remaining > 0) {
812  vec[count].iov_base = (void *)cbp->ext_ptr;
813  vec[count].iov_len = remaining;
814  towrite += remaining;
815  ++count;
816  }
817  }
818 
819  nwritten = et_socket_writev(m_sd, vec, count, &error);
820  if (nwritten == (ssize_t)-1) {
821  if (error == EAGAIN)
822  return Error::OK;
824  HT_WARNF("FileUtils::writev(%d, len=%d) failed : %s", m_sd, (int)towrite,
825  strerror(errno));
827  }
828  else if (nwritten < towrite) {
829  if (nwritten == 0) {
830  if (error == EAGAIN)
831  break;
832  if (error) {
834  HT_WARNF("FileUtils::writev(%d, len=%d) failed : %s", m_sd,
835  (int)towrite, strerror(error));
837  }
838  continue;
839  }
840  remaining = cbp->data.size - (cbp->data_ptr - cbp->data.base);
841  if (remaining > 0) {
842  if (nwritten < remaining) {
843  cbp->data_ptr += nwritten;
844  if (error == EAGAIN)
845  break;
846  error = 0;
847  continue;
848  }
849  else {
850  nwritten -= remaining;
851  cbp->data_ptr += remaining;
852  }
853  }
854  if (cbp->ext.base != 0) {
855  cbp->ext_ptr += nwritten;
856  if (error == EAGAIN)
857  break;
858  error = 0;
859  continue;
860  }
861  }
862 
863  // buffer written successfully, now remove from queue (destroys buffer)
864  m_send_queue.pop_front();
865  }
866 
867  return Error::OK;
868 }
869 
870 #elif defined(__APPLE__) || defined (__sun__) || defined(__FreeBSD__)
871 
873  ssize_t nwritten, towrite, remaining;
874  struct iovec vec[2];
875  int count;
876 
877  while (!m_send_queue.empty()) {
878 
879  CommBufPtr &cbp = m_send_queue.front();
880 
881  count = 0;
882  towrite = 0;
883  remaining = cbp->data.size - (cbp->data_ptr - cbp->data.base);
884  if (remaining > 0) {
885  vec[0].iov_base = (void *)cbp->data_ptr;
886  vec[0].iov_len = remaining;
887  towrite = remaining;
888  ++count;
889  }
890  if (cbp->ext.base != 0) {
891  remaining = cbp->ext.size - (cbp->ext_ptr - cbp->ext.base);
892  if (remaining > 0) {
893  vec[count].iov_base = (void *)cbp->ext_ptr;
894  vec[count].iov_len = remaining;
895  towrite += remaining;
896  ++count;
897  }
898  }
899 
900  nwritten = FileUtils::writev(m_sd, vec, count);
901  if (nwritten == (ssize_t)-1) {
903  HT_WARNF("FileUtils::writev(%d, len=%d) failed : %s", m_sd, (int)towrite,
904  strerror(errno));
906  }
907  else if (nwritten < towrite) {
908  if (nwritten == 0)
909  break;
910  remaining = cbp->data.size - (cbp->data_ptr - cbp->data.base);
911  if (remaining > 0) {
912  if (nwritten < remaining) {
913  cbp->data_ptr += nwritten;
914  break;
915  }
916  else {
917  nwritten -= remaining;
918  cbp->data_ptr += remaining;
919  }
920  }
921  if (cbp->ext.base != 0) {
922  cbp->ext_ptr += nwritten;
923  break;
924  }
925  }
926 
927  // buffer written successfully, now remove from queue (destroys buffer)
928  m_send_queue.pop_front();
929  }
930 
931  return Error::OK;
932 }
933 
934 #else
935  ImplementMe;
936 #endif
static bool verbose
Verbose mode.
#define HT_WARNF(msg,...)
Definition: Logger.h:290
static bool read(const String &fname, String &contents)
Reads a whole file into a String.
Definition: FileUtils.cc:59
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
Declarations for ReactorRunner.
Abstract base class for application dispatch handlers registered with AsyncComm.
static bool ms_epollet
Use "edge triggered" epoll.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
#define HT_INFO(msg)
Definition: Logger.h:271
STL namespace.
int send_message(CommBufPtr &cbp, uint32_t timeout_ms=0, DispatchHandler *disp_handler=nullptr)
Sends message pointed to by cbp over socket associated with this I/O handler.
Connection established event.
Definition: Event.h:61
bool handle_event(struct pollfd *event, ClockT::time_point arrival_time) override
Handle poll() interface events.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
static HandlerMapPtr handler_map
Smart pointer to HandlerMap.
Definition: ReactorRunner.h:70
File system utility functions.
static time_point now() noexcept
Definition: fast_clock.cc:37
void handle_message_body()
Processes a message body.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
#define HT_DEBUG(msg)
Definition: Logger.h:259
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
#define HT_ERROR_OUT
Definition: Logger.h:301
Time related declarations.
Declarations for IOHandlerData.
Hypertable definitions
#define HT_DEBUGF(msg,...)
Definition: Logger.h:260
Writing can be performed without blocking.
Definition: PollEvent.h:46
bool handle_write_readiness()
Handles write readiness by completing connection and flushing send queue.
static ssize_t writev(int fd, const struct iovec *vector, int count)
Atomically writes data from multiple buffers to a file descriptor.
Definition: FileUtils.cc:162
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Internet address wrapper classes and utility functions.
Request/response message event.
Definition: Event.h:63
This is a generic exception class for Hypertable.
Definition: Error.h:314
int flush_send_queue()
Flushes send queue.
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void handle_disconnect()
Decomissions the handler.
Error codes, Exception handling, error logging.
int code() const
Returns the error code.
Definition: Error.h:391
void handle_message_header(ClockT::time_point arrival_time)
Processes a message header.
static bool proxy_master
Set to true if this process is acting as "Proxy Master".
ImplementMe