diff --git a/alternator/executor.cc b/alternator/executor.cc index bd5521e8ef..4eddf38ebd 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -58,7 +58,7 @@ logging::logger elogger("alternator-executor"); namespace alternator { -static future> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type); +static future> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type); static map_type attrs_type() { static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true); @@ -1071,7 +1071,7 @@ static future create_table_on_shard0(tracing::tra auto ts = group0_guard.write_timestamp(); std::vector schema_mutations; try { - schema_mutations = co_await create_keyspace(keyspace_name, mm, gossiper, ts); + schema_mutations = co_await create_keyspace(keyspace_name, sp, mm, gossiper, ts); } catch (exceptions::already_exists_exception&) { if (sp.data_dictionary().has_schema(keyspace_name, table_name)) { co_return api_error::resource_in_use(format("Table {} already exists", table_name)); @@ -4337,11 +4337,12 @@ future executor::describe_endpoints(client_state& return make_ready_future(make_jsonable(std::move(response))); } -static std::map get_network_topology_options(gms::gossiper& gossiper, int rf) { +static std::map get_network_topology_options(service::storage_proxy& sp, gms::gossiper& gossiper, int rf) { std::map options; sstring rf_str = std::to_string(rf); + auto& topology = sp.get_token_metadata_ptr()->get_topology(); for (const gms::inet_address& addr : gossiper.get_live_members()) { - options.emplace(locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(addr), rf_str); + options.emplace(topology.get_datacenter(addr), rf_str); }; return options; } @@ -4375,7 +4376,7 @@ future executor::describe_continuous_backups(clie // of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3. // A smaller cluster (presumably, a test only), gets RF=1. The user may // manually create the keyspace to override this predefined behavior. -static future> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type ts) { +static future> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type ts) { sstring keyspace_name_str(keyspace_name); int endpoint_count = gossiper.get_endpoint_states().size(); int rf = 3; @@ -4384,7 +4385,7 @@ static future> create_keyspace(std::string_view keyspace_n elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.", keyspace_name_str, rf, endpoint_count); } - auto opts = get_network_topology_options(gossiper, rf); + auto opts = get_network_topology_options(sp, gossiper, rf); auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true); co_return mm.prepare_new_keyspace_announcement(ksm, ts); diff --git a/alternator/server.cc b/alternator/server.cc index 118510fe4c..8232efd6ee 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -20,7 +20,6 @@ #include "auth.hh" #include #include "service/storage_proxy.hh" -#include "locator/snitch_base.hh" #include "gms/gossiper.hh" #include "utils/overloaded_functor.hh" #include "utils/fb_utilities.hh" @@ -201,10 +200,9 @@ protected: // It's very easy to get a list of all live nodes on the cluster, // using _gossiper().get_live_members(). But getting // just the list of live nodes in this DC needs more elaborate code: - sstring local_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter( - utils::fb_utilities::get_broadcast_address()); - std::unordered_set local_dc_nodes = - _proxy.get_token_metadata_ptr()->get_topology().get_datacenter_endpoints().at(local_dc); + auto& topology = _proxy.get_token_metadata_ptr()->get_topology(); + sstring local_dc = topology.get_datacenter(); + std::unordered_set local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc); for (auto& ip : local_dc_nodes) { if (_gossiper.is_alive(ip)) { rjson::push_back(results, rjson::from_string(ip.to_sstring())); diff --git a/api/endpoint_snitch.cc b/api/endpoint_snitch.cc index 52fff3031a..44d5ee79f3 100644 --- a/api/endpoint_snitch.cc +++ b/api/endpoint_snitch.cc @@ -6,6 +6,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include "locator/token_metadata.hh" #include "locator/snitch_base.hh" #include "endpoint_snitch.hh" #include "api/api-doc/endpoint_snitch_info.json.hh" @@ -19,12 +20,14 @@ void set_endpoint_snitch(http_context& ctx, routes& r) { return host.empty() ? gms::inet_address(utils::fb_utilities::get_broadcast_address()) : gms::inet_address(host); }; - httpd::endpoint_snitch_info_json::get_datacenter.set(r, [](const_req req) { - return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(host_or_broadcast(req)); + httpd::endpoint_snitch_info_json::get_datacenter.set(r, [&ctx](const_req req) { + auto& topology = ctx.shared_token_metadata.local().get()->get_topology(); + return topology.get_datacenter(host_or_broadcast(req)); }); - httpd::endpoint_snitch_info_json::get_rack.set(r, [](const_req req) { - return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(host_or_broadcast(req)); + httpd::endpoint_snitch_info_json::get_rack.set(r, [&ctx](const_req req) { + auto& topology = ctx.shared_token_metadata.local().get()->get_topology(); + return topology.get_rack(host_or_broadcast(req)); }); httpd::endpoint_snitch_info_json::get_snitch_name.set(r, [] (const_req req) { diff --git a/db/config.hh b/db/config.hh index d2ba29786c..5d33762731 100644 --- a/db/config.hh +++ b/db/config.hh @@ -20,6 +20,8 @@ #include "seastarx.hh" #include "utils/config_file.hh" #include "utils/enum_option.hh" +#include "utils/UUID.hh" +#include "gms/inet_address.hh" #include "db/hints/host_filter.hh" namespace seastar { diff --git a/db/hints/host_filter.cc b/db/hints/host_filter.cc index 45b5eca097..2bbdaa438b 100644 --- a/db/hints/host_filter.cc +++ b/db/hints/host_filter.cc @@ -8,6 +8,7 @@ #include #include +#include "locator/token_metadata.hh" #include "to_string.hh" #include "host_filter.hh" @@ -27,12 +28,12 @@ host_filter::host_filter(std::unordered_set allowed_dcs) , _dcs(std::move(allowed_dcs)) { } -bool host_filter::can_hint_for(locator::snitch_ptr& snitch, gms::inet_address ep) const { +bool host_filter::can_hint_for(const locator::topology& topo, gms::inet_address ep) const { switch (_enabled_kind) { case enabled_kind::enabled_for_all: return true; case enabled_kind::enabled_selectively: - return _dcs.contains(snitch->get_datacenter(ep)); + return _dcs.contains(topo.get_datacenter(ep)); case enabled_kind::disabled_for_all: return false; } diff --git a/db/hints/host_filter.hh b/db/hints/host_filter.hh index aa34fcbca2..06791e9b22 100644 --- a/db/hints/host_filter.hh +++ b/db/hints/host_filter.hh @@ -14,13 +14,14 @@ #include #include -#include "locator/snitch_base.hh" #include "seastarx.hh" namespace gms { class inet_address; } // namespace gms +namespace locator { class topology; } + namespace db { namespace hints { @@ -57,7 +58,7 @@ public: // Parses hint filtering configuration from a list of DCs. static host_filter parse_from_dc_list(sstring opt); - bool can_hint_for(locator::snitch_ptr& snitch, gms::inet_address ep) const; + bool can_hint_for(const locator::topology& topo, gms::inet_address ep) const; inline const std::unordered_set& get_dcs() const { return _dcs; diff --git a/db/hints/manager.cc b/db/hints/manager.cc index c7090b965d..d3db42bdec 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -49,7 +49,6 @@ const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10 manager::manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager& res_manager, distributed& db) : _hints_dir(fs::path(hints_directory) / format("{:d}", this_shard_id())) , _host_filter(std::move(filter)) - , _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr()) , _max_hint_window_us(max_hint_window_ms * 1000) , _local_db(db.local()) , _resource_manager(res_manager) @@ -659,7 +658,7 @@ future<> manager::change_host_filter(host_filter filter) { // for some of them return lister::scan_dir(_hints_dir, { directory_entry_type::directory }, [this] (fs::path datadir, directory_entry de) { const ep_key_type ep = ep_key_type(de.name); - if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(_local_snitch_ptr, ep)) { + if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), ep)) { return make_ready_future<>(); } return get_ep_manager(ep).populate_segments_to_replay(); @@ -670,7 +669,7 @@ future<> manager::change_host_filter(host_filter filter) { }).finally([this] { // Remove endpoint managers which are rejected by the filter return parallel_for_each(_ep_managers, [this] (auto& pair) { - if (_host_filter.can_hint_for(_local_snitch_ptr, pair.first)) { + if (_host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), pair.first)) { return make_ready_future<>(); } return pair.second.stop(drain::no).finally([this, ep = pair.first] { @@ -687,7 +686,7 @@ bool manager::check_dc_for(ep_key_type ep) const noexcept { // If target's DC is not a "hintable" DCs - don't hint. // If there is an end point manager then DC has already been checked and found to be ok. return _host_filter.is_enabled_for_all() || have_ep_manager(ep) || - _host_filter.can_hint_for(_local_snitch_ptr, ep); + _host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), ep); } catch (...) { // if we failed to check the DC - block this hint return false; diff --git a/db/hints/manager.hh b/db/hints/manager.hh index f467f72303..b7b4fc8e54 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -21,7 +21,6 @@ #include #include #include -#include "locator/snitch_base.hh" #include "inet_address_vectors.hh" #include "db/commitlog/commitlog.hh" #include "utils/loading_shared_values.hh" @@ -515,7 +514,6 @@ private: host_filter _host_filter; shared_ptr _proxy_anchor; shared_ptr _gossiper_anchor; - locator::snitch_ptr& _local_snitch_ptr; int64_t _max_hint_window_us = 0; replica::database& _local_db; @@ -530,7 +528,7 @@ private: seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}}; public: - manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager&res_manager, distributed& db); + manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager&res_manager, sharded& db); virtual ~manager(); manager(manager&&) = delete; manager& operator=(manager&&) = delete; diff --git a/db/view/view.cc b/db/view/view.cc index c6d9124f97..5be551cd4c 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1181,14 +1181,15 @@ static std::optional get_view_natural_endpoint(const sstring& keyspace_name, const dht::token& base_token, const dht::token& view_token) { auto &db = service::get_local_storage_proxy().local_db(); + auto& topology = service::get_local_storage_proxy().get_token_metadata_ptr()->get_topology(); auto& ks = db.find_keyspace(keyspace_name); auto erm = ks.get_effective_replication_map(); auto my_address = utils::fb_utilities::get_broadcast_address(); - auto my_datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(my_address); + auto my_datacenter = topology.get_datacenter(); bool network_topology = dynamic_cast(&ks.get_replication_strategy()); std::vector base_endpoints, view_endpoints; for (auto&& base_endpoint : erm->get_natural_endpoints(base_token)) { - if (!network_topology || locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(base_endpoint) == my_datacenter) { + if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) { base_endpoints.push_back(base_endpoint); } } @@ -1206,7 +1207,7 @@ get_view_natural_endpoint(const sstring& keyspace_name, view_endpoint); if (it != base_endpoints.end()) { base_endpoints.erase(it); - } else if (!network_topology || locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(view_endpoint) == my_datacenter) { + } else if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) { view_endpoints.push_back(view_endpoint); } } diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index a8bc5dd734..2ddead7dfd 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -47,7 +47,7 @@ range_streamer::get_range_fetch_map(const std::unordered_mapshould_include(address)) { + if (!filter->should_include(get_token_metadata().get_topology(), address)) { filtered = true; break; } diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index 2f6679a7a8..b9dcea0f4a 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -11,7 +11,6 @@ #pragma once #include "locator/token_metadata.hh" -#include "locator/snitch_base.hh" #include "streaming/stream_plan.hh" #include "streaming/stream_state.hh" #include "streaming/stream_reason.hh" @@ -27,6 +26,7 @@ class database; } namespace gms { class gossiper; } +namespace locator { class topology; } namespace dht { /** @@ -45,7 +45,7 @@ public: */ class i_source_filter { public: - virtual bool should_include(inet_address endpoint) = 0; + virtual bool should_include(const locator::topology&, inet_address endpoint) = 0; virtual ~i_source_filter() {} }; @@ -58,7 +58,7 @@ public: std::set _down_nodes; public: failure_detector_source_filter(std::set down_nodes) : _down_nodes(std::move(down_nodes)) { } - virtual bool should_include(inet_address endpoint) override { return !_down_nodes.contains(endpoint); } + virtual bool should_include(const locator::topology&, inet_address endpoint) override { return !_down_nodes.contains(endpoint); } }; /** @@ -71,9 +71,8 @@ public: single_datacenter_filter(const sstring& source_dc) : _source_dc(source_dc) { } - virtual bool should_include(inet_address endpoint) override { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - return snitch_ptr->get_datacenter(endpoint) == _source_dc; + virtual bool should_include(const locator::topology& topo, inet_address endpoint) override { + return topo.get_datacenter(endpoint) == _source_dc; } }; diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 0382f669f9..e158b820db 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -40,6 +40,7 @@ #include #include "utils/generation-number.hh" #include "locator/token_metadata.hh" +#include "locator/snitch_base.hh" #include "utils/exceptions.hh" namespace gms { diff --git a/lang/lua.cc b/lang/lua.cc index bd158c36be..8d36725992 100644 --- a/lang/lua.cc +++ b/lang/lua.cc @@ -6,6 +6,8 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include +#include #include "lua.hh" #include "exceptions/exceptions.hh" #include "concrete_types.hh" diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 846219addd..c49472a17f 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -21,6 +21,7 @@ #include #include #include "utils/stall_free.hh" +#include "utils/fb_utilities.hh" namespace locator { @@ -1313,6 +1314,27 @@ const endpoint_dc_rack& topology::get_location(const inet_address& ep) const { return _current_locations.at(ep); } +// FIXME -- both methods below should rather return data from the +// get_location() result, but to make it work two things are to be fixed: +// - topology should be aware of internal-ip conversions +// - topology should be pre-populated with data loaded from system ks + +sstring topology::get_rack() const { + return get_rack(utils::fb_utilities::get_broadcast_address()); +} + +sstring topology::get_rack(inet_address ep) const { + return i_endpoint_snitch::get_local_snitch_ptr()->get_rack(ep); +} + +sstring topology::get_datacenter() const { + return get_datacenter(utils::fb_utilities::get_broadcast_address()); +} + +sstring topology::get_datacenter(inet_address ep) const { + return i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(ep); +} + /////////////////// class topology end ///////////////////////////////////////// void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index acaf10475c..a2d478459e 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -98,6 +98,11 @@ public: } const endpoint_dc_rack& get_location(const inet_address& ep) const; + sstring get_rack() const; + sstring get_rack(inet_address ep) const; + sstring get_datacenter() const; + sstring get_datacenter(inet_address ep) const; + private: /** multi-map: DC -> endpoints in that DC */ std::unordered_mapget_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address()); } -static sstring get_local_dc() { - return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter( - utils::fb_utilities::get_broadcast_address()); -} - void repair_stats::add(const repair_stats& o) { round_nr += o.round_nr; round_nr_fast_path_already_synced += o.round_nr_fast_path_already_synced; @@ -1000,6 +995,7 @@ static future<> repair_ranges(lw_shared_ptr ri) { // itself does very little (mainly tell other nodes and CPUs what to do). int repair_service::do_repair_start(sstring keyspace, std::unordered_map options_map) { seastar::sharded& db = get_db(); + auto& topology = db.local().get_token_metadata().get_topology(); repair_tracker().check_in_shutdown(); repair_options options(options_map); @@ -1027,7 +1023,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map 0 || options.hosts.size() > 0) { throw std::runtime_error("You need to run primary range repair on all nodes in the cluster."); @@ -1407,8 +1403,8 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr std::vector mandatory_neighbors; // All neighbors std::vector neighbors; - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - auto local_dc = get_local_dc(); + auto& topology = db.local().get_token_metadata().get_topology(); + auto local_dc = topology.get_datacenter(); auto get_node_losing_the_ranges = [&] (const std::vector& old_nodes, const std::unordered_set& new_nodes) { // Remove the new nodes from the old nodes list, so // that it contains only the node that will lose @@ -1436,7 +1432,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr auto get_old_endpoints_in_local_dc = [&] () { return boost::copy_range>(old_endpoints | boost::adaptors::filtered([&] (const gms::inet_address& node) { - return snitch_ptr->get_datacenter(node) == local_dc; + return topology.get_datacenter(node) == local_dc; }) ); }; @@ -1564,8 +1560,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } std::unordered_map range_sources; dht::token_range_vector ranges_for_removenode; - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - auto local_dc = get_local_dc(); + auto& topology = db.local().get_token_metadata().get_topology(); + auto local_dc = topology.get_datacenter(); bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology; for (auto&r : ranges) { if (ops) { @@ -1586,7 +1582,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } auto get_neighbors_set = [&] (const std::vector& nodes) { for (auto& node : nodes) { - if (snitch_ptr->get_datacenter(node) == local_dc) { + if (topology.get_datacenter(node) == local_dc) { return std::unordered_set{node};; } } @@ -1659,8 +1655,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } } auto neighbors = boost::copy_range>(neighbors_set | - boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) { - return snitch_ptr->get_datacenter(node) == local_dc; + boost::adaptors::filtered([&local_dc, &topology] (const gms::inet_address& node) { + return topology.get_datacenter(node) == local_dc; }) ); @@ -1769,16 +1765,16 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ auto& r = *it; seastar::thread::maybe_yield(); auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto& topology = db.local().get_token_metadata().get_topology(); auto neighbors = boost::copy_range>(strat.calculate_natural_endpoints(end_token, *tmptr).get0() | - boost::adaptors::filtered([myip, &source_dc, &snitch_ptr, &ignore_nodes] (const gms::inet_address& node) { + boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) { if (node == myip) { return false; } if (std::find(ignore_nodes.begin(), ignore_nodes.end(), node) != ignore_nodes.end()) { return false; } - return source_dc.empty() ? true : snitch_ptr->get_datacenter(node) == source_dc; + return source_dc.empty() ? true : topology.get_datacenter(node) == source_dc; }) ); rlogger.debug("{}: keyspace={}, range={}, neighbors={}", op, keyspace_name, r, neighbors); @@ -1811,7 +1807,8 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) { auto op = sstring("rebuild_with_repair"); if (source_dc.empty()) { - source_dc = get_local_dc(); + auto& topology = get_db().local().get_token_metadata().get_topology(); + source_dc = topology.get_datacenter(); } auto reason = streaming::stream_reason::rebuild; co_await do_rebuild_replace_with_repair(std::move(tmptr), std::move(op), std::move(source_dc), reason, {}); @@ -1825,7 +1822,8 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set replacing_tokens, std::list ignore_nodes) { auto cloned_tm = co_await tmptr->clone_async(); auto op = sstring("replace_with_repair"); - auto source_dc = get_local_dc(); + auto& topology = get_db().local().get_token_metadata().get_topology(); + auto source_dc = topology.get_datacenter(); auto reason = streaming::stream_reason::replace; // update a cloned version of tmptr // no need to set the original version diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 524043e522..bece0f18ff 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -137,18 +137,6 @@ const dht::token& end_token(const dht::partition_range& r) { return r.end() ? r.end()->value().token() : max_token; } -static inline -sstring get_dc(gms::inet_address ep) { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - return snitch_ptr->get_datacenter(ep); -} - -static inline -sstring get_local_dc() { - auto local_addr = utils::fb_utilities::get_broadcast_address(); - return get_dc(local_addr); -} - unsigned storage_proxy::cas_shard(const schema& s, dht::token token) { return dht::shard_of(s, token); } @@ -534,7 +522,7 @@ public: } if (_cl_achieved) { // For CL=ANY this can still be false for (auto&& ep : get_targets()) { - ++stats().background_replica_writes_failed.get_ep_stat(ep); + ++stats().background_replica_writes_failed.get_ep_stat(_proxy->get_token_metadata_ptr()->get_topology(), ep); } stats().background_writes_failed += int(!_targets.empty()); } @@ -733,8 +721,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha }; std::unordered_map _dc_responses; bool waited_for(gms::inet_address from) override { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - sstring data_center = snitch_ptr->get_datacenter(from); + auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); + sstring data_center = topology.get_datacenter(from); auto dc_resp = _dc_responses.find(data_center); if (dc_resp->second.acks < dc_resp->second.total_block_for) { @@ -748,17 +736,17 @@ public: std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); for (auto& target : targets) { - auto dc = snitch_ptr->get_datacenter(target); + auto dc = topology.get_datacenter(target); if (!_dc_responses.contains(dc)) { - auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (const gms::inet_address& ep){ - return snitch_ptr->get_datacenter(ep) == dc; + auto pending_for_dc = boost::range::count_if(pending_endpoints, [&topology, &dc] (const gms::inet_address& ep){ + return topology.get_datacenter(ep) == dc; }); - size_t total_endpoints_for_dc = boost::range::count_if(targets, [&snitch_ptr, &dc] (const gms::inet_address& ep){ - return snitch_ptr->get_datacenter(ep) == dc; + size_t total_endpoints_for_dc = boost::range::count_if(targets, [&topology, &dc] (const gms::inet_address& ep){ + return topology.get_datacenter(ep) == dc; }); _dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(ks, dc) + pending_for_dc, total_endpoints_for_dc, 0}); _total_block_for += pending_for_dc; @@ -766,8 +754,8 @@ public: } } bool failure(gms::inet_address from, size_t count, error err) override { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - const sstring& dc = snitch_ptr->get_datacenter(from); + auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); + const sstring& dc = topology.get_datacenter(from); auto dc_resp = _dc_responses.find(dc); dc_resp->second.failures += count; @@ -1726,15 +1714,15 @@ void storage_proxy_stats::stats::register_stats() { }); } -inline uint64_t& storage_proxy_stats::split_stats::get_ep_stat(gms::inet_address ep) noexcept { +inline uint64_t& storage_proxy_stats::split_stats::get_ep_stat(const locator::topology& topo, gms::inet_address ep) noexcept { if (fbu::is_me(ep)) { return _local.val; } try { - sstring dc = get_dc(ep); + sstring dc = topo.get_datacenter(ep); if (_auto_register_metrics) { - register_metrics_for(ep); + register_metrics_for(dc, ep); } return _dc_stats[dc].val; } catch (...) { @@ -1753,10 +1741,9 @@ void storage_proxy_stats::split_stats::register_metrics_local() { }); } -void storage_proxy_stats::split_stats::register_metrics_for(gms::inet_address ep) { +void storage_proxy_stats::split_stats::register_metrics_for(sstring dc, gms::inet_address ep) { namespace sm = seastar::metrics; - sstring dc = get_dc(ep); // if this is the first time we see an endpoint from this DC - add a // corresponding collectd metric if (auto [ignored, added] = _dc_stats.try_emplace(dc); added) { @@ -2461,8 +2448,9 @@ storage_proxy::mutate_atomically_result(std::vector mutations, db::con [this]() -> inet_address_vector_replica_set { auto local_addr = utils::fb_utilities::get_broadcast_address(); auto& topology = _tmptr->get_topology(); - auto& local_endpoints = topology.get_datacenter_racks().at(get_local_dc()); - auto local_rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(local_addr); + auto local_dc = topology.get_datacenter(); + auto& local_endpoints = topology.get_datacenter_racks().at(local_dc); + auto local_rack = topology.get_rack(); auto chosen_endpoints = _p.gossiper().endpoint_filter(local_rack, local_endpoints); if (chosen_endpoints.empty()) { @@ -2725,11 +2713,13 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto& stats = handler_ptr->stats(); auto& handler = *handler_ptr; auto& global_stats = handler._proxy->_global_stats; + auto& topology = get_token_metadata_ptr()->get_topology(); + auto local_dc = topology.get_datacenter(); for(auto dest: handler.get_targets()) { - sstring dc = get_dc(dest); + sstring dc = topology.get_datacenter(dest); // read repair writes do not go through coordinator since mutations are per destination - if (handler.read_repair_write() || dc == get_local_dc()) { + if (handler.read_repair_write() || dc == local_dc) { local.emplace_back("", inet_address_vector_replica_set({dest})); } else { dc_groups[dc].push_back(dest); @@ -2776,9 +2766,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo got_response(response_id, coordinator, std::nullopt); } else { if (!handler.read_repair_write()) { - ++stats.writes_attempts.get_ep_stat(coordinator); + ++stats.writes_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator); } else { - ++stats.read_repair_write_attempts.get_ep_stat(coordinator); + ++stats.read_repair_write_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator); } if (coordinator == my_address) { @@ -2790,7 +2780,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo // Waited on indirectly. (void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) { - ++stats.writes_errors.get_ep_stat(coordinator); + ++stats.writes_errors.get_ep_stat(p->get_token_metadata_ptr()->get_topology(), coordinator); error err = error::FAILURE; std::optional msg; try { @@ -3529,6 +3519,11 @@ private: _proxy->get_stats().foreground_reads -= int(_foreground); _foreground = false; } + + const locator::topology& get_topology() const noexcept { + return _proxy->get_token_metadata_ptr()->get_topology(); + } + public: abstract_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit) : @@ -3552,7 +3547,7 @@ public: protected: future>, cache_temperature>> make_mutation_data_request(lw_shared_ptr cmd, gms::inet_address ep, clock_type::time_point timeout) { - ++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(ep); + ++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(get_topology(), ep); if (fbu::is_me(ep)) { tracing::trace(_trace_state, "read_mutation_data: querying locally"); return _proxy->query_mutations_locally(_schema, cmd, _partition_range, timeout, _trace_state); @@ -3566,7 +3561,7 @@ protected: } } future>, cache_temperature>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) { - ++_proxy->get_stats().data_read_attempts.get_ep_stat(ep); + ++_proxy->get_stats().data_read_attempts.get_ep_stat(get_topology(), ep); auto opts = want_digest ? query::result_options{query::result_request::result_and_digest, digest_algorithm(*_proxy)} : query::result_options{query::result_request::only_result, query::digest_algorithm::none}; @@ -3583,7 +3578,7 @@ protected: } } future> make_digest_request(gms::inet_address ep, clock_type::time_point timeout) { - ++_proxy->get_stats().digest_read_attempts.get_ep_stat(ep); + ++_proxy->get_stats().digest_read_attempts.get_ep_stat(get_topology(), ep); if (fbu::is_me(ep)) { tracing::trace(_trace_state, "read_digest: querying locally"); return _proxy->query_result_local_digest(_schema, _cmd, _partition_range, _trace_state, @@ -3608,10 +3603,10 @@ protected: auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_mutate_data(ep, std::get<0>(std::move(v))); - ++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(ep); + ++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(get_topology(), ep); register_request_latency(latency_clock::now() - start); } catch(...) { - ++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(ep); + ++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(get_topology(), ep); resolver->error(ep, std::current_exception()); } }); @@ -3626,11 +3621,11 @@ protected: auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_data(ep, std::get<0>(std::move(v))); - ++_proxy->get_stats().data_read_completed.get_ep_stat(ep); + ++_proxy->get_stats().data_read_completed.get_ep_stat(get_topology(), ep); _used_targets.push_back(ep); register_request_latency(latency_clock::now() - start); } catch(...) { - ++_proxy->get_stats().data_read_errors.get_ep_stat(ep); + ++_proxy->get_stats().data_read_errors.get_ep_stat(get_topology(), ep); resolver->error(ep, std::current_exception()); } }); @@ -3645,11 +3640,11 @@ protected: auto v = f.get0(); _cf->set_hit_rate(ep, std::get<2>(v)); resolver->add_digest(ep, std::get<0>(v), std::get<1>(v)); - ++_proxy->get_stats().digest_read_completed.get_ep_stat(ep); + ++_proxy->get_stats().digest_read_completed.get_ep_stat(get_topology(), ep); _used_targets.push_back(ep); register_request_latency(latency_clock::now() - start); } catch(...) { - ++_proxy->get_stats().digest_read_errors.get_ep_stat(ep); + ++_proxy->get_stats().digest_read_errors.get_ep_stat(get_topology(), ep); resolver->error(ep, std::current_exception()); } }); diff --git a/service/storage_proxy_stats.hh b/service/storage_proxy_stats.hh index eb9cf55815..3d344e0347 100644 --- a/service/storage_proxy_stats.hh +++ b/service/storage_proxy_stats.hh @@ -13,6 +13,8 @@ #include "utils/histogram.hh" #include +namespace locator { class topology; } + namespace service { namespace storage_proxy_stats { @@ -51,7 +53,7 @@ public: split_stats(const sstring& category, const sstring& short_description_prefix, const sstring& long_description_prefix, const sstring& op_type, bool auto_register_metrics = true); void register_metrics_local(); - void register_metrics_for(gms::inet_address ep); + void register_metrics_for(sstring dc, gms::inet_address ep); /** * Get a reference to the statistics counter corresponding to the given @@ -61,7 +63,7 @@ public: * * @return a reference to the requested counter */ - uint64_t& get_ep_stat(gms::inet_address ep) noexcept; + uint64_t& get_ep_stat(const locator::topology& topo, gms::inet_address ep) noexcept; }; struct write_stats { diff --git a/service/storage_service.cc b/service/storage_service.cc index 464eeff5de..9f3413841f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3072,7 +3072,9 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ include_only_local_dc ? get_range_to_address_map_in_local_dc(keyspace) : get_range_to_address_map(keyspace); + auto tmptr = get_token_metadata_ptr(); for (auto entry : range_to_address_map) { + const auto& topology = tmptr->get_topology(); auto range = entry.first; auto addresses = entry.second; token_range_endpoints tr; @@ -3085,8 +3087,8 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ for (auto endpoint : addresses) { endpoint_details details; details._host = endpoint; - details._datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(endpoint); - details._rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(endpoint); + details._datacenter = topology.get_datacenter(endpoint); + details._rack = topology.get_rack(endpoint); tr._rpc_endpoints.push_back(get_rpc_address(endpoint)); tr._endpoints.push_back(boost::lexical_cast(details._host)); tr._endpoint_details.push_back(details);