service: move topology mutation builder out of storage_service

The topology_mutation_builder, topology_node_mutation_builder and
topology_request_tracking_mutation_builder are currently used by
storage service - mainly, but not exclusively, by the topology
coordinator logic. As we are going to extract the topology coordinator
to a separate file, we need to move the builders to their own file as
well so that they will be accessible both by the topology coordinator
and the storage service.
This commit is contained in:
Piotr Dulikowski
2023-12-11 17:27:00 +01:00
parent 6f11651222
commit 79c3ed7fdb
5 changed files with 438 additions and 383 deletions

View File

@@ -1192,6 +1192,7 @@ scylla_core = (['message/messaging_service.cc',
'rust/wasmtime_bindings/src/lib.rs',
'utils/to_string.cc',
'service/topology_state_machine.cc',
'service/topology_mutation.cc',
'node_ops/node_ops_ctl.cc'
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core

View File

@@ -27,6 +27,7 @@ target_sources(service
storage_proxy.cc
storage_service.cc
tablet_allocator.cc
topology_mutation.cc
topology_state_machine.cc)
target_include_directories(service
PUBLIC

View File

@@ -82,8 +82,8 @@
#include "service/raft/join_node.hh"
#include "idl/join_node.dist.hh"
#include "protocol_server.hh"
#include "types/set.hh"
#include "node_ops/node_ops_ctl.hh"
#include "service/topology_mutation.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
@@ -95,10 +95,6 @@ using inet_address = gms::inet_address;
extern logging::logger cdc_log;
namespace db {
extern thread_local data_type cdc_generation_ts_id_type;
}
namespace service {
static logging::logger slogger("storage_service");
@@ -753,384 +749,6 @@ public:
// }}} raft_ip_address_updater
template<typename Builder>
class topology_mutation_builder_base {
private:
Builder& self() {
return *static_cast<Builder*>(this);
}
protected:
enum class collection_apply_mode {
overwrite,
update,
};
using builder_base = topology_mutation_builder_base<Builder>;
Builder& apply_atomic(const char* cell, const data_value& value);
template<std::ranges::range C>
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c);
Builder& set(const char* cell, node_state value);
Builder& set(const char* cell, topology_request value);
Builder& set(const char* cell, const sstring& value);
Builder& set(const char* cell, const raft::server_id& value);
Builder& set(const char* cell, const uint32_t& value);
Builder& set(const char* cell, cleanup_status value);
Builder& set(const char* cell, const utils::UUID& value);
Builder& set(const char* cell, bool value);
Builder& set(const char* cell, const char* value);
Builder& set(const char* cell, const db_clock::time_point& value);
Builder& del(const char* cell);
};
class topology_mutation_builder;
class topology_node_mutation_builder
: public topology_mutation_builder_base<topology_node_mutation_builder> {
friend builder_base;
topology_mutation_builder& _builder;
deletable_row& _r;
private:
row& row();
api::timestamp_type timestamp() const;
const schema& schema() const;
ttl_opt ttl() const { return std::nullopt; }
public:
topology_node_mutation_builder(topology_mutation_builder&, raft::server_id);
using builder_base::set;
using builder_base::del;
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids);
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
topology_node_mutation_builder& set(const char* cell, const std::set<sstring>& value);
canonical_mutation build();
};
class topology_mutation_builder
: public topology_mutation_builder_base<topology_mutation_builder> {
friend builder_base;
friend class topology_node_mutation_builder;
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
std::optional<topology_node_mutation_builder> _node_builder;
private:
row& row();
api::timestamp_type timestamp() const;
const schema& schema() const;
ttl_opt ttl() const { return std::nullopt; }
public:
topology_mutation_builder(api::timestamp_type ts);
topology_mutation_builder& set_transition_state(topology::transition_state);
topology_mutation_builder& set_version(topology::version_t);
topology_mutation_builder& set_fence_version(topology::version_t);
topology_mutation_builder& set_session(session_id);
topology_mutation_builder& set_tablet_balancing_enabled(bool);
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value);
topology_mutation_builder& set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values);
topology_mutation_builder& set_global_topology_request(global_topology_request);
topology_mutation_builder& add_enabled_features(const std::set<sstring>& value);
topology_mutation_builder& add_unpublished_cdc_generation(const cdc::generation_id_v2& value);
topology_mutation_builder& del_transition_state();
topology_mutation_builder& del_session();
topology_mutation_builder& del_global_topology_request();
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts) :
_s(db::system_keyspace::topology()),
_m(_s, partition_key::from_singular(*_s, db::system_keyspace::TOPOLOGY)),
_ts(ts) {
}
topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation_builder& builder, raft::server_id id) :
_builder(builder),
_r(_builder._m.partition().clustered_row(*_builder._s, clustering_key::from_singular(*_builder._s, id.uuid()))) {
_r.apply(row_marker(_builder._ts));
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::apply_atomic(const char* cell, const data_value& value) {
const column_definition* cdef = self().schema().get_column_definition(cell);
assert(cdef);
self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value), self().ttl()));
return self();
}
template<typename Builder>
template<std::ranges::range C>
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
Builder& topology_mutation_builder_base<Builder>::apply_set(const char* cell, collection_apply_mode apply_mode, const C& c) {
const column_definition* cdef = self().schema().get_column_definition(cell);
assert(cdef);
auto vtype = static_pointer_cast<const set_type_impl>(cdef->type)->get_elements_type();
std::set<bytes, serialized_compare> cset(vtype->as_less_comparator());
for (const auto& v : c) {
cset.insert(vtype->decompose(data_value(v)));
}
collection_mutation_description cm;
cm.cells.reserve(cset.size());
for (const bytes& raw : cset) {
cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view(), self().ttl()));
}
if (apply_mode == collection_apply_mode::overwrite) {
cm.tomb = tombstone(self().timestamp() - 1, gc_clock::now());
}
self().row().apply(*cdef, cm.serialize(*cdef->type));
return self();
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::del(const char* cell) {
auto cdef = self().schema().get_column_definition(cell);
assert(cdef);
if (!cdef->type->is_multi_cell()) {
self().row().apply(*cdef, atomic_cell::make_dead(self().timestamp(), gc_clock::now()));
} else {
collection_mutation_description cm;
cm.tomb = tombstone{self().timestamp(), gc_clock::now()};
self().row().apply(*cdef, cm.serialize(*cdef->type));
}
return self();
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, node_state value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, topology_request value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const sstring& value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const raft::server_id& value) {
return apply_atomic(cell, value.uuid());
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const uint32_t& value) {
return apply_atomic(cell, int32_t(value));
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, cleanup_status value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const utils::UUID& value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, bool value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const char* value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const db_clock::time_point& value) {
return apply_atomic(cell, value);
}
row& topology_node_mutation_builder::row() {
return _r.cells();
}
api::timestamp_type topology_node_mutation_builder::timestamp() const {
return _builder._ts;
}
const schema& topology_node_mutation_builder::schema() const {
return *_builder._s;
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids) {
return apply_set(cell, collection_apply_mode::overwrite, nodes_ids | boost::adaptors::transformed([] (const auto& node_id) { return node_id.id; }));
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<dht::token>& tokens) {
return apply_set(cell, collection_apply_mode::overwrite, tokens | boost::adaptors::transformed([] (const auto& t) { return t.to_sstring(); }));
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::set<sstring>& features) {
return apply_set(cell, collection_apply_mode::overwrite, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); }));
}
canonical_mutation topology_node_mutation_builder::build() {
return canonical_mutation{std::move(_builder._m)};
}
row& topology_mutation_builder::row() {
return _m.partition().static_row().maybe_create();
}
api::timestamp_type topology_mutation_builder::timestamp() const {
return _ts;
}
const schema& topology_mutation_builder::schema() const {
return *_s;
}
topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) {
return apply_atomic("transition_state", ::format("{}", value));
}
topology_mutation_builder& topology_mutation_builder::set_version(topology::version_t value) {
_m.set_static_cell("version", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_fence_version(topology::version_t value) {
_m.set_static_cell("fence_version", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_session(session_id value) {
_m.set_static_cell("session", value.uuid(), _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_tablet_balancing_enabled(bool value) {
_m.set_static_cell("tablet_balancing_enabled", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::del_transition_state() {
return del("transition_state");
}
topology_mutation_builder& topology_mutation_builder::del_session() {
return del("session");
}
topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id(
const cdc::generation_id_v2& value) {
apply_atomic("current_cdc_generation_timestamp", value.ts);
apply_atomic("current_cdc_generation_uuid", value.id);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid(
const utils::UUID& value) {
return apply_atomic("new_cdc_generation_data_uuid", value);
}
topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values) {
auto dv = values | boost::adaptors::transformed([&] (const auto& v) {
return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}}));
});
return apply_set("unpublished_cdc_generations", collection_apply_mode::overwrite, std::move(dv));
}
topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) {
return apply_atomic("global_topology_request", ::format("{}", value));
}
topology_mutation_builder& topology_mutation_builder::add_enabled_features(const std::set<sstring>& features) {
return apply_set("enabled_features", collection_apply_mode::update, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); }));
}
topology_mutation_builder& topology_mutation_builder::add_unpublished_cdc_generation(const cdc::generation_id_v2& value) {
auto dv = make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({value.ts, timeuuid_native_type{value.id}}));
return apply_set("unpublished_cdc_generations", collection_apply_mode::update, std::vector<data_value>{std::move(dv)});
}
topology_mutation_builder& topology_mutation_builder::del_global_topology_request() {
return del("global_topology_request");
}
topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) {
_node_builder.emplace(*this, n);
return *_node_builder;
}
class topology_request_tracking_mutation_builder :
public topology_mutation_builder_base<topology_request_tracking_mutation_builder> {
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
deletable_row& _r;
public:
row& row();
const schema& schema() const;
api::timestamp_type timestamp() const;
ttl_opt ttl() const;
topology_request_tracking_mutation_builder(utils::UUID id);
using builder_base::set;
using builder_base::del;
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
topology_request_tracking_mutation_builder::topology_request_tracking_mutation_builder(utils::UUID id) :
_s(db::system_keyspace::topology_requests()),
_m(_s, partition_key::from_singular(*_s, id)),
_ts(utils::UUID_gen::micros_timestamp(id)),
_r(_m.partition().clustered_row(*_s, clustering_key::make_empty())) {
_r.apply(row_marker(_ts, *ttl(), gc_clock::now() + *ttl()));
}
ttl_opt topology_request_tracking_mutation_builder::ttl() const {
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::microseconds(_ts)) + std::chrono::months(1)
- std::chrono::duration_cast<std::chrono::seconds>(gc_clock::now().time_since_epoch());
}
const schema& topology_request_tracking_mutation_builder::schema() const {
return *_s;
}
row& topology_request_tracking_mutation_builder::row() {
return _r.cells();
}
api::timestamp_type topology_request_tracking_mutation_builder::timestamp() const {
return _ts;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional<sstring> error) {
set("end_time", db_clock::now());
if (error) {
set("error", *error);
}
return set("done", true);
}
future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<service::storage_proxy>& proxy) noexcept {
while (!_group0_as.abort_requested()) {
bool err = false;

View File

@@ -0,0 +1,282 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "db/system_keyspace.hh"
#include "topology_mutation.hh"
#include "types/tuple.hh"
#include "types/types.hh"
#include "types/set.hh"
namespace db {
extern thread_local data_type cdc_generation_ts_id_type;
}
namespace service {
topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts) :
_s(db::system_keyspace::topology()),
_m(_s, partition_key::from_singular(*_s, db::system_keyspace::TOPOLOGY)),
_ts(ts) {
}
topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation_builder& builder, raft::server_id id) :
_builder(builder),
_r(_builder._m.partition().clustered_row(*_builder._s, clustering_key::from_singular(*_builder._s, id.uuid()))) {
_r.apply(row_marker(_builder._ts));
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::apply_atomic(const char* cell, const data_value& value) {
const column_definition* cdef = self().schema().get_column_definition(cell);
assert(cdef);
self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value), self().ttl()));
return self();
}
template<typename Builder>
template<std::ranges::range C>
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
Builder& topology_mutation_builder_base<Builder>::apply_set(const char* cell, collection_apply_mode apply_mode, const C& c) {
const column_definition* cdef = self().schema().get_column_definition(cell);
assert(cdef);
auto vtype = static_pointer_cast<const set_type_impl>(cdef->type)->get_elements_type();
std::set<bytes, serialized_compare> cset(vtype->as_less_comparator());
for (const auto& v : c) {
cset.insert(vtype->decompose(data_value(v)));
}
collection_mutation_description cm;
cm.cells.reserve(cset.size());
for (const bytes& raw : cset) {
cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view(), self().ttl()));
}
if (apply_mode == collection_apply_mode::overwrite) {
cm.tomb = tombstone(self().timestamp() - 1, gc_clock::now());
}
self().row().apply(*cdef, cm.serialize(*cdef->type));
return self();
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::del(const char* cell) {
auto cdef = self().schema().get_column_definition(cell);
assert(cdef);
if (!cdef->type->is_multi_cell()) {
self().row().apply(*cdef, atomic_cell::make_dead(self().timestamp(), gc_clock::now()));
} else {
collection_mutation_description cm;
cm.tomb = tombstone{self().timestamp(), gc_clock::now()};
self().row().apply(*cdef, cm.serialize(*cdef->type));
}
return self();
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, node_state value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, topology_request value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const sstring& value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const raft::server_id& value) {
return apply_atomic(cell, value.uuid());
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const uint32_t& value) {
return apply_atomic(cell, int32_t(value));
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, cleanup_status value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const utils::UUID& value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, bool value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const char* value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const db_clock::time_point& value) {
return apply_atomic(cell, value);
}
row& topology_node_mutation_builder::row() {
return _r.cells();
}
api::timestamp_type topology_node_mutation_builder::timestamp() const {
return _builder._ts;
}
const schema& topology_node_mutation_builder::schema() const {
return *_builder._s;
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids) {
return apply_set(cell, collection_apply_mode::overwrite, nodes_ids | boost::adaptors::transformed([] (const auto& node_id) { return node_id.id; }));
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<dht::token>& tokens) {
return apply_set(cell, collection_apply_mode::overwrite, tokens | boost::adaptors::transformed([] (const auto& t) { return t.to_sstring(); }));
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::set<sstring>& features) {
return apply_set(cell, collection_apply_mode::overwrite, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); }));
}
canonical_mutation topology_node_mutation_builder::build() {
return canonical_mutation{std::move(_builder._m)};
}
row& topology_mutation_builder::row() {
return _m.partition().static_row().maybe_create();
}
api::timestamp_type topology_mutation_builder::timestamp() const {
return _ts;
}
const schema& topology_mutation_builder::schema() const {
return *_s;
}
topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) {
return apply_atomic("transition_state", ::format("{}", value));
}
topology_mutation_builder& topology_mutation_builder::set_version(topology::version_t value) {
_m.set_static_cell("version", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_fence_version(topology::version_t value) {
_m.set_static_cell("fence_version", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_session(session_id value) {
_m.set_static_cell("session", value.uuid(), _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_tablet_balancing_enabled(bool value) {
_m.set_static_cell("tablet_balancing_enabled", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::del_transition_state() {
return del("transition_state");
}
topology_mutation_builder& topology_mutation_builder::del_session() {
return del("session");
}
topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id(
const cdc::generation_id_v2& value) {
apply_atomic("current_cdc_generation_timestamp", value.ts);
apply_atomic("current_cdc_generation_uuid", value.id);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid(
const utils::UUID& value) {
return apply_atomic("new_cdc_generation_data_uuid", value);
}
topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values) {
auto dv = values | boost::adaptors::transformed([&] (const auto& v) {
return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}}));
});
return apply_set("unpublished_cdc_generations", collection_apply_mode::overwrite, std::move(dv));
}
topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) {
return apply_atomic("global_topology_request", ::format("{}", value));
}
topology_mutation_builder& topology_mutation_builder::add_enabled_features(const std::set<sstring>& features) {
return apply_set("enabled_features", collection_apply_mode::update, features | boost::adaptors::transformed([] (const auto& f) { return sstring(f); }));
}
topology_mutation_builder& topology_mutation_builder::add_unpublished_cdc_generation(const cdc::generation_id_v2& value) {
auto dv = make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({value.ts, timeuuid_native_type{value.id}}));
return apply_set("unpublished_cdc_generations", collection_apply_mode::update, std::vector<data_value>{std::move(dv)});
}
topology_mutation_builder& topology_mutation_builder::del_global_topology_request() {
return del("global_topology_request");
}
topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) {
_node_builder.emplace(*this, n);
return *_node_builder;
}
topology_request_tracking_mutation_builder::topology_request_tracking_mutation_builder(utils::UUID id) :
_s(db::system_keyspace::topology_requests()),
_m(_s, partition_key::from_singular(*_s, id)),
_ts(utils::UUID_gen::micros_timestamp(id)),
_r(_m.partition().clustered_row(*_s, clustering_key::make_empty())) {
_r.apply(row_marker(_ts, *ttl(), gc_clock::now() + *ttl()));
}
ttl_opt topology_request_tracking_mutation_builder::ttl() const {
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::microseconds(_ts)) + std::chrono::months(1)
- std::chrono::duration_cast<std::chrono::seconds>(gc_clock::now().time_since_epoch());
}
const schema& topology_request_tracking_mutation_builder::schema() const {
return *_s;
}
row& topology_request_tracking_mutation_builder::row() {
return _r.cells();
}
api::timestamp_type topology_request_tracking_mutation_builder::timestamp() const {
return _ts;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional<sstring> error) {
set("end_time", db_clock::now());
if (error) {
set("error", *error);
}
return set("done", true);
}
template class topology_mutation_builder_base<topology_mutation_builder>;
template class topology_mutation_builder_base<topology_node_mutation_builder>;
template class topology_mutation_builder_base<topology_request_tracking_mutation_builder>;
} // namespace service

