From f6b019c22994ce53a13fa0170878e67d6d09b647 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 18 May 2023 18:57:11 +0400 Subject: [PATCH] raft topology: add fence_version It's stored outside of topology table, since it's updated not through RAFT, but with a new 'fence' raft command. The current value is cached in shared_token_metadata. An initial fence version is loaded in main during storage_service initialisation. --- db/system_keyspace.cc | 9 +++++++++ db/system_keyspace.hh | 2 ++ idl/storage_service.idl.hh | 3 ++- locator/token_metadata.cc | 13 +++++++++++++ locator/token_metadata.hh | 6 ++++++ main.cc | 5 +++++ service/storage_service.cc | 26 +++++++++++++++++++++++++- service/storage_service.hh | 4 ++++ service/topology_state_machine.cc | 3 +++ service/topology_state_machine.hh | 3 ++- 10 files changed, 71 insertions(+), 3 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index d41b0ac4e4..37ac8f11be 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3659,6 +3659,15 @@ future system_keyspace::load_topology_state() { co_return ret; } +future system_keyspace::get_topology_fence_version() { + auto opt = co_await get_scylla_local_param_as("topology_fence_version"); + co_return opt.value_or(0); +} + +future<> system_keyspace::update_topology_fence_version(int64_t value) { + return set_scylla_local_param_as("topology_fence_version", value); +} + future system_keyspace::read_cdc_generation(utils::UUID id) { std::vector entries; diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 0bf7e16724..7a349e968c 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -454,6 +454,8 @@ public: static future group0_history_contains(utils::UUID state_id); static future load_topology_state(); + future get_topology_fence_version(); + future<> update_topology_fence_version(int64_t value); // Read CDC generation data with the given UUID as key. // Precondition: the data is known to be present in the table (because it was committed earlier through group 0). diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index 0073caea18..9b45adbe1f 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -12,7 +12,8 @@ namespace service { barrier, barrier_and_drain, stream_ranges, - fence_old_reads + fence_old_reads, + fence }; service::raft_topology_cmd::command cmd; }; diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 391616b8ed..e2f00ec0dd 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -1190,6 +1190,19 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { _shared->set_version_tracker(_versions_barrier.start()); } +void shared_token_metadata::update_fence_version(token_metadata::version_t version) { + if (const auto current_version = _shared->get_version(); version > current_version) { + on_internal_error(tlogger, + format("shared_token_metadata: invalid new fence version, can't be greater than the current version, " + "current version {}, new fence version {}", current_version, version)); + } + if (version < _fence_version) { + on_internal_error(tlogger, + format("shared_token_metadata: must not set decreasing fence version: {} -> {}", _fence_version, version)); + } + _fence_version = version; +} + future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function (token_metadata&)> func) { auto lk = co_await get_lock(); auto tm = co_await _shared->clone_async(); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 2ff8a94a87..54794b9bee 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -332,6 +332,7 @@ class shared_token_metadata { // includes its own invocation as an operation in the new phase. utils::phased_barrier _versions_barrier; shared_future<> _stale_versions_in_use{make_ready_future<>()}; + token_metadata::version_t _fence_version = 0; public: // used to construct the shared object as a sharded<> instance @@ -356,6 +357,11 @@ public: return _stale_versions_in_use.get_future(); } + void update_fence_version(token_metadata::version_t version); + token_metadata::version_t get_fence_version() const noexcept { + return _fence_version; + } + // Token metadata changes are serialized // using the schema_tables merge_lock. // diff --git a/main.cc b/main.cc index 1864ed1031..17570f8f28 100644 --- a/main.cc +++ b/main.cc @@ -1542,6 +1542,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl auto stop_ss_msg = defer_verbose_shutdown("storage service messaging", [&ss] { ss.local().uninit_messaging_service_part().get(); }); + + // It's essential to load fencing_version prior to starting the messaging service, + // since incoming messages may require fencing. + ss.local().update_fence_version(sys_ks.local().get_topology_fence_version().get()).get(); + api::set_server_messaging_service(ctx, messaging).get(); auto stop_messaging_api = defer_verbose_shutdown("messaging service API", [&ctx] { api::unset_server_messaging_service(ctx).get(); diff --git a/service/storage_service.cc b/service/storage_service.cc index dcd5c82c78..44edf50587 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4908,13 +4908,19 @@ future storage_service::raft_topology_cmd_handler(shar // Return an error since the command is from outdated leader co_return result; } + + // We capture the topology version right after the checks + // above, before any yields. This is crucial since _topology_state_machine._topology + // might be altered concurrently while this method is running, + // which can cause the fence command to apply an invalid fence version. + const auto version = _topology_state_machine._topology.version; + switch (cmd.cmd) { case raft_topology_cmd::command::barrier: // we already did read barrier above result.status = raft_topology_cmd_result::command_status::success; break; case raft_topology_cmd::command::barrier_and_drain: { - const auto version = _topology_state_machine._topology.version; co_await container().invoke_on_all([version] (storage_service& ss) -> future<> { const auto current_version = ss._shared_token_metadata.get()->get_version(); slogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}", @@ -5078,6 +5084,17 @@ future storage_service::raft_topology_cmd_handler(shar //co_await sleep_abortable(_db.local().get_config().read_request_timeout_in_ms() * std::chrono::milliseconds(1), _abort_source); result.status = raft_topology_cmd_result::command_status::success; break; + case raft_topology_cmd::command::fence: { + // We can have several concurrent fence commands in case topology change + // coordinator migrated to another node. The update_fence_version function + // checks that the version doesn't decrease, we do the check and persist + // the new version under the same lock to avoid raises. + auto holder = co_await get_units(_raft_topology_cmd_handler_state._operation_mutex, 1); + co_await update_fence_version(version); + co_await _sys_ks.local().update_topology_fence_version(version); + result.status = raft_topology_cmd_result::command_status::success; + break; + } } } catch (...) { slogger.error("raft topology: raft_topology_cmd failed with: {}", std::current_exception()); @@ -5085,6 +5102,13 @@ future storage_service::raft_topology_cmd_handler(shar co_return result; } +future<> storage_service::update_fence_version(token_metadata::version_t new_version) { + return container().invoke_on_all([new_version] (storage_service& ss) { + slogger.debug("update_fence_version, version {}", new_version); + ss._shared_token_metadata.update_fence_version(new_version); + }); +} + void storage_service::init_messaging_service(sharded& proxy, sharded& sys_dist_ks) { _messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); diff --git a/service/storage_service.hh b/service/storage_service.hh index 18eabdc542..69787cc5c6 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -744,6 +744,7 @@ private: public: future is_cleanup_allowed(sstring keyspace); bool is_repair_based_node_ops_enabled(streaming::stream_reason reason); + future<> update_fence_version(token_metadata::version_t version); private: std::unordered_set _normal_state_handled_on_boot; @@ -768,6 +769,9 @@ private: std::optional> _decomission_result; std::optional> _rebuild_result; std::unordered_map>> _remove_result; + struct { + semaphore _operation_mutex{1}; + } _raft_topology_cmd_handler_state; future raft_topology_cmd_handler(sharded& sys_dist_ks, raft::term_t term, const raft_topology_cmd& cmd); diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 1692975c55..1198de797a 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -146,6 +146,9 @@ std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd case raft_topology_cmd::command::fence_old_reads: os << "fence_old_reads"; break; + case raft_topology_cmd::command::fence: + os << "fence"; + break; } return os; } diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 6562b3e723..a8c96fe746 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -138,7 +138,8 @@ struct raft_topology_cmd { barrier_and_drain, // same + drain requests which use previous versions stream_ranges, // reqeust to stream data, return when streaming is // done - fence_old_reads // wait for all reads started before to complete + fence_old_reads, // wait for all reads started before to complete + fence // erect the fence against requests with stale versions }; command cmd; };