shared_token_metadata: mutate_token_metadata: replicate to all shards
storage_service::replicate_to_all_cores has a sophisticated way to mutate the token_metadata and effective_replication_map on shard 0 and cloning those to all other shards, applying the changes only mutate and clone succeeded on all shards so we don't end up with only some of the shards with the mutated copy if an error happend mid-way (and then we would need to roll-back the change for exception safety). shared_token_metadata::mutate_token_metadata is currently only called from a unit test that needs to mutate the token metadata only on shard 0, but a following patch will require doing that on all shards. This change adds this capbility by enforcing the call to be on shard 0m mutating the token_metdata into a temporary pending copy and cloning it on all other shards. Only then, when all shard succeeded, set the modified token_metadata on all shards. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include "seastar/core/smp.hh"
|
||||
#include "utils/stall_free.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
|
||||
@@ -302,6 +303,7 @@ public:
|
||||
|
||||
void invalidate_cached_rings() {
|
||||
_ring_version = ++_static_ring_version;
|
||||
tlogger.debug("ring_version={}", _ring_version);
|
||||
}
|
||||
|
||||
friend class token_metadata;
|
||||
@@ -1253,6 +1255,32 @@ future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_funct
|
||||
set(make_token_metadata_ptr(std::move(tm)));
|
||||
}
|
||||
|
||||
future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata&)> func) {
|
||||
auto base_shard = this_shard_id();
|
||||
assert(base_shard == 0);
|
||||
auto lk = co_await stm.local().get_lock();
|
||||
|
||||
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
|
||||
pending_token_metadata_ptr.reserve(smp::count);
|
||||
auto tmptr = make_token_metadata_ptr(co_await stm.local().get()->clone_async());
|
||||
auto& tm = *tmptr;
|
||||
// bump the token_metadata ring_version
|
||||
// to invalidate cached token/replication mappings
|
||||
// when the modified token_metadata is committed.
|
||||
tm.invalidate_cached_rings();
|
||||
co_await func(tm);
|
||||
|
||||
// Apply the mutated token_metadata only after successfully cloning it on all shards.
|
||||
pending_token_metadata_ptr[base_shard] = tmptr;
|
||||
co_await smp::invoke_on_others(base_shard, [&] () -> future<> {
|
||||
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tm.clone_async());
|
||||
});
|
||||
|
||||
co_await stm.invoke_on_all([&] (shared_token_metadata& stm) {
|
||||
stm.set(std::move(pending_token_metadata_ptr[this_shard_id()]));
|
||||
});
|
||||
}
|
||||
|
||||
host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) {
|
||||
switch (restrict) {
|
||||
case param_type::host_id:
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include "range.hh"
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
|
||||
#include "locator/types.hh"
|
||||
#include "locator/topology.hh"
|
||||
@@ -317,7 +318,7 @@ public:
|
||||
return _lock_func();
|
||||
}
|
||||
|
||||
// mutate_token_metadata acquires the shared_token_metadata lock,
|
||||
// mutate_token_metadata_on_all_shards acquires the shared_token_metadata lock,
|
||||
// clones the token_metadata (using clone_async)
|
||||
// and calls an asynchronous functor on
|
||||
// the cloned copy of the token_metadata to mutate it.
|
||||
@@ -326,6 +327,18 @@ public:
|
||||
// is set back to to the shared_token_metadata,
|
||||
// otherwise, the clone is destroyed.
|
||||
future<> mutate_token_metadata(seastar::noncopyable_function<future<> (token_metadata&)> func);
|
||||
|
||||
// mutate_token_metadata_on_all_shards acquires the shared_token_metadata lock,
|
||||
// clones the token_metadata (using clone_async)
|
||||
// and calls an asynchronous functor on
|
||||
// the cloned copy of the token_metadata to mutate it.
|
||||
//
|
||||
// If the functor is successful, the mutated clone
|
||||
// is set back to to the shared_token_metadata on all shards,
|
||||
// otherwise, the clone is destroyed.
|
||||
//
|
||||
// Must be called on shard 0.
|
||||
static future<> mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata&)> func);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user