35 m_connections_iter = m_connections.end();
36 m_disk_threshold =
Config::properties->get_i32(
"Hypertable.Master.DiskThreshold.Percentage");
40 lock_guard<mutex> lock(m_mutex);
44 if (find_server_by_location_unlocked(rsc->location(), tmp_rsc))
45 HT_FATALF(
"Attempt to add %s which conflicts with previously added %s",
46 rsc->to_str().c_str(), tmp_rsc->to_str().c_str());
51 HT_FATALF(
"Attempt to add %s which conflicts with previously added entry",
52 rsc->to_str().c_str());
58 lock_guard<mutex> lock(m_mutex);
61 auto iter = hash_index.find(location);
62 if (iter != hash_index.end()) {
65 hash_index.erase(iter);
66 m_connections_iter = m_connections.begin();
74 lock_guard<mutex> lock(m_mutex);
76 LocationIndex::iterator orig_iter;
78 HT_INFOF(
"connect_server(%s, '%s', local=%s, public=%s)",
79 rsc->location().c_str(), hostname.c_str(),
80 local_addr.
format().c_str(), public_addr.
format().c_str());
82 if ((orig_iter = location_index.find(rsc->location())) == location_index.end()) {
84 if (rsc->connect(hostname, local_addr, public_addr))
90 bool needs_reindex =
false;
94 if (rsc->connected()) {
95 HT_ERRORF(
"Attempted to connect '%s' but failed because already connected.",
96 rsc->location().c_str());
100 if (hostname != rsc->hostname()) {
101 HT_INFOF(
"Changing hostname for %s from '%s' to '%s'",
102 rsc->location().c_str(), rsc->hostname().c_str(),
104 needs_reindex =
true;
107 if (local_addr != rsc->local_addr()) {
108 HT_INFOF(
"Changing local address for %s from '%s' to '%s'",
109 rsc->location().c_str(), rsc->local_addr().format().c_str(),
110 local_addr.
format().c_str());
111 needs_reindex =
true;
114 if (public_addr != rsc->public_addr()) {
115 HT_INFOF(
"Changing public address for %s from '%s' to '%s'",
116 rsc->location().c_str(), rsc->public_addr().format().c_str(),
117 public_addr.
format().c_str());
118 needs_reindex =
true;
121 if (orig_iter->rsc->connect(hostname, local_addr, public_addr))
125 location_index.erase(orig_iter);
127 m_connections_iter = m_connections.begin();
134 lock_guard<mutex> lock(m_mutex);
135 if (rsc->disconnect()) {
144 lock_guard<mutex> lock(m_mutex);
146 LocationIndex::iterator iter = location_index.find(location);
147 if (iter != location_index.end() && iter->rsc->connected())
154 lock_guard<mutex> lock(m_mutex);
155 return find_server_by_location_unlocked(location, rsc);
160 LocationIndex::iterator lookup_iter;
162 if ((lookup_iter = hash_index.find(location)) == hash_index.end()) {
172 rsc = lookup_iter->rsc;
179 lock_guard<mutex> lock(m_mutex);
182 pair<HostnameIndex::iterator, HostnameIndex::iterator> p
183 = hash_index.equal_range(hostname);
184 if (p.first != p.second) {
186 if (++p.first == p.second)
195 lock_guard<mutex> lock(m_mutex);
197 PublicAddrIndex::iterator lookup_iter;
199 if ((lookup_iter = hash_index.find(addr)) == hash_index.end()) {
203 rsc = lookup_iter->rsc;
209 lock_guard<mutex> lock(m_mutex);
212 for (pair<LocalAddrIndex::iterator, LocalAddrIndex::iterator> p
213 = hash_index.equal_range(addr);
214 p.first != p.second; ++p.first) {
215 if (p.first->connected()) {
226 lock_guard<mutex> lock(m_mutex);
227 double minimum_disk_use = 100;
230 if (m_connections.empty())
233 if (m_connections_iter == m_connections.end())
234 m_connections_iter = m_connections.begin();
236 auto saved_iter = m_connections_iter;
239 ++m_connections_iter;
240 if (m_connections_iter == m_connections.end())
241 m_connections_iter = m_connections.begin();
242 if (m_connections_iter->rsc->connected() &&
243 !m_connections_iter->rsc->is_recovering()) {
244 if (m_connections_iter->rsc->get_disk_fill_percentage() <
245 (double)m_disk_threshold) {
246 rsc = m_connections_iter->rsc;
249 if (m_connections_iter->rsc->get_disk_fill_percentage() < minimum_disk_use) {
250 minimum_disk_use = m_connections_iter->rsc->get_disk_fill_percentage();
251 urgent_server = m_connections_iter->rsc;
254 }
while (m_connections_iter != saved_iter);
256 if (urgent && urgent_server) {
265 lock_guard<mutex> lock(m_mutex);
267 for (
auto &entry : m_connections) {
268 if (!entry.rsc->get_removed())
276 lock_guard<mutex> lock(m_mutex);
277 for (
auto &entry : m_connections) {
278 if (!entry.rsc->get_removed())
279 servers.push_back(entry.rsc);
284 vector<RangeServerConnectionPtr> &connections) {
285 lock_guard<mutex> lock(m_mutex);
287 LocationIndex::iterator iter;
289 for (
auto &location : locations) {
290 auto iter = location_index.find(location);
291 if (iter != location_index.end() && !iter->rsc->is_recovering())
292 connections.push_back(iter->rsc);
297 lock_guard<mutex> lock(m_mutex);
298 for (
const auto rsc : unbalanced)
303 lock_guard<mutex> lock(m_mutex);
304 for (
auto &entry : m_connections) {
305 if (!entry.rsc->get_removed() && !entry.rsc->is_recovering() && !entry.rsc->get_balanced())
312 lock_guard<mutex> lock(m_mutex);
314 LocationIndex::iterator lookup_iter;
316 for (
auto &state : states) {
317 if ((lookup_iter = hash_index.find(state.location)) == hash_index.end())
319 lookup_iter->rsc->set_disk_fill_percentage(state.disk_usage);
bool find_server_by_location(const String &location, RangeServerConnectionPtr &rsc)
std::set< String > StringSet
STL Set managing Strings.
ConnectionList::nth_index< 1 >::type LocationIndex
bool find_server_by_hostname(const String &hostname, RangeServerConnectionPtr &rsc)
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
PropertiesPtr properties
This singleton map stores all options.
std::string String
A String is simply a typedef to std::string.
bool next_available_server(RangeServerConnectionPtr &rsc, bool urgent=false)
void get_servers(std::vector< RangeServerConnectionPtr > &servers)
void set_servers_balanced(const std::vector< RangeServerConnectionPtr > &servers)
void set_range_server_state(std::vector< RangeServerState > &states)
bool connect_server(RangeServerConnectionPtr &rsc, const String &hostname, InetAddr local_addr, InetAddr public_addr)
bool find_server_by_location_unlocked(const String &location, RangeServerConnectionPtr &rsc)
Encapsulate an internet address.
bool disconnect_server(RangeServerConnectionPtr &rsc)
Compatibility Macros for C/C++.
bool exist_unbalanced_servers()
RangeServerConnectionManager()
ConnectionList::nth_index< 2 >::type HostnameIndex
ConnectionList::nth_index< 3 >::type PublicAddrIndex
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
#define HT_FATALF(msg,...)
bool find_server_by_local_addr(InetAddr addr, RangeServerConnectionPtr &rsc)
bool is_connected(const String &location)
void add_server(RangeServerConnectionPtr &rsc)
Adds a range server connection.
#define HT_INFOF(msg,...)
ConnectionList::nth_index< 4 >::type LocalAddrIndex
bool remove_server(const std::string &location, RangeServerConnectionPtr &rsc)
Removes range server connection.
#define HT_ERRORF(msg,...)
void get_valid_connections(StringSet &locations, std::vector< RangeServerConnectionPtr > &connections)
bool find_server_by_public_addr(InetAddr addr, RangeServerConnectionPtr &rsc)