46 #include <arpa/inet.h>
48 #include <netinet/in.h>
51 #include <sys/socket.h>
53 #include <sys/types.h>
55 #if defined(__APPLE__) || defined(__FreeBSD__)
56 #include <sys/event.h>
64 struct sockaddr_in addr;
67 #if defined(__linux__)
68 if ((poll_fd = epoll_create(256)) < 0) {
69 perror(
"epoll_create");
72 #elif defined(__sun__)
73 if ((poll_fd = port_create()) < 0) {
74 perror(
"creation of event port failed");
77 #elif defined(__APPLE__) || defined(__FreeBSD__)
89 if ((m_interrupt_sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
90 HT_ERRORF(
"socket() failure: %s", strerror(errno));
98 memset(&addr, 0 ,
sizeof(sockaddr_in));
99 addr.sin_family = AF_INET;
100 addr.sin_addr.s_addr = inet_addr(
"127.0.0.1");
102 uint16_t port = (uint16_t)(49152 + std::uniform_int_distribution<>(0, 16382)(
ReactorFactory::rng));
103 addr.sin_port = htons(port);
106 if ((::bind(m_interrupt_sd, (sockaddr *)&addr,
sizeof(sockaddr_in))) < 0) {
107 if (errno == EADDRINUSE) {
108 ::close(m_interrupt_sd);
120 if (connect(m_interrupt_sd, (sockaddr *)&addr,
sizeof(addr)) < 0)
121 HT_INFOF(
"connect(interrupt_sd) to port %d failed - %s",
122 (
int)ntohs(addr.sin_port), strerror(errno));
125 lock_guard<mutex> lock(m_polldata_mutex);
126 if ((
size_t)m_interrupt_sd >= m_polldata.size()) {
127 size_t i = m_polldata.size();
128 m_polldata.resize(m_interrupt_sd+1);
129 for (; i<m_polldata.size(); i++) {
130 m_polldata[i].pollfd.fd = -1;
131 m_polldata[i].pollfd.events = 0;
132 m_polldata[i].pollfd.revents = 0;
133 m_polldata[i].handler = 0;
136 m_polldata[m_interrupt_sd].pollfd.fd = m_interrupt_sd;
137 m_polldata[m_interrupt_sd].pollfd.events = POLLIN;
141 #if defined(__linux__)
144 struct epoll_event event;
145 memset(&event, 0,
sizeof(
struct epoll_event));
146 event.events = EPOLLIN | EPOLLOUT | POLLRDHUP | EPOLLET;
147 if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, m_interrupt_sd, &event) < 0) {
148 HT_ERRORF(
"epoll_ctl(%d, EPOLL_CTL_ADD, %d, EPOLLIN|EPOLLOUT|POLLRDHUP|"
149 "EPOLLET) failed : %s", poll_fd, m_interrupt_sd,
155 struct epoll_event event;
156 memset(&event, 0,
sizeof(
struct epoll_event));
157 if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, m_interrupt_sd, &event) < 0) {
158 HT_ERRORF(
"epoll_ctl(%d, EPOLL_CTL_ADD, %d, 0) failed : %s",
159 poll_fd, m_interrupt_sd, strerror(errno));
171 vector<ExpireTimer> expired_timers;
178 lock_guard<mutex> lock(m_mutex);
184 while (m_request_cache.get_next_timeout(now, handler, dh,
185 &next_req_timeout)) {
192 next_timeout.
set(now, next_req_timeout);
193 m_next_wakeup = next_req_timeout;
200 if (!m_timer_heap.empty()) {
203 while (!m_timer_heap.empty()) {
204 timer = m_timer_heap.top();
213 expired_timers.push_back(timer);
223 for (
size_t i=0; i<expired_timers.size(); i++) {
225 if (expired_timers[i].handler)
226 expired_timers[i].handler->handle(event);
230 lock_guard<mutex> lock(m_mutex);
232 if (!m_timer_heap.empty()) {
233 timer = m_timer_heap.top();
245 poll_loop_continue();
257 m_interrupt_in_progress =
true;
264 HT_ERRORF(
"send(interrupt_sd) failed - %s", strerror(errno));
270 #if defined(__linux__)
280 HT_ERRORF(
"send(interrupt_sd) failed - %s", strerror(errno));
285 HT_ERRORF(
"recv(interrupt_sd) failed - %s", strerror(errno));
292 struct epoll_event event;
293 memset(&event, 0,
sizeof(
struct epoll_event));
294 event.events = EPOLLOUT;
295 if (epoll_ctl(poll_fd, EPOLL_CTL_MOD, m_interrupt_sd, &event) < 0) {
305 #elif defined(__sun__)
307 if (port_alert(poll_fd, PORT_ALERT_SET, 1, NULL) < 0) {
308 HT_ERRORF(
"port_alert(%d, PORT_ALERT_SET, 1, 0) failed - %s",
309 poll_fd, strerror(errno));
313 #elif defined(__APPLE__) || defined(__FreeBSD__)
316 EV_SET(&event, m_interrupt_sd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, 0);
318 if (kevent(kqd, &event, 1, 0, 0, 0) == -1) {
319 HT_ERRORF(
"kevent(sd=%d) : %s", m_interrupt_sd, strerror(errno));
334 m_interrupt_in_progress =
false;
338 #if defined(__linux__)
341 struct epoll_event event;
348 memset(&event, 0,
sizeof(
struct epoll_event));
349 event.events = EPOLLERR | EPOLLHUP;
351 if (epoll_ctl(poll_fd, EPOLL_CTL_MOD, m_interrupt_sd, &event) < 0) {
352 HT_ERRORF(
"epoll_ctl(EPOLL_CTL_MOD, sd=%d) : %s", m_interrupt_sd,
358 #elif defined(__sun__)
360 if (port_alert(poll_fd, PORT_ALERT_SET, 0, NULL) < 0) {
361 HT_ERRORF(
"port_alert(%d, PORT_ALERT_SET, 0, 0) failed - %s",
362 poll_fd, strerror(errno));
366 #elif defined(__APPLE__) || defined(__FreeBSD__)
367 struct kevent devent;
369 EV_SET(&devent, m_interrupt_sd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
371 if (kevent(kqd, &devent, 1, 0, 0, 0) == -1 && errno != ENOENT) {
372 HT_ERRORF(
"kevent(sd=%d) : %s", m_interrupt_sd, strerror(errno));
378 m_interrupt_in_progress =
false;
384 lock_guard<mutex> lock(m_polldata_mutex);
387 if (m_polldata.size() <= (size_t)sd) {
388 size_t i = m_polldata.size();
389 m_polldata.resize(sd+1);
390 for (; i<m_polldata.size(); i++) {
392 m_polldata[i].pollfd.fd = -1;
396 m_polldata[sd].pollfd.fd = sd;
397 m_polldata[sd].pollfd.events = events;
398 m_polldata[sd].handler = handler;
401 lock_guard<mutex> lock(m_mutex);
402 error = poll_loop_interrupt();
405 m_polldata[sd].pollfd.fd = -1;
406 m_polldata[sd].pollfd.events = 0;
407 m_polldata[sd].handler = 0;
414 lock_guard<mutex> lock(m_polldata_mutex);
416 HT_ASSERT(m_polldata.size() > (size_t)sd);
417 if ((
size_t)sd == m_polldata.size()-1) {
421 }
while (last_entry > 0 && m_polldata[last_entry].pollfd.fd == -1);
422 m_polldata.resize(last_entry+1);
425 m_polldata[sd].pollfd.fd = -1;
426 m_polldata[sd].handler = 0;
429 lock_guard<mutex> lock(m_mutex);
430 return poll_loop_interrupt();
435 lock_guard<mutex> lock(m_polldata_mutex);
436 HT_ASSERT(m_polldata.size() > (size_t)sd);
437 m_polldata[sd].pollfd.events = events;
439 lock_guard<mutex> lock(m_mutex);
440 return poll_loop_interrupt();
445 std::vector<IOHandler *> &handlers) {
446 lock_guard<mutex> lock(m_polldata_mutex);
451 for (
size_t i=0; i<m_polldata.size(); i++) {
452 if (m_polldata[i].pollfd.fd != -1 && m_polldata[i].pollfd.events) {
453 fdarray.push_back(m_polldata[i].pollfd);
454 handlers.push_back(m_polldata[i].handler);
void handle_timeouts(PollTimeout &next_timeout)
Processes request timeouts and timers.
int remove_poll_interest(int sd)
Remove poll interest for socket (POSIX poll only).
int modify_poll_interest(int sd, short events)
Modify poll interest for socket (POSIX poll only).
chrono::time_point< fast_clock > time_point
Declarations for ReactorRunner.
Abstract base class for application dispatch handlers registered with AsyncComm.
static bool ms_epollet
Use "edge triggered" epoll.
Maintains next timeout for event polling loop.
int poll_loop_continue()
Reset state after call to poll_loop_interrupt.
static std::default_random_engine rng
Pseudo random number generator.
int poll_loop_interrupt()
Forces polling interface wait call to return.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
int add_poll_interest(int sd, short events, IOHandler *handler)
Add poll interest for socket (POSIX poll only).
ClockT::time_point expire_time
Absolute expiration time.
static ssize_t send(int fd, const void *vptr, size_t n)
Sends data through a network connection.
void set(ClockT::time_point now, ClockT::time_point expire)
Sets the next timeout.
Socket descriptor poll state for use with POSIX poll()
File system utility functions.
static time_point now() noexcept
void fetch_poll_array(std::vector< struct pollfd > &fdarray, std::vector< IOHandler * > &handlers)
Fetches poll state vectors (POSIX poll only).
Logging routines and macros.
Compatibility Macros for C/C++.
Base class for socket descriptor I/O handlers.
I/O handler for TCP sockets.
Time related declarations.
Declarations for IOHandlerData.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
static bool set_flags(int fd, int flags)
Sets fcntl flags of a socket.
Declarations for Reactor.
#define HT_FATALF(msg,...)
#define HT_INFOF(msg,...)
Declarations for ReactorFactory.
void set_indefinite()
Sets the next timeout to be an indefinite time in the future.
void deliver_event(EventPtr &event, DispatchHandler *dh=0)
Convenience method for delivering event to application.
#define HT_ERRORF(msg,...)
static ssize_t recv(int fd, void *vptr, size_t n)
Receives data from a network connection.
Error codes, Exception handling, error logging.