Merge 'raft topology: move the topology coordinator to a dedicated file' from Piotr Dulikowski

The `topology_coordinator` is a large class (>1000 loc) which resides in
an even larger source file (storage_service.cc, ~7800 loc). This PR
moves the topology_coordinator class out of the storage_service.cc file
in order to improve modularity and recompilation times during
development.

As a first step, the `topology_mutation_builder` and
`topology_node_mutation_builder` classes are also moved from
storage_service.cc to their own, new header/source files as they are an
important abstraction used both by the topology coordinator code and
some other code in storage_service.cc that won't be moved.

Then, the `topology_coordinator` is moved out. The
`topology_coordinator` class is completely hidden in the new
topology_coordinator.cc file and can only be started and waited on to
finish via the new `run_topology_coordinator` function.

Fixes: scylladb/scylladb#16605

Closes scylladb/scylladb#16609

* github.com:scylladb/scylladb:
  service: move topology coordinator to a separate file
  storage_service: introduce run_topology_coordinator function
  service: move topology mutation builder out of storage_service
  storage_service: detemplate topology_node_mutation_builder::set
This commit is contained in:
Kamil Braun
2024-01-23 20:02:06 +01:00
7 changed files with 2754 additions and 2572 deletions

View File

@@ -1192,6 +1192,8 @@ scylla_core = (['message/messaging_service.cc',
'rust/wasmtime_bindings/src/lib.rs',
'utils/to_string.cc',
'service/topology_state_machine.cc',
'service/topology_mutation.cc',
'service/topology_coordinator.cc',
'node_ops/node_ops_ctl.cc'
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core

View File

@@ -27,6 +27,8 @@ target_sources(service
storage_proxy.cc
storage_service.cc
tablet_allocator.cc
topology_coordinator.cc
topology_mutation.cc
topology_state_machine.cc)
target_include_directories(service
PUBLIC

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,72 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <chrono>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include "log.hh"
#include "raft/raft.hh"
#include "gms/inet_address.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/topology_state_machine.hh"
namespace db {
class system_keyspace;
class system_distributed_keyspace;
}
namespace gms {
class gossiper;
}
namespace netw {
class messaging_service;
}
namespace locator {
class shared_token_metadata;
}
namespace replica {
class database;
}
namespace raft {
class server;
}
namespace service {
template <typename Clock>
class raft_address_map_t;
using raft_address_map = raft_address_map_t<seastar::lowres_clock>;
class raft_group0;
class tablet_allocator;
extern logging::logger rtlogger;
future<gms::inet_address> wait_for_ip(raft::server_id id, const raft_address_map& am, seastar::abort_source& as);
using raft_topology_cmd_handler_type = noncopyable_function<future<raft_topology_cmd_result>(
raft::term_t, uint64_t, const raft_topology_cmd&)>;
future<> run_topology_coordinator(
seastar::sharded<db::system_distributed_keyspace>& sys_dist_ks, gms::gossiper& gossiper,
netw::messaging_service& messaging, locator::shared_token_metadata& shared_tm,
db::system_keyspace& sys_ks, replica::database& db, service::raft_group0& group0,
service::topology_state_machine& topo_sm, seastar::abort_source& as, raft::server& raft,
raft_topology_cmd_handler_type raft_topology_cmd_handler,
tablet_allocator& tablet_allocator,
std::chrono::milliseconds ring_delay,
endpoint_lifecycle_notifier& lifecycle_notifier);
}

View File

@@ -0,0 +1,282 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "db/system_keyspace.hh"
#include "topology_mutation.hh"
#include "types/tuple.hh"
#include "types/types.hh"
#include "types/set.hh"
namespace db {
extern thread_local data_type cdc_generation_ts_id_type;
}
namespace service {
topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts) :
_s(db::system_keyspace::topology()),
_m(_s, partition_key::from_singular(*_s, db::system_keyspace::TOPOLOGY)),
_ts(ts) {
}
topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation_builder& builder, raft::server_id id) :
_builder(builder),
_r(_builder._m.partition().clustered_row(*_builder._s, clustering_key::from_singular(*_builder._s, id.uuid()))) {
_r.apply(row_marker(_builder._ts));
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::apply_atomic(const char* cell, const data_value& value) {
const column_definition* cdef = self().schema().get_column_definition(cell);
assert(cdef);
self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value), self().ttl()));
return self();
}
template<typename Builder>
template<std::ranges::range C>
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
Builder& topology_mutation_builder_base<Builder>::apply_set(const char* cell, collection_apply_mode apply_mode, const C& c) {
const column_definition* cdef = self().schema().get_column_definition(cell);
assert(cdef);
auto vtype = static_pointer_cast<const set_type_impl>(cdef->type)->get_elements_type();
std::set<bytes, serialized_compare> cset(vtype->as_less_comparator());
for (const auto& v : c) {
cset.insert(vtype->decompose(data_value(v)));
}
collection_mutation_description cm;
cm.cells.reserve(cset.size());
for (const bytes& raw : cset) {
cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view(), self().ttl()));
}
if (apply_mode == collection_apply_mode::overwrite) {
cm.tomb = tombstone(self().timestamp() - 1, gc_clock::now());
}
self().row().apply(*cdef, cm.serialize(*cdef->type));
return self();
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::del(const char* cell) {
auto cdef = self().schema().get_column_definition(cell);
assert(cdef);
if (!cdef->type->is_multi_cell()) {
self().row().apply(*cdef, atomic_cell::make_dead(self().timestamp(), gc_clock::now()));
} else {
collection_mutation_description cm;
cm.tomb = tombstone{self().timestamp(), gc_clock::now()};
self().row().apply(*cdef, cm.serialize(*cdef->type));
}
return self();
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, node_state value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, topology_request value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const sstring& value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const raft::server_id& value) {
return apply_atomic(cell, value.uuid());
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const uint32_t& value) {
return apply_atomic(cell, int32_t(value));
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, cleanup_status value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const utils::UUID& value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, bool value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const char* value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const db_clock::time_point& value) {
return apply_atomic(cell, value);
}
row& topology_node_mutation_builder::row() {
return _r.cells();
}
api::timestamp_type topology_node_mutation_builder::timestamp() const {
return _builder._ts;
}
const schema& topology_node_mutation_builder::schema() const {
return *_builder._s;
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids) {
return apply_set(cell, collection_apply_mode::overwrite, nodes_ids | boost::adaptors::transformed([] (const auto& node_id) { return node_id.id; }));
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<dht::token>& tokens) {
return apply_set(cell, collection_apply_mode::overwrite, tokens | boost::adaptors::transformed([] (const auto& t) { return t.to_sstring(); }));
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::set<sstring>& features) {
return apply_set(cell, collection_apply_mode::overwrite, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); }));
}
canonical_mutation topology_node_mutation_builder::build() {
return canonical_mutation{std::move(_builder._m)};
}
row& topology_mutation_builder::row() {
return _m.partition().static_row().maybe_create();
}
api::timestamp_type topology_mutation_builder::timestamp() const {
return _ts;
}
const schema& topology_mutation_builder::schema() const {
return *_s;
}
topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) {
return apply_atomic("transition_state", ::format("{}", value));
}
topology_mutation_builder& topology_mutation_builder::set_version(topology::version_t value) {
_m.set_static_cell("version", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_fence_version(topology::version_t value) {
_m.set_static_cell("fence_version", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_session(session_id value) {
_m.set_static_cell("session", value.uuid(), _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_tablet_balancing_enabled(bool value) {
_m.set_static_cell("tablet_balancing_enabled", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::del_transition_state() {
return del("transition_state");
}
topology_mutation_builder& topology_mutation_builder::del_session() {
return del("session");
}
topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id(
const cdc::generation_id_v2& value) {
apply_atomic("current_cdc_generation_timestamp", value.ts);
apply_atomic("current_cdc_generation_uuid", value.id);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid(
const utils::UUID& value) {
return apply_atomic("new_cdc_generation_data_uuid", value);
}
topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values) {
auto dv = values | boost::adaptors::transformed([&] (const auto& v) {
return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}}));
});
return apply_set("unpublished_cdc_generations", collection_apply_mode::overwrite, std::move(dv));
}
topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) {
return apply_atomic("global_topology_request", ::format("{}", value));
}
topology_mutation_builder& topology_mutation_builder::add_enabled_features(const std::set<sstring>& features) {
return apply_set("enabled_features", collection_apply_mode::update, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); }));
}
topology_mutation_builder& topology_mutation_builder::add_unpublished_cdc_generation(const cdc::generation_id_v2& value) {
auto dv = make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({value.ts, timeuuid_native_type{value.id}}));
return apply_set("unpublished_cdc_generations", collection_apply_mode::update, std::vector<data_value>{std::move(dv)});
}
topology_mutation_builder& topology_mutation_builder::del_global_topology_request() {
return del("global_topology_request");
}
topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) {
_node_builder.emplace(*this, n);
return *_node_builder;
}
topology_request_tracking_mutation_builder::topology_request_tracking_mutation_builder(utils::UUID id) :
_s(db::system_keyspace::topology_requests()),
_m(_s, partition_key::from_singular(*_s, id)),
_ts(utils::UUID_gen::micros_timestamp(id)),
_r(_m.partition().clustered_row(*_s, clustering_key::make_empty())) {
_r.apply(row_marker(_ts, *ttl(), gc_clock::now() + *ttl()));
}
ttl_opt topology_request_tracking_mutation_builder::ttl() const {
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::microseconds(_ts)) + std::chrono::months(1)
- std::chrono::duration_cast<std::chrono::seconds>(gc_clock::now().time_since_epoch());
}
const schema& topology_request_tracking_mutation_builder::schema() const {
return *_s;
}
row& topology_request_tracking_mutation_builder::row() {
return _r.cells();
}
api::timestamp_type topology_request_tracking_mutation_builder::timestamp() const {
return _ts;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional<sstring> error) {
set("end_time", db_clock::now());
if (error) {
set("error", *error);
}
return set("done", true);
}
template class topology_mutation_builder_base<topology_mutation_builder>;
template class topology_mutation_builder_base<topology_node_mutation_builder>;
template class topology_mutation_builder_base<topology_request_tracking_mutation_builder>;
} // namespace service

