37 #include <boost/graph/topological_sort.hpp>
38 #include <boost/graph/graphviz.hpp>
44 #include <unordered_map>
47 using namespace boost;
51 : master_context(mctx), current_blocked(0), busy_count(0),
52 need_order_recompute(false), shutdown(false), paused(false) {
64 if (context->props->get_bool(
"Hypertable.Master.RecordGraphvizStream")) {
65 Path data_dir =
Path(context->props->get_str(
"Hypertable.DataDirectory"));
66 string run_dir = (data_dir /=
"/run").
string();
67 string filename = run_dir +
"/graphviz-stream";
68 m_graphviz_out = make_unique<std::ofstream>(filename.c_str(), ofstream::out|ofstream::app);
74 for (
size_t i=0; i<thread_count; ++i)
93 if (!operation->is_complete() || operation->is_perpetual()) {
99 else if (operation->get_remove_approval_mask() == 0)
108 for (
auto & operation : operations) {
116 if (!operation->is_complete() || operation->is_perpetual()) {
120 else if (operation->get_remove_approval_mask() == 0)
133 if (operation->exclusive()) {
136 "Dropping %s because another one is outstanding",
137 operation->name().c_str());
164 operation = iter->second.operation;
165 vertex = iter->second.vertex;
223 std::pair<DependencyIndex::iterator, DependencyIndex::iterator> bound;
224 bool unblocked_something =
false;
227 bound.first != bound.second; ++bound.first)
229 unblocked_something =
true;
232 bound.first != bound.second; ++bound.first)
234 unblocked_something =
true;
236 if (unblocked_something) {
252 (*iter)->obstructions(names);
256 for (
const auto &tag : names)
258 HT_INFOF(
"Activating %s with obstructions %s", (*iter)->label().c_str(), str.c_str());
261 if (names.count(name) > 0) {
262 PerpetualSet::iterator rm_iter = iter;
283 bool current_needs_loading =
true;
298 current_needs_loading =
true;
301 current_needs_loading =
304 if (current_needs_loading &&
334 operation->pre_run();
335 if (!operation->is_blocked())
336 operation->execute();
337 operation->post_run();
344 if (operation->is_complete())
346 else if (operation->is_blocked())
364 std::this_thread::sleep_for(std::chrono::milliseconds(5000));
370 catch (std::exception &e) {
371 std::cout << e.what() << std::endl;
379 operation->exclusivities(names);
380 for (DependencySet::iterator iter = names.begin(); iter != names.end(); ++iter)
383 operation->dependencies(names);
384 for (DependencySet::iterator iter = names.begin(); iter != names.end(); ++iter)
387 operation->obstructions(names);
388 for (DependencySet::iterator iter = names.begin(); iter != names.end(); ++iter) {
397 GraphTraits::in_edge_iterator in_i, in_end;
400 std::pair<DependencyIndex::iterator, DependencyIndex::iterator> bound;
404 bound.first != bound.second; ++bound.first)
409 bound.first != bound.second; ++bound.first)
414 bound.first != bound.second; ++bound.first) {
415 if (v == bound.first->second)
422 for (; in_i != in_end; ++in_i) {
425 if (names.find(name) != names.end())
428 if (in_i == in_end) {
440 std::pair<DependencyIndex::iterator, DependencyIndex::iterator> bound;
444 bound.first != bound.second; ++bound.first) {
445 if (bound.first->second == v)
451 bound.first != bound.second; ++bound.first)
460 (*iter)->obstructions(names);
461 if (names.count(name) > 0) {
462 PerpetualSet::iterator rm_iter = iter;
475 bound.first != bound.second; ++bound.first)
483 std::pair<DependencyIndex::iterator, DependencyIndex::iterator> bound;
487 bound.first != bound.second; ++bound.first) {
488 if (bound.first->second == v)
494 bound.first != bound.second; ++bound.first)
499 bound.first != bound.second; ++bound.first)
507 DependencyIndex::iterator iter, del_iter;
512 if (iter->second == v) {
523 DependencyIndex::iterator iter, del_iter;
528 if (iter->second == v) {
539 DependencyIndex::iterator iter, del_iter;
544 if (iter->second == v) {
568 std::ostringstream oss;
575 std::ostringstream oss;
578 oss <<
"Num vertices = " << num_vertices(
m_context.
graph) <<
"\n";
586 std::pair<GraphTraits::vertex_iterator, GraphTraits::vertex_iterator> vp;
587 std::set<Vertex> vset;
590 for (vp = vertices(
m_context.
graph); vp.first != vp.second; ++vp.first) {
591 vset.insert(*vp.first);
592 oss << i <<
": " <<
m_context.
ops[*vp.first]->label() <<
"\n";
594 oss <<
" live: " << ((
m_context.
live.count(*vp.first) > 0) ?
"true\n" :
"false\n");
595 oss <<
" exclusive: " << (
m_context.
ops[*vp.first]->exclusive() ?
"true\n" :
"false\n");
596 oss <<
" perpetual: " << (
m_context.
ops[*vp.first]->is_perpetual() ?
"true\n" :
"false\n");
597 oss <<
" blocked: " << (
m_context.
ops[*vp.first]->is_blocked() ?
"true\n" :
"false\n");
599 oss <<
" dependencies: (";
603 for (
const auto &
str : names) {
610 oss <<
" obstructions: (";
614 for (
const auto &
str : names) {
621 oss <<
" exclusivities: (";
625 for (
const auto &
str : names) {
640 if (vset.count(iter->vertex))
643 oss <<
"[retired]\n" ;
649 oss <<
"Execution order:\n";
654 if (vset.count(iter->vertex)) {
655 oss <<
m_context.
ops[iter->vertex]->label() <<
" (time=";
659 oss <<
"[retired]\n" ;
665 oss <<
"Graphviz:\n";
681 if (operation->exclusive())
688 if (operation->is_perpetual())
691 if (operation->get_remove_approval_mask() == 0 &&
711 std::vector<OperationPtr> sub_ops;
712 operation->fetch_sub_operations(sub_ops);
713 for (
auto op : sub_ops) {
728 property_map<OperationGraph, vertex_index_t>::type index =
get(vertex_index,
m_context.
graph);
730 std::pair<GraphTraits::vertex_iterator, GraphTraits::vertex_iterator> vp;
731 for (vp = vertices(
m_context.
graph); vp.first != vp.second; ++vp.first)
732 put(index, *vp.first, i++);
743 catch (std::invalid_argument &e) {
747 ExecutionList::iterator iter;
758 OperationGraph::out_edge_iterator j, j_end;
760 for (boost::tie(j, j_end) = out_edges(iter->vertex,
m_context.
graph); j != j_end; ++j)
767 std::vector<struct vertex_info> vvec;
770 vvec.push_back(*iter);
773 std::set<Vertex> vset;
776 std::sort(vvec.begin(), vvec.end(), ltvi);
777 m_context.execution_order.clear();
778 for (
size_t i=0; i<vvec.size(); i++) {
779 HT_ASSERT(vset.count(vvec[i].vertex) == 0);
780 vset.insert(vvec[i].vertex);
781 m_context.execution_order.push_back(vvec[i]);
784 m_context.execution_order_iter = m_context.execution_order.begin();
786 m_context.need_order_recompute =
false;
787 if (m_context.execution_order.empty())
788 m_context.idle_cond.notify_all();
ContextPtr & master_context
void update_operation(Vertex v, OperationPtr &operation)
Updates dependency relationship of an operation.
std::string String
A String is simply a typedef to std::string.
Compatibility class for boost::filesystem::path.
ExecutionList::iterator current_iter
std::unordered_map< int64_t, OperationVertex > operation_hash
void graphviz_output(String &output)
Declarations for OperationProcessor.
Po::typed_value< String > * str(String *v=0)
void activate(const String &name)
void add_obstruction(Vertex v, const String &name)
void add_exclusivity(Vertex v, const String &name)
void recompute_order()
Recomputes operation execution order.
void unblock(const String &name)
boost::property_map< OperationGraph, operation_t >::type ops
std::condition_variable idle_cond
const char * get_text(int32_t state)
void state_description(String &output)
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
void retire_operation(Vertex v, OperationPtr &operation)
Retires (remove) an operation.
boost::property_map< OperationGraph, execution_time_t >::type exec_time
PerpetualSet perpetual_ops
void add_edge(Vertex v, Vertex u)
Compatibility class for boost::filesystem::path.
boost::property_map< OperationGraph, label_t >::type label
boost::property_map< OperationGraph, busy_t >::type busy
GraphTraits::vertex_descriptor Vertex
bool need_order_recompute
boost::property_map< OperationGraph, permanent_t >::type permanent
Compatibility Macros for C/C++.
ThreadContext(ContextPtr &mctx)
void add_edge_permanent(Vertex v, Vertex u)
std::unique_ptr< std::ofstream > m_graphviz_out
ExecutionList execution_order
void purge_from_dependency_index(Vertex v)
void add_operation(OperationPtr operation)
std::condition_variable cond
DependencyIndex dependency_index
DependencyIndex exclusivity_index
void add_operations(std::vector< OperationPtr > &operations)
void add_dependencies(Vertex v, OperationPtr &operation)
OperationPtr remove_operation(int64_t hash_code)
#define HT_INFOF(msg,...)
#define HT_THROWF(_code_, _fmt_,...)
This is a generic exception class for Hypertable.
OperationProcessor(ContextPtr &context, size_t thread_count)
void add_dependency(Vertex v, const String &name)
void add_operation_internal(OperationPtr &operation)
DependencyIndex obstruction_index
ExecutionList::iterator execution_order_iter
bool load_current()
Loads m_context.current list with operations to be executed.
std::set< String > DependencySet
Set of dependency string.
std::set< int64_t > op_ids
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
String extensions and helpers: sets, maps, append operators etc.
ThreadContext & m_context
int code() const
Returns the error code.
void purge_from_exclusivity_index(Vertex v)
void purge_from_obstruction_index(Vertex v)