Merge "Obtain dc/rack from topology, not snitch" from Pavel Emelyanov

"
The way dc/rack info is maintained is very intricate.

The dc/rack strings originate at snitch, get propagated via gossiper,
get notified to storage service which, in turn, stores them into the
system keyspace and token metadata. Code that needs to get dc/rack
for a given endpoint calls snitch which tries to get the data from
gossiper and if failed goes and loads it from system keyspace cache.
Also there's "internal IP" thing hanging arond that loops messaging
service in both -- updating and getting the info.

The plan is to make topology (that currently sits on token metadata)
stay the only "source of truth" regarding the endpoints' dc/rack and
internal IP info. The dc/rack mappings are put into topology already,
but it cannot yet fully replace snitch for two reasons:

- it doesn't map internal IP to endpoint
- it doesn't get data stored in system keyspace

So what this patch set does is patches most of the dc/rack getters
to call topology methods. The topology is temporarily patched to
just call the respective snitch methods. This removes a big portion
of calls for global snitch instance.

After the set the places that still explicitly rely on snitch to
provide dc/rack are

- messaging service: needs internal IP knowledge on topology
- db/consistency_level: is all "global", needs heavier patching
- tests: just later
"

* 'br-get-dc-rack-from-topology-2' of https://github.com/xemul/scylla:
  proxy stats: Get rack/datacenter from topology
  proxy stats: Push topology arg to get_ep_stats
  api: Get rack/datacenter from topology
  hints: Remove snitch dependency
  hints: Get rack/datacenter from topology
  alternator: Get rack/datacenter from topology
  range_streamer: Get rack/datacenter from topology
  repair: Get rack/datacenter from topology
  view: Get rack/datacenter from topology
  storage_service: Get rack/datacenter from topology
  proxy: Get rack/datacenter from topology
  topology: Add get_rack/_datacenter methods
This commit is contained in:
Botond Dénes
2022-06-23 10:01:36 +03:00
19 changed files with 133 additions and 103 deletions

View File