View File

@@ -0,0 +1,153 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <concepts>
#include <cstdint>
#include <set>
#include <unordered_set>
#include <seastar/core/sstring.hh>
#include "dht/token.hh"
#include "mutation/canonical_mutation.hh"
#include "mutation/mutation.hh"
#include "raft/server.hh"
#include "schema/schema.hh"
#include "service/topology_state_machine.hh"
#include "timestamp.hh"
#include "utils/UUID.hh"
namespace service {
template<typename Builder>
class topology_mutation_builder_base {
private:
Builder& self() {
return *static_cast<Builder*>(this);
}
protected:
enum class collection_apply_mode {
overwrite,
update,
};
using builder_base = topology_mutation_builder_base<Builder>;
Builder& apply_atomic(const char* cell, const data_value& value);
template<std::ranges::range C>
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c);
Builder& set(const char* cell, node_state value);
Builder& set(const char* cell, topology_request value);
Builder& set(const char* cell, const sstring& value);
Builder& set(const char* cell, const raft::server_id& value);
Builder& set(const char* cell, const uint32_t& value);
Builder& set(const char* cell, cleanup_status value);
Builder& set(const char* cell, const utils::UUID& value);
Builder& set(const char* cell, bool value);
Builder& set(const char* cell, const char* value);
Builder& set(const char* cell, const db_clock::time_point& value);
Builder& del(const char* cell);
};
class topology_mutation_builder;
class topology_node_mutation_builder
: public topology_mutation_builder_base<topology_node_mutation_builder> {
friend builder_base;
topology_mutation_builder& _builder;
deletable_row& _r;
private:
row& row();
api::timestamp_type timestamp() const;
const schema& schema() const;
ttl_opt ttl() const { return std::nullopt; }
public:
topology_node_mutation_builder(topology_mutation_builder&, raft::server_id);
using builder_base::set;
using builder_base::del;
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids);
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
topology_node_mutation_builder& set(const char* cell, const std::set<sstring>& value);
canonical_mutation build();
};
class topology_mutation_builder
: public topology_mutation_builder_base<topology_mutation_builder> {
friend builder_base;
friend class topology_node_mutation_builder;
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
std::optional<topology_node_mutation_builder> _node_builder;
private:
row& row();
api::timestamp_type timestamp() const;
const schema& schema() const;
ttl_opt ttl() const { return std::nullopt; }
public:
topology_mutation_builder(api::timestamp_type ts);
topology_mutation_builder& set_transition_state(topology::transition_state);
topology_mutation_builder& set_version(topology::version_t);
topology_mutation_builder& set_fence_version(topology::version_t);
topology_mutation_builder& set_session(session_id);
topology_mutation_builder& set_tablet_balancing_enabled(bool);
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value);
topology_mutation_builder& set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values);
topology_mutation_builder& set_global_topology_request(global_topology_request);
topology_mutation_builder& add_enabled_features(const std::set<sstring>& value);
topology_mutation_builder& add_unpublished_cdc_generation(const cdc::generation_id_v2& value);
topology_mutation_builder& del_transition_state();
topology_mutation_builder& del_session();
topology_mutation_builder& del_global_topology_request();
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
class topology_request_tracking_mutation_builder :
public topology_mutation_builder_base<topology_request_tracking_mutation_builder> {
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
deletable_row& _r;
public:
row& row();
const schema& schema() const;
api::timestamp_type timestamp() const;
ttl_opt ttl() const;
topology_request_tracking_mutation_builder(utils::UUID id);
using builder_base::set;
using builder_base::del;
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
extern template class topology_mutation_builder_base<topology_mutation_builder>;
extern template class topology_mutation_builder_base<topology_node_mutation_builder>;
extern template class topology_mutation_builder_base<topology_request_tracking_mutation_builder>;
} // namespace service