View File

@@ -0,0 +1,153 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <concepts>
#include <cstdint>
#include <set>
#include <unordered_set>
#include <seastar/core/sstring.hh>
#include "dht/token.hh"
#include "mutation/canonical_mutation.hh"
#include "mutation/mutation.hh"
#include "raft/server.hh"
#include "schema/schema.hh"
#include "service/topology_state_machine.hh"
#include "timestamp.hh"
#include "utils/UUID.hh"
namespace service {
template<typename Builder>
class topology_mutation_builder_base {
private:
Builder& self() {
return *static_cast<Builder*>(this);
}
protected:
enum class collection_apply_mode {
overwrite,
update,
};
using builder_base = topology_mutation_builder_base<Builder>;
Builder& apply_atomic(const char* cell, const data_value& value);
template<std::ranges::range C>
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c);
Builder& set(const char* cell, node_state value);
Builder& set(const char* cell, topology_request value);
Builder& set(const char* cell, const sstring& value);
Builder& set(const char* cell, const raft::server_id& value);
Builder& set(const char* cell, const uint32_t& value);
Builder& set(const char* cell, cleanup_status value);
Builder& set(const char* cell, const utils::UUID& value);
Builder& set(const char* cell, bool value);
Builder& set(const char* cell, const char* value);
Builder& set(const char* cell, const db_clock::time_point& value);
Builder& del(const char* cell);
};
class topology_mutation_builder;
class topology_node_mutation_builder
: public topology_mutation_builder_base<topology_node_mutation_builder> {
friend builder_base;
topology_mutation_builder& _builder;
deletable_row& _r;
private:
row& row();
api::timestamp_type timestamp() const;
const schema& schema() const;
ttl_opt ttl() const { return std::nullopt; }
public:
topology_node_mutation_builder(topology_mutation_builder&, raft::server_id);
using builder_base::set;
using builder_base::del;
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids);
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
topology_node_mutation_builder& set(const char* cell, const std::set<sstring>& value);
canonical_mutation build();
};
class topology_mutation_builder
: public topology_mutation_builder_base<topology_mutation_builder> {
friend builder_base;
friend class topology_node_mutation_builder;
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
std::optional<topology_node_mutation_builder> _node_builder;
private:
row& row();
api::timestamp_type timestamp() const;
const schema& schema() const;
ttl_opt ttl() const { return std::nullopt; }
public:
topology_mutation_builder(api::timestamp_type ts);
topology_mutation_builder& set_transition_state(topology::transition_state);
topology_mutation_builder& set_version(topology::version_t);
topology_mutation_builder& set_fence_version(topology::version_t);
topology_mutation_builder& set_session(session_id);
topology_mutation_builder& set_tablet_balancing_enabled(bool);
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value);
topology_mutation_builder& set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values);
topology_mutation_builder& set_global_topology_request(global_topology_request);
topology_mutation_builder& add_enabled_features(const std::set<sstring>& value);
topology_mutation_builder& add_unpublished_cdc_generation(const cdc::generation_id_v2& value);
topology_mutation_builder& del_transition_state();
topology_mutation_builder& del_session();
topology_mutation_builder& del_global_topology_request();
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
class topology_request_tracking_mutation_builder :
public topology_mutation_builder_base<topology_request_tracking_mutation_builder> {
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
deletable_row& _r;
public:
row& row();
const schema& schema() const;
api::timestamp_type timestamp() const;
ttl_opt ttl() const;
topology_request_tracking_mutation_builder(utils::UUID id);
using builder_base::set;
using builder_base::del;
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
extern template class topology_mutation_builder_base<topology_mutation_builder>;
extern template class topology_mutation_builder_base<topology_node_mutation_builder>;
extern template class topology_mutation_builder_base<topology_request_tracking_mutation_builder>;
} // namespace service