raft topology: add fence_version

It's stored outside of topology table,
since it's updated not through RAFT, but
with a new 'fence' raft command.
The current value is cached in shared_token_metadata.
An initial fence version is loaded in main
during storage_service initialisation.
This commit is contained in:
Petr Gusev
2023-05-18 18:57:11 +04:00
parent 4f99302c2b
commit f6b019c229
10 changed files with 71 additions and 3 deletions

View File

@@ -3659,6 +3659,15 @@ future<service::topology> system_keyspace::load_topology_state() {
co_return ret;
}
future<int64_t> system_keyspace::get_topology_fence_version() {
auto opt = co_await get_scylla_local_param_as<int64_t>("topology_fence_version");
co_return opt.value_or<int64_t>(0);
}
future<> system_keyspace::update_topology_fence_version(int64_t value) {
return set_scylla_local_param_as<int64_t>("topology_fence_version", value);
}
future<cdc::topology_description>
system_keyspace::read_cdc_generation(utils::UUID id) {
std::vector<cdc::token_range_description> entries;

View File

@@ -454,6 +454,8 @@ public:
static future<bool> group0_history_contains(utils::UUID state_id);
static future<service::topology> load_topology_state();
future<int64_t> get_topology_fence_version();
future<> update_topology_fence_version(int64_t value);
// Read CDC generation data with the given UUID as key.
// Precondition: the data is known to be present in the table (because it was committed earlier through group 0).

View File

@@ -12,7 +12,8 @@ namespace service {
barrier,
barrier_and_drain,
stream_ranges,
fence_old_reads
fence_old_reads,
fence
};
service::raft_topology_cmd::command cmd;
};

View File

@@ -1190,6 +1190,19 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
_shared->set_version_tracker(_versions_barrier.start());
}
void shared_token_metadata::update_fence_version(token_metadata::version_t version) {
if (const auto current_version = _shared->get_version(); version > current_version) {
on_internal_error(tlogger,
format("shared_token_metadata: invalid new fence version, can't be greater than the current version, "
"current version {}, new fence version {}", current_version, version));
}
if (version < _fence_version) {
on_internal_error(tlogger,
format("shared_token_metadata: must not set decreasing fence version: {} -> {}", _fence_version, version));
}
_fence_version = version;
}
future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function<future<> (token_metadata&)> func) {
auto lk = co_await get_lock();
auto tm = co_await _shared->clone_async();

View File

@@ -332,6 +332,7 @@ class shared_token_metadata {
// includes its own invocation as an operation in the new phase.
utils::phased_barrier _versions_barrier;
shared_future<> _stale_versions_in_use{make_ready_future<>()};
token_metadata::version_t _fence_version = 0;
public:
// used to construct the shared object as a sharded<> instance
@@ -356,6 +357,11 @@ public:
return _stale_versions_in_use.get_future();
}
void update_fence_version(token_metadata::version_t version);
token_metadata::version_t get_fence_version() const noexcept {
return _fence_version;
}
// Token metadata changes are serialized
// using the schema_tables merge_lock.
//

View File

@@ -1542,6 +1542,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto stop_ss_msg = defer_verbose_shutdown("storage service messaging", [&ss] {
ss.local().uninit_messaging_service_part().get();
});
// It's essential to load fencing_version prior to starting the messaging service,
// since incoming messages may require fencing.
ss.local().update_fence_version(sys_ks.local().get_topology_fence_version().get()).get();
api::set_server_messaging_service(ctx, messaging).get();
auto stop_messaging_api = defer_verbose_shutdown("messaging service API", [&ctx] {
api::unset_server_messaging_service(ctx).get();

View File

@@ -4908,13 +4908,19 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
// Return an error since the command is from outdated leader
co_return result;
}
// We capture the topology version right after the checks
// above, before any yields. This is crucial since _topology_state_machine._topology
// might be altered concurrently while this method is running,
// which can cause the fence command to apply an invalid fence version.
const auto version = _topology_state_machine._topology.version;
switch (cmd.cmd) {
case raft_topology_cmd::command::barrier:
// we already did read barrier above
result.status = raft_topology_cmd_result::command_status::success;
break;
case raft_topology_cmd::command::barrier_and_drain: {
const auto version = _topology_state_machine._topology.version;
co_await container().invoke_on_all([version] (storage_service& ss) -> future<> {
const auto current_version = ss._shared_token_metadata.get()->get_version();
slogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}",
@@ -5078,6 +5084,17 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
//co_await sleep_abortable(_db.local().get_config().read_request_timeout_in_ms() * std::chrono::milliseconds(1), _abort_source);
result.status = raft_topology_cmd_result::command_status::success;
break;
case raft_topology_cmd::command::fence: {
// We can have several concurrent fence commands in case topology change
// coordinator migrated to another node. The update_fence_version function
// checks that the version doesn't decrease, we do the check and persist
// the new version under the same lock to avoid raises.
auto holder = co_await get_units(_raft_topology_cmd_handler_state._operation_mutex, 1);
co_await update_fence_version(version);
co_await _sys_ks.local().update_topology_fence_version(version);
result.status = raft_topology_cmd_result::command_status::success;
break;
}
}
} catch (...) {
slogger.error("raft topology: raft_topology_cmd failed with: {}", std::current_exception());
@@ -5085,6 +5102,13 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
co_return result;
}
future<> storage_service::update_fence_version(token_metadata::version_t new_version) {
return container().invoke_on_all([new_version] (storage_service& ss) {
slogger.debug("update_fence_version, version {}", new_version);
ss._shared_token_metadata.update_fence_version(new_version);
});
}
void storage_service::init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");

View File

@@ -744,6 +744,7 @@ private:
public:
future<bool> is_cleanup_allowed(sstring keyspace);
bool is_repair_based_node_ops_enabled(streaming::stream_reason reason);
future<> update_fence_version(token_metadata::version_t version);
private:
std::unordered_set<gms::inet_address> _normal_state_handled_on_boot;
@@ -768,6 +769,9 @@ private:
std::optional<shared_future<>> _decomission_result;
std::optional<shared_future<>> _rebuild_result;
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
struct {
semaphore _operation_mutex{1};
} _raft_topology_cmd_handler_state;
future<raft_topology_cmd_result> raft_topology_cmd_handler(sharded<db::system_distributed_keyspace>& sys_dist_ks, raft::term_t term, const raft_topology_cmd& cmd);

View File

@@ -146,6 +146,9 @@ std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd
case raft_topology_cmd::command::fence_old_reads:
os << "fence_old_reads";
break;
case raft_topology_cmd::command::fence:
os << "fence";
break;
}
return os;
}

View File

@@ -138,7 +138,8 @@ struct raft_topology_cmd {
barrier_and_drain, // same + drain requests which use previous versions
stream_ranges, // reqeust to stream data, return when streaming is
// done
fence_old_reads // wait for all reads started before to complete
fence_old_reads, // wait for all reads started before to complete
fence // erect the fence against requests with stale versions
};
command cmd;
};