26 #ifndef Hypertable_RangeServer_MaintenanceQueue_h
27 #define Hypertable_RangeServer_MaintenanceQueue_h
39 #include <condition_variable>
82 std::condition_variable
cond;
98 MaintenanceTask *task = 0;
106 auto now = std::chrono::steady_clock::now();
128 now = std::chrono::steady_clock::now();
145 ms_cond.wait(lock, [](){
return !
ms_pause; });
157 bool message_logged =
false;
160 message_logged =
true;
164 HT_INFOF(
"Maintenance Task '%s' aborted, will retry in %u "
167 task->
start_time = std::chrono::steady_clock::now();
177 HT_INFOF(
"Maintenance Task '%s' failed because range is busy, dropping task ...",
236 assert (worker_count > 0);
238 m_threads.create_thread(Worker);
248 m_state.
cond.notify_all();
256 m_threads.join_all();
264 std::lock_guard<std::mutex> lock(m_state.
mutex);
265 HT_INFO(
"Stopping maintenance queue");
272 std::lock_guard<std::mutex> lock(m_state.
mutex);
276 ms_cond.notify_all();
277 HT_INFO(
"Starting maintenance queue");
285 template<
typename _Function>
287 std::lock_guard<std::mutex> lock(m_state.
mutex);
288 TaskQueue filtered_queue;
289 MaintenanceTask *task = 0;
290 while (!m_state.
queue.empty()) {
291 task = m_state.
queue.top();
299 filtered_queue.push(task);
301 m_state.
queue = filtered_queue;
311 std::lock_guard<std::mutex> lock(m_state.
mutex);
312 return m_state.
ranges.count(range) > 0;
319 void add(MaintenanceTask *task) {
320 std::lock_guard<std::mutex> lock(m_state.
mutex);
321 m_state.
queue.push(task);
324 m_state.
cond.notify_one();
333 std::lock_guard<std::mutex> lock(m_state.
mutex);
334 return (
size_t)m_state.
queue.size() + (size_t)m_state.
inflight;
344 std::lock_guard<std::mutex> lock(m_state.
mutex);
353 std::lock_guard<std::mutex> lock(m_state.
mutex);
355 (size_t)m_worker_count;
362 std::lock_guard<std::mutex> lock(m_state.
mutex);
369 std::unique_lock<std::mutex> lock(m_state.
mutex);
379 std::unique_lock<std::mutex> lock(m_state.
mutex);
380 return m_state.
empty_cond.wait_until(lock, deadline, [
this](){
398 #endif // Hypertable_RangeServer_MaintenanceQueue_h
void stop()
Stops (suspends) queue processing.
static std::condition_variable ms_cond
std::priority_queue< MaintenanceTask *, std::vector< MaintenanceTask * >, LtMaintenanceTask > TaskQueue
chrono::time_point< fast_clock > time_point
void join()
Waits for a shutdown to complete.
size_t size()
Returns the size of the queue.
uint32_t inflight_levels[MAX_LEVELS]
MaintenanceQueueState & m_state
MaintenanceQueueState m_state
uint32_t lowest_inflight_level()
Determine the lowest level of the tasks currently being executed.
Represents a table row range.
bool wait_for_empty(ClockT::time_point deadline)
Waits for queue to become empty with deadline.
std::shared_ptr< MaintenanceQueue > MaintenanceQueuePtr
Smart pointer to MaintenanceQueue.
void shutdown()
Shuts down the maintenance queue.
boost::thread_group ThreadGroup
Logging routines and macros.
Importing boost::thread and boost::thread_group into the Hypertable namespace.
std::condition_variable empty_cond
void wait_for_empty()
Waits for queue to become empty.
bool contains(Range *range)
Returns true if queue contains a maintenance task for range.
std::condition_variable cond
int64_t generation()
Returns queue generation number.
uint32_t get_retry_delay()
bool full()
Returns true if any tasks are in queue or all worker threads are busy executing tasks.
std::chrono::time_point< std::chrono::steady_clock > start_time
#define HT_INFOF(msg,...)
Worker(MaintenanceQueueState &state)
std::set< Range * > ranges
This is a generic exception class for Hypertable.
void start()
Starts queue processing.
bool empty()
Returns true if maintenance queue is empty.
void drop_range_tasks(_Function __f)
Drops range maintenance tasks from the queue.
Queue for periodic maintenance work.
int worker_count()
Returns the number of worker threads configured for the queue.
bool operator()(const MaintenanceTask *sm1, const MaintenanceTask *sm2) const
Error codes, Exception handling, error logging.
MaintenanceQueue(int worker_count)
Constructor to set up the maintenance queue.
int code() const
Returns the error code.
void add(MaintenanceTask *task)
Adds a maintenance task to the queue.