service: make address_map raft independent

We want to start using address map class outside for raft, so lets make
it work on host_id instead of raft::servers_id and move is outside of
raft.
This commit is contained in:
Gleb Natapov
2024-10-10 15:11:52 +03:00
parent cc1b5aaf51
commit be5caec54e
14 changed files with 463 additions and 446 deletions

View File

@@ -1195,7 +1195,7 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
}
auto host2ip = [&addr_map = _addr_map] (locator::host_id host) -> future<gms::inet_address> {
auto ip = addr_map.local().find(raft::server_id(host.uuid()));
auto ip = addr_map.local().find(host);
if (!ip) {
throw std::runtime_error(format("Could not get ip address for host {} from raft_address_map", host));
}
@@ -2433,7 +2433,7 @@ future<> repair_service::repair_tablet(locator::tablet_metadata_guard& guard, lo
auto tablet_id = gid.tablet;
auto host2ip = [&addr_map = _addr_map] (locator::host_id host) -> future<gms::inet_address> {
auto ip = addr_map.local().find(raft::server_id(host.uuid()));
auto ip = addr_map.local().find(host);
if (!ip) {
throw std::runtime_error(format("Could not get ip address for host {} from raft_address_map", host));
}

383
service/address_map.hh Normal file
View File

@@ -0,0 +1,383 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "utils/assert.hh"
#include "gms/inet_address.hh"
#include "gms/generation-number.hh"
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/util/log.hh>
#include <boost/intrusive/list.hpp>
#include <chrono>
#include <source_location>
#include "locator/host_id.hh"
namespace bi = boost::intrusive;
namespace db {
class system_keyspace;
}
namespace service {
extern seastar::logger rslog;
// This class provides an abstraction of expirable server address mappings
// used by the messaging service and raft rpc module to store host id to ip maping
template <typename Clock>
class address_map_t : public peering_sharded_service<address_map_t<Clock>> {
// Expiring mappings stay in the cache for 1 hour (if not accessed during this time period)
static constexpr std::chrono::hours default_expiry_period{1};
static constexpr size_t initial_buckets_count = 16;
using clock_duration = typename Clock::duration;
using clock_time_point = typename Clock::time_point;
class expiring_entry_ptr;
// An `inet_address` optionally equipped with a pointer to an entry
// in LRU list of 'expiring entries'. If the pointer is set, it means that this
// `timestamped_entry` is expiring; the corresponding LRU list entry contains
// the last access time and we periodically delete elements from the LRU list
// when they become too old.
struct timestamped_entry {
std::optional<gms::inet_address> _addr;
// The address map's source of IP addresses is gossip,
// which can reorder events it delivers. It is therefore
// possible that we get an outdated IP address after
// the map has been updated with a new one, and revert
// the mapping to an incorrect one (see #14274). To
// protect against outdated information we mark each
// entry with its generation number, when available,
// and drop updates with outdated generations. 0 means
// there is no generation available - e.g. it's set when
// we load the persisted map state from system.peers at
// boot.
gms::generation_type _generation_number;
std::unique_ptr<expiring_entry_ptr> _lru_entry;
explicit timestamped_entry(gms::generation_type generation_number,
std::optional<gms::inet_address> addr)
: _addr(std::move(addr)), _generation_number(generation_number), _lru_entry(nullptr)
{
}
bool expiring() const {
return _lru_entry != nullptr;
}
};
class expiring_entry_ptr : public bi::list_base_hook<> {
public:
// Base type for LRU list of expiring entries.
//
// When an entry is created with state, an
// entry in this list is created, holding a pointer to the base entry
// which contains the data.
//
// The LRU list is maintained in such a way that MRU (most recently used)
// entries are at the beginning of the list while LRU entries move to the
// end.
using list_type = bi::list<expiring_entry_ptr>;
explicit expiring_entry_ptr(list_type& l, const locator::host_id& entry_id)
: _expiring_list(l), _last_accessed(Clock::now()), _entry_id(entry_id)
{
_expiring_list.push_front(*this);
}
~expiring_entry_ptr() {
_expiring_list.erase(_expiring_list.iterator_to(*this));
}
// Update last access timestamp and move ourselves to the front of LRU list.
void touch() {
_last_accessed = Clock::now();
_expiring_list.erase(_expiring_list.iterator_to(*this));
_expiring_list.push_front(*this);
}
// Test whether the entry has expired or not given a base time point and
// an expiration period (the time period since the last access lies within
// the given expiration period time frame).
bool expired(clock_duration expiry_period) const {
auto last_access_delta = Clock::now() - _last_accessed;
return expiry_period < last_access_delta;
}
const locator::host_id& entry_id() {
return _entry_id;
}
private:
list_type& _expiring_list;
clock_time_point _last_accessed;
const locator::host_id& _entry_id;
};
using map_type = std::unordered_map<locator::host_id, timestamped_entry>;
using map_iterator = typename map_type::iterator;
using expiring_list_type = typename expiring_entry_ptr::list_type;
using expiring_list_iterator = typename expiring_list_type::iterator;
// LRU list to hold expiring entries.
//
// Marked as `mutable` since the `find` function, which should naturally
// be `const`, updates the entry's timestamp and thus requires
// non-const access.
mutable expiring_list_type _expiring_list;
// Container to hold address mappings (both permanent and expiring).
// Declared as `mutable` for the same reasons as `_expiring_list`.
//
// It's important that _map is declared after _expiring_list, so it's
// destroyed first: when we destroy _map, the LRU entries corresponding
// to expiring entries are also destroyed, which unlinks them from the list,
// so the list must still exist.
mutable map_type _map;
// Timer that executes the cleanup procedure to erase expired
// entries from the mappings container.
//
// Rearmed automatically in the following cases:
// * A new expiring entry is created
// * If there are still some expiring entries left in the LRU list after
// the cleanup is finished.
seastar::timer<Clock> _timer;
clock_duration _expiry_period;
std::optional<future<>> _replication_fiber{make_ready_future<>()};
void drop_expired_entries(bool force = false) {
auto list_it = _expiring_list.rbegin();
while (list_it != _expiring_list.rend() && (list_it->expired(_expiry_period) || force)) {
// Remove from both LRU list and base storage
auto map_it = _map.find(list_it->entry_id());
if (map_it == _map.end()) {
on_internal_error(rslog, format(
"address_map::drop_expired_entries: missing entry with id {}", list_it->entry_id()));
}
_map.erase(map_it);
// Point at the oldest entry again
list_it = _expiring_list.rbegin();
}
if (!_expiring_list.empty()) {
// Rearm the timer in case there are still some expiring entries
_timer.arm(_expiry_period);
}
}
void add_expiring_entry(const locator::host_id& entry_id, timestamped_entry& entry) {
entry._lru_entry = std::make_unique<expiring_entry_ptr>(_expiring_list, entry_id);
if (!_timer.armed()) {
_timer.arm(_expiry_period);
}
}
template <std::invocable<address_map_t&> F>
void replicate(F f, seastar::compat::source_location l = seastar::compat::source_location::current()) {
if (this_shard_id() != 0) {
auto msg = format("address_map::{}() called on shard {} != 0",
l.function_name(), this_shard_id());
on_internal_error(rslog, msg);
}
if (!_replication_fiber) {
return;
}
_replication_fiber = _replication_fiber->then([this, f = std::move(f), l] () -> future<> {
try {
co_await this->container().invoke_on_others([f] (address_map_t& self) {
f(self);
});
} catch (...) {
rslog.error("address_map_t::replicate (called from {}) failed: {}",
l.function_name(), std::current_exception());
}
});
}
void replicate_set_nonexpiring(const locator::host_id& id) {
replicate([id] (address_map_t& self) {
self.handle_set_nonexpiring(id);
});
}
void replicate_set_expiring(const locator::host_id& id) {
replicate([id] (address_map_t& self) {
self.handle_set_expiring(id);
});
}
void replicate_add_or_update_entry(const locator::host_id& id,
gms::generation_type generation_number, const gms::inet_address& ip_addr,
bool update_if_exists) {
replicate([id, generation_number, ip_addr, update_if_exists] (address_map_t& self) {
self.handle_add_or_update_entry(id, generation_number, ip_addr, update_if_exists);
});
}
void handle_set_nonexpiring(const locator::host_id& id) {
auto [it, _] = _map.try_emplace(id, timestamped_entry{gms::generation_type{}, std::nullopt});
auto& entry = it->second;
if (entry.expiring()) {
entry._lru_entry = nullptr;
}
}
void handle_set_expiring(const locator::host_id& id) {
auto it = _map.find(id);
if (it == _map.end()) {
return;
}
auto& entry = it->second;
if (entry.expiring()) {
return;
}
add_expiring_entry(it->first, entry);
}
void handle_add_or_update_entry(const locator::host_id& id,
gms::generation_type generation_number, const gms::inet_address& ip_addr,
bool update_if_exists) {
auto [it, emplaced] = _map.try_emplace(id, timestamped_entry{generation_number, ip_addr});
auto& entry = it->second;
if (emplaced) {
add_expiring_entry(it->first, entry);
} else if ((update_if_exists && generation_number >= entry._generation_number) || !entry._addr) {
entry._addr = ip_addr;
entry._generation_number = generation_number;
if (entry.expiring()) {
entry._lru_entry->touch(); // Re-insert in the front of _expiring_list
}
}
}
public:
address_map_t()
: _map(initial_buckets_count),
_timer([this] { drop_expired_entries(); }),
_expiry_period(default_expiry_period)
{}
future<> stop() {
SCYLLA_ASSERT(_replication_fiber);
co_await *std::exchange(_replication_fiber, std::nullopt);
}
~address_map_t() {
SCYLLA_ASSERT(!_replication_fiber);
}
// Find a mapping with a given id.
//
// If a mapping is expiring, the last access timestamp is updated automatically.
std::optional<gms::inet_address> find(locator::host_id id) const {
auto it = _map.find(id);
if (it == _map.end()) {
return std::nullopt;
}
auto& entry = it->second;
if (entry.expiring()) {
// Touch the entry to update it's access timestamp and move it to the front of LRU list
entry._lru_entry->touch();
}
return entry._addr;
}
// Find an id with a given mapping.
//
// If a mapping is expiring, the last access timestamp is updated automatically.
//
// The only purpose of this function is to allow passing IPs with the
// --ignore-dead-nodes parameter in raft_removenode and raft_replace. As this
// feature is deprecated, we should also remove this function when the
// deprecation period ends.
std::optional<locator::host_id> find_by_addr(gms::inet_address addr) const {
rslog.warn("Finding Raft nodes by IP addresses is deprecated. Please use Host IDs instead.");
if (addr == gms::inet_address{}) {
on_internal_error(rslog, "address_map::find_by_addr: called with an empty address");
}
auto it = std::find_if(_map.begin(), _map.end(), [&](auto&& mapping) { return mapping.second._addr == addr; });
if (it == _map.end()) {
return std::nullopt;
}
auto& entry = it->second;
if (entry.expiring()) {
entry._lru_entry->touch();
}
return it->first;
}
// Convert an expiring entry to a non-expiring one, or
// insert a new non-expiring entry if the entry is missing.
// Called on Raft configuration changes to mark the new
// member of Raft configuration as a non-temporary member
// of the address map. The configuration member may be
// lacking an IP address but it will be added later.
// Can only be called on shard 0.
// The expiring state is replicated to other shards.
void set_nonexpiring(locator::host_id id) {
handle_set_nonexpiring(id);
replicate_set_nonexpiring(id);
}
// Convert a non-expiring entry to an expiring one,
// eventually erasing it from the mapping. Never inserts an
// entry if it doesn't exist.
// Can be called only on shard 0.
// The expiring state is replicated to other shards.
void set_expiring(locator::host_id id) {
handle_set_expiring(id);
replicate_set_expiring(id);
}
// Insert a new mapping with an IP address if it doesn't
// exist yet. Creates a mapping only on the current shard. Doesn't
// update the mapping if it already exists.
// The purpose of this function is to cache an IP address
// on a local shard while gossiper messages are still
// arriving.
// Used primarily from Raft RPC to speed up Raft at boot.
void opt_add_entry(locator::host_id id, gms::inet_address addr) {
if (addr == gms::inet_address{}) {
on_internal_error(rslog, format("IP address missing for {}", id));
}
handle_add_or_update_entry(id, gms::generation_type{}, addr, false);
}
// Insert or update entry with a new IP address on all shards.
// Used when we get a gossip notification about a node IP
// address. Overrides the current IP address if present,
// as long as the generation of the new entry is greater.
// If no entry is present, creates an expiring entry - there
// must be a separate Raft configuration change event (@sa
// set_nonexpiring()) to mark the entry as non expiring.
void add_or_update_entry(locator::host_id id, gms::inet_address addr,
gms::generation_type generation_number = {}) {
if (addr == gms::inet_address{}) {
on_internal_error(rslog, format("IP address missing for {}", id));
}
handle_add_or_update_entry(id, generation_number, addr, true);
replicate_add_or_update_entry(id, generation_number, addr, true);
}
// Drop all expiring entries immediately, without waiting for expiry.
// Used for testing
void force_drop_expiring_entries() {
drop_expired_entries(true);
}
};
} // end of namespace service

View File

@@ -14,8 +14,8 @@
namespace service {
template <typename C> class raft_address_map_t;
using raft_address_map = raft_address_map_t<seastar::lowres_clock>;
template <typename C> class address_map_t;
using raft_address_map = address_map_t<seastar::lowres_clock>;
// Address of a discovery peer
struct discovery_peer {

View File

@@ -50,7 +50,7 @@ void group0_state_id_handler::refresh() {
std::vector<raft::server_id> group0_members_missing_state_id;
const auto& group0_members_state_ids = group0_members | std::ranges::views::transform([&](const auto& member) -> std::optional<utils::UUID> {
const auto endpoint_addr = _address_map.find(member.addr.id);
const auto endpoint_addr = _address_map.find(locator::host_id{member.addr.id.uuid()});
if (!endpoint_addr) {
group0_members_missing_endpoint.push_back(member.addr.id);
return std::nullopt;

View File

@@ -322,7 +322,7 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) {
// FIXME: The translation will ultimately be done by messaging_service
auto from_ip = _address_map.find(from_id);
auto from_ip = _address_map.find(locator::host_id{from_id.uuid()});
if (!from_ip.has_value()) {
// This is virtually impossible. We've just received the
// snapshot from the sender and must have updated our

View File

@@ -29,8 +29,8 @@ class storage_proxy;
class storage_service;
struct group0_state_machine_merger;
template <typename C> class raft_address_map_t;
using raft_address_map = raft_address_map_t<seastar::lowres_clock>;
template <typename C> class address_map_t;
using raft_address_map = address_map_t<seastar::lowres_clock>;
struct schema_change {
// Mutations of schema tables (such as `system_schema.keyspaces`, `system_schema.tables` etc.)

View File

@@ -7,383 +7,17 @@
*/
#pragma once
#include "utils/assert.hh"
#include "gms/inet_address.hh"
#include "gms/generation-number.hh"
#include "raft/raft.hh"
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/util/log.hh>
#include <boost/intrusive/list.hpp>
#include <chrono>
#include <source_location>
namespace bi = boost::intrusive;
namespace db {
class system_keyspace;
}
#include "service/address_map.hh"
namespace service {
extern seastar::logger rslog;
using raft_ticker_type = seastar::timer<lowres_clock>;
// TODO: should be configurable.
static constexpr raft_ticker_type::duration raft_tick_interval = std::chrono::milliseconds(100);
// This class provides an abstraction of expirable server address mappings
// used by the raft rpc module to store connection info for servers in a raft group.
template <typename Clock>
class raft_address_map_t : public peering_sharded_service<raft_address_map_t<Clock>> {
// Expiring mappings stay in the cache for 1 hour (if not accessed during this time period)
static constexpr std::chrono::hours default_expiry_period{1};
static constexpr size_t initial_buckets_count = 16;
using clock_duration = typename Clock::duration;
using clock_time_point = typename Clock::time_point;
class expiring_entry_ptr;
// An `inet_address` optionally equipped with a pointer to an entry
// in LRU list of 'expiring entries'. If the pointer is set, it means that this
// `timestamped_entry` is expiring; the corresponding LRU list entry contains
// the last access time and we periodically delete elements from the LRU list
// when they become too old.
struct timestamped_entry {
std::optional<gms::inet_address> _addr;
// The address map's source of IP addresses is gossip,
// which can reorder events it delivers. It is therefore
// possible that we get an outdated IP address after
// the map has been updated with a new one, and revert
// the mapping to an incorrect one (see #14274). To
// protect against outdated information we mark each
// entry with its generation number, when available,
// and drop updates with outdated generations. 0 means
// there is no generation available - e.g. it's set when
// we load the persisted map state from system.peers at
// boot.
gms::generation_type _generation_number;
std::unique_ptr<expiring_entry_ptr> _lru_entry;
explicit timestamped_entry(gms::generation_type generation_number,
std::optional<gms::inet_address> addr)
: _addr(std::move(addr)), _generation_number(generation_number), _lru_entry(nullptr)
{
}
bool expiring() const {
return _lru_entry != nullptr;
}
};
class expiring_entry_ptr : public bi::list_base_hook<> {
public:
// Base type for LRU list of expiring entries.
//
// When an entry is created with state, an
// entry in this list is created, holding a pointer to the base entry
// which contains the data.
//
// The LRU list is maintained in such a way that MRU (most recently used)
// entries are at the beginning of the list while LRU entries move to the
// end.
using list_type = bi::list<expiring_entry_ptr>;
explicit expiring_entry_ptr(list_type& l, const raft::server_id& entry_id)
: _expiring_list(l), _last_accessed(Clock::now()), _entry_id(entry_id)
{
_expiring_list.push_front(*this);
}
~expiring_entry_ptr() {
_expiring_list.erase(_expiring_list.iterator_to(*this));
}
// Update last access timestamp and move ourselves to the front of LRU list.
void touch() {
_last_accessed = Clock::now();
_expiring_list.erase(_expiring_list.iterator_to(*this));
_expiring_list.push_front(*this);
}
// Test whether the entry has expired or not given a base time point and
// an expiration period (the time period since the last access lies within
// the given expiration period time frame).
bool expired(clock_duration expiry_period) const {
auto last_access_delta = Clock::now() - _last_accessed;
return expiry_period < last_access_delta;
}
const raft::server_id& entry_id() {
return _entry_id;
}
private:
list_type& _expiring_list;
clock_time_point _last_accessed;
const raft::server_id& _entry_id;
};
using map_type = std::unordered_map<raft::server_id, timestamped_entry>;
using map_iterator = typename map_type::iterator;
using expiring_list_type = typename expiring_entry_ptr::list_type;
using expiring_list_iterator = typename expiring_list_type::iterator;
// LRU list to hold expiring entries.
//
// Marked as `mutable` since the `find` function, which should naturally
// be `const`, updates the entry's timestamp and thus requires
// non-const access.
mutable expiring_list_type _expiring_list;
// Container to hold address mappings (both permanent and expiring).
// Declared as `mutable` for the same reasons as `_expiring_list`.
//
// It's important that _map is declared after _expiring_list, so it's
// destroyed first: when we destroy _map, the LRU entries corresponding
// to expiring entries are also destroyed, which unlinks them from the list,
// so the list must still exist.
mutable map_type _map;
// Timer that executes the cleanup procedure to erase expired
// entries from the mappings container.
//
// Rearmed automatically in the following cases:
// * A new expiring entry is created
// * If there are still some expiring entries left in the LRU list after
// the cleanup is finished.
seastar::timer<Clock> _timer;
clock_duration _expiry_period;
std::optional<future<>> _replication_fiber{make_ready_future<>()};
void drop_expired_entries(bool force = false) {
auto list_it = _expiring_list.rbegin();
while (list_it != _expiring_list.rend() && (list_it->expired(_expiry_period) || force)) {
// Remove from both LRU list and base storage
auto map_it = _map.find(list_it->entry_id());
if (map_it == _map.end()) {
on_internal_error(rslog, format(
"raft_address_map::drop_expired_entries: missing entry with id {}", list_it->entry_id()));
}
_map.erase(map_it);
// Point at the oldest entry again
list_it = _expiring_list.rbegin();
}
if (!_expiring_list.empty()) {
// Rearm the timer in case there are still some expiring entries
_timer.arm(_expiry_period);
}
}
void add_expiring_entry(const raft::server_id& entry_id, timestamped_entry& entry) {
entry._lru_entry = std::make_unique<expiring_entry_ptr>(_expiring_list, entry_id);
if (!_timer.armed()) {
_timer.arm(_expiry_period);
}
}
template <std::invocable<raft_address_map_t&> F>
void replicate(F f, seastar::compat::source_location l = seastar::compat::source_location::current()) {
if (this_shard_id() != 0) {
auto msg = format("raft_address_map::{}() called on shard {} != 0",
l.function_name(), this_shard_id());
on_internal_error(rslog, msg);
}
if (!_replication_fiber) {
return;
}
_replication_fiber = _replication_fiber->then([this, f = std::move(f), l] () -> future<> {
try {
co_await this->container().invoke_on_others([f] (raft_address_map_t& self) {
f(self);
});
} catch (...) {
rslog.error("raft_address_map_t::replicate (called from {}) failed: {}",
l.function_name(), std::current_exception());
}
});
}
void replicate_set_nonexpiring(const raft::server_id& id) {
replicate([id] (raft_address_map_t& self) {
self.handle_set_nonexpiring(id);
});
}
void replicate_set_expiring(const raft::server_id& id) {
replicate([id] (raft_address_map_t& self) {
self.handle_set_expiring(id);
});
}
void replicate_add_or_update_entry(const raft::server_id& id,
gms::generation_type generation_number, const gms::inet_address& ip_addr,
bool update_if_exists) {
replicate([id, generation_number, ip_addr, update_if_exists] (raft_address_map_t& self) {
self.handle_add_or_update_entry(id, generation_number, ip_addr, update_if_exists);
});
}
void handle_set_nonexpiring(const raft::server_id& id) {
auto [it, _] = _map.try_emplace(id, timestamped_entry{gms::generation_type{}, std::nullopt});
auto& entry = it->second;
if (entry.expiring()) {
entry._lru_entry = nullptr;
}
}
void handle_set_expiring(const raft::server_id& id) {
auto it = _map.find(id);
if (it == _map.end()) {
return;
}
auto& entry = it->second;
if (entry.expiring()) {
return;
}
add_expiring_entry(it->first, entry);
}
void handle_add_or_update_entry(const raft::server_id& id,
gms::generation_type generation_number, const gms::inet_address& ip_addr,
bool update_if_exists) {
auto [it, emplaced] = _map.try_emplace(id, timestamped_entry{generation_number, ip_addr});
auto& entry = it->second;
if (emplaced) {
add_expiring_entry(it->first, entry);
} else if ((update_if_exists && generation_number >= entry._generation_number) || !entry._addr) {
entry._addr = ip_addr;
entry._generation_number = generation_number;
if (entry.expiring()) {
entry._lru_entry->touch(); // Re-insert in the front of _expiring_list
}
}
}
public:
raft_address_map_t()
: _map(initial_buckets_count),
_timer([this] { drop_expired_entries(); }),
_expiry_period(default_expiry_period)
{}
future<> stop() {
SCYLLA_ASSERT(_replication_fiber);
co_await *std::exchange(_replication_fiber, std::nullopt);
}
~raft_address_map_t() {
SCYLLA_ASSERT(!_replication_fiber);
}
// Find a mapping with a given id.
//
// If a mapping is expiring, the last access timestamp is updated automatically.
std::optional<gms::inet_address> find(raft::server_id id) const {
auto it = _map.find(id);
if (it == _map.end()) {
return std::nullopt;
}
auto& entry = it->second;
if (entry.expiring()) {
// Touch the entry to update it's access timestamp and move it to the front of LRU list
entry._lru_entry->touch();
}
return entry._addr;
}
// Find an id with a given mapping.
//
// If a mapping is expiring, the last access timestamp is updated automatically.
//
// The only purpose of this function is to allow passing IPs with the
// --ignore-dead-nodes parameter in raft_removenode and raft_replace. As this
// feature is deprecated, we should also remove this function when the
// deprecation period ends.
std::optional<raft::server_id> find_by_addr(gms::inet_address addr) const {
rslog.warn("Finding Raft nodes by IP addresses is deprecated. Please use Host IDs instead.");
if (addr == gms::inet_address{}) {
on_internal_error(rslog, "raft_address_map::find_by_addr: called with an empty address");
}
auto it = std::find_if(_map.begin(), _map.end(), [&](auto&& mapping) { return mapping.second._addr == addr; });
if (it == _map.end()) {
return std::nullopt;
}
auto& entry = it->second;
if (entry.expiring()) {
entry._lru_entry->touch();
}
return it->first;
}
// Convert an expiring entry to a non-expiring one, or
// insert a new non-expiring entry if the entry is missing.
// Called on Raft configuration changes to mark the new
// member of Raft configuration as a non-temporary member
// of the address map. The configuration member may be
// lacking an IP address but it will be added later.
// Can only be called on shard 0.
// The expiring state is replicated to other shards.
void set_nonexpiring(raft::server_id id) {
handle_set_nonexpiring(id);
replicate_set_nonexpiring(id);
}
// Convert a non-expiring entry to an expiring one,
// eventually erasing it from the mapping. Never inserts an
// entry if it doesn't exist.
// Can be called only on shard 0.
// The expiring state is replicated to other shards.
void set_expiring(raft::server_id id) {
handle_set_expiring(id);
replicate_set_expiring(id);
}
// Insert a new mapping with an IP address if it doesn't
// exist yet. Creates a mapping only on the current shard. Doesn't
// update the mapping if it already exists.
// The purpose of this function is to cache an IP address
// on a local shard while gossiper messages are still
// arriving.
// Used primarily from Raft RPC to speed up Raft at boot.
void opt_add_entry(raft::server_id id, gms::inet_address addr) {
if (addr == gms::inet_address{}) {
on_internal_error(rslog, format("IP address missing for {}", id));
}
handle_add_or_update_entry(id, gms::generation_type{}, addr, false);
}
// Insert or update entry with a new IP address on all shards.
// Used when we get a gossip notification about a node IP
// address. Overrides the current IP address if present,
// as long as the generation of the new entry is greater.
// If no entry is present, creates an expiring entry - there
// must be a separate Raft configuration change event (@sa
// set_nonexpiring()) to mark the entry as non expiring.
void add_or_update_entry(raft::server_id id, gms::inet_address addr,
gms::generation_type generation_number = {}) {
if (addr == gms::inet_address{}) {
on_internal_error(rslog, format("IP address missing for {}", id));
}
handle_add_or_update_entry(id, generation_number, addr, true);
replicate_add_or_update_entry(id, generation_number, addr, true);
}
// Drop all expiring entries immediately, without waiting for expiry.
// Used for testing
void force_drop_expiring_entries() {
drop_expired_entries(true);
}
};
using raft_address_map = raft_address_map_t<seastar::lowres_clock>;
using raft_address_map = address_map_t<seastar::lowres_clock>;
} // end of namespace service

View File

@@ -115,19 +115,19 @@ public:
virtual void on_configuration_change(raft::server_address_set add, raft::server_address_set del) override {
for (const auto& addr: add) {
auto ip_for_id = _address_map.find(addr.id);
auto ip_for_id = _address_map.find(locator::host_id{addr.id.uuid()});
if (!ip_for_id) {
// Make sure that the addresses of new nodes in the configuration are in the address map
auto ips = _gossiper.get_nodes_with_host_id(locator::host_id(addr.id.uuid()));
for (auto ip : ips) {
if (_gossiper.is_normal(ip)) {
_address_map.add_or_update_entry(addr.id, ip);
_address_map.add_or_update_entry(locator::host_id{addr.id.uuid()}, ip);
}
}
}
// Entries explicitly managed via `rpc::on_configuration_change() should NOT be
// expirable.
_address_map.set_nonexpiring(addr.id);
_address_map.set_nonexpiring(locator::host_id{addr.id.uuid()});
// Notify the direct failure detector that it should track
// (or liveness of a specific raft server id.
if (addr != _my_id) {
@@ -139,7 +139,7 @@ public:
// RPC 'send' may yield before resolving IP address,
// e.g. on _shutdown_gate, so keep the deleted
// entries in the map for a bit.
_address_map.set_expiring(addr.id);
_address_map.set_expiring(locator::host_id{addr.id.uuid()});
_direct_fd.remove_endpoint(addr.id.id);
}
}
@@ -600,7 +600,7 @@ struct group0_members {
}
std::optional<gms::inet_address> get_inet_addr(const raft::config_member& member) const {
return _address_map.find(member.addr.id);
return _address_map.find(locator::host_id{member.addr.id.uuid()});
}
std::vector<gms::inet_address> get_inet_addrs(seastar::compat::source_location l =
@@ -610,7 +610,7 @@ struct group0_members {
std::vector<raft::server_id> missing;
ret.reserve(members.size());
for (const auto& srv: members) {
auto addr = _address_map.find(srv.addr.id);
auto addr = _address_map.find(locator::host_id{srv.addr.id.uuid()});
if (!addr.has_value()) {
missing.push_back(srv.addr.id);
} else {
@@ -723,7 +723,7 @@ future<> raft_group0::setup_group0(
replace_info->raft_id, replace_info->ip_addr);
// `opt_add_entry` is shard-local, but that's fine - we only need this info on shard 0.
_raft_gr.address_map().opt_add_entry(replace_info->raft_id, replace_info->ip_addr);
_raft_gr.address_map().opt_add_entry(locator::host_id{replace_info->raft_id.uuid()}, replace_info->ip_addr);
}
std::vector<gms::inet_address> seeds;

View File

@@ -112,7 +112,7 @@ void raft_group_registry::init_rpc_verbs() {
// a previously learned gossiper address: otherwise an RPC from
// a node outside of the config could permanently
// change the address map of a healthy cluster.
self._address_map.opt_add_entry(from, std::move(addr));
self._address_map.opt_add_entry(locator::host_id{from.uuid()}, std::move(addr));
// Execute the actual message handling code
if constexpr (is_one_way) {
handler(rpc);
@@ -490,7 +490,7 @@ raft_server_with_timeouts::run_with_timeout(Op&& op, const char* op_name,
if (voters_count > 0 && dead_voters.size() >= (voters_count + 1) / 2) {
std::string dead_voters_str;
for (const auto id: dead_voters) {
const auto ip = am.find(id);
const auto ip = am.find(locator::host_id{id.uuid()});
if (ip) {
fmt::format_to(std::back_inserter(dead_voters_str), ",{}", *ip);
} else {
@@ -547,7 +547,7 @@ future<> raft_server_with_timeouts::read_barrier(seastar::abort_source* as, std:
future<bool> direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) {
auto dst_id = raft::server_id{id};
auto addr = _address_map.find(dst_id);
auto addr = _address_map.find(locator::host_id{dst_id.uuid()});
if (!addr) {
{
auto& rate_limit = _rate_limits.try_get_recent_entry(id, std::chrono::minutes(5));

View File

@@ -42,7 +42,7 @@ raft_rpc::one_way_rpc(sloc loc, raft::server_id id,
loc.file_name(), loc.line(), loc.function_name(), id);
return make_ready_future<>();
}
auto ip_addr = _address_map.find(id);
auto ip_addr = _address_map.find(locator::host_id{id.uuid()});
if (!ip_addr) {
rlogger.debug("{}:{}: {} dropping outgoing message to {} - IP address not found",
loc.file_name(), loc.line(), loc.function_name(), id);
@@ -70,7 +70,7 @@ raft_rpc::two_way_rpc(sloc loc, raft::server_id id,
if (!_failure_detector->is_alive(id)) {
return make_exception_future<Ret>(raft::destination_not_alive_error(id, loc));
}
auto ip_addr = _address_map.find(id);
auto ip_addr = _address_map.find(locator::host_id{id.uuid()});
if (!ip_addr) {
const auto msg = format("Failed to send {} {}: ip address not found", loc.function_name(), id);
return make_exception_future<Ret>(raft::transport_error(msg));
@@ -93,7 +93,7 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re
rlogger.debug("Failed to send append_entires to {}: node is not seen as alive by the failure detector", id);
co_return;
}
auto ip_addr = _address_map.find(id);
auto ip_addr = _address_map.find(locator::host_id{id.uuid()});
if (!ip_addr) {
const auto msg = format("Failed to send append_entires to {}: ip address not found", id);
co_await coroutine::return_exception_ptr(std::make_exception_ptr(raft::transport_error(msg)));

View File

@@ -420,7 +420,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
if (!used_ips) {
used_ips.emplace();
for (const auto& [sid, rs]: boost::range::join(t.normal_nodes, t.transition_nodes)) {
if (const auto used_ip = am.find(sid)) {
if (const auto used_ip = am.find(locator::host_id{sid.uuid()})) {
used_ips->insert(*used_ip);
}
}
@@ -461,7 +461,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
auto process_left_node = [&] (raft::server_id id) -> future<> {
locator::host_id host_id{id.uuid()};
if (const auto ip = am.find(id)) {
if (const auto ip = am.find(host_id)) {
co_await remove_ip(*ip, host_id, true);
}
@@ -469,14 +469,14 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
update_topology(host_id, std::nullopt, t.left_nodes_rs.at(id));
}
_group0->modifiable_address_map().set_expiring(id);
_group0->modifiable_address_map().set_expiring(host_id);
// However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
co_await _messaging.local().ban_host(host_id);
};
auto process_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
locator::host_id host_id{id.uuid()};
auto ip = am.find(id);
auto ip = am.find(host_id);
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
@@ -529,7 +529,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
auto process_transition_node = [&](raft::server_id id, const replica_state& rs) -> future<> {
locator::host_id host_id{id.uuid()};
auto ip = am.find(id);
auto ip = am.find(host_id);
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate,
@@ -586,7 +586,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
case node_state::replacing: {
SCYLLA_ASSERT(_topology_state_machine._topology.req_param.contains(id));
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto existing_ip = am.find(replaced_id);
auto existing_ip = am.find(locator::host_id{replaced_id.uuid()});
if (!existing_ip) {
// FIXME: What if not known?
on_fatal_internal_error(rtlogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id));
@@ -640,7 +640,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
co_await process_transition_node(id, rs);
}
for (const auto& [id, rs]: t.new_nodes) {
_group0->modifiable_address_map().set_nonexpiring(id);
_group0->modifiable_address_map().set_nonexpiring(locator::host_id{id.uuid()});
}
for (auto id : t.get_excluded_nodes()) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
@@ -950,7 +950,7 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha
if (!app_state_ptr) {
co_return;
}
raft::server_id id(utils::UUID(app_state_ptr->value()));
locator::host_id id(utils::UUID(app_state_ptr->value()));
rslog.debug("raft_ip_address_updater::on_endpoint_change({}) {} {}", ev, endpoint, id);
const auto prev_ip = _address_map.find(id);
@@ -987,17 +987,16 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha
// To avoid this, we don't wait for sync_raft_topology_nodes to finish.
(void)futurize_invoke(ensure_alive([this, id, endpoint, h = _ss._async_gate.hold()]() -> future<> {
auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source);
const auto hid = locator::host_id{id.uuid()};
if (_address_map.find(id) != endpoint ||
_ss.get_token_metadata().get_endpoint_for_host_id_if_known(hid) == endpoint)
_ss.get_token_metadata().get_endpoint_for_host_id_if_known(id) == endpoint)
{
co_return;
}
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
storage_service::nodes_to_notify_after_sync nodes_to_notify;
auto lock = co_await _ss.get_token_metadata_lock();
co_await _ss.mutate_token_metadata([this, hid, &nodes_to_notify](mutable_token_metadata_ptr t) -> future<> {
nodes_to_notify = co_await _ss.sync_raft_topology_nodes(std::move(t), hid, {});
co_await _ss.mutate_token_metadata([this, id, &nodes_to_notify](mutable_token_metadata_ptr t) -> future<> {
nodes_to_notify = co_await _ss.sync_raft_topology_nodes(std::move(t), id, {});
}, storage_service::acquire_merge_lock::no);
co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify));
}));
@@ -1197,10 +1196,11 @@ std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(
if (hoep.has_host_id()) {
id = raft::server_id{hoep.id().uuid()};
} else {
id = _group0->address_map().find_by_addr(hoep.endpoint());
if (!id) {
auto hid = _group0->address_map().find_by_addr(hoep.endpoint());
if (!hid) {
throw std::runtime_error(::format("Cannot find a mapping to IP {}", hoep.endpoint()));
}
id = raft::server_id{hid->uuid()};
}
if (!_topology_state_machine._topology.find(*id)) {
throw std::runtime_error(::format("Node {} is not found in the cluster", *id));
@@ -1936,7 +1936,7 @@ future<> storage_service::join_topology(sharded<db::system_distributed_keyspace>
std::unordered_set<inet_address> ips;
const auto& am = _group0->address_map();
for (auto id : _topology_state_machine._topology.normal_nodes | std::views::keys) {
auto ip = am.find(id);
auto ip = am.find(locator::host_id{id.uuid()});
if (ip) {
ips.insert(*ip);
}
@@ -2928,11 +2928,11 @@ void storage_service::set_group0(raft_group0& group0) {
future<> storage_service::init_address_map(raft_address_map& address_map, gms::generation_type new_generation) {
for (auto [ip, host] : co_await _sys_ks.local().load_host_ids()) {
address_map.add_or_update_entry(raft::server_id(host.uuid()), ip);
address_map.add_or_update_entry(host, ip);
}
const auto& topology = get_token_metadata().get_topology();
raft::server_id myid{topology.my_host_id().uuid()};
address_map.add_or_update_entry(myid,topology.my_address(), new_generation);
auto myid = topology.my_host_id();
address_map.add_or_update_entry(myid, topology.my_address(), new_generation);
// Make my entry non expiring
address_map.set_nonexpiring(myid);
_raft_ip_address_updater = make_shared<raft_ip_address_updater>(address_map, *this);
@@ -4044,7 +4044,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, locator::hos
}
const auto& am = _group0->address_map();
auto ip = am.find(id);
auto ip = am.find(host_id);
if (!ip) {
// What to do if there is no mapping? Wait and retry?
on_fatal_internal_error(rtlogger, ::format("Remove node cannot find a mapping from node id {} to its ip", id));
@@ -5569,7 +5569,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
case raft_topology_cmd::command::barrier_and_drain: {
if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_old) {
for (auto& n : _topology_state_machine._topology.transition_nodes) {
if (!_group0->address_map().find(n.first)) {
if (!_group0->address_map().find(locator::host_id{n.first.uuid()})) {
rtlogger.error("The topology transition is in a double write state but the IP of the node in transition is not known");
break;
}
@@ -5688,7 +5688,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
auto existing_ip = _group0->address_map().find(replaced_id);
auto existing_ip = _group0->address_map().find(locator::host_id(replaced_id.uuid()));
SCYLLA_ASSERT(existing_ip);
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, _topology_state_machine._topology.session, *existing_ip);
}
@@ -5725,7 +5725,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
auto id = it->first;
rtlogger.debug("streaming to remove node {}", id);
const auto& am = _group0->address_map();
auto ip = am.find(id); // map node id to ip
auto ip = am.find(locator::host_id{id.uuid()}); // map node id to ip
SCYLLA_ASSERT (ip); // what to do if address is unknown?
tasks::task_info parent_info{tasks::task_id{it->second.request_id}, 0};
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
@@ -5740,7 +5740,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
// FIXME: we should not need to translate ids to IPs here. See #6403.
std::list<gms::inet_address> ignored_ips;
for (const auto& ignored_id : _topology_state_machine._topology.ignored_nodes) {
auto ip = _group0->address_map().find(ignored_id);
auto ip = _group0->address_map().find(locator::host_id{ignored_id.uuid()});
if (!ip) {
on_fatal_internal_error(rtlogger, ::format("Cannot find a mapping from node id {} to its ip", ignored_id));
}
@@ -5843,7 +5843,7 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver
}
inet_address storage_service::host2ip(locator::host_id host) const {
auto ip = _group0->address_map().find(raft::server_id(host.uuid()));
auto ip = _group0->address_map().find(host);
if (!ip) {
throw std::runtime_error(::format("Cannot map host {} to ip", host));
}
@@ -6933,7 +6933,7 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
if (ignored_ids.contains(id)) {
continue;
}
if (auto ip = amap.find(id)) {
if (auto ip = amap.find(locator::host_id{id.uuid()})) {
sync_nodes.push_back(*ip);
} else {
untranslated_ids.push_back(id);

View File

@@ -66,7 +66,7 @@ future<inet_address> wait_for_ip(raft::server_id id, const raft_address_map& am,
const auto timeout = std::chrono::seconds{30};
const auto deadline = lowres_clock::now() + timeout;
while (true) {
const auto ip = am.find(id);
const auto ip = am.find(locator::host_id{id.uuid()});
if (ip) {
co_return *ip;
}
@@ -358,7 +358,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
inet_address id2ip(locator::host_id id) const {
auto ip = _address_map.find(raft::server_id(id.uuid()));
auto ip = _address_map.find(id);
if (!ip) {
throw std::runtime_error(::format("no ip address mapping for {}", id));
}
@@ -366,7 +366,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
future<> exec_direct_command_helper(raft::server_id id, uint64_t cmd_index, const raft_topology_cmd& cmd) {
auto ip = _address_map.find(id);
auto ip = _address_map.find(locator::host_id{id.uuid()});
if (!ip) {
rtlogger.warn("cannot send command {} with term {} and index {} "
"to {} because mapping to ip is not available",

View File

@@ -48,8 +48,8 @@ class server;
namespace service {
template <typename Clock>
class raft_address_map_t;
using raft_address_map = raft_address_map_t<seastar::lowres_clock>;
class address_map_t;
using raft_address_map = address_map_t<seastar::lowres_clock>;
class raft_group0;
class tablet_allocator;

View File

@@ -12,8 +12,7 @@
#include <chrono>
#include <ranges>
#include "raft/raft.hh"
#include "service/raft/raft_address_map.hh"
#include "service/address_map.hh"
#include "gms/inet_address.hh"
#include "utils/UUID.hh"
@@ -22,11 +21,12 @@
#include <seastar/util/later.hh>
#include <seastar/util/defer.hh>
using namespace raft;
using namespace service;
using namespace std::chrono_literals;
using namespace seastar::testing;
using server_id = locator::host_id;
// Can be used to wait for delivery of messages that were sent to other shards.
future<> ping_shards() {
if (smp::count == 1) {
@@ -53,7 +53,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
using seastar::manual_clock;
{
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -72,7 +72,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
}
{
// m.add_or_update_entry() adds an expiring entry
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -87,7 +87,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
{
// Set two expirable entries with different timestamps, check for
// automatic rearming of expiration timer after the first one expires.
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -107,7 +107,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
{
// Do not throw on re-mapping address for the same id
// - happens when IP address changes after a node restart
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -120,7 +120,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
{
// Check that add_or_update_entry() doesn't transition the entry type
// to expiring.
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -133,7 +133,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
}
{
// Check that set_expiring() doesn't insert a new entry
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -144,7 +144,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
{
// Check that set_nonexpiring() inserts a new non-expiring entry if the
// entry is missing
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -162,7 +162,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
}
{
// Check that add_or_update_entry() throws when called without an actual IP address
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -172,7 +172,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
}
{
// Check that opt_add_entry() throws when called without an actual IP address
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -183,7 +183,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
{
// Check that add_or_update_entry() doesn't overwrite a new IP address
// with an obsolete one
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -195,7 +195,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
{
// Check that add_or_update_entry() adds an IP address if the entry
// doesn't have it regardless of the generation number
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -206,7 +206,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
}
{
// Check the basic functionality of find_by_addr()
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -221,7 +221,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
}
{
// Check that find_by_addr() properly updates timestamps of entries
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -238,7 +238,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_operations) {
}
{
// Check that find_by_addr() throws when called without an actual IP address
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -265,7 +265,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_replication) {
using seastar::manual_clock;
{
sharded<raft_address_map_t<manual_clock>> m_svc;
sharded<address_map_t<manual_clock>> m_svc;
m_svc.start().get();
auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
auto& m = m_svc.local();
@@ -275,7 +275,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_replication) {
m.add_or_update_entry(id1, addr1);
m.set_nonexpiring(id1);
ping_shards().get();
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
manual_clock::advance(expiration_time);
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
@@ -285,7 +285,7 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_replication) {
m.set_expiring(id1);
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
ping_shards().get();
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
manual_clock::advance(expiration_time);
BOOST_CHECK(!m.find(id1));
@@ -296,18 +296,18 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_replication) {
// Expiring entries are replicated
m.add_or_update_entry(id1, addr1);
ping_shards().get();
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
BOOST_CHECK(m.find(id1));
}).get();
// Can't call add_or_update_entry on shard other than 0
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
scoped_no_abort_on_internal_error abort_guard;
BOOST_CHECK_THROW(m.add_or_update_entry(id1, addr2), std::runtime_error);
}).get();
// Can add expiring entries on shard other than 0 - and they indeed expire
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
m.opt_add_entry(id2, addr2);
BOOST_CHECK(m.find(id2) && *m.find(id2) == addr2);
manual_clock::advance(expiration_time);
@@ -317,25 +317,25 @@ SEASTAR_THREAD_TEST_CASE(test_raft_address_map_replication) {
// Add entry on two shards, make it non-expiring on shard 0,
// the non-expiration must be replicated
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
m.opt_add_entry(id2, addr2);
}).get();
m.set_nonexpiring(id2);
ping_shards().get();
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
manual_clock::advance(expiration_time);
BOOST_CHECK(m.find(id2) && *m.find(id2) == addr2);
}).get();
// Cannot set it to expiring on shard 1
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
scoped_no_abort_on_internal_error abort_guard;
BOOST_CHECK_THROW(m.set_expiring(id2), std::runtime_error);
}).get();
// Cannot set it to non-expiring on shard 1
m.set_expiring(id2);
m_svc.invoke_on(1, [] (raft_address_map_t<manual_clock>& m) {
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
scoped_no_abort_on_internal_error abort_guard;
BOOST_CHECK_THROW(m.set_nonexpiring(id2), std::runtime_error);
}).get();