direct_failure_detector: run direct failure detector in the gossiper scheduling group

When direct failure detector was introduces the idea was that it will
run on the same connection raft group0 verbs are running, but in
60f1053087 raft verbs were moved to run on the gossiper connection
while DIRECT_FD_PING was left where it was. This patch move it to
gossiper connection as well and fix the pinger code to run in gossiper
scheduling group.

(cherry picked from commit 86dde50c0d)
This commit is contained in:
Gleb Natapov
2025-12-01 15:32:17 +02:00
parent 37010db61a
commit c33c09336b
7 changed files with 19 additions and 14 deletions

View File

@@ -1638,7 +1638,7 @@ sharded<locator::shared_token_metadata> token_metadata;
fd.start(
std::ref(fd_pinger), std::ref(fd_clock),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count()).get();
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count(), dbcfg.gossip_scheduling_group).get();
auto stop_fd = defer_verbose_shutdown("direct_failure_detector", [] {
fd.stop().get();

View File

@@ -674,6 +674,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_ADD_ENTRY:
case messaging_verb::RAFT_MODIFY_CONFIG:
case messaging_verb::RAFT_PULL_SNAPSHOT:
case messaging_verb::DIRECT_FD_PING:
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.
@@ -732,7 +733,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
case messaging_verb::DIRECT_FD_PING:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "seastar/core/scheduling.hh"
#include "utils/assert.hh"
#include <unordered_set>
@@ -17,6 +18,7 @@
#include <seastar/core/condition-variable.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/switch_to.hh>
#include "utils/log.hh"
@@ -118,7 +120,7 @@ struct failure_detector::impl {
// Fetches endpoint updates from _endpoint_queue and performs the add/remove operation.
// Runs on shard 0 only.
future<> update_endpoint_fiber();
future<> update_endpoint_fiber(seastar::scheduling_group sg);
future<> _update_endpoint_fiber = make_ready_future<>();
// Workers running on this shard.
@@ -140,7 +142,7 @@ struct failure_detector::impl {
// The unregistering process requires cross-shard operations which we perform on this fiber.
future<> _destroy_subscriptions = make_ready_future<>();
impl(failure_detector& parent, pinger&, clock&, clock::interval_t ping_period, clock::interval_t ping_timeout);
impl(failure_detector& parent, pinger&, clock&, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg);
~impl();
// Inform update_endpoint_fiber() about an added/removed endpoint.
@@ -177,19 +179,19 @@ struct failure_detector::impl {
};
failure_detector::failure_detector(
pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout)
: _impl(std::make_unique<impl>(*this, pinger, clock, ping_period, ping_timeout))
pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg)
: _impl(std::make_unique<impl>(*this, pinger, clock, ping_period, ping_timeout, sg))
{}
failure_detector::impl::impl(
failure_detector& parent, pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout)
failure_detector& parent, pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg)
: _parent(parent), _pinger(pinger), _clock(clock), _ping_period(ping_period), _ping_timeout(ping_timeout) {
if (this_shard_id() != 0) {
return;
}
_num_workers.resize(smp::count, 0);
_update_endpoint_fiber = update_endpoint_fiber();
_update_endpoint_fiber = update_endpoint_fiber(sg);
}
void failure_detector::impl::send_update_endpoint(pinger::endpoint_id ep, endpoint_update update) {
@@ -205,9 +207,9 @@ void failure_detector::impl::send_update_endpoint(pinger::endpoint_id ep, endpoi
_endpoint_changed.signal();
}
future<> failure_detector::impl::update_endpoint_fiber() {
future<> failure_detector::impl::update_endpoint_fiber(seastar::scheduling_group sg) {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await coroutine::switch_to(sg);
while (true) {
co_await _endpoint_changed.wait([this] { return !_endpoint_updates.empty(); });

View File

@@ -128,7 +128,10 @@ public:
// Duration after which a ping is aborted, so that next ping can be started
// (pings are sent sequentially).
clock::interval_t ping_timeout
clock::interval_t ping_timeout,
// Scheduling group used for fibers inside the failure detector.
seastar::scheduling_group sg
);
~failure_detector();

View File

@@ -851,7 +851,7 @@ private:
_fd.start(
std::ref(_fd_pinger), std::ref(fd_clock),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{600}}.count()).get();
service::direct_fd_clock::base::duration{std::chrono::milliseconds{600}}.count(), gcfg.gossip_scheduling_group).get();
auto stop_fd = defer_verbose_shutdown("direct failure detector", [this] {
_fd.stop().get();

View File

@@ -132,7 +132,7 @@ SEASTAR_TEST_CASE(failure_detector_test) {
test_pinger pinger;
test_clock clock;
sharded<direct_failure_detector::failure_detector> fd;
co_await fd.start(std::ref(pinger), std::ref(clock), 10, 30);
co_await fd.start(std::ref(pinger), std::ref(clock), 10, 30, seastar::current_scheduling_group());
test_listener l1, l2;
auto sub1 = co_await fd.local().register_listener(l1, 95);

View File

@@ -1439,7 +1439,7 @@ public:
// _fd_service must be started before raft server,
// because as soon as raft server is started, it may start adding endpoints to the service.
// _fd_service is using _server's RPC, but not until the first endpoint is added.
co_await _fd_service->start(std::ref(*_fd_pinger), std::ref(*_fd_clock), fd_ping_period.count(), fd_ping_timeout.count());
co_await _fd_service->start(std::ref(*_fd_pinger), std::ref(*_fd_clock), fd_ping_period.count(), fd_ping_timeout.count(), seastar::current_scheduling_group());
_fd_subscription.emplace(co_await _fd_service->local().register_listener(*_fd_listener, _fd_convict_threshold.count()));
co_await _server->start();
}