From 79c3ed7fdb44ec698d68e6db49360fbb49a25e83 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Mon, 11 Dec 2023 17:27:00 +0100 Subject: [PATCH] service: move topology mutation builder out of storage_service The topology_mutation_builder, topology_node_mutation_builder and topology_request_tracking_mutation_builder are currently used by storage service - mainly, but not exclusively, by the topology coordinator logic. As we are going to extract the topology coordinator to a separate file, we need to move the builders to their own file as well so that they will be accessible both by the topology coordinator and the storage service. --- configure.py | 1 + service/CMakeLists.txt | 1 + service/storage_service.cc | 384 +---------------------------------- service/topology_mutation.cc | 282 +++++++++++++++++++++++++ service/topology_mutation.hh | 153 ++++++++++++++ 5 files changed, 438 insertions(+), 383 deletions(-) create mode 100644 service/topology_mutation.cc create mode 100644 service/topology_mutation.hh diff --git a/configure.py b/configure.py index 79aefe6ac2..e2325e1093 100755 --- a/configure.py +++ b/configure.py @@ -1192,6 +1192,7 @@ scylla_core = (['message/messaging_service.cc', 'rust/wasmtime_bindings/src/lib.rs', 'utils/to_string.cc', 'service/topology_state_machine.cc', + 'service/topology_mutation.cc', 'node_ops/node_ops_ctl.cc' ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \ + scylla_raft_core diff --git a/service/CMakeLists.txt b/service/CMakeLists.txt index 5e4a3efbc6..8ef80fa101 100644 --- a/service/CMakeLists.txt +++ b/service/CMakeLists.txt @@ -27,6 +27,7 @@ target_sources(service storage_proxy.cc storage_service.cc tablet_allocator.cc + topology_mutation.cc topology_state_machine.cc) target_include_directories(service PUBLIC diff --git a/service/storage_service.cc b/service/storage_service.cc index b8ca4b6b64..8026407df9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -82,8 +82,8 @@ #include "service/raft/join_node.hh" #include "idl/join_node.dist.hh" #include "protocol_server.hh" -#include "types/set.hh" #include "node_ops/node_ops_ctl.hh" +#include "service/topology_mutation.hh" #include #include @@ -95,10 +95,6 @@ using inet_address = gms::inet_address; extern logging::logger cdc_log; -namespace db { - extern thread_local data_type cdc_generation_ts_id_type; -} - namespace service { static logging::logger slogger("storage_service"); @@ -753,384 +749,6 @@ public: // }}} raft_ip_address_updater -template -class topology_mutation_builder_base { -private: - Builder& self() { - return *static_cast(this); - } - -protected: - enum class collection_apply_mode { - overwrite, - update, - }; - - using builder_base = topology_mutation_builder_base; - - Builder& apply_atomic(const char* cell, const data_value& value); - template - requires std::convertible_to, data_value> - Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c); - Builder& set(const char* cell, node_state value); - Builder& set(const char* cell, topology_request value); - Builder& set(const char* cell, const sstring& value); - Builder& set(const char* cell, const raft::server_id& value); - Builder& set(const char* cell, const uint32_t& value); - Builder& set(const char* cell, cleanup_status value); - Builder& set(const char* cell, const utils::UUID& value); - Builder& set(const char* cell, bool value); - Builder& set(const char* cell, const char* value); - Builder& set(const char* cell, const db_clock::time_point& value); - - Builder& del(const char* cell); -}; - -class topology_mutation_builder; - -class topology_node_mutation_builder - : public topology_mutation_builder_base { - - friend builder_base; - - topology_mutation_builder& _builder; - deletable_row& _r; - -private: - row& row(); - api::timestamp_type timestamp() const; - const schema& schema() const; - ttl_opt ttl() const { return std::nullopt; } - -public: - topology_node_mutation_builder(topology_mutation_builder&, raft::server_id); - - using builder_base::set; - using builder_base::del; - topology_node_mutation_builder& set(const char* cell, const std::unordered_set& nodes_ids); - topology_node_mutation_builder& set(const char* cell, const std::unordered_set& value); - topology_node_mutation_builder& set(const char* cell, const std::set& value); - - canonical_mutation build(); -}; - -class topology_mutation_builder - : public topology_mutation_builder_base { - - friend builder_base; - friend class topology_node_mutation_builder; - - schema_ptr _s; - mutation _m; - api::timestamp_type _ts; - - std::optional _node_builder; - -private: - row& row(); - api::timestamp_type timestamp() const; - const schema& schema() const; - ttl_opt ttl() const { return std::nullopt; } - -public: - topology_mutation_builder(api::timestamp_type ts); - topology_mutation_builder& set_transition_state(topology::transition_state); - topology_mutation_builder& set_version(topology::version_t); - topology_mutation_builder& set_fence_version(topology::version_t); - topology_mutation_builder& set_session(session_id); - topology_mutation_builder& set_tablet_balancing_enabled(bool); - topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&); - topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value); - topology_mutation_builder& set_unpublished_cdc_generations(const std::vector& values); - topology_mutation_builder& set_global_topology_request(global_topology_request); - topology_mutation_builder& add_enabled_features(const std::set& value); - topology_mutation_builder& add_unpublished_cdc_generation(const cdc::generation_id_v2& value); - topology_mutation_builder& del_transition_state(); - topology_mutation_builder& del_session(); - topology_mutation_builder& del_global_topology_request(); - topology_node_mutation_builder& with_node(raft::server_id); - canonical_mutation build() { return canonical_mutation{std::move(_m)}; } -}; - -topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts) : - _s(db::system_keyspace::topology()), - _m(_s, partition_key::from_singular(*_s, db::system_keyspace::TOPOLOGY)), - _ts(ts) { -} - -topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation_builder& builder, raft::server_id id) : - _builder(builder), - _r(_builder._m.partition().clustered_row(*_builder._s, clustering_key::from_singular(*_builder._s, id.uuid()))) { - _r.apply(row_marker(_builder._ts)); -} - -template -Builder& topology_mutation_builder_base::apply_atomic(const char* cell, const data_value& value) { - const column_definition* cdef = self().schema().get_column_definition(cell); - assert(cdef); - self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value), self().ttl())); - return self(); -} - -template -template -requires std::convertible_to, data_value> -Builder& topology_mutation_builder_base::apply_set(const char* cell, collection_apply_mode apply_mode, const C& c) { - const column_definition* cdef = self().schema().get_column_definition(cell); - assert(cdef); - auto vtype = static_pointer_cast(cdef->type)->get_elements_type(); - - std::set cset(vtype->as_less_comparator()); - for (const auto& v : c) { - cset.insert(vtype->decompose(data_value(v))); - } - - collection_mutation_description cm; - cm.cells.reserve(cset.size()); - for (const bytes& raw : cset) { - cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view(), self().ttl())); - } - - if (apply_mode == collection_apply_mode::overwrite) { - cm.tomb = tombstone(self().timestamp() - 1, gc_clock::now()); - } - - self().row().apply(*cdef, cm.serialize(*cdef->type)); - return self(); -} - -template -Builder& topology_mutation_builder_base::del(const char* cell) { - auto cdef = self().schema().get_column_definition(cell); - assert(cdef); - if (!cdef->type->is_multi_cell()) { - self().row().apply(*cdef, atomic_cell::make_dead(self().timestamp(), gc_clock::now())); - } else { - collection_mutation_description cm; - cm.tomb = tombstone{self().timestamp(), gc_clock::now()}; - self().row().apply(*cdef, cm.serialize(*cdef->type)); - } - return self(); -} - - -template -Builder& topology_mutation_builder_base::set(const char* cell, node_state value) { - return apply_atomic(cell, sstring{::format("{}", value)}); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, topology_request value) { - return apply_atomic(cell, sstring{::format("{}", value)}); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, const sstring& value) { - return apply_atomic(cell, value); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, const raft::server_id& value) { - return apply_atomic(cell, value.uuid()); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, const uint32_t& value) { - return apply_atomic(cell, int32_t(value)); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, cleanup_status value) { - return apply_atomic(cell, sstring{::format("{}", value)}); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, const utils::UUID& value) { - return apply_atomic(cell, value); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, bool value) { - return apply_atomic(cell, value); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, const char* value) { - return apply_atomic(cell, value); -} - -template -Builder& topology_mutation_builder_base::set(const char* cell, const db_clock::time_point& value) { - return apply_atomic(cell, value); -} - -row& topology_node_mutation_builder::row() { - return _r.cells(); -} - -api::timestamp_type topology_node_mutation_builder::timestamp() const { - return _builder._ts; -} - -const schema& topology_node_mutation_builder::schema() const { - return *_builder._s; -} - -topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set& nodes_ids) { - return apply_set(cell, collection_apply_mode::overwrite, nodes_ids | boost::adaptors::transformed([] (const auto& node_id) { return node_id.id; })); -} - -topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set& tokens) { - return apply_set(cell, collection_apply_mode::overwrite, tokens | boost::adaptors::transformed([] (const auto& t) { return t.to_sstring(); })); -} - -topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::set& features) { - return apply_set(cell, collection_apply_mode::overwrite, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); })); -} - -canonical_mutation topology_node_mutation_builder::build() { - return canonical_mutation{std::move(_builder._m)}; -} - -row& topology_mutation_builder::row() { - return _m.partition().static_row().maybe_create(); -} - -api::timestamp_type topology_mutation_builder::timestamp() const { - return _ts; -} - -const schema& topology_mutation_builder::schema() const { - return *_s; -} - -topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) { - return apply_atomic("transition_state", ::format("{}", value)); -} - -topology_mutation_builder& topology_mutation_builder::set_version(topology::version_t value) { - _m.set_static_cell("version", value, _ts); - return *this; -} - -topology_mutation_builder& topology_mutation_builder::set_fence_version(topology::version_t value) { - _m.set_static_cell("fence_version", value, _ts); - return *this; -} - -topology_mutation_builder& topology_mutation_builder::set_session(session_id value) { - _m.set_static_cell("session", value.uuid(), _ts); - return *this; -} - -topology_mutation_builder& topology_mutation_builder::set_tablet_balancing_enabled(bool value) { - _m.set_static_cell("tablet_balancing_enabled", value, _ts); - return *this; -} - -topology_mutation_builder& topology_mutation_builder::del_transition_state() { - return del("transition_state"); -} - -topology_mutation_builder& topology_mutation_builder::del_session() { - return del("session"); -} - -topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id( - const cdc::generation_id_v2& value) { - apply_atomic("current_cdc_generation_timestamp", value.ts); - apply_atomic("current_cdc_generation_uuid", value.id); - return *this; -} - -topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid( - const utils::UUID& value) { - return apply_atomic("new_cdc_generation_data_uuid", value); -} - -topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector& values) { - auto dv = values | boost::adaptors::transformed([&] (const auto& v) { - return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}})); - }); - return apply_set("unpublished_cdc_generations", collection_apply_mode::overwrite, std::move(dv)); -} - -topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) { - return apply_atomic("global_topology_request", ::format("{}", value)); -} - -topology_mutation_builder& topology_mutation_builder::add_enabled_features(const std::set& features) { - return apply_set("enabled_features", collection_apply_mode::update, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); })); -} - -topology_mutation_builder& topology_mutation_builder::add_unpublished_cdc_generation(const cdc::generation_id_v2& value) { - auto dv = make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({value.ts, timeuuid_native_type{value.id}})); - return apply_set("unpublished_cdc_generations", collection_apply_mode::update, std::vector{std::move(dv)}); -} - -topology_mutation_builder& topology_mutation_builder::del_global_topology_request() { - return del("global_topology_request"); -} - -topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) { - _node_builder.emplace(*this, n); - return *_node_builder; -} - -class topology_request_tracking_mutation_builder : - public topology_mutation_builder_base { - schema_ptr _s; - mutation _m; - api::timestamp_type _ts; - deletable_row& _r; - -public: - - row& row(); - const schema& schema() const; - api::timestamp_type timestamp() const; - ttl_opt ttl() const; - - topology_request_tracking_mutation_builder(utils::UUID id); - using builder_base::set; - using builder_base::del; - topology_request_tracking_mutation_builder& done(std::optional error = std::nullopt); - canonical_mutation build() { return canonical_mutation{std::move(_m)}; } -}; - -topology_request_tracking_mutation_builder::topology_request_tracking_mutation_builder(utils::UUID id) : - _s(db::system_keyspace::topology_requests()), - _m(_s, partition_key::from_singular(*_s, id)), - _ts(utils::UUID_gen::micros_timestamp(id)), - _r(_m.partition().clustered_row(*_s, clustering_key::make_empty())) { - _r.apply(row_marker(_ts, *ttl(), gc_clock::now() + *ttl())); -} - -ttl_opt topology_request_tracking_mutation_builder::ttl() const { - return std::chrono::duration_cast(std::chrono::microseconds(_ts)) + std::chrono::months(1) - - std::chrono::duration_cast(gc_clock::now().time_since_epoch()); -} - -const schema& topology_request_tracking_mutation_builder::schema() const { - return *_s; -} - -row& topology_request_tracking_mutation_builder::row() { - return _r.cells(); -} - -api::timestamp_type topology_request_tracking_mutation_builder::timestamp() const { - return _ts; -} - -topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional error) { - set("end_time", db_clock::now()); - if (error) { - set("error", *error); - } - return set("done", true); -} - future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded& proxy) noexcept { while (!_group0_as.abort_requested()) { bool err = false; diff --git a/service/topology_mutation.cc b/service/topology_mutation.cc new file mode 100644 index 0000000000..e2653bec48 --- /dev/null +++ b/service/topology_mutation.cc @@ -0,0 +1,282 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "db/system_keyspace.hh" +#include "topology_mutation.hh" +#include "types/tuple.hh" +#include "types/types.hh" +#include "types/set.hh" + +namespace db { + extern thread_local data_type cdc_generation_ts_id_type; +} + +namespace service { + +topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts) : + _s(db::system_keyspace::topology()), + _m(_s, partition_key::from_singular(*_s, db::system_keyspace::TOPOLOGY)), + _ts(ts) { +} + +topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation_builder& builder, raft::server_id id) : + _builder(builder), + _r(_builder._m.partition().clustered_row(*_builder._s, clustering_key::from_singular(*_builder._s, id.uuid()))) { + _r.apply(row_marker(_builder._ts)); +} + +template +Builder& topology_mutation_builder_base::apply_atomic(const char* cell, const data_value& value) { + const column_definition* cdef = self().schema().get_column_definition(cell); + assert(cdef); + self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value), self().ttl())); + return self(); +} + +template +template +requires std::convertible_to, data_value> +Builder& topology_mutation_builder_base::apply_set(const char* cell, collection_apply_mode apply_mode, const C& c) { + const column_definition* cdef = self().schema().get_column_definition(cell); + assert(cdef); + auto vtype = static_pointer_cast(cdef->type)->get_elements_type(); + + std::set cset(vtype->as_less_comparator()); + for (const auto& v : c) { + cset.insert(vtype->decompose(data_value(v))); + } + + collection_mutation_description cm; + cm.cells.reserve(cset.size()); + for (const bytes& raw : cset) { + cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view(), self().ttl())); + } + + if (apply_mode == collection_apply_mode::overwrite) { + cm.tomb = tombstone(self().timestamp() - 1, gc_clock::now()); + } + + self().row().apply(*cdef, cm.serialize(*cdef->type)); + return self(); +} + +template +Builder& topology_mutation_builder_base::del(const char* cell) { + auto cdef = self().schema().get_column_definition(cell); + assert(cdef); + if (!cdef->type->is_multi_cell()) { + self().row().apply(*cdef, atomic_cell::make_dead(self().timestamp(), gc_clock::now())); + } else { + collection_mutation_description cm; + cm.tomb = tombstone{self().timestamp(), gc_clock::now()}; + self().row().apply(*cdef, cm.serialize(*cdef->type)); + } + return self(); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, node_state value) { + return apply_atomic(cell, sstring{::format("{}", value)}); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, topology_request value) { + return apply_atomic(cell, sstring{::format("{}", value)}); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, const sstring& value) { + return apply_atomic(cell, value); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, const raft::server_id& value) { + return apply_atomic(cell, value.uuid()); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, const uint32_t& value) { + return apply_atomic(cell, int32_t(value)); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, cleanup_status value) { + return apply_atomic(cell, sstring{::format("{}", value)}); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, const utils::UUID& value) { + return apply_atomic(cell, value); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, bool value) { + return apply_atomic(cell, value); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, const char* value) { + return apply_atomic(cell, value); +} + +template +Builder& topology_mutation_builder_base::set(const char* cell, const db_clock::time_point& value) { + return apply_atomic(cell, value); +} + +row& topology_node_mutation_builder::row() { + return _r.cells(); +} + +api::timestamp_type topology_node_mutation_builder::timestamp() const { + return _builder._ts; +} + +const schema& topology_node_mutation_builder::schema() const { + return *_builder._s; +} + +topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set& nodes_ids) { + return apply_set(cell, collection_apply_mode::overwrite, nodes_ids | boost::adaptors::transformed([] (const auto& node_id) { return node_id.id; })); +} + +topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set& tokens) { + return apply_set(cell, collection_apply_mode::overwrite, tokens | boost::adaptors::transformed([] (const auto& t) { return t.to_sstring(); })); +} + +topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::set& features) { + return apply_set(cell, collection_apply_mode::overwrite, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); })); +} + +canonical_mutation topology_node_mutation_builder::build() { + return canonical_mutation{std::move(_builder._m)}; +} + +row& topology_mutation_builder::row() { + return _m.partition().static_row().maybe_create(); +} + +api::timestamp_type topology_mutation_builder::timestamp() const { + return _ts; +} + +const schema& topology_mutation_builder::schema() const { + return *_s; +} + +topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) { + return apply_atomic("transition_state", ::format("{}", value)); +} + +topology_mutation_builder& topology_mutation_builder::set_version(topology::version_t value) { + _m.set_static_cell("version", value, _ts); + return *this; +} + +topology_mutation_builder& topology_mutation_builder::set_fence_version(topology::version_t value) { + _m.set_static_cell("fence_version", value, _ts); + return *this; +} + +topology_mutation_builder& topology_mutation_builder::set_session(session_id value) { + _m.set_static_cell("session", value.uuid(), _ts); + return *this; +} + +topology_mutation_builder& topology_mutation_builder::set_tablet_balancing_enabled(bool value) { + _m.set_static_cell("tablet_balancing_enabled", value, _ts); + return *this; +} + +topology_mutation_builder& topology_mutation_builder::del_transition_state() { + return del("transition_state"); +} + +topology_mutation_builder& topology_mutation_builder::del_session() { + return del("session"); +} + +topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id( + const cdc::generation_id_v2& value) { + apply_atomic("current_cdc_generation_timestamp", value.ts); + apply_atomic("current_cdc_generation_uuid", value.id); + return *this; +} + +topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid( + const utils::UUID& value) { + return apply_atomic("new_cdc_generation_data_uuid", value); +} + +topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector& values) { + auto dv = values | boost::adaptors::transformed([&] (const auto& v) { + return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}})); + }); + return apply_set("unpublished_cdc_generations", collection_apply_mode::overwrite, std::move(dv)); +} + +topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) { + return apply_atomic("global_topology_request", ::format("{}", value)); +} + +topology_mutation_builder& topology_mutation_builder::add_enabled_features(const std::set& features) { + return apply_set("enabled_features", collection_apply_mode::update, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); })); +} + +topology_mutation_builder& topology_mutation_builder::add_unpublished_cdc_generation(const cdc::generation_id_v2& value) { + auto dv = make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({value.ts, timeuuid_native_type{value.id}})); + return apply_set("unpublished_cdc_generations", collection_apply_mode::update, std::vector{std::move(dv)}); +} + +topology_mutation_builder& topology_mutation_builder::del_global_topology_request() { + return del("global_topology_request"); +} + +topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) { + _node_builder.emplace(*this, n); + return *_node_builder; +} + +topology_request_tracking_mutation_builder::topology_request_tracking_mutation_builder(utils::UUID id) : + _s(db::system_keyspace::topology_requests()), + _m(_s, partition_key::from_singular(*_s, id)), + _ts(utils::UUID_gen::micros_timestamp(id)), + _r(_m.partition().clustered_row(*_s, clustering_key::make_empty())) { + _r.apply(row_marker(_ts, *ttl(), gc_clock::now() + *ttl())); +} + +ttl_opt topology_request_tracking_mutation_builder::ttl() const { + return std::chrono::duration_cast(std::chrono::microseconds(_ts)) + std::chrono::months(1) + - std::chrono::duration_cast(gc_clock::now().time_since_epoch()); +} + +const schema& topology_request_tracking_mutation_builder::schema() const { + return *_s; +} + +row& topology_request_tracking_mutation_builder::row() { + return _r.cells(); +} + +api::timestamp_type topology_request_tracking_mutation_builder::timestamp() const { + return _ts; +} + +topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional error) { + set("end_time", db_clock::now()); + if (error) { + set("error", *error); + } + return set("done", true); +} + +template class topology_mutation_builder_base; +template class topology_mutation_builder_base; +template class topology_mutation_builder_base; + +} // namespace service diff --git a/service/topology_mutation.hh b/service/topology_mutation.hh new file mode 100644 index 0000000000..4a5dcd2166 --- /dev/null +++ b/service/topology_mutation.hh @@ -0,0 +1,153 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include +#include +#include +#include + +#include + +#include "dht/token.hh" +#include "mutation/canonical_mutation.hh" +#include "mutation/mutation.hh" +#include "raft/server.hh" +#include "schema/schema.hh" +#include "service/topology_state_machine.hh" +#include "timestamp.hh" +#include "utils/UUID.hh" + +namespace service { + +template +class topology_mutation_builder_base { +private: + Builder& self() { + return *static_cast(this); + } + +protected: + enum class collection_apply_mode { + overwrite, + update, + }; + + using builder_base = topology_mutation_builder_base; + + Builder& apply_atomic(const char* cell, const data_value& value); + template + requires std::convertible_to, data_value> + Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c); + Builder& set(const char* cell, node_state value); + Builder& set(const char* cell, topology_request value); + Builder& set(const char* cell, const sstring& value); + Builder& set(const char* cell, const raft::server_id& value); + Builder& set(const char* cell, const uint32_t& value); + Builder& set(const char* cell, cleanup_status value); + Builder& set(const char* cell, const utils::UUID& value); + Builder& set(const char* cell, bool value); + Builder& set(const char* cell, const char* value); + Builder& set(const char* cell, const db_clock::time_point& value); + + Builder& del(const char* cell); +}; + +class topology_mutation_builder; + +class topology_node_mutation_builder + : public topology_mutation_builder_base { + + friend builder_base; + + topology_mutation_builder& _builder; + deletable_row& _r; + +private: + row& row(); + api::timestamp_type timestamp() const; + const schema& schema() const; + ttl_opt ttl() const { return std::nullopt; } + +public: + topology_node_mutation_builder(topology_mutation_builder&, raft::server_id); + + using builder_base::set; + using builder_base::del; + topology_node_mutation_builder& set(const char* cell, const std::unordered_set& nodes_ids); + topology_node_mutation_builder& set(const char* cell, const std::unordered_set& value); + topology_node_mutation_builder& set(const char* cell, const std::set& value); + + canonical_mutation build(); +}; + +class topology_mutation_builder + : public topology_mutation_builder_base { + + friend builder_base; + friend class topology_node_mutation_builder; + + schema_ptr _s; + mutation _m; + api::timestamp_type _ts; + + std::optional _node_builder; + +private: + row& row(); + api::timestamp_type timestamp() const; + const schema& schema() const; + ttl_opt ttl() const { return std::nullopt; } + +public: + topology_mutation_builder(api::timestamp_type ts); + topology_mutation_builder& set_transition_state(topology::transition_state); + topology_mutation_builder& set_version(topology::version_t); + topology_mutation_builder& set_fence_version(topology::version_t); + topology_mutation_builder& set_session(session_id); + topology_mutation_builder& set_tablet_balancing_enabled(bool); + topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&); + topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value); + topology_mutation_builder& set_unpublished_cdc_generations(const std::vector& values); + topology_mutation_builder& set_global_topology_request(global_topology_request); + topology_mutation_builder& add_enabled_features(const std::set& value); + topology_mutation_builder& add_unpublished_cdc_generation(const cdc::generation_id_v2& value); + topology_mutation_builder& del_transition_state(); + topology_mutation_builder& del_session(); + topology_mutation_builder& del_global_topology_request(); + topology_node_mutation_builder& with_node(raft::server_id); + canonical_mutation build() { return canonical_mutation{std::move(_m)}; } +}; + +class topology_request_tracking_mutation_builder : + public topology_mutation_builder_base { + + schema_ptr _s; + mutation _m; + api::timestamp_type _ts; + deletable_row& _r; + +public: + row& row(); + const schema& schema() const; + api::timestamp_type timestamp() const; + ttl_opt ttl() const; + + topology_request_tracking_mutation_builder(utils::UUID id); + using builder_base::set; + using builder_base::del; + topology_request_tracking_mutation_builder& done(std::optional error = std::nullopt); + canonical_mutation build() { return canonical_mutation{std::move(_m)}; } +}; + +extern template class topology_mutation_builder_base; +extern template class topology_mutation_builder_base; +extern template class topology_mutation_builder_base; + +} // namespace service