From a8f11852dae4cb14ab9753dde3bdcb136ce6f923 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 27 Nov 2023 13:09:40 +0200 Subject: [PATCH] vnode_effective_replication_map: pre calculate dirty endpoints during topology change Some topology change operations causes some nodes loose ranges. This information is needed to know which nodes need to do cleanup after topology operation completes. Pre calculate it during erm creation. --- locator/abstract_replication_strategy.cc | 18 ++++++++++++++++-- locator/abstract_replication_strategy.hh | 16 +++++++++++++--- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index c2a90914e6..82f96014c4 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -375,6 +375,8 @@ future calculate_effective_replicat replication_map replication_map; ring_mapping pending_endpoints; ring_mapping read_endpoints; + std::unordered_set dirty_endpoints; + const auto depend_on_token = rs->natural_endpoints_depend_on_token(); const auto& sorted_tokens = tmptr->sorted_tokens(); replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1); @@ -421,6 +423,14 @@ future calculate_effective_replicat } } + // If an endpoint is in target endpoints, but not in current endpoints it means + // it loses a range and becomes dirty + for (auto& h : current_endpoints) { + if (!target_endpoints.contains(h)) { + dirty_endpoints.emplace(h); + } + } + // in order not to waste memory, we update read_endpoints only if the // new endpoints differs from the old one if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) { @@ -446,7 +456,7 @@ future calculate_effective_replicat auto rf = rs->get_replication_factor(*tmptr); co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), - std::move(pending_endpoints), std::move(read_endpoints), rf); + std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), rf); } auto vnode_effective_replication_map::clone_data_gently() const -> future> { @@ -466,6 +476,10 @@ auto vnode_effective_replication_map::clone_data_gently() const -> futureread_endpoints += i; co_await coroutine::maybe_yield(); } + + // no need to yield while copying since this is bound by nodes, not vnodes + result->dirty_endpoints = _dirty_endpoints; + co_return std::move(result); } @@ -541,7 +555,7 @@ future effective_replication_map_factory::c auto rf = ref_erm->get_replication_factor(); auto local_data = co_await ref_erm->clone_data_gently(); new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_data->replication_map), - std::move(local_data->pending_endpoints), std::move(local_data->read_endpoints), rf); + std::move(local_data->pending_endpoints), std::move(local_data->read_endpoints), std::move(local_data->dirty_endpoints), rf); } else { new_erm = co_await calculate_effective_replication_map(std::move(rs), std::move(tmptr)); } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index c5aa95fdb5..97d5eb2958 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -292,6 +292,7 @@ private: replication_map _replication_map; ring_mapping _pending_endpoints; ring_mapping _read_endpoints; + std::unordered_set _dirty_endpoints; std::optional _factory_key = std::nullopt; effective_replication_map_factory* _factory = nullptr; @@ -308,11 +309,12 @@ public: // effective_replication_map const dht::sharder& get_sharder(const schema& s) const override; public: explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, - ring_mapping pending_endpoints, ring_mapping read_endpoints, size_t replication_factor) noexcept + ring_mapping pending_endpoints, ring_mapping read_endpoints, std::unordered_set dirty_endpoints, size_t replication_factor) noexcept : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) , _replication_map(std::move(replication_map)) , _pending_endpoints(std::move(pending_endpoints)) , _read_endpoints(std::move(read_endpoints)) + , _dirty_endpoints(std::move(dirty_endpoints)) { } vnode_effective_replication_map() = delete; vnode_effective_replication_map(vnode_effective_replication_map&&) = default; @@ -322,6 +324,7 @@ public: replication_map replication_map; ring_mapping pending_endpoints; ring_mapping read_endpoints; + std::unordered_set dirty_endpoints; }; // boost::icl::interval_map is not no_throw_move_constructible -> can't return cloned_data by val, // since future_state requires T to be no_throw_move_constructible. @@ -355,6 +358,13 @@ public: future> get_range_addresses() const; + // Returns a set of dirty endpoint. An endpoint is dirty if it may have a data + // for a range it does not own any longer. Will be empty if there is no topology + // change. During topology can be empty as well (for instance for everywhere strategy) + const std::unordered_set& get_dirty_endpoints() const { + return _dirty_endpoints; + } + private: dht::token_range_vector do_get_ranges(noncopyable_function consider_range_for_endpoint) const; inet_address_vector_replica_set do_get_natural_endpoints(const token& tok, bool is_vnode) const; @@ -387,10 +397,10 @@ using vnode_erm_ptr = vnode_effective_replication_map_ptr; using mutable_vnode_erm_ptr = mutable_vnode_effective_replication_map_ptr; inline mutable_vnode_erm_ptr make_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, ring_mapping pending_endpoints, - ring_mapping read_endpoints, size_t replication_factor) { + ring_mapping read_endpoints, std::unordered_set dirty_endpoints, size_t replication_factor) { return seastar::make_shared( std::move(rs), std::move(tmptr), std::move(replication_map), - std::move(pending_endpoints), std::move(read_endpoints), replication_factor); + std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), replication_factor); } // Apply the replication strategy over the current configuration and the given token_metadata.