28 #ifndef Hypertable_AyncComm_ApplicationQueue_h
29 #define Hypertable_AyncComm_ApplicationQueue_h
40 #include <condition_variable>
45 #include <unordered_map>
173 RequestQueue::iterator iter;
273 if (rec->group_state) {
275 rec->group_state->running =
false;
276 rec->group_state->outstanding--;
277 if (rec->group_state->outstanding == 0) {
279 delete rec->group_state;
342 : joined(false), m_dynamic_threads(dynamic_threads) {
345 assert (worker_count > 0);
346 for (
int i=0; i<worker_count; ++i) {
347 m_thread_ids.push_back(m_threads.create_thread(Worker)->get_id());
376 m_state.
cond.notify_all();
386 bool wait_for_idle(std::chrono::time_point<std::chrono::steady_clock> deadline,
387 int reserve_threads=0) {
388 std::unique_lock<std::mutex> lock(m_state.
mutex);
399 m_threads.join_all();
407 std::lock_guard<std::mutex> lock(m_state.
mutex);
409 m_state.
cond.notify_all();
417 std::lock_guard<std::mutex> lock(m_state.
mutex);
429 GroupStateMap::iterator uiter;
437 std::lock_guard<std::mutex> ulock(m_state.
mutex);
451 std::lock_guard<std::mutex> lock(m_state.
mutex);
455 Worker worker(m_state,
true);
460 m_state.
queue.push_back(rec);
461 m_state.
cond.notify_one();
482 std::lock_guard<std::mutex> lock(m_state.
mutex);
492 #endif // Hypertable_AyncComm_ApplicationQueue_h
virtual void add(ApplicationHandler *app_handler)
Adds a request (application request handler) to the application queue.
std::condition_variable quiesce_cond
Condition variable used to signal quiesced queue.
void start()
Starts application queue.
Declarations of ApplicationHandler.
GroupState * group_state
Pointer to GroupState to which request belongs.
virtual void add_unlocked(ApplicationHandler *app_handler)
Adds a request (application request handler) to the application queue.
ThreadGroup m_threads
Boost thread group for managing threads.
uint64_t group_id
Group ID.
ApplicationQueueState & m_state
Shared application queue state object.
int outstanding
Number of outstanding (uncompleted) requests in queue for this group.
std::condition_variable cond
Condition variable to signal pending handlers.
std::vector< Thread::id > get_thread_ids() const
Returns all the thread IDs for this threadgroup.
std::mutex mutex
Mutex for serializing concurrent access
void stop()
Stops (pauses) application queue, preventing non-urgent requests from being executed.
RequestQueue urgent_queue
Urgent request queue.
uint64_t get_group_id()
Returns the group ID that this handler belongs to.
Worker(ApplicationQueueState &qstate, bool one_shot=false)
size_t threads_total
Total initial threads.
void join()
Waits for a shutdown to complete.
boost::thread_group ThreadGroup
size_t threads_available
Idle thread count.
Logging routines and macros.
ApplicationHandler * handler
Pointer to ApplicationHandler.
ApplicationQueue(int worker_count, bool dynamic_threads=true)
Constructor initialized with worker thread count.
void remove_expired(RequestRec *rec)
Removes and deletes an expired request.
GroupStateMap group_state_map
Group ID to group state map.
Importing boost::thread and boost::thread_group into the Hypertable namespace.
ApplicationQueue()
Default constructor used by derived classes only.
bool wait_for_idle(std::chrono::time_point< std::chrono::steady_clock > deadline, int reserve_threads=0)
Wait for queue to become idle (with timeout).
Application queue state shared among worker threads.
bool m_one_shot
Set to true if thread should exit after executing request.
bool is_expired()
Returns true if request has expired.
Application queue worker thread function (functor)
std::list< RequestRec * > RequestQueue
Individual request queue.
bool paused
Flag indicating if queue has been paused.
virtual void run()=0
Carries out the request.
size_t backlog()
Returns the request backlog Returns the request backlog, which is the number of requests waiting on t...
bool running
true if a request from this group is being executed
std::unordered_map< uint64_t, GroupState * > GroupStateMap
Hash map of thread group ID to GroupState.
ApplicationQueueState m_state
Application queue state object.
bool shutdown
Flag indicating if shutdown is in progress.
RequestRec(ApplicationHandler *arh)
void shutdown()
Shuts down the application queue.
Tracks group execution state.
Base clase for application handlers.
virtual ~ApplicationQueue()
Destructor.
bool joined
Flag indicating if threads have joined after a shutdown.
Declarations for ApplicationQueueInterface.
bool m_dynamic_threads
Set to true if queue is configured to allow dynamic thread creation.
std::vector< Thread::id > m_thread_ids
Vector of thread IDs.
RequestQueue queue
Normal request queue.
std::shared_ptr< ApplicationQueue > ApplicationQueuePtr
Shared smart pointer to ApplicationQueue object.
String extensions and helpers: sets, maps, append operators etc.
void operator()()
Thread run method.
Abstract interface for application queue.
bool is_urgent()
Returns true if request is urgent.