From b6f7c8da8b7317d6b9ec36260791cc1fb34a7ede Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 14:51:11 +0300 Subject: [PATCH 01/12] topology: Add get_rack/_datacenter methods For now they just forward the request to snitch. Once topology is properly updated boot-time dc/rack info and knows internal IP it will be able to serve request on its own. For convenience overloads without arguments return dc/rack for current node. Signed-off-by: Pavel Emelyanov --- locator/token_metadata.cc | 22 ++++++++++++++++++++++ locator/token_metadata.hh | 5 +++++ 2 files changed, 27 insertions(+) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 846219addd..c49472a17f 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -21,6 +21,7 @@ #include #include #include "utils/stall_free.hh" +#include "utils/fb_utilities.hh" namespace locator { @@ -1313,6 +1314,27 @@ const endpoint_dc_rack& topology::get_location(const inet_address& ep) const { return _current_locations.at(ep); } +// FIXME -- both methods below should rather return data from the +// get_location() result, but to make it work two things are to be fixed: +// - topology should be aware of internal-ip conversions +// - topology should be pre-populated with data loaded from system ks + +sstring topology::get_rack() const { + return get_rack(utils::fb_utilities::get_broadcast_address()); +} + +sstring topology::get_rack(inet_address ep) const { + return i_endpoint_snitch::get_local_snitch_ptr()->get_rack(ep); +} + +sstring topology::get_datacenter() const { + return get_datacenter(utils::fb_utilities::get_broadcast_address()); +} + +sstring topology::get_datacenter(inet_address ep) const { + return i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(ep); +} + /////////////////// class topology end ///////////////////////////////////////// void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index acaf10475c..a2d478459e 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -98,6 +98,11 @@ public: } const endpoint_dc_rack& get_location(const inet_address& ep) const; + sstring get_rack() const; + sstring get_rack(inet_address ep) const; + sstring get_datacenter() const; + sstring get_datacenter(inet_address ep) const; + private: /** multi-map: DC -> endpoints in that DC */ std::unordered_map Date: Fri, 17 Jun 2022 15:16:29 +0300 Subject: [PATCH 02/12] proxy: Get rack/datacenter from topology Proxy has shared token metadata from which it can get the topology. This change obsoletes static get_local_dc() helper. Signed-off-by: Pavel Emelyanov --- service/storage_proxy.cc | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 524043e522..0ad5a9b94f 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -143,12 +143,6 @@ sstring get_dc(gms::inet_address ep) { return snitch_ptr->get_datacenter(ep); } -static inline -sstring get_local_dc() { - auto local_addr = utils::fb_utilities::get_broadcast_address(); - return get_dc(local_addr); -} - unsigned storage_proxy::cas_shard(const schema& s, dht::token token) { return dht::shard_of(s, token); } @@ -733,8 +727,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha }; std::unordered_map _dc_responses; bool waited_for(gms::inet_address from) override { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - sstring data_center = snitch_ptr->get_datacenter(from); + auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); + sstring data_center = topology.get_datacenter(from); auto dc_resp = _dc_responses.find(data_center); if (dc_resp->second.acks < dc_resp->second.total_block_for) { @@ -748,17 +742,17 @@ public: std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); for (auto& target : targets) { - auto dc = snitch_ptr->get_datacenter(target); + auto dc = topology.get_datacenter(target); if (!_dc_responses.contains(dc)) { - auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (const gms::inet_address& ep){ - return snitch_ptr->get_datacenter(ep) == dc; + auto pending_for_dc = boost::range::count_if(pending_endpoints, [&topology, &dc] (const gms::inet_address& ep){ + return topology.get_datacenter(ep) == dc; }); - size_t total_endpoints_for_dc = boost::range::count_if(targets, [&snitch_ptr, &dc] (const gms::inet_address& ep){ - return snitch_ptr->get_datacenter(ep) == dc; + size_t total_endpoints_for_dc = boost::range::count_if(targets, [&topology, &dc] (const gms::inet_address& ep){ + return topology.get_datacenter(ep) == dc; }); _dc_responses.emplace(dc, dc_info{0, db::local_quorum_for(ks, dc) + pending_for_dc, total_endpoints_for_dc, 0}); _total_block_for += pending_for_dc; @@ -766,8 +760,8 @@ public: } } bool failure(gms::inet_address from, size_t count, error err) override { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - const sstring& dc = snitch_ptr->get_datacenter(from); + auto& topology = _proxy->get_token_metadata_ptr()->get_topology(); + const sstring& dc = topology.get_datacenter(from); auto dc_resp = _dc_responses.find(dc); dc_resp->second.failures += count; @@ -2461,8 +2455,9 @@ storage_proxy::mutate_atomically_result(std::vector mutations, db::con [this]() -> inet_address_vector_replica_set { auto local_addr = utils::fb_utilities::get_broadcast_address(); auto& topology = _tmptr->get_topology(); - auto& local_endpoints = topology.get_datacenter_racks().at(get_local_dc()); - auto local_rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(local_addr); + auto local_dc = topology.get_datacenter(); + auto& local_endpoints = topology.get_datacenter_racks().at(local_dc); + auto local_rack = topology.get_rack(); auto chosen_endpoints = _p.gossiper().endpoint_filter(local_rack, local_endpoints); if (chosen_endpoints.empty()) { @@ -2725,11 +2720,13 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto& stats = handler_ptr->stats(); auto& handler = *handler_ptr; auto& global_stats = handler._proxy->_global_stats; + auto& topology = get_token_metadata_ptr()->get_topology(); + auto local_dc = topology.get_datacenter(); for(auto dest: handler.get_targets()) { - sstring dc = get_dc(dest); + sstring dc = topology.get_datacenter(dest); // read repair writes do not go through coordinator since mutations are per destination - if (handler.read_repair_write() || dc == get_local_dc()) { + if (handler.read_repair_write() || dc == local_dc) { local.emplace_back("", inet_address_vector_replica_set({dest})); } else { dc_groups[dc].push_back(dest); From 894cbeacc554648f37c516e34897e3069c123bd4 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 15:19:22 +0300 Subject: [PATCH 03/12] storage_service: Get rack/datacenter from topology Same as in previous patch -- storage service has token metadata to get topology from. Signed-off-by: Pavel Emelyanov --- service/storage_service.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 464eeff5de..9f3413841f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3072,7 +3072,9 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ include_only_local_dc ? get_range_to_address_map_in_local_dc(keyspace) : get_range_to_address_map(keyspace); + auto tmptr = get_token_metadata_ptr(); for (auto entry : range_to_address_map) { + const auto& topology = tmptr->get_topology(); auto range = entry.first; auto addresses = entry.second; token_range_endpoints tr; @@ -3085,8 +3087,8 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ for (auto endpoint : addresses) { endpoint_details details; details._host = endpoint; - details._datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(endpoint); - details._rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(endpoint); + details._datacenter = topology.get_datacenter(endpoint); + details._rack = topology.get_rack(endpoint); tr._rpc_endpoints.push_back(get_rpc_address(endpoint)); tr._endpoints.push_back(boost::lexical_cast(details._host)); tr._endpoint_details.push_back(details); From 17128eb54b661709ff35e0b2225c94166e67b898 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 15:46:59 +0300 Subject: [PATCH 04/12] view: Get rack/datacenter from topology The view code already gets token metadata from global proxy instance. Do the same to get topology object. Signed-off-by: Pavel Emelyanov --- db/view/view.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 243d8df8a5..6666afda32 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1177,14 +1177,15 @@ static std::optional get_view_natural_endpoint(const sstring& keyspace_name, const dht::token& base_token, const dht::token& view_token) { auto &db = service::get_local_storage_proxy().local_db(); + auto& topology = service::get_local_storage_proxy().get_token_metadata_ptr()->get_topology(); auto& ks = db.find_keyspace(keyspace_name); auto erm = ks.get_effective_replication_map(); auto my_address = utils::fb_utilities::get_broadcast_address(); - auto my_datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(my_address); + auto my_datacenter = topology.get_datacenter(); bool network_topology = dynamic_cast(&ks.get_replication_strategy()); std::vector base_endpoints, view_endpoints; for (auto&& base_endpoint : erm->get_natural_endpoints(base_token)) { - if (!network_topology || locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(base_endpoint) == my_datacenter) { + if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) { base_endpoints.push_back(base_endpoint); } } @@ -1202,7 +1203,7 @@ get_view_natural_endpoint(const sstring& keyspace_name, view_endpoint); if (it != base_endpoints.end()) { base_endpoints.erase(it); - } else if (!network_topology || locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(view_endpoint) == my_datacenter) { + } else if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) { view_endpoints.push_back(view_endpoint); } } From b28db0294cd643fdf8ec98793e0262a9006861ec Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 15:55:04 +0300 Subject: [PATCH 05/12] repair: Get rack/datacenter from topology Repair gets token metadata from its local database reference. Not perfect, repair should better have its own private token meta reference, but it's OK for now. The change obsoletes static get_local_dc helper. Signed-off-by: Pavel Emelyanov --- repair/repair.cc | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 789f389447..c423ec35b5 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -678,11 +678,6 @@ static dht::token_range_vector get_primary_ranges_within_dc( return db.find_keyspace(keyspace).get_effective_replication_map()->get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address()); } -static sstring get_local_dc() { - return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter( - utils::fb_utilities::get_broadcast_address()); -} - void repair_stats::add(const repair_stats& o) { round_nr += o.round_nr; round_nr_fast_path_already_synced += o.round_nr_fast_path_already_synced; @@ -1000,6 +995,7 @@ static future<> repair_ranges(lw_shared_ptr ri) { // itself does very little (mainly tell other nodes and CPUs what to do). int repair_service::do_repair_start(sstring keyspace, std::unordered_map options_map) { seastar::sharded& db = get_db(); + auto& topology = db.local().get_token_metadata().get_topology(); repair_tracker().check_in_shutdown(); repair_options options(options_map); @@ -1027,7 +1023,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map 0 || options.hosts.size() > 0) { throw std::runtime_error("You need to run primary range repair on all nodes in the cluster."); @@ -1407,8 +1403,8 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr std::vector mandatory_neighbors; // All neighbors std::vector neighbors; - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - auto local_dc = get_local_dc(); + auto& topology = db.local().get_token_metadata().get_topology(); + auto local_dc = topology.get_datacenter(); auto get_node_losing_the_ranges = [&] (const std::vector& old_nodes, const std::unordered_set& new_nodes) { // Remove the new nodes from the old nodes list, so // that it contains only the node that will lose @@ -1436,7 +1432,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr auto get_old_endpoints_in_local_dc = [&] () { return boost::copy_range>(old_endpoints | boost::adaptors::filtered([&] (const gms::inet_address& node) { - return snitch_ptr->get_datacenter(node) == local_dc; + return topology.get_datacenter(node) == local_dc; }) ); }; @@ -1564,8 +1560,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } std::unordered_map range_sources; dht::token_range_vector ranges_for_removenode; - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - auto local_dc = get_local_dc(); + auto& topology = db.local().get_token_metadata().get_topology(); + auto local_dc = topology.get_datacenter(); bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology; for (auto&r : ranges) { if (ops) { @@ -1586,7 +1582,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } auto get_neighbors_set = [&] (const std::vector& nodes) { for (auto& node : nodes) { - if (snitch_ptr->get_datacenter(node) == local_dc) { + if (topology.get_datacenter(node) == local_dc) { return std::unordered_set{node};; } } @@ -1659,8 +1655,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } } auto neighbors = boost::copy_range>(neighbors_set | - boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) { - return snitch_ptr->get_datacenter(node) == local_dc; + boost::adaptors::filtered([&local_dc, &topology] (const gms::inet_address& node) { + return topology.get_datacenter(node) == local_dc; }) ); @@ -1769,16 +1765,16 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ auto& r = *it; seastar::thread::maybe_yield(); auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto& topology = db.local().get_token_metadata().get_topology(); auto neighbors = boost::copy_range>(strat.calculate_natural_endpoints(end_token, *tmptr).get0() | - boost::adaptors::filtered([myip, &source_dc, &snitch_ptr, &ignore_nodes] (const gms::inet_address& node) { + boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) { if (node == myip) { return false; } if (std::find(ignore_nodes.begin(), ignore_nodes.end(), node) != ignore_nodes.end()) { return false; } - return source_dc.empty() ? true : snitch_ptr->get_datacenter(node) == source_dc; + return source_dc.empty() ? true : topology.get_datacenter(node) == source_dc; }) ); rlogger.debug("{}: keyspace={}, range={}, neighbors={}", op, keyspace_name, r, neighbors); @@ -1811,7 +1807,8 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) { auto op = sstring("rebuild_with_repair"); if (source_dc.empty()) { - source_dc = get_local_dc(); + auto& topology = get_db().local().get_token_metadata().get_topology(); + source_dc = topology.get_datacenter(); } auto reason = streaming::stream_reason::rebuild; co_await do_rebuild_replace_with_repair(std::move(tmptr), std::move(op), std::move(source_dc), reason, {}); @@ -1825,7 +1822,8 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set replacing_tokens, std::list ignore_nodes) { auto cloned_tm = co_await tmptr->clone_async(); auto op = sstring("replace_with_repair"); - auto source_dc = get_local_dc(); + auto& topology = get_db().local().get_token_metadata().get_topology(); + auto source_dc = topology.get_datacenter(); auto reason = streaming::stream_reason::replace; // update a cloned version of tmptr // no need to set the original version From 5e2fa32c8c2f25ba1a55658c23cd7fba114c3af4 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 16:17:00 +0300 Subject: [PATCH 06/12] range_streamer: Get rack/datacenter from topology It's needed in source filter classes so range-streamer passes the topology reference into its methods. Nice side effect -- snitch header goes away from range-streamer one. Signed-off-by: Pavel Emelyanov --- dht/range_streamer.cc | 2 +- dht/range_streamer.hh | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index a8bc5dd734..2ddead7dfd 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -47,7 +47,7 @@ range_streamer::get_range_fetch_map(const std::unordered_mapshould_include(address)) { + if (!filter->should_include(get_token_metadata().get_topology(), address)) { filtered = true; break; } diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index 2f6679a7a8..b9dcea0f4a 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -11,7 +11,6 @@ #pragma once #include "locator/token_metadata.hh" -#include "locator/snitch_base.hh" #include "streaming/stream_plan.hh" #include "streaming/stream_state.hh" #include "streaming/stream_reason.hh" @@ -27,6 +26,7 @@ class database; } namespace gms { class gossiper; } +namespace locator { class topology; } namespace dht { /** @@ -45,7 +45,7 @@ public: */ class i_source_filter { public: - virtual bool should_include(inet_address endpoint) = 0; + virtual bool should_include(const locator::topology&, inet_address endpoint) = 0; virtual ~i_source_filter() {} }; @@ -58,7 +58,7 @@ public: std::set _down_nodes; public: failure_detector_source_filter(std::set down_nodes) : _down_nodes(std::move(down_nodes)) { } - virtual bool should_include(inet_address endpoint) override { return !_down_nodes.contains(endpoint); } + virtual bool should_include(const locator::topology&, inet_address endpoint) override { return !_down_nodes.contains(endpoint); } }; /** @@ -71,9 +71,8 @@ public: single_datacenter_filter(const sstring& source_dc) : _source_dc(source_dc) { } - virtual bool should_include(inet_address endpoint) override { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - return snitch_ptr->get_datacenter(endpoint) == _source_dc; + virtual bool should_include(const locator::topology& topo, inet_address endpoint) override { + return topo.get_datacenter(endpoint) == _source_dc; } }; From 98a4d41e316048cce548ef7d7994faf01bafc61d Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 16:23:22 +0300 Subject: [PATCH 07/12] alternator: Get rack/datacenter from topology It's needed in two places, both can get topology from the proxy's token metadata. Signed-off-by: Pavel Emelyanov --- alternator/executor.cc | 13 +++++++------ alternator/server.cc | 8 +++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index bd5521e8ef..4eddf38ebd 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -58,7 +58,7 @@ logging::logger elogger("alternator-executor"); namespace alternator { -static future> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type); +static future> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type); static map_type attrs_type() { static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true); @@ -1071,7 +1071,7 @@ static future create_table_on_shard0(tracing::tra auto ts = group0_guard.write_timestamp(); std::vector schema_mutations; try { - schema_mutations = co_await create_keyspace(keyspace_name, mm, gossiper, ts); + schema_mutations = co_await create_keyspace(keyspace_name, sp, mm, gossiper, ts); } catch (exceptions::already_exists_exception&) { if (sp.data_dictionary().has_schema(keyspace_name, table_name)) { co_return api_error::resource_in_use(format("Table {} already exists", table_name)); @@ -4337,11 +4337,12 @@ future executor::describe_endpoints(client_state& return make_ready_future(make_jsonable(std::move(response))); } -static std::map get_network_topology_options(gms::gossiper& gossiper, int rf) { +static std::map get_network_topology_options(service::storage_proxy& sp, gms::gossiper& gossiper, int rf) { std::map options; sstring rf_str = std::to_string(rf); + auto& topology = sp.get_token_metadata_ptr()->get_topology(); for (const gms::inet_address& addr : gossiper.get_live_members()) { - options.emplace(locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(addr), rf_str); + options.emplace(topology.get_datacenter(addr), rf_str); }; return options; } @@ -4375,7 +4376,7 @@ future executor::describe_continuous_backups(clie // of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3. // A smaller cluster (presumably, a test only), gets RF=1. The user may // manually create the keyspace to override this predefined behavior. -static future> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type ts) { +static future> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type ts) { sstring keyspace_name_str(keyspace_name); int endpoint_count = gossiper.get_endpoint_states().size(); int rf = 3; @@ -4384,7 +4385,7 @@ static future> create_keyspace(std::string_view keyspace_n elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.", keyspace_name_str, rf, endpoint_count); } - auto opts = get_network_topology_options(gossiper, rf); + auto opts = get_network_topology_options(sp, gossiper, rf); auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true); co_return mm.prepare_new_keyspace_announcement(ksm, ts); diff --git a/alternator/server.cc b/alternator/server.cc index 118510fe4c..8232efd6ee 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -20,7 +20,6 @@ #include "auth.hh" #include #include "service/storage_proxy.hh" -#include "locator/snitch_base.hh" #include "gms/gossiper.hh" #include "utils/overloaded_functor.hh" #include "utils/fb_utilities.hh" @@ -201,10 +200,9 @@ protected: // It's very easy to get a list of all live nodes on the cluster, // using _gossiper().get_live_members(). But getting // just the list of live nodes in this DC needs more elaborate code: - sstring local_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter( - utils::fb_utilities::get_broadcast_address()); - std::unordered_set local_dc_nodes = - _proxy.get_token_metadata_ptr()->get_topology().get_datacenter_endpoints().at(local_dc); + auto& topology = _proxy.get_token_metadata_ptr()->get_topology(); + sstring local_dc = topology.get_datacenter(); + std::unordered_set local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc); for (auto& ip : local_dc_nodes) { if (_gossiper.is_alive(ip)) { rjson::push_back(results, rjson::from_string(ip.to_sstring())); From 9b6312687b3130a9bbdc4f851aa2219b32dd6c2c Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 16:36:08 +0300 Subject: [PATCH 08/12] hints: Get rack/datacenter from topology The topology referecne is obtained from the proxy anchor pointer sitting on manager. Signed-off-by: Pavel Emelyanov --- db/hints/host_filter.cc | 5 +++-- db/hints/host_filter.hh | 4 +++- db/hints/manager.cc | 6 +++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/db/hints/host_filter.cc b/db/hints/host_filter.cc index 45b5eca097..2bbdaa438b 100644 --- a/db/hints/host_filter.cc +++ b/db/hints/host_filter.cc @@ -8,6 +8,7 @@ #include #include +#include "locator/token_metadata.hh" #include "to_string.hh" #include "host_filter.hh" @@ -27,12 +28,12 @@ host_filter::host_filter(std::unordered_set allowed_dcs) , _dcs(std::move(allowed_dcs)) { } -bool host_filter::can_hint_for(locator::snitch_ptr& snitch, gms::inet_address ep) const { +bool host_filter::can_hint_for(const locator::topology& topo, gms::inet_address ep) const { switch (_enabled_kind) { case enabled_kind::enabled_for_all: return true; case enabled_kind::enabled_selectively: - return _dcs.contains(snitch->get_datacenter(ep)); + return _dcs.contains(topo.get_datacenter(ep)); case enabled_kind::disabled_for_all: return false; } diff --git a/db/hints/host_filter.hh b/db/hints/host_filter.hh index aa34fcbca2..c98ba457b9 100644 --- a/db/hints/host_filter.hh +++ b/db/hints/host_filter.hh @@ -21,6 +21,8 @@ namespace gms { class inet_address; } // namespace gms +namespace locator { class topology; } + namespace db { namespace hints { @@ -57,7 +59,7 @@ public: // Parses hint filtering configuration from a list of DCs. static host_filter parse_from_dc_list(sstring opt); - bool can_hint_for(locator::snitch_ptr& snitch, gms::inet_address ep) const; + bool can_hint_for(const locator::topology& topo, gms::inet_address ep) const; inline const std::unordered_set& get_dcs() const { return _dcs; diff --git a/db/hints/manager.cc b/db/hints/manager.cc index c7090b965d..d5f39ad621 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -659,7 +659,7 @@ future<> manager::change_host_filter(host_filter filter) { // for some of them return lister::scan_dir(_hints_dir, { directory_entry_type::directory }, [this] (fs::path datadir, directory_entry de) { const ep_key_type ep = ep_key_type(de.name); - if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(_local_snitch_ptr, ep)) { + if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), ep)) { return make_ready_future<>(); } return get_ep_manager(ep).populate_segments_to_replay(); @@ -670,7 +670,7 @@ future<> manager::change_host_filter(host_filter filter) { }).finally([this] { // Remove endpoint managers which are rejected by the filter return parallel_for_each(_ep_managers, [this] (auto& pair) { - if (_host_filter.can_hint_for(_local_snitch_ptr, pair.first)) { + if (_host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), pair.first)) { return make_ready_future<>(); } return pair.second.stop(drain::no).finally([this, ep = pair.first] { @@ -687,7 +687,7 @@ bool manager::check_dc_for(ep_key_type ep) const noexcept { // If target's DC is not a "hintable" DCs - don't hint. // If there is an end point manager then DC has already been checked and found to be ok. return _host_filter.is_enabled_for_all() || have_ep_manager(ep) || - _host_filter.can_hint_for(_local_snitch_ptr, ep); + _host_filter.can_hint_for(_proxy_anchor->get_token_metadata_ptr()->get_topology(), ep); } catch (...) { // if we failed to check the DC - block this hint return false; From 820be06ac1dd5882181cc119419c828eccdb2083 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 16:50:34 +0300 Subject: [PATCH 09/12] hints: Remove snitch dependency After previous patch hints manager class gets unused dependency on snitch. While removing it it turns out that several unrelated places get needed headers indirectly via host_filter.hh -> snitsh_base.hh inclusion. Signed-off-by: Pavel Emelyanov --- db/config.hh | 2 ++ db/hints/host_filter.hh | 1 - db/hints/manager.cc | 1 - db/hints/manager.hh | 4 +--- gms/gossiper.cc | 1 + lang/lua.cc | 2 ++ 6 files changed, 6 insertions(+), 5 deletions(-) diff --git a/db/config.hh b/db/config.hh index d2ba29786c..5d33762731 100644 --- a/db/config.hh +++ b/db/config.hh @@ -20,6 +20,8 @@ #include "seastarx.hh" #include "utils/config_file.hh" #include "utils/enum_option.hh" +#include "utils/UUID.hh" +#include "gms/inet_address.hh" #include "db/hints/host_filter.hh" namespace seastar { diff --git a/db/hints/host_filter.hh b/db/hints/host_filter.hh index c98ba457b9..06791e9b22 100644 --- a/db/hints/host_filter.hh +++ b/db/hints/host_filter.hh @@ -14,7 +14,6 @@ #include #include -#include "locator/snitch_base.hh" #include "seastarx.hh" namespace gms { diff --git a/db/hints/manager.cc b/db/hints/manager.cc index d5f39ad621..d3db42bdec 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -49,7 +49,6 @@ const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10 manager::manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager& res_manager, distributed& db) : _hints_dir(fs::path(hints_directory) / format("{:d}", this_shard_id())) , _host_filter(std::move(filter)) - , _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr()) , _max_hint_window_us(max_hint_window_ms * 1000) , _local_db(db.local()) , _resource_manager(res_manager) diff --git a/db/hints/manager.hh b/db/hints/manager.hh index f467f72303..b7b4fc8e54 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -21,7 +21,6 @@ #include #include #include -#include "locator/snitch_base.hh" #include "inet_address_vectors.hh" #include "db/commitlog/commitlog.hh" #include "utils/loading_shared_values.hh" @@ -515,7 +514,6 @@ private: host_filter _host_filter; shared_ptr _proxy_anchor; shared_ptr _gossiper_anchor; - locator::snitch_ptr& _local_snitch_ptr; int64_t _max_hint_window_us = 0; replica::database& _local_db; @@ -530,7 +528,7 @@ private: seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}}; public: - manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager&res_manager, distributed& db); + manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager&res_manager, sharded& db); virtual ~manager(); manager(manager&&) = delete; manager& operator=(manager&&) = delete; diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 0382f669f9..e158b820db 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -40,6 +40,7 @@ #include #include "utils/generation-number.hh" #include "locator/token_metadata.hh" +#include "locator/snitch_base.hh" #include "utils/exceptions.hh" namespace gms { diff --git a/lang/lua.cc b/lang/lua.cc index bd158c36be..8d36725992 100644 --- a/lang/lua.cc +++ b/lang/lua.cc @@ -6,6 +6,8 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include +#include #include "lua.hh" #include "exceptions/exceptions.hh" #include "concrete_types.hh" From 3ab7c9320c0454a1d34710bc38c74afe8ea208f7 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 16:54:06 +0300 Subject: [PATCH 10/12] api: Get rack/datacenter from topology The http_ctx already has token metadata on board, it's possible to get topology from it. Signed-off-by: Pavel Emelyanov --- api/endpoint_snitch.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/api/endpoint_snitch.cc b/api/endpoint_snitch.cc index 52fff3031a..44d5ee79f3 100644 --- a/api/endpoint_snitch.cc +++ b/api/endpoint_snitch.cc @@ -6,6 +6,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include "locator/token_metadata.hh" #include "locator/snitch_base.hh" #include "endpoint_snitch.hh" #include "api/api-doc/endpoint_snitch_info.json.hh" @@ -19,12 +20,14 @@ void set_endpoint_snitch(http_context& ctx, routes& r) { return host.empty() ? gms::inet_address(utils::fb_utilities::get_broadcast_address()) : gms::inet_address(host); }; - httpd::endpoint_snitch_info_json::get_datacenter.set(r, [](const_req req) { - return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(host_or_broadcast(req)); + httpd::endpoint_snitch_info_json::get_datacenter.set(r, [&ctx](const_req req) { + auto& topology = ctx.shared_token_metadata.local().get()->get_topology(); + return topology.get_datacenter(host_or_broadcast(req)); }); - httpd::endpoint_snitch_info_json::get_rack.set(r, [](const_req req) { - return locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(host_or_broadcast(req)); + httpd::endpoint_snitch_info_json::get_rack.set(r, [&ctx](const_req req) { + auto& topology = ctx.shared_token_metadata.local().get()->get_topology(); + return topology.get_rack(host_or_broadcast(req)); }); httpd::endpoint_snitch_info_json::get_snitch_name.set(r, [] (const_req req) { From 8ffe2494305aecce4b4f59ae9f6732049f47e3a4 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 17:11:37 +0300 Subject: [PATCH 11/12] proxy stats: Push topology arg to get_ep_stats The latter will need it to get dc info from. All the callers are either storage proxy or have storage proxy pointer/reference to get topology from. Signed-off-by: Pavel Emelyanov --- service/storage_proxy.cc | 33 +++++++++++++++++++-------------- service/storage_proxy_stats.hh | 4 +++- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 0ad5a9b94f..9ca302e87d 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -528,7 +528,7 @@ public: } if (_cl_achieved) { // For CL=ANY this can still be false for (auto&& ep : get_targets()) { - ++stats().background_replica_writes_failed.get_ep_stat(ep); + ++stats().background_replica_writes_failed.get_ep_stat(_proxy->get_token_metadata_ptr()->get_topology(), ep); } stats().background_writes_failed += int(!_targets.empty()); } @@ -1720,7 +1720,7 @@ void storage_proxy_stats::stats::register_stats() { }); } -inline uint64_t& storage_proxy_stats::split_stats::get_ep_stat(gms::inet_address ep) noexcept { +inline uint64_t& storage_proxy_stats::split_stats::get_ep_stat(const locator::topology& topo, gms::inet_address ep) noexcept { if (fbu::is_me(ep)) { return _local.val; } @@ -2773,9 +2773,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo got_response(response_id, coordinator, std::nullopt); } else { if (!handler.read_repair_write()) { - ++stats.writes_attempts.get_ep_stat(coordinator); + ++stats.writes_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator); } else { - ++stats.read_repair_write_attempts.get_ep_stat(coordinator); + ++stats.read_repair_write_attempts.get_ep_stat(get_token_metadata_ptr()->get_topology(), coordinator); } if (coordinator == my_address) { @@ -2787,7 +2787,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo // Waited on indirectly. (void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) { - ++stats.writes_errors.get_ep_stat(coordinator); + ++stats.writes_errors.get_ep_stat(p->get_token_metadata_ptr()->get_topology(), coordinator); error err = error::FAILURE; std::optional msg; try { @@ -3526,6 +3526,11 @@ private: _proxy->get_stats().foreground_reads -= int(_foreground); _foreground = false; } + + const locator::topology& get_topology() const noexcept { + return _proxy->get_token_metadata_ptr()->get_topology(); + } + public: abstract_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit) : @@ -3549,7 +3554,7 @@ public: protected: future>, cache_temperature>> make_mutation_data_request(lw_shared_ptr cmd, gms::inet_address ep, clock_type::time_point timeout) { - ++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(ep); + ++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(get_topology(), ep); if (fbu::is_me(ep)) { tracing::trace(_trace_state, "read_mutation_data: querying locally"); return _proxy->query_mutations_locally(_schema, cmd, _partition_range, timeout, _trace_state); @@ -3563,7 +3568,7 @@ protected: } } future>, cache_temperature>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) { - ++_proxy->get_stats().data_read_attempts.get_ep_stat(ep); + ++_proxy->get_stats().data_read_attempts.get_ep_stat(get_topology(), ep); auto opts = want_digest ? query::result_options{query::result_request::result_and_digest, digest_algorithm(*_proxy)} : query::result_options{query::result_request::only_result, query::digest_algorithm::none}; @@ -3580,7 +3585,7 @@ protected: } } future> make_digest_request(gms::inet_address ep, clock_type::time_point timeout) { - ++_proxy->get_stats().digest_read_attempts.get_ep_stat(ep); + ++_proxy->get_stats().digest_read_attempts.get_ep_stat(get_topology(), ep); if (fbu::is_me(ep)) { tracing::trace(_trace_state, "read_digest: querying locally"); return _proxy->query_result_local_digest(_schema, _cmd, _partition_range, _trace_state, @@ -3605,10 +3610,10 @@ protected: auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_mutate_data(ep, std::get<0>(std::move(v))); - ++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(ep); + ++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(get_topology(), ep); register_request_latency(latency_clock::now() - start); } catch(...) { - ++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(ep); + ++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(get_topology(), ep); resolver->error(ep, std::current_exception()); } }); @@ -3623,11 +3628,11 @@ protected: auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_data(ep, std::get<0>(std::move(v))); - ++_proxy->get_stats().data_read_completed.get_ep_stat(ep); + ++_proxy->get_stats().data_read_completed.get_ep_stat(get_topology(), ep); _used_targets.push_back(ep); register_request_latency(latency_clock::now() - start); } catch(...) { - ++_proxy->get_stats().data_read_errors.get_ep_stat(ep); + ++_proxy->get_stats().data_read_errors.get_ep_stat(get_topology(), ep); resolver->error(ep, std::current_exception()); } }); @@ -3642,11 +3647,11 @@ protected: auto v = f.get0(); _cf->set_hit_rate(ep, std::get<2>(v)); resolver->add_digest(ep, std::get<0>(v), std::get<1>(v)); - ++_proxy->get_stats().digest_read_completed.get_ep_stat(ep); + ++_proxy->get_stats().digest_read_completed.get_ep_stat(get_topology(), ep); _used_targets.push_back(ep); register_request_latency(latency_clock::now() - start); } catch(...) { - ++_proxy->get_stats().digest_read_errors.get_ep_stat(ep); + ++_proxy->get_stats().digest_read_errors.get_ep_stat(get_topology(), ep); resolver->error(ep, std::current_exception()); } }); diff --git a/service/storage_proxy_stats.hh b/service/storage_proxy_stats.hh index eb9cf55815..efe0e278ae 100644 --- a/service/storage_proxy_stats.hh +++ b/service/storage_proxy_stats.hh @@ -13,6 +13,8 @@ #include "utils/histogram.hh" #include +namespace locator { class topology; } + namespace service { namespace storage_proxy_stats { @@ -61,7 +63,7 @@ public: * * @return a reference to the requested counter */ - uint64_t& get_ep_stat(gms::inet_address ep) noexcept; + uint64_t& get_ep_stat(const locator::topology& topo, gms::inet_address ep) noexcept; }; struct write_stats { From f0cafc35fd41c1383d047d889b8fb682c60f513f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 17 Jun 2022 17:02:35 +0300 Subject: [PATCH 12/12] proxy stats: Get rack/datacenter from topology The reference is already at hand. The get_ep_stats() calls another helper that also maps endpoint to datacenter, but it can get the obtained dc sstring via argument. Signed-off-by: Pavel Emelyanov --- service/storage_proxy.cc | 13 +++---------- service/storage_proxy_stats.hh | 2 +- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9ca302e87d..bece0f18ff 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -137,12 +137,6 @@ const dht::token& end_token(const dht::partition_range& r) { return r.end() ? r.end()->value().token() : max_token; } -static inline -sstring get_dc(gms::inet_address ep) { - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - return snitch_ptr->get_datacenter(ep); -} - unsigned storage_proxy::cas_shard(const schema& s, dht::token token) { return dht::shard_of(s, token); } @@ -1726,9 +1720,9 @@ inline uint64_t& storage_proxy_stats::split_stats::get_ep_stat(const locator::to } try { - sstring dc = get_dc(ep); + sstring dc = topo.get_datacenter(ep); if (_auto_register_metrics) { - register_metrics_for(ep); + register_metrics_for(dc, ep); } return _dc_stats[dc].val; } catch (...) { @@ -1747,10 +1741,9 @@ void storage_proxy_stats::split_stats::register_metrics_local() { }); } -void storage_proxy_stats::split_stats::register_metrics_for(gms::inet_address ep) { +void storage_proxy_stats::split_stats::register_metrics_for(sstring dc, gms::inet_address ep) { namespace sm = seastar::metrics; - sstring dc = get_dc(ep); // if this is the first time we see an endpoint from this DC - add a // corresponding collectd metric if (auto [ignored, added] = _dc_stats.try_emplace(dc); added) { diff --git a/service/storage_proxy_stats.hh b/service/storage_proxy_stats.hh index efe0e278ae..3d344e0347 100644 --- a/service/storage_proxy_stats.hh +++ b/service/storage_proxy_stats.hh @@ -53,7 +53,7 @@ public: split_stats(const sstring& category, const sstring& short_description_prefix, const sstring& long_description_prefix, const sstring& op_type, bool auto_register_metrics = true); void register_metrics_local(); - void register_metrics_for(gms::inet_address ep); + void register_metrics_for(sstring dc, gms::inet_address ep); /** * Get a reference to the statistics counter corresponding to the given