diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index bba99cf414..b972fec93e 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -19,6 +19,7 @@ #include #include #include +#include "seastar/core/smp.hh" #include "utils/stall_free.hh" #include "utils/fb_utilities.hh" @@ -302,6 +303,7 @@ public: void invalidate_cached_rings() { _ring_version = ++_static_ring_version; + tlogger.debug("ring_version={}", _ring_version); } friend class token_metadata; @@ -1253,6 +1255,32 @@ future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_funct set(make_token_metadata_ptr(std::move(tm))); } +future<> shared_token_metadata::mutate_on_all_shards(sharded& stm, seastar::noncopyable_function (token_metadata&)> func) { + auto base_shard = this_shard_id(); + assert(base_shard == 0); + auto lk = co_await stm.local().get_lock(); + + std::vector pending_token_metadata_ptr; + pending_token_metadata_ptr.reserve(smp::count); + auto tmptr = make_token_metadata_ptr(co_await stm.local().get()->clone_async()); + auto& tm = *tmptr; + // bump the token_metadata ring_version + // to invalidate cached token/replication mappings + // when the modified token_metadata is committed. + tm.invalidate_cached_rings(); + co_await func(tm); + + // Apply the mutated token_metadata only after successfully cloning it on all shards. + pending_token_metadata_ptr[base_shard] = tmptr; + co_await smp::invoke_on_others(base_shard, [&] () -> future<> { + pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tm.clone_async()); + }); + + co_await stm.invoke_on_all([&] (shared_token_metadata& stm) { + stm.set(std::move(pending_token_metadata_ptr[this_shard_id()])); + }); +} + host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) { switch (restrict) { case param_type::host_id: diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 453901fc50..bc3c37464c 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -23,6 +23,7 @@ #include "range.hh" #include #include +#include #include "locator/types.hh" #include "locator/topology.hh" @@ -317,7 +318,7 @@ public: return _lock_func(); } - // mutate_token_metadata acquires the shared_token_metadata lock, + // mutate_token_metadata_on_all_shards acquires the shared_token_metadata lock, // clones the token_metadata (using clone_async) // and calls an asynchronous functor on // the cloned copy of the token_metadata to mutate it. @@ -326,6 +327,18 @@ public: // is set back to to the shared_token_metadata, // otherwise, the clone is destroyed. future<> mutate_token_metadata(seastar::noncopyable_function (token_metadata&)> func); + + // mutate_token_metadata_on_all_shards acquires the shared_token_metadata lock, + // clones the token_metadata (using clone_async) + // and calls an asynchronous functor on + // the cloned copy of the token_metadata to mutate it. + // + // If the functor is successful, the mutated clone + // is set back to to the shared_token_metadata on all shards, + // otherwise, the clone is destroyed. + // + // Must be called on shard 0. + static future<> mutate_on_all_shards(sharded& stm, seastar::noncopyable_function (token_metadata&)> func); }; }