@@ -58,7 +58,7 @@ logging::logger elogger("alternator-executor");
namespace alternator {
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type);
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type);
static map_type attrs_type() {
static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true);
@@ -1071,7 +1071,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
auto ts = group0_guard.write_timestamp();
std::vector<mutation> schema_mutations;
try {
schema_mutations = co_await create_keyspace(keyspace_name, mm, gossiper, ts);
schema_mutations = co_await create_keyspace(keyspace_name, sp, mm, gossiper, ts);
} catch (exceptions::already_exists_exception&) {
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
co_return api_error::resource_in_use(format("Table {} already exists", table_name));
@@ -4337,11 +4337,12 @@ future<executor::request_return_type> executor::describe_endpoints(client_state&
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(response)));
}
static std::map<sstring, sstring> get_network_topology_options(gms::gossiper& gossiper, int rf) {
static std::map<sstring, sstring> get_network_topology_options(service::storage_proxy& sp, gms::gossiper& gossiper, int rf) {
std::map<sstring, sstring> options;
sstring rf_str = std::to_string(rf);
auto& topology = sp.get_token_metadata_ptr()->get_topology();
for (const gms::inet_address& addr : gossiper.get_live_members()) {
options.emplace(locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(addr), rf_str);
options.emplace(topology.get_datacenter(addr), rf_str);
};
return options;
}
@@ -4375,7 +4376,7 @@ future<executor::request_return_type> executor::describe_continuous_backups(clie
// of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3.
// A smaller cluster (presumably, a test only), gets RF=1. The user may
// manually create the keyspace to override this predefined behavior.
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type ts) {
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type ts) {
sstring keyspace_name_str(keyspace_name);
int endpoint_count = gossiper.get_endpoint_states().size();
int rf = 3;
@@ -4384,7 +4385,7 @@ static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_n
elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.",
keyspace_name_str, rf, endpoint_count);
}
auto opts = get_network_topology_options(gossiper, rf);
auto opts = get_network_topology_options(sp, gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
co_return mm.prepare_new_keyspace_announcement(ksm, ts);

View File

@@ -20,7 +20,6 @@
#include "auth.hh"
#include <cctype>
#include "service/storage_proxy.hh"
#include "locator/snitch_base.hh"
#include "gms/gossiper.hh"
#include "utils/overloaded_functor.hh"
#include "utils/fb_utilities.hh"
@@ -201,10 +200,9 @@ protected:
// It's very easy to get a list of all live nodes on the cluster,
// using _gossiper().get_live_members(). But getting
// just the list of live nodes in this DC needs more elaborate code:
sstring local_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(
utils::fb_utilities::get_broadcast_address());
std::unordered_set<gms::inet_address> local_dc_nodes =
_proxy.get_token_metadata_ptr()->get_topology().get_datacenter_endpoints().at(local_dc);
auto& topology = _proxy.get_token_metadata_ptr()->get_topology();
sstring local_dc = topology.get_datacenter();
std::unordered_set<gms::inet_address> local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc);
for (auto& ip : local_dc_nodes) {
if (_gossiper.is_alive(ip)) {
rjson::push_back(results, rjson::from_string(ip.to_sstring()));

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "locator/token_metadata.hh"
#include "locator/snitch_base.hh"
#include "endpoint_snitch.hh"
#include "api/api-doc/endpoint_snitch_info.json.hh"
@@ -19,12 +20,14 @@ void set_endpoint_snitch(http_context& ctx, routes& r) {
return host.empty() ? gms::inet_address(utils::fb_utilities::get_broadcast_address()) : gms::inet_address(host);
};
httpd::endpoint_snitch_info_json::get_datacenter.set(r, [](const_req req) {
return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(host_or_broadcast(req));
httpd::endpoint_snitch_info_json::get_datacenter.set(r, [&ctx](const_req req) {
auto& topology = ctx.shared_token_metadata.local().get()->get_topology();
return topology.get_datacenter(host_or_broadcast(req));
});
httpd::endpoint_snitch_info_json::get_rack.set(r, [](const_req req) {
return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(host_or_broadcast(req));
httpd::endpoint_snitch_info_json::get_rack.set(r, [&ctx](const_req req) {
auto& topology = ctx.shared_token_metadata.local().get()->get_topology();
return topology.get_rack(host_or_broadcast(req));
});
httpd::endpoint_snitch_info_json::get_snitch_name.set(r, [] (const_req req) {

View File

@@ -20,6 +20,8 @@
#include "seastarx.hh"
#include "utils/config_file.hh"
#include "utils/enum_option.hh"
#include "utils/UUID.hh"
#include "gms/inet_address.hh"
#include "db/hints/host_filter.hh"
namespace seastar {

View File

@@ -8,6 +8,7 @@
#include <string_view>
#include <boost/algorithm/string.hpp>
#include "locator/token_metadata.hh"
#include "to_string.hh"
#include "host_filter.hh"
@@ -27,12 +28,12 @@ host_filter::host_filter(std::unordered_set<sstring> allowed_dcs)
, _dcs(std::move(allowed_dcs)) {
}
bool host_filter::can_hint_for(locator::snitch_ptr& snitch, gms::inet_address ep) const {
bool host_filter::can_hint_for(const locator::topology& topo, gms::inet_address ep) const {
switch (_enabled_kind) {
case enabled_kind::enabled_for_all:
return true;
case enabled_kind::enabled_selectively:
return _dcs.contains(snitch->get_datacenter(ep));
return _dcs.contains(topo.get_datacenter(ep));
case enabled_kind::disabled_for_all:
return false;
}

View File

@@ -14,13 +14,14 @@
#include <string_view>
#include <seastar/core/sstring.hh>
#include "locator/snitch_base.hh"
#include "seastarx.hh"
namespace gms {
class inet_address;
} // namespace gms
namespace locator { class topology; }
namespace db {
namespace hints {
@@ -57,7 +58,7 @@ public:
// Parses hint filtering configuration from a list of DCs.
static host_filter parse_from_dc_list(sstring opt);
bool can_hint_for(locator::snitch_ptr& snitch, gms::inet_address ep) const;
bool can_hint_for(const locator::topology& topo, gms::inet_address ep) const;
inline const std::unordered_set<sstring>& get_dcs() const {
return _dcs;

View File

@@ -49,7 +49,6 @@ const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10
manager::manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager& res_manager, distributed<replica::database>& db)
: _hints_dir(fs::path(hints_directory) / format("{:d}", this_shard_id()))
, _host_filter(std::move(filter))
, _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr())
, _max_hint_window_us(max_hint_window_ms * 1000)
, _local_db(db.local())
, _resource_manager(res_manager)
@@ -659,7 +658,7 @@ future<> manager::change_host_filter(host_filter filter) {
// for some of them
return lister::scan_dir(_hints_dir, { directory_entry_type::directory }, [this] (fs::path datadir, directory_entry de) {
const ep_key_type ep = ep_key_type(de.name);
if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(_local_snitch_ptr, ep)) {
if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), ep)) {
return make_ready_future<>();
}
return get_ep_manager(ep).populate_segments_to_replay();
@@ -670,7 +669,7 @@ future<> manager::change_host_filter(host_filter filter) {
}).finally([this] {
// Remove endpoint managers which are rejected by the filter
return parallel_for_each(_ep_managers, [this] (auto& pair) {
if (_host_filter.can_hint_for(_local_snitch_ptr, pair.first)) {
if (_host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), pair.first)) {
return make_ready_future<>();
}
return pair.second.stop(drain::no).finally([this, ep = pair.first] {
@@ -687,7 +686,7 @@ bool manager::check_dc_for(ep_key_type ep) const noexcept {
// If target's DC is not a "hintable" DCs - don't hint.
// If there is an end point manager then DC has already been checked and found to be ok.
return _host_filter.is_enabled_for_all() || have_ep_manager(ep) ||
_host_filter.can_hint_for(_local_snitch_ptr, ep);
_host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), ep);
} catch (...) {
// if we failed to check the DC - block this hint
return false;

View File

@@ -21,7 +21,6 @@
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/shared_mutex.hh>
#include <seastar/core/abort_source.hh>
#include "locator/snitch_base.hh"
#include "inet_address_vectors.hh"
#include "db/commitlog/commitlog.hh"
#include "utils/loading_shared_values.hh"
@@ -515,7 +514,6 @@ private:
host_filter _host_filter;
shared_ptr<service::storage_proxy> _proxy_anchor;
shared_ptr<gms::gossiper> _gossiper_anchor;
locator::snitch_ptr& _local_snitch_ptr;
int64_t _max_hint_window_us = 0;
replica::database& _local_db;
@@ -530,7 +528,7 @@ private:
seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}};
public:
manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager&res_manager, distributed<replica::database>& db);
manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager&res_manager, sharded<replica::database>& db);
virtual ~manager();
manager(manager&&) = delete;
manager& operator=(manager&&) = delete;

View File

@@ -1181,14 +1181,15 @@ static std::optional<gms::inet_address>
get_view_natural_endpoint(const sstring& keyspace_name,
const dht::token& base_token, const dht::token& view_token) {
auto &db = service::get_local_storage_proxy().local_db();
auto& topology = service::get_local_storage_proxy().get_token_metadata_ptr()->get_topology();
auto& ks = db.find_keyspace(keyspace_name);
auto erm = ks.get_effective_replication_map();
auto my_address = utils::fb_utilities::get_broadcast_address();
auto my_datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(my_address);
auto my_datacenter = topology.get_datacenter();
bool network_topology = dynamic_cast<const locator::network_topology_strategy*>(&ks.get_replication_strategy());
std::vector<gms::inet_address> base_endpoints, view_endpoints;
for (auto&& base_endpoint : erm->get_natural_endpoints(base_token)) {
if (!network_topology || locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(base_endpoint) == my_datacenter) {
if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) {
base_endpoints.push_back(base_endpoint);
}
}
@@ -1206,7 +1207,7 @@ get_view_natural_endpoint(const sstring& keyspace_name,
view_endpoint);
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (!network_topology || locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(view_endpoint) == my_datacenter) {
} else if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) {
view_endpoints.push_back(view_endpoint);
}
}

View File

@@ -47,7 +47,7 @@ range_streamer::get_range_fetch_map(const std::unordered_map<dht::token_range, s
auto filtered = false;
for (const auto& filter : source_filters) {
if (!filter->should_include(address)) {
if (!filter->should_include(get_token_metadata().get_topology(), address)) {
filtered = true;
break;
}

View File

@@ -11,7 +11,6 @@
#pragma once
#include "locator/token_metadata.hh"
#include "locator/snitch_base.hh"
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
#include "streaming/stream_reason.hh"
@@ -27,6 +26,7 @@ class database;
}
namespace gms { class gossiper; }
namespace locator { class topology; }
namespace dht {
/**
@@ -45,7 +45,7 @@ public:
*/
class i_source_filter {
public:
virtual bool should_include(inet_address endpoint) = 0;
virtual bool should_include(const locator::topology&, inet_address endpoint) = 0;
virtual ~i_source_filter() {}
};
@@ -58,7 +58,7 @@ public:
std::set<gms::inet_address> _down_nodes;
public:
failure_detector_source_filter(std::set<gms::inet_address> down_nodes) : _down_nodes(std::move(down_nodes)) { }
virtual bool should_include(inet_address endpoint) override { return !_down_nodes.contains(endpoint); }
virtual bool should_include(const locator::topology&, inet_address endpoint) override { return !_down_nodes.contains(endpoint); }
};
/**
@@ -71,9 +71,8 @@ public:
single_datacenter_filter(const sstring& source_dc)
: _source_dc(source_dc) {
}
virtual bool should_include(inet_address endpoint) override {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
return snitch_ptr->get_datacenter(endpoint) == _source_dc;
virtual bool should_include(const locator::topology& topo, inet_address endpoint) override {
return topo.get_datacenter(endpoint) == _source_dc;
}
};

View File

@@ -40,6 +40,7 @@
#include <boost/algorithm/string/classification.hpp>
#include "utils/generation-number.hh"
#include "locator/token_metadata.hh"
#include "locator/snitch_base.hh"
#include "utils/exceptions.hh"
namespace gms {

View File

@@ -6,6 +6,8 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <boost/date_time/gregorian/greg_date.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include "lua.hh"
#include "exceptions/exceptions.hh"
#include "concrete_types.hh"

View File

@@ -21,6 +21,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include <boost/range/adaptors.hpp>
#include "utils/stall_free.hh"
#include "utils/fb_utilities.hh"
namespace locator {
@@ -1313,6 +1314,27 @@ const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
return _current_locations.at(ep);
}
// FIXME -- both methods below should rather return data from the
// get_location() result, but to make it work two things are to be fixed:
// - topology should be aware of internal-ip conversions
// - topology should be pre-populated with data loaded from system ks
sstring topology::get_rack() const {
return get_rack(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_rack(inet_address ep) const {
return i_endpoint_snitch::get_local_snitch_ptr()->get_rack(ep);
}
sstring topology::get_datacenter() const {
return get_datacenter(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_datacenter(inet_address ep) const {
return i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(ep);
}
/////////////////// class topology end /////////////////////////////////////////
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {

View File

@@ -98,6 +98,11 @@ public:
}
const endpoint_dc_rack& get_location(const inet_address& ep) const;
sstring get_rack() const;
sstring get_rack(inet_address ep) const;
sstring get_datacenter() const;
sstring get_datacenter(inet_address ep) const;
private:
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,

View File

@@ -678,11 +678,6 @@ static dht::token_range_vector get_primary_ranges_within_dc(
return db.find_keyspace(keyspace).get_effective_replication_map()->get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address());
}
static sstring get_local_dc() {
return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(
utils::fb_utilities::get_broadcast_address());
}
void repair_stats::add(const repair_stats& o) {
round_nr += o.round_nr;
round_nr_fast_path_already_synced += o.round_nr_fast_path_already_synced;
@@ -1000,6 +995,7 @@ static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
// itself does very little (mainly tell other nodes and CPUs what to do).
int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map) {
seastar::sharded<replica::database>& db = get_db();
auto& topology = db.local().get_token_metadata().get_topology();
repair_tracker().check_in_shutdown();
repair_options options(options_map);
@@ -1027,7 +1023,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
// when "primary_range" option is on, neither data_centers nor hosts
// may be set, except data_centers may contain only local DC (-local)
if (options.data_centers.size() == 1 &&
options.data_centers[0] == get_local_dc()) {
options.data_centers[0] == topology.get_datacenter()) {
ranges = get_primary_ranges_within_dc(db.local(), keyspace);
} else if (options.data_centers.size() > 0 || options.hosts.size() > 0) {
throw std::runtime_error("You need to run primary range repair on all nodes in the cluster.");
@@ -1407,8 +1403,8 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
std::vector<gms::inet_address> mandatory_neighbors;
// All neighbors
std::vector<inet_address> neighbors;
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
auto local_dc = get_local_dc();
auto& topology = db.local().get_token_metadata().get_topology();
auto local_dc = topology.get_datacenter();
auto get_node_losing_the_ranges = [&] (const std::vector<gms::inet_address>& old_nodes, const std::unordered_set<gms::inet_address>& new_nodes) {
// Remove the new nodes from the old nodes list, so
// that it contains only the node that will lose
@@ -1436,7 +1432,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
auto get_old_endpoints_in_local_dc = [&] () {
return boost::copy_range<std::vector<gms::inet_address>>(old_endpoints |
boost::adaptors::filtered([&] (const gms::inet_address& node) {
return snitch_ptr->get_datacenter(node) == local_dc;
return topology.get_datacenter(node) == local_dc;
})
);
};
@@ -1564,8 +1560,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
}
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
dht::token_range_vector ranges_for_removenode;
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
auto local_dc = get_local_dc();
auto& topology = db.local().get_token_metadata().get_topology();
auto local_dc = topology.get_datacenter();
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
for (auto&r : ranges) {
if (ops) {
@@ -1586,7 +1582,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
}
auto get_neighbors_set = [&] (const std::vector<inet_address>& nodes) {
for (auto& node : nodes) {
if (snitch_ptr->get_datacenter(node) == local_dc) {
if (topology.get_datacenter(node) == local_dc) {
return std::unordered_set<inet_address>{node};;
}
}
@@ -1659,8 +1655,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
}
}
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors_set |
boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) {
return snitch_ptr->get_datacenter(node) == local_dc;
boost::adaptors::filtered([&local_dc, &topology] (const gms::inet_address& node) {
return topology.get_datacenter(node) == local_dc;
})
);
@@ -1769,16 +1765,16 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
auto& r = *it;
seastar::thread::maybe_yield();
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
auto& topology = db.local().get_token_metadata().get_topology();
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_endpoints(end_token, *tmptr).get0() |
boost::adaptors::filtered([myip, &source_dc, &snitch_ptr, &ignore_nodes] (const gms::inet_address& node) {
boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) {
if (node == myip) {
return false;
}
if (std::find(ignore_nodes.begin(), ignore_nodes.end(), node) != ignore_nodes.end()) {
return false;
}
return source_dc.empty() ? true : snitch_ptr->get_datacenter(node) == source_dc;
return source_dc.empty() ? true : topology.get_datacenter(node) == source_dc;
})
);
rlogger.debug("{}: keyspace={}, range={}, neighbors={}", op, keyspace_name, r, neighbors);
@@ -1811,7 +1807,8 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) {
auto op = sstring("rebuild_with_repair");
if (source_dc.empty()) {
source_dc = get_local_dc();
auto& topology = get_db().local().get_token_metadata().get_topology();
source_dc = topology.get_datacenter();
}
auto reason = streaming::stream_reason::rebuild;
co_await do_rebuild_replace_with_repair(std::move(tmptr), std::move(op), std::move(source_dc), reason, {});
@@ -1825,7 +1822,8 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr,
future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::list<gms::inet_address> ignore_nodes) {
auto cloned_tm = co_await tmptr->clone_async();
auto op = sstring("replace_with_repair");
auto source_dc = get_local_dc();
auto& topology = get_db().local().get_token_metadata().get_topology();
auto source_dc = topology.get_datacenter();
auto reason = streaming::stream_reason::replace;
// update a cloned version of tmptr
// no need to set the original version

View File

@@ -137,18 +137,6 @@ const dht::token& end_token(const dht::partition_range& r) {
return r.end() ? r.end()->value().token() : max_token;
}
static inline
sstring get_dc(gms::inet_address ep) {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
return snitch_ptr->get_datacenter(ep);
}
static inline
sstring get_local_dc() {
auto local_addr = utils::fb_utilities::get_broadcast_address();
return get_dc(local_addr);
}
unsigned storage_proxy::cas_shard(const schema& s, dht::token token) {
return dht::shard_of(s, token);
}
@@ -534,7 +522,7 @@ public:
}
if (_cl_achieved) { // For CL=ANY this can still be false
for (auto&& ep : get_targets()) {
++stats().background_replica_writes_failed.get_ep_stat(ep);
++stats().background_replica_writes_failed.get_ep_stat(_proxy->get_token_metadata_ptr()->get_topology(), ep);
}
stats().background_writes_failed += int(!_targets.empty());
}
@@ -733,8 +721,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
};
std::unordered_map<sstring, dc_info> _dc_responses;
bool waited_for(gms::inet_address from) override {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
sstring data_center = snitch_ptr->get_datacenter(from);
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
sstring data_center = topology.get_datacenter(from);
auto dc_resp = _dc_responses.find(data_center);
if (dc_resp->second.acks < dc_resp->second.total_block_for) {
@@ -748,17 +736,17 @@ public:
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints,
inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
for (auto& target : targets) {
auto dc = snitch_ptr->get_datacenter(target);
auto dc = topology.get_datacenter(target);
if (!_dc_responses.contains(dc)) {
auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (const gms::inet_address& ep){
return snitch_ptr->get_datacenter(ep) == dc;
auto pending_for_dc = boost::range::count_if(pending_endpoints, [&topology, &dc] (const gms::inet_address& ep){
return topology.get_datacenter(ep) == dc;
});
size_t total_endpoints_for_dc = boost::range::count_if(targets, [&snitch_ptr, &dc] (const gms::inet_address& ep){
return snitch_ptr->get_datacenter(ep) == dc;
size_t total_endpoints_for_dc = boost::range::count_if(targets, [&topology, &dc] (const gms::inet_address& ep){
return topology.get_datacenter(ep) == dc;
});
_dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(ks, dc) + pending_for_dc, total_endpoints_for_dc, 0});
_total_block_for += pending_for_dc;
@@ -766,8 +754,8 @@ public:
}
}
bool failure(gms::inet_address from, size_t count, error err) override {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
const sstring& dc = snitch_ptr->get_datacenter(from);
auto& topology = _proxy->get_token_metadata_ptr()->get_topology();
const sstring& dc = topology.get_datacenter(from);
auto dc_resp = _dc_responses.find(dc);
dc_resp->second.failures += count;
@@ -1726,15 +1714,15 @@ void storage_proxy_stats::stats::register_stats() {
});
}
inline uint64_t& storage_proxy_stats::split_stats::get_ep_stat(gms::inet_address ep) noexcept {
inline uint64_t& storage_proxy_stats::split_stats::get_ep_stat(const locator::topology& topo, gms::inet_address ep) noexcept {
if (fbu::is_me(ep)) {
return _local.val;
}
try {
sstring dc = get_dc(ep);
sstring dc = topo.get_datacenter(ep);
if (_auto_register_metrics) {
register_metrics_for(ep);
register_metrics_for(dc, ep);
}
return _dc_stats[dc].val;
} catch (...) {
@@ -1753,10 +1741,9 @@ void storage_proxy_stats::split_stats::register_metrics_local() {
});
}
void storage_proxy_stats::split_stats::register_metrics_for(gms::inet_address ep) {
void storage_proxy_stats::split_stats::register_metrics_for(sstring dc, gms::inet_address ep) {
namespace sm = seastar::metrics;
sstring dc = get_dc(ep);
// if this is the first time we see an endpoint from this DC - add a
// corresponding collectd metric
if (auto [ignored, added] = _dc_stats.try_emplace(dc); added) {
@@ -2461,8 +2448,9 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
[this]() -> inet_address_vector_replica_set {
auto local_addr = utils::fb_utilities::get_broadcast_address();
auto& topology = _tmptr->get_topology();
auto& local_endpoints = topology.get_datacenter_racks().at(get_local_dc());
auto local_rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(local_addr);
auto local_dc = topology.get_datacenter();
auto& local_endpoints = topology.get_datacenter_racks().at(local_dc);
auto local_rack = topology.get_rack();
auto chosen_endpoints = _p.gossiper().endpoint_filter(local_rack, local_endpoints);
if (chosen_endpoints.empty()) {
@@ -2725,11 +2713,13 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto& stats = handler_ptr->stats();
auto& handler = *handler_ptr;
auto& global_stats = handler._proxy->_global_stats;
auto& topology = get_token_metadata_ptr()->get_topology();
auto local_dc = topology.get_datacenter();
for(auto dest: handler.get_targets()) {
sstring dc = get_dc(dest);
sstring dc = topology.get_datacenter(dest);
// read repair writes do not go through coordinator since mutations are per destination
if (handler.read_repair_write() || dc == get_local_dc()) {
if (handler.read_repair_write() || dc == local_dc) {
local.emplace_back("", inet_address_vector_replica_set({dest}));
} else {
dc_groups[dc].push_back(dest);
@@ -2776,9 +2766,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
got_response(response_id, coordinator, std::nullopt);
} else {
if (!handler.read_repair_write()) {
++stats.writes_attempts.get_ep_stat(coordinator);
++stats.writes_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator);
} else {
++stats.read_repair_write_attempts.get_ep_stat(coordinator);
++stats.read_repair_write_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator);
}
if (coordinator == my_address) {
@@ -2790,7 +2780,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
// Waited on indirectly.
(void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) {
++stats.writes_errors.get_ep_stat(coordinator);
++stats.writes_errors.get_ep_stat(p->get_token_metadata_ptr()->get_topology(), coordinator);
error err = error::FAILURE;
std::optional<sstring> msg;
try {
@@ -3529,6 +3519,11 @@ private:
_proxy->get_stats().foreground_reads -= int(_foreground);
_foreground = false;
}
const locator::topology& get_topology() const noexcept {
return _proxy->get_token_metadata_ptr()->get_topology();
}
public:
abstract_read_executor(schema_ptr s, lw_shared_ptr<replica::column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for,
inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit) :
@@ -3552,7 +3547,7 @@ public:
protected:
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> make_mutation_data_request(lw_shared_ptr<query::read_command> cmd, gms::inet_address ep, clock_type::time_point timeout) {
++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(ep);
++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(get_topology(), ep);
if (fbu::is_me(ep)) {
tracing::trace(_trace_state, "read_mutation_data: querying locally");
return _proxy->query_mutations_locally(_schema, cmd, _partition_range, timeout, _trace_state);
@@ -3566,7 +3561,7 @@ protected:
}
}
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
++_proxy->get_stats().data_read_attempts.get_ep_stat(ep);
++_proxy->get_stats().data_read_attempts.get_ep_stat(get_topology(), ep);
auto opts = want_digest
? query::result_options{query::result_request::result_and_digest, digest_algorithm(*_proxy)}
: query::result_options{query::result_request::only_result, query::digest_algorithm::none};
@@ -3583,7 +3578,7 @@ protected:
}
}
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>> make_digest_request(gms::inet_address ep, clock_type::time_point timeout) {
++_proxy->get_stats().digest_read_attempts.get_ep_stat(ep);
++_proxy->get_stats().digest_read_attempts.get_ep_stat(get_topology(), ep);
if (fbu::is_me(ep)) {
tracing::trace(_trace_state, "read_digest: querying locally");
return _proxy->query_result_local_digest(_schema, _cmd, _partition_range, _trace_state,
@@ -3608,10 +3603,10 @@ protected:
auto v = f.get0();
_cf->set_hit_rate(ep, std::get<1>(v));
resolver->add_mutate_data(ep, std::get<0>(std::move(v)));
++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(ep);
++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(get_topology(), ep);
register_request_latency(latency_clock::now() - start);
} catch(...) {
++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(ep);
++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(get_topology(), ep);
resolver->error(ep, std::current_exception());
}
});
@@ -3626,11 +3621,11 @@ protected:
auto v = f.get0();
_cf->set_hit_rate(ep, std::get<1>(v));
resolver->add_data(ep, std::get<0>(std::move(v)));
++_proxy->get_stats().data_read_completed.get_ep_stat(ep);
++_proxy->get_stats().data_read_completed.get_ep_stat(get_topology(), ep);
_used_targets.push_back(ep);
register_request_latency(latency_clock::now() - start);
} catch(...) {
++_proxy->get_stats().data_read_errors.get_ep_stat(ep);
++_proxy->get_stats().data_read_errors.get_ep_stat(get_topology(), ep);
resolver->error(ep, std::current_exception());
}
});
@@ -3645,11 +3640,11 @@ protected:
auto v = f.get0();
_cf->set_hit_rate(ep, std::get<2>(v));
resolver->add_digest(ep, std::get<0>(v), std::get<1>(v));
++_proxy->get_stats().digest_read_completed.get_ep_stat(ep);
++_proxy->get_stats().digest_read_completed.get_ep_stat(get_topology(), ep);
_used_targets.push_back(ep);
register_request_latency(latency_clock::now() - start);
} catch(...) {
++_proxy->get_stats().digest_read_errors.get_ep_stat(ep);
++_proxy->get_stats().digest_read_errors.get_ep_stat(get_topology(), ep);
resolver->error(ep, std::current_exception());
}
});

View File

@@ -13,6 +13,8 @@
#include "utils/histogram.hh"
#include <seastar/core/metrics.hh>
namespace locator { class topology; }
namespace service {
namespace storage_proxy_stats {
@@ -51,7 +53,7 @@ public:
split_stats(const sstring& category, const sstring& short_description_prefix, const sstring& long_description_prefix, const sstring& op_type, bool auto_register_metrics = true);
void register_metrics_local();
void register_metrics_for(gms::inet_address ep);
void register_metrics_for(sstring dc, gms::inet_address ep);
/**
* Get a reference to the statistics counter corresponding to the given
@@ -61,7 +63,7 @@ public:
*
* @return a reference to the requested counter
*/
uint64_t& get_ep_stat(gms::inet_address ep) noexcept;
uint64_t& get_ep_stat(const locator::topology& topo, gms::inet_address ep) noexcept;
};
struct write_stats {

View File

@@ -3072,7 +3072,9 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
include_only_local_dc
? get_range_to_address_map_in_local_dc(keyspace)
: get_range_to_address_map(keyspace);
auto tmptr = get_token_metadata_ptr();
for (auto entry : range_to_address_map) {
const auto& topology = tmptr->get_topology();
auto range = entry.first;
auto addresses = entry.second;
token_range_endpoints tr;
@@ -3085,8 +3087,8 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
for (auto endpoint : addresses) {
endpoint_details details;
details._host = endpoint;
details._datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(endpoint);
details._rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(endpoint);
details._datacenter = topology.get_datacenter(endpoint);
details._rack = topology.get_rack(endpoint);
tr._rpc_endpoints.push_back(get_rpc_address(endpoint));
tr._endpoints.push_back(boost::lexical_cast<std::string>(details._host));
tr._endpoint_details.push_back(details);