30 #define HT_DISABLE_LOG_DEBUG 1
52 #include <sys/types.h>
54 #if defined(__APPLE__) || defined(__FreeBSD__)
55 #include <sys/event.h>
70 std::set<IOHandler *> removed_handlers;
72 bool did_delay =
false;
74 bool got_arrival_time =
false;
75 std::vector<struct pollfd> pollfds;
76 std::vector<IOHandler *> handlers;
80 uint32_t dispatch_delay {};
87 m_reactor->fetch_poll_array(pollfds, handlers);
89 while ((n = poll(&pollfds[0], pollfds.size(),
90 timeout.
get_millis())) >= 0 || errno == EINTR) {
92 if (record_arrival_time)
93 got_arrival_time =
false;
98 m_reactor->get_removed_handlers(removed_handlers);
101 for (
size_t i=0; i<pollfds.size(); i++) {
103 if (pollfds[i].revents == 0)
106 if (pollfds[i].fd == m_reactor->interrupt_sd()) {
111 errno != EAGAIN && errno != EINTR) {
112 HT_ERRORF(
"recv(interrupt_sd) failed - %s", strerror(errno));
117 if (handlers[i] && removed_handlers.count(handlers[i]) == 0) {
119 if (dispatch_delay && !did_delay && (pollfds[i].revents & POLLIN)) {
120 this_thread::sleep_for(chrono::milliseconds((
int)dispatch_delay));
123 if (record_arrival_time && !got_arrival_time
124 && (pollfds[i].revents & POLLIN)) {
126 got_arrival_time =
true;
128 if (handlers[i]->handle_event(&pollfds[i], arrival_time))
129 removed_handlers.insert(handlers[i]);
132 if (!removed_handlers.empty())
133 cleanup_and_remove_handlers(removed_handlers);
134 m_reactor->handle_timeouts(timeout);
138 m_reactor->fetch_poll_array(pollfds, handlers);
142 HT_ERRORF(
"poll() failed : %s", strerror(errno));
147 #if defined(__linux__)
148 struct epoll_event events[256];
150 while ((n = epoll_wait(m_reactor->poll_fd, events, 256,
151 timeout.
get_millis())) >= 0 || errno == EINTR) {
153 if (record_arrival_time)
154 got_arrival_time =
false;
159 m_reactor->get_removed_handlers(removed_handlers);
162 HT_DEBUGF(
"epoll_wait returned %d events", n);
163 for (
int i=0; i<n; i++) {
164 handler = (
IOHandler *)events[i].data.ptr;
165 if (handler && removed_handlers.count(handler) == 0) {
167 if (dispatch_delay && !did_delay && (events[i].events & EPOLLIN)) {
168 this_thread::sleep_for(chrono::milliseconds((
int)dispatch_delay));
171 if (record_arrival_time && !got_arrival_time
172 && (events[i].events & EPOLLIN)) {
174 got_arrival_time =
true;
177 removed_handlers.insert(handler);
180 if (!removed_handlers.empty())
181 cleanup_and_remove_handlers(removed_handlers);
182 m_reactor->handle_timeouts(timeout);
188 HT_ERRORF(
"epoll_wait(%d) failed : %s", m_reactor->poll_fd,
191 #elif defined(__sun__)
195 port_event_t *events;
199 events = (port_event_t *)calloc(33,
sizeof (port_event_t));
201 while ((ret = port_getn(m_reactor->poll_fd, events, 32,
203 errno == EINTR || errno == EAGAIN || errno == ETIME) {
207 if (record_arrival_time)
208 got_arrival_time =
false;
213 m_reactor->get_removed_handlers(removed_handlers);
214 for (
unsigned i=0; i<nget; i++) {
217 if (events[i].portev_source == PORT_SOURCE_ALERT)
220 handler = (
IOHandler *)events[i].portev_user;
221 if (handler && removed_handlers.count(handler) == 0) {
223 if (dispatch_delay && !did_delay && events[i].portev_events == POLLIN) {
224 this_thread::sleep_for(chrono::milliseconds((
int)dispatch_delay));
227 if (record_arrival_time && !got_arrival_time && events[i].portev_events == POLLIN) {
229 got_arrival_time =
true;
232 removed_handlers.insert(handler);
233 else if (removed_handlers.count(handler) == 0)
237 if (!removed_handlers.empty())
238 cleanup_and_remove_handlers(removed_handlers);
239 m_reactor->handle_timeouts(timeout);
246 HT_ERRORF(
"port_getn(%d) failed : %s", m_reactor->poll_fd,
253 #elif defined(__APPLE__) || defined(__FreeBSD__)
254 struct kevent events[32];
256 while ((n = kevent(m_reactor->kqd, NULL, 0, events, 32,
259 if (record_arrival_time)
260 got_arrival_time =
false;
265 m_reactor->get_removed_handlers(removed_handlers);
266 for (
int i=0; i<n; i++) {
268 if (handler && removed_handlers.count(handler) == 0) {
270 if (dispatch_delay && !did_delay && events[i].filter == EVFILT_READ) {
271 this_thread::sleep_for(chrono::milliseconds((
int)dispatch_delay));
274 if (record_arrival_time && !got_arrival_time && events[i].filter == EVFILT_READ) {
276 got_arrival_time =
true;
279 removed_handlers.insert(handler);
282 if (!removed_handlers.empty())
283 cleanup_and_remove_handlers(removed_handlers);
284 m_reactor->handle_timeouts(timeout);
290 HT_ERRORF(
"kevent(%d) failed : %s", m_reactor->kqd, strerror(errno));
302 for (
auto handler : handlers) {
306 if (!handler_map->destroy_ok(handler))
309 m_reactor->cancel_requests(handler);
312 m_reactor->remove_poll_interest(handler->get_sd());
314 #if defined(__linux__)
315 struct epoll_event event;
316 memset(&event, 0,
sizeof(
struct epoll_event));
317 if (epoll_ctl(m_reactor->poll_fd, EPOLL_CTL_DEL, handler->get_sd(), &event) < 0) {
319 HT_ERRORF(
"epoll_ctl(EPOLL_CTL_DEL, %d) failure, %s", handler->get_sd(),
322 #elif defined(__APPLE__) || defined(__FreeBSD__)
323 struct kevent devents[2];
324 EV_SET(&devents[0], handler->get_sd(), EVFILT_READ, EV_DELETE, 0, 0, 0);
325 EV_SET(&devents[1], handler->get_sd(), EVFILT_WRITE, EV_DELETE, 0, 0, 0);
326 if (kevent(m_reactor->kqd, devents, 2, NULL, 0, NULL) == -1
327 && errno != ENOENT) {
329 HT_ERRORF(
"kevent(%d) : %s", handler->get_sd(), strerror(errno));
331 #elif !defined(__sun__)
335 handler_map->purge_handler(handler);
int reset_poll_interest()
Resets poll interest by adding m_poll_interest to the polling interface for this handler.
PropertiesPtr properties
This singleton map stores all options.
chrono::time_point< fast_clock > time_point
Declarations for ReactorRunner.
void operator()()
Primary thread entry point.
Maintains next timeout for event polling loop.
bool has(const String &name)
Check existence of a configuration value.
#define HT_EXPECT(_e_, _code_)
void cleanup_and_remove_handlers(std::set< IOHandler * > &handlers)
Cleans up and removes a set of handlers.
static bool record_arrival_time
If set to true arrival time is recorded and passed into IOHandler::handle.
int get_millis()
Gets duration until next timeout in the form of milliseconds.
static HandlerMapPtr handler_map
Smart pointer to HandlerMap.
File system utility functions.
std::shared_ptr< HandlerMap > HandlerMapPtr
Smart pointer to HandlerMap.
static time_point now() noexcept
Logging routines and macros.
Compatibility Macros for C/C++.
Base class for socket descriptor I/O handlers.
Declarations for HandlerMap.
Time related declarations.
Declarations for IOHandlerData.
#define HT_DEBUGF(msg,...)
static bool shutdown
Flag indicating that reactor thread is being shut down.
virtual bool handle_event(struct pollfd *event, ClockT::time_point arrival_time)=0
Event handler method for Unix poll interface.
Declarations for ReactorFactory.
#define HT_ERRORF(msg,...)
static ssize_t recv(int fd, void *vptr, size_t n)
Receives data from a network connection.
struct timespec * get_timespec()
Gets duration until next timeout in the form of a pointer to timespec.
Declarations for IOHandler.