gms: get rid of unused failure_detector
The legacy failure_detector is now unused and can be removed. TODO: integare direct_failure_detector with failure_detector api. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -916,7 +916,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'gms/versioned_value.cc',
|
||||
'gms/gossiper.cc',
|
||||
'gms/feature_service.cc',
|
||||
'gms/failure_detector.cc',
|
||||
'gms/gossip_digest_syn.cc',
|
||||
'gms/gossip_digest_ack.cc',
|
||||
'gms/gossip_digest_ack2.cc',
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
#include "log.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "schema/schema_registry.hh"
|
||||
#include "idl/frozen_schema.dist.hh"
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
#include "dht/boot_strapper.hh"
|
||||
#include "dht/range_streamer.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "log.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "log.hh"
|
||||
#include "streaming/stream_plan.hh"
|
||||
#include "streaming/stream_state.hh"
|
||||
|
||||
@@ -1,156 +0,0 @@
|
||||
/*
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright (C) 2015-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/i_failure_detection_event_listener.hh"
|
||||
#include "gms/endpoint_state.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "log.hh"
|
||||
#include <iostream>
|
||||
#include <chrono>
|
||||
#include "replica/database.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
static logging::logger logger("failure_detector");
|
||||
|
||||
constexpr std::chrono::milliseconds failure_detector::DEFAULT_MAX_PAUSE;
|
||||
|
||||
using clk = arrival_window::clk;
|
||||
|
||||
void arrival_window::add(clk::time_point value, const gms::inet_address& ep) {
|
||||
if (_tlast > clk::time_point::min()) {
|
||||
auto inter_arrival_time = value - _tlast;
|
||||
if (inter_arrival_time <= _max_interval && inter_arrival_time >= _min_interval) {
|
||||
_arrival_intervals.add(inter_arrival_time.count());
|
||||
} else {
|
||||
logger.debug("failure_detector: Ignoring interval time of {} for {}, mean={}, size={}", inter_arrival_time.count(), ep, mean(), size());
|
||||
}
|
||||
} else {
|
||||
// We use a very large initial interval since the "right" average depends on the cluster size
|
||||
// and it's better to err high (false negatives, which will be corrected by waiting a bit longer)
|
||||
// than low (false positives, which cause "flapping").
|
||||
_arrival_intervals.add(_initial.count());
|
||||
}
|
||||
_tlast = value;
|
||||
}
|
||||
|
||||
double arrival_window::mean() const {
|
||||
return _arrival_intervals.mean();
|
||||
}
|
||||
|
||||
double arrival_window::phi(clk::time_point tnow) {
|
||||
assert(_arrival_intervals.size() > 0 && _tlast > clk::time_point::min()); // should not be called before any samples arrive
|
||||
auto t = (tnow - _tlast).count();
|
||||
auto m = mean();
|
||||
double phi = t / m;
|
||||
logger.debug("failure_detector: now={}, tlast={}, t={}, mean={}, phi={}",
|
||||
tnow.time_since_epoch().count(), _tlast.time_since_epoch().count(), t, m, phi);
|
||||
return phi;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const arrival_window& w) {
|
||||
for (auto& x : w._arrival_intervals.deque()) {
|
||||
os << x << " ";
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
void failure_detector::set_phi_convict_threshold(double phi) {
|
||||
_phi = phi;
|
||||
}
|
||||
|
||||
double failure_detector::get_phi_convict_threshold() {
|
||||
return _phi;
|
||||
}
|
||||
|
||||
void failure_detector::report(inet_address ep) {
|
||||
logger.trace("failure_detector: reporting {}", ep);
|
||||
auto now = clk::now();
|
||||
auto it = _arrival_samples.find(ep);
|
||||
if (it == _arrival_samples.end()) {
|
||||
// avoid adding an empty ArrivalWindow to the Map
|
||||
auto heartbeat_window = arrival_window(SAMPLE_SIZE, _initial, _max_interval, gossiper::INTERVAL);
|
||||
heartbeat_window.add(now, ep);
|
||||
_arrival_samples.emplace(ep, heartbeat_window);
|
||||
} else {
|
||||
it->second.add(now, ep);
|
||||
}
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void failure_detector::interpret(inet_address ep) {
|
||||
auto it = _arrival_samples.find(ep);
|
||||
if (it == _arrival_samples.end()) {
|
||||
return;
|
||||
}
|
||||
arrival_window& hb_wnd = it->second;
|
||||
auto now = clk::now();
|
||||
if (!_last_interpret) {
|
||||
*_last_interpret = now;
|
||||
}
|
||||
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(now - *_last_interpret);
|
||||
*_last_interpret = now;
|
||||
if (diff > get_max_local_pause()) {
|
||||
logger.warn("Not marking nodes down due to local pause of {} > {} (milliseconds)", diff.count(), get_max_local_pause().count());
|
||||
_last_paused = now;
|
||||
return;
|
||||
}
|
||||
if (clk::now() - _last_paused < get_max_local_pause()) {
|
||||
logger.debug("Still not marking nodes down due to local pause");
|
||||
return;
|
||||
}
|
||||
double phi = hb_wnd.phi(now);
|
||||
logger.trace("failure_detector: PHI for {} : {}", ep, phi);
|
||||
logger.trace("failure_detector: phi_convict_threshold={}", _phi);
|
||||
|
||||
if (PHI_FACTOR * phi > get_phi_convict_threshold()) {
|
||||
logger.trace("failure_detector: notifying listeners that {} is down", ep);
|
||||
logger.trace("failure_detector: intervals: {} mean: {}", hb_wnd, hb_wnd.mean());
|
||||
for (auto& listener : _fd_evnt_listeners) {
|
||||
logger.debug("failure_detector: convict ep={} phi={}", ep, phi);
|
||||
listener->convict(ep, phi);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void failure_detector::force_conviction(inet_address ep) {
|
||||
logger.debug("failure_detector: Forcing conviction of {}", ep);
|
||||
for (auto& listener : _fd_evnt_listeners) {
|
||||
listener->convict(ep, get_phi_convict_threshold());
|
||||
}
|
||||
}
|
||||
|
||||
void failure_detector::remove(inet_address ep) {
|
||||
_arrival_samples.erase(ep);
|
||||
}
|
||||
|
||||
void failure_detector::register_failure_detection_event_listener(i_failure_detection_event_listener* listener) {
|
||||
_fd_evnt_listeners.push_back(std::move(listener));
|
||||
}
|
||||
|
||||
void failure_detector::unregister_failure_detection_event_listener(i_failure_detection_event_listener* listener) {
|
||||
_fd_evnt_listeners.remove(listener);
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const failure_detector& x) {
|
||||
for (auto& entry : x._arrival_samples) {
|
||||
const inet_address& ep = entry.first;
|
||||
const arrival_window& win = entry.second;
|
||||
os << ep << " : " << win << "\n";
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
} // namespace gms
|
||||
@@ -1,163 +0,0 @@
|
||||
/*
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright (C) 2015-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include "utils/bounded_stats_deque.hh"
|
||||
#include <iosfwd>
|
||||
#include <cmath>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <optional>
|
||||
|
||||
#include "gms/inet_address.hh"
|
||||
|
||||
|
||||
namespace gms {
|
||||
class i_failure_detection_event_listener;
|
||||
class endpoint_state;
|
||||
|
||||
class arrival_window {
|
||||
public:
|
||||
using clk = seastar::lowres_system_clock;
|
||||
private:
|
||||
clk::time_point _tlast{clk::time_point::min()};
|
||||
utils::bounded_stats_deque _arrival_intervals;
|
||||
std::chrono::milliseconds _initial;
|
||||
std::chrono::milliseconds _max_interval;
|
||||
std::chrono::milliseconds _min_interval;
|
||||
|
||||
// this is useless except to provide backwards compatibility in phi_convict_threshold,
|
||||
// because everyone seems pretty accustomed to the default of 8, and users who have
|
||||
// already tuned their phi_convict_threshold for their own environments won't need to
|
||||
// change.
|
||||
static constexpr double PHI_FACTOR{M_LOG10El};
|
||||
|
||||
public:
|
||||
arrival_window(int size, std::chrono::milliseconds initial,
|
||||
std::chrono::milliseconds max_interval, std::chrono::milliseconds min_interval)
|
||||
: _arrival_intervals(size)
|
||||
, _initial(initial)
|
||||
, _max_interval(max_interval)
|
||||
, _min_interval(min_interval) {
|
||||
}
|
||||
|
||||
void add(clk::time_point value, const gms::inet_address& ep);
|
||||
|
||||
double mean() const;
|
||||
|
||||
// see CASSANDRA-2597 for an explanation of the math at work here.
|
||||
double phi(clk::time_point tnow);
|
||||
|
||||
size_t size() { return _arrival_intervals.size(); }
|
||||
|
||||
clk::time_point last_update() const { return _tlast; }
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const arrival_window& w);
|
||||
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* This FailureDetector is an implementation of the paper titled
|
||||
* "The Phi Accrual Failure Detector" by Hayashibara.
|
||||
* Check the paper and the <i>IFailureDetector</i> interface for details.
|
||||
*/
|
||||
class failure_detector {
|
||||
private:
|
||||
static constexpr int SAMPLE_SIZE = 1000;
|
||||
// this is useless except to provide backwards compatibility in phi_convict_threshold,
|
||||
// because everyone seems pretty accustomed to the default of 8, and users who have
|
||||
// already tuned their phi_convict_threshold for their own environments won't need to
|
||||
// change.
|
||||
static constexpr double PHI_FACTOR{M_LOG10El};
|
||||
|
||||
std::map<inet_address, arrival_window> _arrival_samples;
|
||||
std::list<i_failure_detection_event_listener*> _fd_evnt_listeners;
|
||||
double _phi = 8;
|
||||
std::chrono::milliseconds _initial;
|
||||
std::chrono::milliseconds _max_interval;
|
||||
|
||||
static constexpr std::chrono::milliseconds DEFAULT_MAX_PAUSE{5000};
|
||||
|
||||
std::chrono::milliseconds get_max_local_pause() {
|
||||
// FIXME: cassandra.max_local_pause_in_ms
|
||||
#if 0
|
||||
if (System.getProperty("cassandra.max_local_pause_in_ms") != null) {
|
||||
long pause = Long.parseLong(System.getProperty("cassandra.max_local_pause_in_ms"));
|
||||
logger.warn("Overriding max local pause time to {}ms", pause);
|
||||
return pause * 1000000L;
|
||||
} else {
|
||||
return DEFAULT_MAX_PAUSE;
|
||||
}
|
||||
#endif
|
||||
return DEFAULT_MAX_PAUSE;
|
||||
}
|
||||
|
||||
std::optional<arrival_window::clk::time_point> _last_interpret;
|
||||
arrival_window::clk::time_point _last_paused;
|
||||
|
||||
public:
|
||||
failure_detector(double phi, std::chrono::milliseconds initial, std::chrono::milliseconds max_interval)
|
||||
: _phi(phi), _initial(initial), _max_interval(max_interval) {
|
||||
}
|
||||
|
||||
const std::map<inet_address, arrival_window>& arrival_samples() const {
|
||||
return _arrival_samples;
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Dump the inter arrival times for examination if necessary.
|
||||
*/
|
||||
#if 0
|
||||
void dumpInterArrivalTimes() {
|
||||
File file = FileUtils.createTempFile("failuredetector-", ".dat");
|
||||
|
||||
OutputStream os = null;
|
||||
try
|
||||
{
|
||||
os = new BufferedOutputStream(new FileOutputStream(file, true));
|
||||
os.write(toString().getBytes());
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new FSWriteError(e, file);
|
||||
}
|
||||
finally
|
||||
{
|
||||
FileUtils.closeQuietly(os);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
void set_phi_convict_threshold(double phi);
|
||||
|
||||
double get_phi_convict_threshold();
|
||||
|
||||
void report(inet_address ep);
|
||||
|
||||
void interpret(inet_address ep);
|
||||
|
||||
void force_conviction(inet_address ep);
|
||||
|
||||
void remove(inet_address ep);
|
||||
|
||||
void register_failure_detection_event_listener(i_failure_detection_event_listener* listener);
|
||||
|
||||
void unregister_failure_detection_event_listener(i_failure_detection_event_listener* listener);
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const failure_detector& x);
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
@@ -18,7 +18,6 @@
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/i_failure_detection_event_listener.hh"
|
||||
#include "gms/i_endpoint_state_change_subscriber.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
#include "utils/atomic_vector.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/versioned_value.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "gms/endpoint_state.hh"
|
||||
@@ -30,7 +29,6 @@
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/in.hh"
|
||||
#include "message/messaging_service_fwd.hh"
|
||||
#include "direct_failure_detector/failure_detector.hh"
|
||||
#include <optional>
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
|
||||
1
init.cc
1
init.cc
@@ -7,7 +7,6 @@
|
||||
*/
|
||||
|
||||
#include "init.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "utils/to_string.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
|
||||
#include "message/messaging_service.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "streaming/prepare_message.hh"
|
||||
#include "gms/gossip_digest_syn.hh"
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
#include "query_result_merger.hh"
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include "db/read_repair_decision.hh"
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
#include "service/raft/raft_group0.hh"
|
||||
#include "utils/to_string.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <sstream>
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/application_state.hh"
|
||||
|
||||
Reference in New Issue
Block a user