From 52366f33e5f7b367e392bf00a6183d5561abba8f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 15:55:51 +0200 Subject: [PATCH 01/23] service/raft: group0_state_machine: signal topology state machine in `load_snapshot` The `_topology_state_machine.event` condition variable should be signalled whenever the topology state is updated, including on snapshot load. --- service/raft/group0_state_machine.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index c48b678a9c..9107babdfe 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -128,6 +128,7 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) { // memory and thus needs to be protected with apply mutex auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1); co_await _ss.topology_state_load(); + _ss._topology_state_machine.event.signal(); } future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) { From 0e8466291038c55c09e199be3fe864518573349d Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 28 Mar 2023 15:02:41 +0200 Subject: [PATCH 02/23] sys_dist_ks: move mutation size threshold calculation outside `get_cdc_generation_mutations` The function turns a `cdc::topology_description` into a vector of mutations. It decides when to push_back a new mutation (instead of extending an existing one) based on certain parameters. This calculation is specific to where we insert the mutation later. Move the calculation outside, to the function which does the insertion. `get_cdc_generation_mutations` will be used outside this function later. --- db/system_distributed_keyspace.cc | 51 +++++++++++++++---------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 874db9a9d0..29ef093f00 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -506,39 +506,17 @@ system_distributed_keyspace::read_cdc_topology_description( static future> get_cdc_generation_mutations( const replica::database& db, utils::UUID id, - size_t num_replicas, - size_t concurrency, - const cdc::topology_description& desc) { - assert(num_replicas); + const cdc::topology_description& desc, + size_t mutation_size_threshold) { auto s = db.find_schema(system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2); - // To insert the data quickly and efficiently we send it in batches of multiple rows - // (each batch represented by a single mutation). We also send multiple such batches concurrently. - // However, we need to limit the memory consumption of the operation. - // I assume that the memory consumption grows linearly with the number of replicas - // (we send to all replicas ``at the same time''), with the batch size (the data must - // be copied for each replica?) and with concurrency. These assumptions may be too conservative - // but that won't hurt in a significant way (it may hurt the efficiency of the operation a little). - // Thus, if we want to limit the memory consumption to L, it should be true that - // mutation_size * num_replicas * concurrency <= L, hence - // mutation_size <= L / (num_replicas * concurrency). - // For example, say L = 10MB, concurrency = 10, num_replicas = 100; we get - // mutation_size <= 10MB / 1000 = 10KB. - // On the other hand we must have mutation_size >= size of a single row, - // so we will use mutation_size <= max(size of single row, L/(num_replicas*concurrency)). - - // It has been tested that sending 1MB batches to 3 replicas with concurrency 20 works OK, - // which would correspond to L ~= 60MB. Hence that's the limit we use here. - const size_t L = 60'000'000; - const auto new_mutation_threshold = std::max(size_t(1), L / (num_replicas * concurrency)); - auto ts = api::new_timestamp(); utils::chunked_vector res; res.emplace_back(s, partition_key::from_singular(*s, id)); res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); size_t size_estimate = 0; for (auto& e : desc.entries()) { - if (size_estimate >= new_mutation_threshold) { + if (size_estimate >= mutation_size_threshold) { res.emplace_back(s, partition_key::from_singular(*s, id)); size_estimate = 0; } @@ -568,8 +546,29 @@ system_distributed_keyspace::insert_cdc_generation( using namespace std::chrono_literals; const size_t concurrency = 10; + const size_t num_replicas = ctx.num_token_owners; - auto ms = co_await get_cdc_generation_mutations(_qp.db().real_database(), id, ctx.num_token_owners, concurrency, desc); + // To insert the data quickly and efficiently we send it in batches of multiple rows + // (each batch represented by a single mutation). We also send multiple such batches concurrently. + // However, we need to limit the memory consumption of the operation. + // I assume that the memory consumption grows linearly with the number of replicas + // (we send to all replicas ``at the same time''), with the batch size (the data must + // be copied for each replica?) and with concurrency. These assumptions may be too conservative + // but that won't hurt in a significant way (it may hurt the efficiency of the operation a little). + // Thus, if we want to limit the memory consumption to L, it should be true that + // mutation_size * num_replicas * concurrency <= L, hence + // mutation_size <= L / (num_replicas * concurrency). + // For example, say L = 10MB, concurrency = 10, num_replicas = 100; we get + // mutation_size <= 10MB / 1000 = 10KB. + // On the other hand we must have mutation_size >= size of a single row, + // so we will use mutation_size <= max(size of single row, L/(num_replicas*concurrency)). + + // It has been tested that sending 1MB batches to 3 replicas with concurrency 20 works OK, + // which would correspond to L ~= 60MB. Hence that's the limit we use here. + const size_t L = 60'000'000; + const auto mutation_size_threshold = std::max(size_t(1), L / (num_replicas * concurrency)); + + auto ms = co_await get_cdc_generation_mutations(_qp.db().real_database(), id, desc, mutation_size_threshold); co_await max_concurrent_for_each(ms, concurrency, [&] (mutation& m) -> future<> { co_await _sp.mutate( { std::move(m) }, From ed133db7092afffc02362d0e9f975fd60a7e749e Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 28 Mar 2023 15:06:10 +0200 Subject: [PATCH 03/23] sys_dist_ks: move find_schema outside `get_cdc_generation_mutations` The function will be reused for a different table. --- db/system_distributed_keyspace.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 29ef093f00..52b8286cca 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -504,12 +504,10 @@ system_distributed_keyspace::read_cdc_topology_description( } static future> get_cdc_generation_mutations( - const replica::database& db, + schema_ptr s, utils::UUID id, const cdc::topology_description& desc, size_t mutation_size_threshold) { - auto s = db.find_schema(system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2); - auto ts = api::new_timestamp(); utils::chunked_vector res; res.emplace_back(s, partition_key::from_singular(*s, id)); @@ -568,7 +566,9 @@ system_distributed_keyspace::insert_cdc_generation( const size_t L = 60'000'000; const auto mutation_size_threshold = std::max(size_t(1), L / (num_replicas * concurrency)); - auto ms = co_await get_cdc_generation_mutations(_qp.db().real_database(), id, desc, mutation_size_threshold); + auto s = _qp.db().real_database().find_schema( + system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2); + auto ms = co_await get_cdc_generation_mutations(s, id, desc, mutation_size_threshold); co_await max_concurrent_for_each(ms, concurrency, [&] (mutation& m) -> future<> { co_await _sp.mutate( { std::move(m) }, From 3e863d0e58a229d7b9a5da1b2394a83644c15233 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 28 Mar 2023 15:10:12 +0200 Subject: [PATCH 04/23] sys_dist_ks: make `get_cdc_generation_mutations` public It was a `static` function inside system_distributed_keyspace. Later it will be used for another table living in system_keyspace, so move it outside, to the CDC generations module, and make it accessible from other places. --- cdc/generation.cc | 38 +++++++++++++++++++++++++++++++ cdc/generation.hh | 8 +++++++ db/system_distributed_keyspace.cc | 36 ++--------------------------- 3 files changed, 48 insertions(+), 34 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index eb75421e2b..720355ea77 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -20,6 +20,7 @@ #include "db/system_distributed_keyspace.hh" #include "dht/token-sharding.hh" #include "locator/token_metadata.hh" +#include "types/set.hh" #include "gms/application_state.hh" #include "gms/inet_address.hh" #include "gms/gossiper.hh" @@ -42,6 +43,10 @@ static unsigned get_sharding_ignore_msb(const gms::inet_address& endpoint, const return ep_state ? std::stoi(ep_state->value) : 0; } +namespace db { + extern thread_local data_type cdc_streams_set_type; +} + namespace cdc { extern const api::timestamp_clock::duration generation_leeway = @@ -274,6 +279,39 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos }); } +future> get_cdc_generation_mutations( + schema_ptr s, + utils::UUID id, + const cdc::topology_description& desc, + size_t mutation_size_threshold) { + auto ts = api::new_timestamp(); + utils::chunked_vector res; + res.emplace_back(s, partition_key::from_singular(*s, id)); + res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); + size_t size_estimate = 0; + for (auto& e : desc.entries()) { + if (size_estimate >= mutation_size_threshold) { + res.emplace_back(s, partition_key::from_singular(*s, id)); + size_estimate = 0; + } + + set_type_impl::native_type streams; + streams.reserve(e.streams.size()); + for (auto& stream: e.streams) { + streams.push_back(data_value(stream.to_bytes())); + } + + size_estimate += e.streams.size() * 20; + auto ckey = clustering_key::from_singular(*s, dht::token::to_int64(e.token_range_end)); + res.back().set_cell(ckey, to_bytes("streams"), make_set_value(db::cdc_streams_set_type, std::move(streams)), ts); + res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts); + + co_await coroutine::maybe_yield(); + } + + co_return res; +} + // non-static for testing size_t limit_of_streams_in_topology_description() { // Each stream takes 16B and we don't want to exceed 4MB so we can have diff --git a/cdc/generation.hh b/cdc/generation.hh index 3c3617bb79..6e026eed31 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -133,4 +133,12 @@ public: */ bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper&); +// Translates the CDC generation data given by a `cdc::topology_description` into a vector of mutations, +// using `mutation_size_threshold` to decide on the mutation sizes. The partition key of each mutation +// is given by `gen_uuid`. +// +// Works for only specific schemas: CDC_GENERATIONS_V2 (in system_distributed_keyspace). +future> get_cdc_generation_mutations( + schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&, size_t mutation_size_threshold); + } // namespace cdc diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 52b8286cca..d3f67069e5 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -50,6 +50,7 @@ namespace { }); } +extern thread_local data_type cdc_streams_set_type; thread_local data_type cdc_streams_set_type = set_type_impl::get_instance(bytes_type, false); /* See `token_range_description` struct */ @@ -503,39 +504,6 @@ system_distributed_keyspace::read_cdc_topology_description( }); } -static future> get_cdc_generation_mutations( - schema_ptr s, - utils::UUID id, - const cdc::topology_description& desc, - size_t mutation_size_threshold) { - auto ts = api::new_timestamp(); - utils::chunked_vector res; - res.emplace_back(s, partition_key::from_singular(*s, id)); - res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); - size_t size_estimate = 0; - for (auto& e : desc.entries()) { - if (size_estimate >= mutation_size_threshold) { - res.emplace_back(s, partition_key::from_singular(*s, id)); - size_estimate = 0; - } - - set_type_impl::native_type streams; - streams.reserve(e.streams.size()); - for (auto& stream: e.streams) { - streams.push_back(data_value(stream.to_bytes())); - } - - size_estimate += e.streams.size() * 20; - auto ckey = clustering_key::from_singular(*s, dht::token::to_int64(e.token_range_end)); - res.back().set_cell(ckey, to_bytes("streams"), make_set_value(cdc_streams_set_type, std::move(streams)), ts); - res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts); - - co_await coroutine::maybe_yield(); - } - - co_return res; -} - future<> system_distributed_keyspace::insert_cdc_generation( utils::UUID id, @@ -568,7 +536,7 @@ system_distributed_keyspace::insert_cdc_generation( auto s = _qp.db().real_database().find_schema( system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2); - auto ms = co_await get_cdc_generation_mutations(s, id, desc, mutation_size_threshold); + auto ms = co_await cdc::get_cdc_generation_mutations(s, id, desc, mutation_size_threshold); co_await max_concurrent_for_each(ms, concurrency, [&] (mutation& m) -> future<> { co_await _sp.mutate( { std::move(m) }, From 85f4f1830bf2f804bf16b6671f117ace21c2fafe Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 28 Mar 2023 15:40:56 +0200 Subject: [PATCH 05/23] cdc: generation: make `topology_description_generator::get_sharding_info` a parameter The function used to obtain the sharding info for a given node (its number of shards and ignore_msb_bits) was using gossiper application states. We want to reuse `topology_description_generator` to build CDC generations when doing Raft Group 0-based topology changes, so make `get_sharding_info` a parameter. --- cdc/generation.cc | 49 +++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index 720355ea77..ef0208c336 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -184,10 +184,9 @@ static std::vector create_stream_ids( } class topology_description_generator final { - unsigned _ignore_msb_bits; const std::unordered_set& _bootstrap_tokens; const locator::token_metadata_ptr _tmptr; - const gms::gossiper& _gossiper; + const noncopyable_function (dht::token)>& _get_sharding_info; // Compute a set of tokens that split the token ring into vnodes auto get_tokens() const { @@ -200,28 +199,12 @@ class topology_description_generator final { return tokens; } - // Fetch sharding parameters for a node that owns vnode ending with this.end - // Returns pair. - std::pair get_sharding_info(dht::token end) const { - if (_bootstrap_tokens.contains(end)) { - return {smp::count, _ignore_msb_bits}; - } else { - auto endpoint = _tmptr->get_endpoint(end); - if (!endpoint) { - throw std::runtime_error( - format("Can't find endpoint for token {}", end)); - } - auto sc = get_shard_count(*endpoint, _gossiper); - return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)}; - } - } - token_range_description create_description(size_t index, dht::token start, dht::token end) const { token_range_description desc; desc.token_range_end = end; - auto [shard_count, ignore_msb] = get_sharding_info(end); + auto [shard_count, ignore_msb] = _get_sharding_info(end); desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb); desc.sharding_ignore_msb = ignore_msb; @@ -229,14 +212,14 @@ class topology_description_generator final { } public: topology_description_generator( - unsigned ignore_msb_bits, const std::unordered_set& bootstrap_tokens, const locator::token_metadata_ptr tmptr, - const gms::gossiper& gossiper) - : _ignore_msb_bits(ignore_msb_bits) - , _bootstrap_tokens(bootstrap_tokens) + // This function must return sharding parameters for a node that owns the vnode ending with + // the given token. Returns pair. + const noncopyable_function (dht::token)>& get_sharding_info) + : _bootstrap_tokens(bootstrap_tokens) , _tmptr(std::move(tmptr)) - , _gossiper(gossiper) + , _get_sharding_info(get_sharding_info) {} /* @@ -349,7 +332,23 @@ future generation_service::make_new_generation(const std::un using namespace std::chrono_literals; const locator::token_metadata_ptr tmptr = _token_metadata.get(); - auto gen = topology_description_generator(_cfg.ignore_msb_bits, bootstrap_tokens, tmptr, _gossiper).generate(); + + // Fetch sharding parameters for a node that owns vnode ending with this token + // using gossiped application states. + auto get_sharding_info = [&] (dht::token end) -> std::pair { + if (bootstrap_tokens.contains(end)) { + return {smp::count, _cfg.ignore_msb_bits}; + } else { + auto endpoint = tmptr->get_endpoint(end); + if (!endpoint) { + throw std::runtime_error( + format("Can't find endpoint for token {}", end)); + } + auto sc = get_shard_count(*endpoint, _gossiper); + return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)}; + } + }; + auto gen = topology_description_generator(bootstrap_tokens, tmptr, get_sharding_info).generate(); // We need to call this as late in the procedure as possible. // In the V2 format we can do this after inserting the generation data into the table; From 1e9cf3badd89a76dbb97f07ebf40a5a9cb51d55e Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 29 Mar 2023 12:35:44 +0200 Subject: [PATCH 06/23] cdc: generation: `get_cdc_generation_mutations`: take timestamp as parameter The function would generate a mutation timestamp for itself, take it as parameter instead. We'll use timestamps provided by Group 0 APIs when creating CDC generations during Group 0- based topology changes. --- cdc/generation.cc | 4 ++-- cdc/generation.hh | 5 +++-- db/system_distributed_keyspace.cc | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index ef0208c336..476ca75567 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -266,8 +266,8 @@ future> get_cdc_generation_mutations( schema_ptr s, utils::UUID id, const cdc::topology_description& desc, - size_t mutation_size_threshold) { - auto ts = api::new_timestamp(); + size_t mutation_size_threshold, + api::timestamp_type ts) { utils::chunked_vector res; res.emplace_back(s, partition_key::from_singular(*s, id)); res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); diff --git a/cdc/generation.hh b/cdc/generation.hh index 6e026eed31..bae9ba828c 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -135,10 +135,11 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos // Translates the CDC generation data given by a `cdc::topology_description` into a vector of mutations, // using `mutation_size_threshold` to decide on the mutation sizes. The partition key of each mutation -// is given by `gen_uuid`. +// is given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`. // // Works for only specific schemas: CDC_GENERATIONS_V2 (in system_distributed_keyspace). future> get_cdc_generation_mutations( - schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&, size_t mutation_size_threshold); + schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&, + size_t mutation_size_threshold, api::timestamp_type mutation_timestamp); } // namespace cdc diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index d3f67069e5..0a37ed813e 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -536,7 +536,7 @@ system_distributed_keyspace::insert_cdc_generation( auto s = _qp.db().real_database().find_schema( system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2); - auto ms = co_await cdc::get_cdc_generation_mutations(s, id, desc, mutation_size_threshold); + auto ms = co_await cdc::get_cdc_generation_mutations(s, id, desc, mutation_size_threshold, api::new_timestamp()); co_await max_concurrent_for_each(ms, concurrency, [&] (mutation& m) -> future<> { co_await _sp.mutate( { std::move(m) }, From 59b692e79903c648ea098cefa6878bac5a31dc05 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 29 Mar 2023 12:38:46 +0200 Subject: [PATCH 07/23] service: raft: plumbing `cdc::generation_service&` Pass a reference to the service into places. It shall be used later, by the group 0 state machine and topology coordinator. --- main.cc | 2 +- service/raft/group0_state_machine.cc | 4 ++-- service/raft/group0_state_machine.hh | 7 ++++++- service/raft/raft_group0.cc | 7 ++++--- service/raft/raft_group0.hh | 6 +++++- service/storage_service.cc | 14 +++++++------- service/storage_service.hh | 8 ++++---- test/lib/cql_test_env.cc | 2 +- 8 files changed, 30 insertions(+), 20 deletions(-) diff --git a/main.cc b/main.cc index e74445f444..48d9709fc8 100644 --- a/main.cc +++ b/main.cc @@ -1475,7 +1475,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl service::raft_group0 group0_service{ stop_signal.as_local_abort_source(), raft_gr.local(), messaging, - gossiper.local(), qp.local(), mm.local(), feature_service.local(), sys_ks.local(), group0_client, ss.local()}; + gossiper.local(), qp.local(), mm.local(), feature_service.local(), sys_ks.local(), group0_client, ss.local(), cdc_generation_service.local()}; group0_service.start().get(); auto stop_group0_service = defer_verbose_shutdown("group 0 service", [&group0_service] { group0_service.abort().get(); diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index 9107babdfe..02f0f6a0db 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -107,7 +107,7 @@ future<> group0_state_machine::apply(std::vector command) { _client.set_query_result(cmd.new_state_id, std::move(result)); }, [&] (topology_change& chng) -> future<> { - return _ss.topology_transition(_sp, cmd.creator_addr, std::move(chng.mutations)); + return _ss.topology_transition(_sp, _cdc_gen_svc, cmd.creator_addr, std::move(chng.mutations)); } ), cmd.change); @@ -127,7 +127,7 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) { // topology_state_load applies persisted state machine state into // memory and thus needs to be protected with apply mutex auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1); - co_await _ss.topology_state_load(); + co_await _ss.topology_state_load(_cdc_gen_svc); _ss._topology_state_machine.event.signal(); } diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh index d1535fb769..111a6eb274 100644 --- a/service/raft/group0_state_machine.hh +++ b/service/raft/group0_state_machine.hh @@ -13,6 +13,10 @@ #include "mutation/canonical_mutation.hh" #include "service/raft/raft_state_machine.hh" +namespace cdc { +class generation_service; +} + namespace service { class raft_group0_client; class migration_manager; @@ -73,8 +77,9 @@ class group0_state_machine : public raft_state_machine { migration_manager& _mm; storage_proxy& _sp; storage_service& _ss; + cdc::generation_service& _cdc_gen_svc; public: - group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss) : _client(client), _mm(mm), _sp(sp), _ss(ss) {} + group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss, cdc::generation_service& cdc_gen_svc) : _client(client), _mm(mm), _sp(sp), _ss(ss), _cdc_gen_svc(cdc_gen_svc) {} future<> apply(std::vector command) override; future take_snapshot() override; void drop_snapshot(raft::snapshot_id id) override; diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 33fbfc44d4..661e232b7e 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -134,8 +134,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source, gms::feature_service& feat, db::system_keyspace& sys_ks, raft_group0_client& client, - storage_service& ss) - : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _feat(feat), _sys_ks(sys_ks), _client(client), _ss(ss) + storage_service& ss, + cdc::generation_service& cdc_gen_svc) + : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _feat(feat), _sys_ks(sys_ks), _client(client), _ss(ss), _cdc_gen_svc(cdc_gen_svc) , _status_for_monitoring(_raft_gr.is_enabled() ? status_for_monitoring::normal : status_for_monitoring::disabled) { register_metrics(); @@ -195,7 +196,7 @@ const raft::server_id& raft_group0::load_my_id() { } raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id) { - auto state_machine = std::make_unique(_client, _mm, _qp.proxy(), _ss); + auto state_machine = std::make_unique(_client, _mm, _qp.proxy(), _ss, _cdc_gen_svc); auto rpc = std::make_unique(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), gid, my_id); // Keep a reference to a specific RPC class. auto& rpc_ref = *rpc; diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh index ef8301cdd0..e191ca224f 100644 --- a/service/raft/raft_group0.hh +++ b/service/raft/raft_group0.hh @@ -16,6 +16,8 @@ namespace cql3 { class query_processor; } namespace db { class system_keyspace; } +namespace cdc { class generation_service; } + namespace gms { class gossiper; class feature_service; } namespace service { @@ -76,6 +78,7 @@ class raft_group0 { db::system_keyspace& _sys_ks; raft_group0_client& _client; service::storage_service& _ss; + cdc::generation_service& _cdc_gen_svc; // Status of leader discovery. Initially there is no group 0, // and the variant contains no state. During initial cluster @@ -114,7 +117,8 @@ public: gms::feature_service& feat, db::system_keyspace& sys_ks, raft_group0_client& client, - storage_service& ss); + storage_service& ss, + cdc::generation_service& cdc_gen_svc); // Initialises RPC verbs on all shards. // Call after construction but before using the object. diff --git a/service/storage_service.cc b/service/storage_service.cc index 2cbb66f72e..3338614f98 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -284,7 +284,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela slogger.info("Checking bootstrapping/leaving nodes: ok"); } -future<> storage_service::topology_state_load() { +future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_svc) { #ifdef SEASTAR_DEBUG static bool running = false; assert(!running); // The function is not re-entrant @@ -407,7 +407,7 @@ future<> storage_service::topology_state_load() { })); } -future<> storage_service::topology_transition(storage_proxy& proxy, gms::inet_address from, std::vector cms) { +future<> storage_service::topology_transition(storage_proxy& proxy, cdc::generation_service& cdc_gen_svc, gms::inet_address from, std::vector cms) { assert(this_shard_id() == 0); // write new state into persistent storage std::vector mutations; @@ -424,7 +424,7 @@ future<> storage_service::topology_transition(storage_proxy& proxy, gms::inet_ad co_await proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr()); - co_await topology_state_load(); // reload new state + co_await topology_state_load(cdc_gen_svc); // reload new state _topology_state_machine.event.signal(); } @@ -520,7 +520,7 @@ topology_mutation_builder& topology_mutation_builder::set(const char* cell, cons return *this; } -future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, raft::term_t term, sharded& sys_dist_ks, abort_source& as) { +future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, raft::term_t term, cdc::generation_service& cdc_gen_svc, sharded& sys_dist_ks, abort_source& as) { slogger.info("raft topology: start topology coordinator fiber"); auto abort = as.subscribe([this] () noexcept { @@ -904,7 +904,7 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, } } -future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded& sys_dist_ks) { +future<> storage_service::raft_state_monitor_fiber(raft::server& raft, cdc::generation_service& cdc_gen_svc, sharded& sys_dist_ks) { std::optional as; try { while (!_abort_source.abort_requested()) { @@ -921,7 +921,7 @@ future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi slogger.info("topology changes are using raft"); // start topology coordinator fiber - _raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks); + _raft_state_monitor = raft_state_monitor_fiber(*raft_server, cdc_gen_service, sys_dist_ks); // Need to start system_distributed_keyspace before bootstrap because bootstraping // process may access those tables. diff --git a/service/storage_service.hh b/service/storage_service.hh index 33c4132002..fd40bdc877 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -762,13 +762,13 @@ private: future<> _raft_state_monitor = make_ready_future<>(); // This fibers monitors raft state and start/stops the topology change // coordinator fiber - future<> raft_state_monitor_fiber(raft::server&, sharded& sys_dist_ks); + future<> raft_state_monitor_fiber(raft::server&, cdc::generation_service&, sharded& sys_dist_ks); // State machine that is responsible for topology change topology_state_machine _topology_state_machine; future<> _topology_change_coordinator = make_ready_future<>(); - future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, sharded&, abort_source&); + future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, cdc::generation_service&, sharded&, abort_source&); // Those futures hold results of streaming for various operations std::optional> _bootstrap_result; @@ -786,9 +786,9 @@ private: future<> update_topology_with_local_metadata(raft::server&); // This is called on all nodes for each new command received through raft - future<> topology_transition(storage_proxy& proxy, gms::inet_address, std::vector); + future<> topology_transition(storage_proxy& proxy, cdc::generation_service&, gms::inet_address, std::vector); // load topology state machine snapshot into memory - future<> topology_state_load(); + future<> topology_state_load(cdc::generation_service&); // Applies received raft snapshot to local state machine persistent storage future<> merge_topology_snapshot(raft_topology_snapshot snp); }; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 6ae1a46f9a..e483a46696 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -886,7 +886,7 @@ public: service::raft_group0 group0_service{ abort_sources.local(), raft_gr.local(), ms, - gossiper.local(), qp.local(), mm.local(), feature_service.local(), sys_ks.local(), group0_client, ss.local()}; + gossiper.local(), qp.local(), mm.local(), feature_service.local(), sys_ks.local(), group0_client, ss.local(), cdc_generation_service.local()}; group0_service.start().get(); auto stop_group0_service = defer([&group0_service] { group0_service.abort().get(); From 07382d634a22a9c0e3f42ba646eff3e27a0b94b9 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 29 Mar 2023 15:22:40 +0200 Subject: [PATCH 08/23] service: topology_state_machine: better error checking for state name (de)serialization For example: ``` std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s) { os << replication_state_to_name_map[s]; return os; } ``` this would print an empty string if the state was missing from `replication_state_to_name_map` (because `operator[]` default-construct a value if it's missing). Use `find` instead and make it an error if the state is missing. Also turn `throw std::runtime_error` into `on_internal_error` in deserialization functions because failure to deserialize a state name is an internal error, not user error. --- service/topology_state_machine.cc | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 91e92eaf20..1b84b3173f 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -11,6 +11,8 @@ namespace service { +logging::logger tsmlogger("topology_state_machine"); + const std::pair* topology::find(raft::server_id id) { auto it = normal_nodes.find(id); if (it != normal_nodes.end()) { @@ -41,8 +43,11 @@ static std::unordered_map replication_st }; std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s) { - os << replication_state_to_name_map[s]; - return os; + auto it = replication_state_to_name_map.find(s); + if (it == replication_state_to_name_map.end()) { + on_internal_error(tsmlogger, "cannot print replication_state"); + } + return os << it->second; } ring_slice::replication_state replication_state_from_string(const sstring& s) { @@ -51,7 +56,7 @@ ring_slice::replication_state replication_state_from_string(const sstring& s) { return e.first; } } - throw std::runtime_error(fmt::format("cannot map name {} to token_state", s)); + on_internal_error(tsmlogger, format("cannot map name {} to replication_state", s)); } static std::unordered_map node_state_to_name_map = { @@ -66,8 +71,11 @@ static std::unordered_map node_state_to_name_map = { }; std::ostream& operator<<(std::ostream& os, node_state s) { - os << node_state_to_name_map[s]; - return os; + auto it = node_state_to_name_map.find(s); + if (it == node_state_to_name_map.end()) { + on_internal_error(tsmlogger, "cannot print node_state"); + } + return os << it->second; } node_state node_state_from_string(const sstring& s) { @@ -76,7 +84,7 @@ node_state node_state_from_string(const sstring& s) { return e.first; } } - throw std::runtime_error(fmt::format("cannot map name {} to node_state", s)); + on_internal_error(tsmlogger, format("cannot map name {} to node_state", s)); } static std::unordered_map topology_request_to_name_map = { From 2233d8f54de02fcb7e24963519f6b3eb89c15fee Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 16:55:58 +0200 Subject: [PATCH 09/23] db: system_keyspace: add storage for CDC generations managed by group 0 The `CDC_GENERATIONS_V3` table schema is a copy-paste of the `CDC_GENERATIONS_V2` schema. The difference is that V2 lives in `system_distributed_keyspace` and writes to it are distributed using regular `storage_proxy` replication mechanisms based on the token ring. The V3 table lives in `system_keyspace` and any mutations written to it will go through group 0. Also extend the `TOPOLOGY` schema with new columns: - `new_cdc_generation_data_uuid` will be stored as part of a bootstrapping node's `ring_slice`, it stores UUID of a newly introduced CDC generation which is used as partition key for the `CDC_GENERATIONS_V3` table to access this new generation's data. It's a regular column, meaning that every row (corresponding to a node) will have its own. - `current_cdc_generation_uuid` and `current_cdc_generation_timestamp` together form the ID of the newest CDC generation in the cluster. (the uuid is the data key for `CDC_GENERATIONS_V3`, the timestamp is when the CDC generation starts operating). Those are static columns since there's a single newest CDC generation. --- cdc/generation.hh | 3 ++- db/system_keyspace.cc | 46 ++++++++++++++++++++++++++++++++++++++++++- db/system_keyspace.hh | 2 ++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/cdc/generation.hh b/cdc/generation.hh index bae9ba828c..6341666e5a 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -137,7 +137,8 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos // using `mutation_size_threshold` to decide on the mutation sizes. The partition key of each mutation // is given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`. // -// Works for only specific schemas: CDC_GENERATIONS_V2 (in system_distributed_keyspace). +// Works for only specific schemas: CDC_GENERATIONS_V2 (in system_distributed_keyspace) +// and CDC_GENERATIONS_V3 (in system_keyspace). future> get_cdc_generation_mutations( schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&, size_t mutation_size_threshold, api::timestamp_type mutation_timestamp); diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index b1c52e64b8..15a40ac07b 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -83,6 +83,7 @@ namespace { system_keyspace::DISCOVERY, system_keyspace::BROADCAST_KV_STORE, system_keyspace::TOPOLOGY, + system_keyspace::CDC_GENERATIONS_V3, }; if (ks_name == system_keyspace::NAME && system_ks_null_shard_tables.contains(cf_name)) { props.use_null_sharder = true; @@ -98,6 +99,7 @@ namespace { system_keyspace::DISCOVERY, system_keyspace::BROADCAST_KV_STORE, system_keyspace::TOPOLOGY, + system_keyspace::CDC_GENERATIONS_V3, }; if (ks_name == system_keyspace::NAME && extra_durable_tables.contains(cf_name)) { props.wait_for_sync_to_commitlog = true; @@ -250,6 +252,9 @@ schema_ptr system_keyspace::topology() { .with_column("num_tokens", int32_type) .with_column("shard_count", int32_type) .with_column("ignore_msb", int32_type) + .with_column("new_cdc_generation_data_uuid", uuid_type) + .with_column("current_cdc_generation_uuid", uuid_type, column_kind::static_column) + .with_column("current_cdc_generation_timestamp", timestamp_type, column_kind::static_column) .set_comment("Current state of topology change machine") .with_version(generate_schema_version(id)) .build(); @@ -257,6 +262,45 @@ schema_ptr system_keyspace::topology() { return schema; } +extern thread_local data_type cdc_streams_set_type; + +/* An internal table used by nodes to store CDC generation data. + * Written to by Raft Group 0. */ +schema_ptr system_keyspace::cdc_generations_v3() { + thread_local auto schema = [] { + auto id = generate_legacy_id(NAME, CDC_GENERATIONS_V3); + return schema_builder(NAME, CDC_GENERATIONS_V3, {id}) + /* The unique identifier of this generation. */ + .with_column("id", uuid_type, column_kind::partition_key) + /* The generation describes a mapping from all tokens in the token ring to a set of stream IDs. + * This mapping is built from a bunch of smaller mappings, each describing how tokens in a + * subrange of the token ring are mapped to stream IDs; these subranges together cover the entire + * token ring. Each such range-local mapping is represented by a row of this table. The + * clustering key of the row is the end of the range being described by this row. The start of + * this range is the range_end of the previous row (in the clustering order, which is the integer + * order) or of the last row of this partition if this is the first the first row. */ + .with_column("range_end", long_type, column_kind::clustering_key) + /* The set of streams mapped to in this range. The number of streams mapped to a single range in + * a CDC generation is bounded from above by the number of shards on the owner of that range in + * the token ring. In other words, the number of elements of this set is bounded by the maximum + * of the number of shards over all nodes. The serialized size is obtained by counting about 20B + * for each stream. For example, if all nodes in the cluster have at most 128 shards, the + * serialized size of this set will be bounded by ~2.5 KB. */ + .with_column("streams", cdc_streams_set_type) + /* The value of the `ignore_msb` sharding parameter of the node which was the owner of this token + * range when the generation was first created. Together with the set of streams above it fully + * describes the mapping for this particular range. */ + .with_column("ignore_msb", byte_type) + /* Column used for sanity checking. For a given generation it's equal to the number of ranges in + * this generation; thus, after the generation is fully inserted, it must be equal to the number + * of rows in the partition. */ + .with_column("num_ranges", int32_type, column_kind::static_column) + .with_version(system_keyspace::generate_schema_version(id)) + .build(); + }(); + return schema; +} + schema_ptr system_keyspace::raft() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, RAFT); @@ -2838,7 +2882,7 @@ std::vector system_keyspace::all_tables(const db::config& cfg) { r.insert(r.end(), {raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery()}); if (cfg.check_experimental(db::experimental_features_t::feature::RAFT)) { - r.insert(r.end(), {topology()}); + r.insert(r.end(), {topology(), cdc_generations_v3()}); } if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 89a3befdde..cf838bd0f7 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -151,6 +151,7 @@ public: static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store"; static constexpr auto TOPOLOGY = "topology"; static constexpr auto SSTABLES_REGISTRY = "sstables"; + static constexpr auto CDC_GENERATIONS_V3 = "cdc_generations_v3"; struct v3 { static constexpr auto BATCHES = "batches"; @@ -233,6 +234,7 @@ public: static schema_ptr broadcast_kv_store(); static schema_ptr topology(); static schema_ptr sstables_registry(); + static schema_ptr cdc_generations_v3(); static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0); From 3abe0f0ad6a788281267813d589c8b98f1892fc7 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 16:57:49 +0200 Subject: [PATCH 10/23] cdc: generation: extract pure parts of `make_new_generation` outside `cdc::generation_service::make_new_cdc_generation` would create a new CDC generation and insert it into the `CDC_GENERATIONS_V2` table these days. For Raft-based topology chnages we'll do the data insertion somewhere else - in topology coordinator code. So extract the parts for calculating the CDC generation to free-standing functions (these are almost pure calculations, modulo accessing RNG). --- cdc/generation.cc | 37 +++++++++++++++++++++---------------- cdc/generation.hh | 7 +++++++ 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index 476ca75567..ad54631930 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -327,10 +327,27 @@ topology_description limit_number_of_streams_if_needed(topology_description&& de return topology_description(std::move(entries)); } -future generation_service::make_new_generation(const std::unordered_set& bootstrap_tokens, bool add_delay) { +std::pair make_new_generation_data( + const std::unordered_set& bootstrap_tokens, + const noncopyable_function(dht::token)>& get_sharding_info, + const locator::token_metadata_ptr tmptr) { + auto gen = topology_description_generator(bootstrap_tokens, tmptr, get_sharding_info).generate(); + auto uuid = utils::make_random_uuid(); + return {uuid, std::move(gen)}; +} + +db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay) { using namespace std::chrono; using namespace std::chrono_literals; + auto ts = db_clock::now(); + if (add_delay && ring_delay != 0ms) { + ts += 2 * ring_delay + duration_cast(generation_leeway); + } + return ts; +} + +future generation_service::make_new_generation(const std::unordered_set& bootstrap_tokens, bool add_delay) { const locator::token_metadata_ptr tmptr = _token_metadata.get(); // Fetch sharding parameters for a node that owns vnode ending with this token @@ -348,31 +365,19 @@ future generation_service::make_new_generation(const std::un return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)}; } }; - auto gen = topology_description_generator(bootstrap_tokens, tmptr, get_sharding_info).generate(); - - // We need to call this as late in the procedure as possible. - // In the V2 format we can do this after inserting the generation data into the table; - // in the V1 format we must do it before (because the timestamp is the partition key in the V1 format). - auto new_generation_timestamp = [add_delay, ring_delay = _cfg.ring_delay] { - auto ts = db_clock::now(); - if (add_delay && ring_delay != 0ms) { - ts += 2 * ring_delay + duration_cast(generation_leeway); - } - return ts; - }; + auto [uuid, gen] = make_new_generation_data(bootstrap_tokens, get_sharding_info, tmptr); // Our caller should ensure that there are normal tokens in the token ring. auto normal_token_owners = tmptr->count_normal_token_owners(); assert(normal_token_owners); if (_feature_service.cdc_generations_v2) { - auto uuid = utils::make_random_uuid(); cdc_log.info("Inserting new generation data at UUID {}", uuid); // This may take a while. co_await _sys_dist_ks.local().insert_cdc_generation(uuid, gen, { normal_token_owners }); // Begin the race. - cdc::generation_id_v2 gen_id{new_generation_timestamp(), uuid}; + cdc::generation_id_v2 gen_id{new_generation_timestamp(add_delay, _cfg.ring_delay), uuid}; cdc_log.info("New CDC generation: {}", gen_id); co_return gen_id; @@ -401,7 +406,7 @@ future generation_service::make_new_generation(const std::un " a new node or running the checkAndRepairCdcStreams nodetool command."); // Begin the race. - cdc::generation_id_v1 gen_id{new_generation_timestamp()}; + cdc::generation_id_v1 gen_id{new_generation_timestamp(add_delay, _cfg.ring_delay)}; co_await _sys_dist_ks.local().insert_cdc_topology_description(gen_id, std::move(gen), { normal_token_owners }); diff --git a/cdc/generation.hh b/cdc/generation.hh index 6341666e5a..6bd9f2bcb8 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -133,6 +133,13 @@ public: */ bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper&); +std::pair make_new_generation_data( + const std::unordered_set& bootstrap_tokens, + const noncopyable_function (dht::token)>& get_sharding_info, + const locator::token_metadata_ptr); + +db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay); + // Translates the CDC generation data given by a `cdc::topology_description` into a vector of mutations, // using `mutation_size_threshold` to decide on the mutation sizes. The partition key of each mutation // is given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`. From 22094f1509b18f2eea4eaab335c1086675fc5707 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 17:04:09 +0200 Subject: [PATCH 11/23] db: system_keyspace: small refactor of `load_topology_state` The variables necessary for constructing a `ring_slice` are now living in a local block of code. This makes it easier to see which data is part of the `ring_slice` and will make it easier to add more data to `ring_slice` in following commits. Also add some more sanity checking. --- db/system_keyspace.cc | 47 ++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 15a40ac07b..d2b3576661 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3519,19 +3519,29 @@ future system_keyspace::load_topology_state() { service::node_state nstate = service::node_state_from_string(row.get_as("node_state")); - std::optional tstate; + std::optional ring_slice; if (row.has("replication_state")) { - tstate = service::replication_state_from_string(row.get_as("replication_state")); - } + auto repl_state = service::replication_state_from_string(row.get_as("replication_state")); - std::unordered_set t; + std::unordered_set tokens; + if (row.has("tokens")) { + auto blob = row.get_blob("tokens"); + auto cdef = topology()->get_column_definition("tokens"); + auto deserialized = cdef->type->deserialize(blob); + auto ts = value_cast(deserialized); + tokens = decode_tokens(ts); + } - if (row.has("tokens")) { - auto blob = row.get_blob("tokens"); - auto cdef = topology()->get_column_definition("tokens"); - auto deserialized = cdef->type->deserialize(blob); - auto tokens = value_cast(deserialized); - t = decode_tokens(tokens); + if (tokens.empty()) { + on_fatal_internal_error(slogger, format( + "load_topology_state: node {} has replication state ({}) but missing tokens", + host_id, repl_state)); + } + + ring_slice = service::ring_slice { + .state = repl_state, + .tokens = std::move(tokens), + }; } std::optional replaced_id; @@ -3589,23 +3599,28 @@ future system_keyspace::load_topology_state() { } } - if (!tstate && t.size() != 0) { - on_fatal_internal_error(slogger, "There cannot be tokens without the replication state"); - } std::unordered_map* map = nullptr; if (nstate == service::node_state::normal) { map = &ret.normal_nodes; + if (!ring_slice) { + on_fatal_internal_error(slogger, format( + "load_topology_state: node {} in normal state but missing ring slice", host_id)); + } } else if (nstate == service::node_state::left) { ret.left_nodes.emplace(host_id); } else if (nstate == service::node_state::none) { map = &ret.new_nodes; } else { map = &ret.transition_nodes; + if (!ring_slice) { + on_fatal_internal_error(slogger, format( + "load_topology_state: node {} in transitioning state but missing ring slice", host_id)); + } } if (map) { - map->emplace(host_id, service::replica_state{nstate, std::move(datacenter), std::move(rack), std::move(release_version), - tstate ? std::optional(service::ring_slice{*tstate, std::move(t)}) : std::nullopt, - shard_count, ignore_msb}); + map->emplace(host_id, service::replica_state{ + nstate, std::move(datacenter), std::move(rack), std::move(release_version), + ring_slice, shard_count, ignore_msb}); } } From 4e7628fa1610083922c65b6a7ca1802de958eb2b Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 20 Apr 2023 16:15:40 +0200 Subject: [PATCH 12/23] service: topology_state_machine: make topology::find const --- service/topology_state_machine.cc | 2 +- service/topology_state_machine.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 1b84b3173f..c544780a73 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -13,7 +13,7 @@ namespace service { logging::logger tsmlogger("topology_state_machine"); -const std::pair* topology::find(raft::server_id id) { +const std::pair* topology::find(raft::server_id id) const { auto it = normal_nodes.find(id); if (it != normal_nodes.end()) { return &*it; diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index e7de7e4ede..8775a787d1 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -86,7 +86,7 @@ struct topology { std::unordered_map req_param; // Find only nodes in non 'left' state - const std::pair* find(raft::server_id id); + const std::pair* find(raft::server_id id) const; // Return true if node exists in any state including 'left' one bool contains(raft::server_id id); }; From 5942237a790ffc6addaf5222c370091802d68e66 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 17:11:14 +0200 Subject: [PATCH 13/23] raft topology: create new CDC generation data during node bootstrap Calculate a new CDC generation using the bootstrapping node's tokens, translate it to mutation format, and insert this mutation to the CDC_GENERATIONS_V3 table through group 0 at the same time we assign tokens to the node in Raft topology. The partition key for this data is stored in the bootstrapping node's `ring_slice`. The data is inserted, but it's not used for anything yet, we'll do it in later commits. Two FIXMEs are left for follow-ups: - in `get_sharding_info` we shouldn't have to use the token owner's IP, but get the host ID directly from token metadata (#12279), - splitting the CDC generation data write into multiple commands. The comment elaborates. --- db/system_keyspace.cc | 7 +++ service/storage_service.cc | 73 +++++++++++++++++++++++++++++-- service/topology_state_machine.hh | 5 +++ 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index d2b3576661..1a7d54ab26 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3538,9 +3538,16 @@ future system_keyspace::load_topology_state() { host_id, repl_state)); } + if (!row.has("new_cdc_generation_data_uuid")) { + on_fatal_internal_error(slogger, format( + "load_topology_state: node {} has replication state ({}) but missing CDC generation data UUID", + host_id, repl_state)); + } + ring_slice = service::ring_slice { .state = repl_state, .tokens = std::move(tokens), + .new_cdc_generation_data_uuid = row.get_as("new_cdc_generation_data_uuid"), }; } diff --git a/service/storage_service.cc b/service/storage_service.cc index 3338614f98..5d0b457900 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -454,6 +454,7 @@ public: topology_mutation_builder& set(const char* cell, const raft::server_id& value); topology_mutation_builder& set(const char* cell, const std::unordered_set& value); topology_mutation_builder& set(const char* cell, const uint32_t& value); + topology_mutation_builder& set(const char* cell, const utils::UUID& value); topology_mutation_builder& del(const char* cell); canonical_mutation build() { return canonical_mutation{std::move(_m)}; } }; @@ -487,6 +488,14 @@ topology_mutation_builder& topology_mutation_builder::set(const char* cell, cons return *this; } +topology_mutation_builder& topology_mutation_builder::set( + const char* cell, const utils::UUID& value) { + auto cdef = _s->get_column_definition(cell); + assert(cdef); + _r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value))); + return *this; +} + topology_mutation_builder& topology_mutation_builder::del(const char* cell) { auto cdef = _s->get_column_definition(cell); assert(cdef); @@ -743,12 +752,70 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, auto tmptr = get_token_metadata_ptr(); auto bootstrap_tokens = boot_strapper::get_random_bootstrap_tokens(tmptr, num_tokens, dht::check_token_endpoint::yes); - // Write choosen tokens through raft. + auto get_sharding_info = [&] (dht::token end) -> std::pair { + if (bootstrap_tokens.contains(end)) { + return {node.rs->shard_count, node.rs->ignore_msb}; + } else { + // FIXME: token metadata should directly return host ID for given token. See #12279 + auto ep = tmptr->get_endpoint(end); + if (!ep) { + // get_sharding_info is only called for bootstrap tokens + // or for tokens present in token_metadata + on_internal_error(slogger, format( + "raft topology: make_new_cdc_generation_data: get_sharding_info:" + " can't find endpoint for token {}", end)); + } + + auto id = tmptr->get_host_id_if_known(*ep); + if (!id) { + on_internal_error(slogger, format( + "raft topology: make_new_cdc_generation_data: get_sharding_info:" + " can't find host ID for endpoint {}, owner of token {}", *ep, end)); + } + + auto ptr = node.topology->find(raft::server_id{id->uuid()}); + if (!ptr) { + on_internal_error(slogger, format( + "raft topology: make_new_cdc_generation_data: get_sharding_info:" + " couldn't find node {} in topology, owner of token {}", *id, end)); + } + + auto& rs = ptr->second; + return {rs.shard_count, rs.ignore_msb}; + } + }; + + auto [gen_uuid, gen_desc] = cdc::make_new_generation_data( + bootstrap_tokens, get_sharding_info, tmptr); + auto gen_table_schema = _db.local().find_schema( + db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3); + + // FIXME: the CDC generation data can be large and not fit in a single command + // (for large clusters, it will introduce reactor stalls and go over commitlog entry + // size limit). We need to split it into multiple mutations by smartly picking + // a `mutation_size_threshold` and sending each mutation as a separate group 0 command. + // We also don't want to serialize the commands - there may be many of them, + // and we don't want to wait for a network round-trip to a quorum between each command. + // So we need to introduce a mechanism for group 0 to send a sequence of commands + // that can be committed concurrently. Also we need to be careful with memory consumption + // with many large mutations. + // See `system_distributed_keyspace::insert_cdc_generation` for inspiration how it + // was done when the mutations were stored in a regular distributed table. + const size_t mutation_size_threshold = 2'000'000; + auto gen_mutations = co_await cdc::get_cdc_generation_mutations( + gen_table_schema, gen_uuid, gen_desc, mutation_size_threshold, node.guard.write_timestamp()); + std::vector updates{gen_mutations.begin(), gen_mutations.end()}; + + // Write chosen tokens and CDC generation data through raft. builder.set("node_state", node_state::bootstrapping) .del("topology_request") .set("tokens", bootstrap_tokens) - .set("replication_state", ring_slice::replication_state::write_both_read_old); - co_await update_replica_state(std::move(node), {builder.build()}, "bootstrap: assign tokens"); + .set("replication_state", ring_slice::replication_state::write_both_read_old) + .set("new_cdc_generation_data_uuid", gen_uuid); + updates.push_back(builder.build()); + auto reason = format( + "bootstrap: assign tokens and insert CDC generation data (UUID: {})", gen_uuid); + co_await update_replica_state(std::move(node), {std::move(updates)}, reason); break; } case topology_request::leave: diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 8775a787d1..c8f7a1289b 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -55,6 +55,11 @@ struct ring_slice { replication_state state; std::unordered_set tokens; + + // When a new node joins the cluster, always a new CDC generation is created. + // This is the UUID used to access the data of the CDC generation introduced + // when the node owning this ring_slice joined (it's the partition key in CDC_GENERATIONS_V3 table). + utils::UUID new_cdc_generation_data_uuid; }; struct replica_state { From 58baf998c1caf22d385a3164d7ab2e2976fbce16 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 17:13:19 +0200 Subject: [PATCH 14/23] raft topology: commit a new CDC generation on node bootstrap After inserting new CDC generation data (see previous commit), we need to pick a timestamp for this generation and commit it, telling all nodes in the cluster to start using the generation for CDC log writes once their clocks cross that timestamp. We introduce a separate step to the bootstrap saga, before `write_both_read_old`, called `commit_cdc_generation`. In this step, the coordinator takes the `new_cdc_generation_data_uuid` stored in a bootstrapping node's `ring_slice` - which serves as the key to the table where the CDC generation data is stored - and combines it with a timestamp which it generates a bit into the future (as in old gossiper-based code, we use 2 * ring_delay, by default 1 minute). This gives us a CDC generation ID which we commit into the topology state as the `current_cdc_generation_id` while switching the saga to the next step, `write_both_read_old`. `system_keyspace::load_topology_state` is extended to load `current_cdc_generation_id`. For now, nodes don't react to `current_cdc_generation_id`. In later commit we'll extend `storage_service::topology_state_load` to start using the current CDC generation for CDC log table writes. The solution with specifying a timestamp into the future is the same as it is for gossip-based topology changes and it has the same consistency problem - if some node is temporarily partitioned away from the quorum, it might not learn about the new CDC generation before its clock crosses the generation's timestamp, causing it to temporarily send writes to the wrong CDC streams (until it learns about the new timestamp). I left a FIXME which describes an alternative solution which wasn't viable for gossiper-based topology changes, but it is viable when we have a fault-tolerant topology coordinator. --- db/system_keyspace.cc | 43 ++++++++++++++++++ service/storage_service.cc | 75 ++++++++++++++++++++++++++++++- service/topology_state_machine.cc | 1 + service/topology_state_machine.hh | 4 ++ 4 files changed, 122 insertions(+), 1 deletion(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 1a7d54ab26..a9c1d37cd4 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3631,6 +3631,49 @@ future system_keyspace::load_topology_state() { } } + { + // Here we access static columns, any row will do. + auto& some_row = *rs->begin(); + if (some_row.has("current_cdc_generation_uuid")) { + auto gen_uuid = some_row.get_as("current_cdc_generation_uuid"); + if (!some_row.has("current_cdc_generation_timestamp")) { + on_internal_error(slogger, format( + "load_topology_state: current CDC generation UUID ({}) present, but timestamp missing", gen_uuid)); + } + auto gen_ts = some_row.get_as("current_cdc_generation_timestamp"); + ret.current_cdc_generation_id = cdc::generation_id_v2 { + .ts = gen_ts, + .id = gen_uuid + }; + + // Sanity check for CDC generation data consistency. + { + auto gen_rows = co_await qctx->execute_cql( + format("SELECT count(range_end) as cnt, num_ranges FROM system.{} WHERE id = ?", + CDC_GENERATIONS_V3), + gen_uuid); + assert(gen_rows); + if (gen_rows->empty()) { + on_internal_error(slogger, format( + "load_topology_state: current CDC generation UUID ({}) present, but data missing", gen_uuid)); + } + auto& row = gen_rows->one(); + auto counted_ranges = row.get_as("cnt"); + auto num_ranges = row.get_as("num_ranges"); + if (counted_ranges != num_ranges) { + on_internal_error(slogger, format( + "load_topology_state: inconsistency in CDC generation data (UUID {}):" + " counted {} ranges, should be {}", gen_uuid, counted_ranges, num_ranges)); + } + } + } else { + if (!ret.normal_nodes.empty()) { + on_internal_error(slogger, + "load_topology_state: normal nodes present but no current CDC generation ID"); + } + } + } + co_return ret; } diff --git a/service/storage_service.cc b/service/storage_service.cc index 5d0b457900..837d12b42b 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -405,6 +405,11 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s } } })); + + if (auto gen_id = _topology_state_machine._topology.current_cdc_generation_id) { + slogger.debug("topology_state_load: current CDC generation ID: {}", *gen_id); + // TODO start using the generation for CDC writes + } } future<> storage_service::topology_transition(storage_proxy& proxy, cdc::generation_service& cdc_gen_svc, gms::inet_address from, std::vector cms) { @@ -455,6 +460,7 @@ public: topology_mutation_builder& set(const char* cell, const std::unordered_set& value); topology_mutation_builder& set(const char* cell, const uint32_t& value); topology_mutation_builder& set(const char* cell, const utils::UUID& value); + topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&); topology_mutation_builder& del(const char* cell); canonical_mutation build() { return canonical_mutation{std::move(_m)}; } }; @@ -529,6 +535,13 @@ topology_mutation_builder& topology_mutation_builder::set(const char* cell, cons return *this; } +topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id( + const cdc::generation_id_v2& value) { + _m.set_static_cell("current_cdc_generation_timestamp", value.ts, _ts); + _m.set_static_cell("current_cdc_generation_uuid", value.id, _ts); + return *this; +} + future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, raft::term_t term, cdc::generation_service& cdc_gen_svc, sharded& sys_dist_ks, abort_source& as) { slogger.info("raft topology: start topology coordinator fiber"); @@ -651,6 +664,66 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, bool res; switch (node.rs->ring.value().state) { + case ring_slice::replication_state::commit_cdc_generation: { + // make sure all nodes know about new topology and have the new CDC generation data + // (we require all nodes to be alive for topo change for now) + std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false, replaced_node); + if (!res) { + break; + } + + // We don't need to add delay to the generation timestamp if this is the first generation. + bool add_ts_delay = bool(node.topology->current_cdc_generation_id); + + // Begin the race. + // See the large FIXME below. + auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, get_ring_delay()); + auto cdc_gen_uuid = node.rs->ring.value().new_cdc_generation_data_uuid; + cdc::generation_id_v2 cdc_gen_id { + .ts = cdc_gen_ts, + .id = cdc_gen_uuid, + }; + + { + // Sanity check. + // This could happen if the topology coordinator's clock is broken. + auto curr_gen_id = node.topology->current_cdc_generation_id; + if (curr_gen_id && curr_gen_id->ts >= cdc_gen_ts) { + on_internal_error(slogger, format( + "raft topology: new CDC generation has smaller timestamp than the previous one." + " Old generation ID: {}, new generation ID: {}", *curr_gen_id, cdc_gen_id)); + } + } + + // Tell all nodes to start using the new CDC generation by updating the topology + // with the generation's ID and timestamp. + // At the same time move the topology change procedure to the next step. + // + // FIXME: as in previous implementation with gossiper and ring_delay, this assumes that all nodes + // will learn about the new CDC generation before their clocks reach the generation's timestamp. + // With this group 0 based implementation, it means that the command must be committed, + // replicated and applied on all nodes before their clocks reach the generation's timestamp + // (i.e. within 2 * ring_delay = 60 seconds by default if clocks are synchronized). If this + // doesn't hold some coordinators might use the wrong CDC streams for some time and CDC stream + // readers will miss some data. It's likely that Raft replication doesn't converge as quickly + // as gossiping does. + // + // We could use a two-phase algorithm instead: first tell all nodes to prepare for using + // the new generation, then tell all nodes to commit. If some nodes don't manage to prepare + // in time, we abort the generation switch. If all nodes prepare, we commit. If a node prepares + // but doesn't receive a commit in time, it stops coordinating CDC-enabled writes until it + // receives a commit or abort. This solution does not have a safety problem like the one + // above, but it has an availability problem when nodes get disconnected from group 0 majority + // in the middle of a CDC generation switch (when they are prepared to switch but not + // committed) - they won't coordinate CDC-enabled writes until they reconnect to the + // majority and commit. + topology_mutation_builder builder(node.guard.write_timestamp(), node.id); + builder.set("replication_state", ring_slice::replication_state::write_both_read_old) + .set_current_cdc_generation_id(cdc_gen_id); + auto str = fmt::format("{}: committed new CDC generation, ID: {}", node.rs->state, cdc_gen_id); + co_await update_replica_state(std::move(node), {builder.build()}, std::move(str)); + } + break; case ring_slice::ring_slice::replication_state::write_both_read_old: { // make sure all nodes know about new topology (we require all nodes to be alive for topo change for now) std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false, replaced_node); @@ -810,7 +883,7 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, builder.set("node_state", node_state::bootstrapping) .del("topology_request") .set("tokens", bootstrap_tokens) - .set("replication_state", ring_slice::replication_state::write_both_read_old) + .set("replication_state", ring_slice::replication_state::commit_cdc_generation) .set("new_cdc_generation_data_uuid", gen_uuid); updates.push_back(builder.build()); auto reason = format( diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index c544780a73..9b0333d3c5 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -37,6 +37,7 @@ bool topology::contains(raft::server_id id) { } static std::unordered_map replication_state_to_name_map = { + {ring_slice::replication_state::commit_cdc_generation, "commit cdc generation"}, {ring_slice::replication_state::write_both_read_old, "write both read old"}, {ring_slice::replication_state::write_both_read_new, "write both read new"}, {ring_slice::replication_state::owner, "owner"}, diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index c8f7a1289b..5b894ae16c 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -17,6 +17,7 @@ #include #include #include +#include "cdc/generation_id.hh" #include "dht/token.hh" #include "raft/raft.hh" #include "utils/UUID.hh" @@ -48,6 +49,7 @@ using request_param = std::variant; struct ring_slice { enum class replication_state: uint8_t { + commit_cdc_generation, write_both_read_old, write_both_read_new, owner @@ -90,6 +92,8 @@ struct topology { // operation untill the node becomes normal std::unordered_map req_param; + std::optional current_cdc_generation_id; + // Find only nodes in non 'left' state const std::pair* find(raft::server_id id) const; // Return true if node exists in any state including 'left' one From 5f2b297f99b499c6b1a3e3a6b481d899daba604e Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 29 Mar 2023 18:06:43 +0200 Subject: [PATCH 15/23] raft topology: publish new CDC generation to the user description tables Once a new CDC generation is committed to the cluster by the topology coordinator, we also need to publish it to the user-facing description tables so CDC applications know which streams to read from. This uses regular distributed table writes underneath (tables living in the `system_distributed` keyspace) so it requires `token_metadata` to be nonempty. We need a hack for the case of bootstrapping the first node in the cluster - turning the tokens into normal tokens earlier in the procedure in `token_metadata`, but this is fine for the single-node case since no streaming is happening. --- db/system_keyspace.cc | 37 +++++++++++++++++++++++++++++++++++++ db/system_keyspace.hh | 8 ++++++++ service/storage_service.cc | 23 +++++++++++++++++++++-- 3 files changed, 66 insertions(+), 2 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index a9c1d37cd4..c8893fb656 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -67,6 +67,7 @@ #include "service/topology_state_machine.hh" #include "sstables/open_info.hh" #include "sstables/generation_type.hh" +#include "cdc/generation.hh" using days = std::chrono::duration>; @@ -3677,6 +3678,42 @@ future system_keyspace::load_topology_state() { co_return ret; } +future +system_keyspace::read_cdc_generation(utils::UUID id) { + std::vector entries; + auto num_ranges = 0; + co_await _qp.local().query_internal( + format("SELECT range_end, streams, ignore_msb, num_ranges FROM {}.{} WHERE id = ?", + NAME, CDC_GENERATIONS_V3), + db::consistency_level::ONE, + { id }, + 1000, // for ~1KB rows, ~1MB page size + [&] (const cql3::untyped_result_set_row& row) { + std::vector streams; + row.get_list_data("streams", std::back_inserter(streams)); + entries.push_back(cdc::token_range_description{ + dht::token::from_int64(row.get_as("range_end")), + std::move(streams), + uint8_t(row.get_as("ignore_msb"))}); + num_ranges = row.get_as("num_ranges"); + return make_ready_future(stop_iteration::no); + }); + + if (entries.empty()) { + // The data must be present by precondition. + on_internal_error(slogger, format( + "read_cdc_generation: data for CDC generation {} not present", id)); + } + + if (entries.size() != num_ranges) { + throw std::runtime_error(format( + "read_cdc_generation: wrong number of rows. The `num_ranges` column claimed {} rows," + " but reading the partition returned {}.", num_ranges, entries.size())); + } + + co_return cdc::topology_description{std::move(entries)}; +} + future<> system_keyspace::sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc) { static const auto req = format("INSERT INTO system.{} (location, generation, uuid, status, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY); slogger.trace("Inserting {}.{}:{} into {}", location, desc.generation.value(), uuid, SSTABLES_REGISTRY); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index cf838bd0f7..ca12f07a65 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -68,6 +68,10 @@ namespace gms { class gossiper; } +namespace cdc { + class topology_description; +} + bool is_system_keyspace(std::string_view ks_name); namespace db { @@ -445,6 +449,10 @@ public: static future load_topology_state(); + // Read CDC generation data with the given UUID as key. + // Precondition: the data is known to be present in the table (because it was committed earlier through group 0). + future read_cdc_generation(utils::UUID id); + // The mutation appends the given state ID to the group 0 history table, with the given description if non-empty. // // If `gc_older_than` is provided, the mutation will also contain a tombstone that clears all entries whose diff --git a/service/storage_service.cc b/service/storage_service.cc index 837d12b42b..4d308c3ecd 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -371,8 +371,16 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid()); } tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}); - tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip); - co_await update_pending_ranges(tmptr, format("bootstrapping node {}/{}", id, ip)); + if (_topology_state_machine._topology.normal_nodes.empty()) { + // This is the first node in the cluster. Insert the tokens as normal to the token ring early + // so we can perform writes to regular 'distributed' tables during the bootstrap procedure + // (such as the CDC generation write). + // It doesn't break anything to set the tokens to normal early in this single-node case. + co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); + } else { + tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip); + co_await update_pending_ranges(tmptr, format("bootstrapping node {}/{}", id, ip)); + } break; case node_state::decommissioning: case node_state::removing: @@ -730,6 +738,17 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, if (!res) { break; } + + // If a node is bootstrapping, we just committed a new CDC generation in the commit_cdc_generation step. + // Publish it to the user-facing distributed CDC description tables. + if (node.rs->state == node_state::bootstrapping) { + auto curr_gen_id = node.topology->current_cdc_generation_id.value(); + auto gen_data = co_await _sys_ks.local().read_cdc_generation(curr_gen_id.id); + + co_await sys_dist_ks.local().create_cdc_desc( + curr_gen_id.ts, gen_data, { get_token_metadata().count_normal_token_owners() }); + } + raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges}; if (node.rs->state == node_state::removing) { // tell all nodes to stream data of the removed node to new range owners From 4c99b4004bd6b5c0e25d459c9bd5121d411e1a79 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 29 Mar 2023 18:22:39 +0200 Subject: [PATCH 16/23] storage_service: use CDC generations introduced by Raft topology When a node notices that a new CDC generation was introduced in `storage_service::topology_state_load`, it updates its internal data structures that are used when coordinating writes to CDC log tables. --- cdc/generation.cc | 19 +++++++++++++++++++ cdc/generation_service.hh | 6 ++++++ service/storage_service.cc | 2 +- 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index ad54631930..6682234905 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -908,6 +908,25 @@ future<> generation_service::check_and_repair_cdc_streams() { co_await _sys_ks.local().update_cdc_generation_id(new_gen_id); } +future<> generation_service::handle_cdc_generation(cdc::generation_id_v2 gen_id) { + auto ts = get_ts(gen_id); + if (co_await container().map_reduce(and_reducer(), [ts] (generation_service& svc) { + return !svc._cdc_metadata.prepare(ts); + })) { + co_return; + } + + auto gen_data = co_await _sys_ks.local().read_cdc_generation(gen_id.id); + + bool using_this_gen = co_await container().map_reduce(or_reducer(), [ts, &gen_data] (generation_service& svc) { + return svc._cdc_metadata.insert(ts, cdc::topology_description{gen_data}); + }); + + if (using_this_gen) { + cdc_log.info("Started using generation {}.", gen_id); + } +} + future<> generation_service::handle_cdc_generation(std::optional gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); diff --git a/cdc/generation_service.hh b/cdc/generation_service.hh index 0e561b2290..40b97f4e78 100644 --- a/cdc/generation_service.hh +++ b/cdc/generation_service.hh @@ -135,6 +135,12 @@ public: */ future make_new_generation(const std::unordered_set& bootstrap_tokens, bool add_delay); + /* Retrieve the CDC generation with the given ID from local tables + * and start using it for CDC log writes if it's not obsolete. + * Precondition: the generation was committed using group 0 and locally applied. + */ + future<> handle_cdc_generation(cdc::generation_id_v2); + private: /* Retrieve the CDC generation which starts at the given timestamp (from a distributed table created for this purpose) * and start using it for CDC log writes if it's not obsolete. diff --git a/service/storage_service.cc b/service/storage_service.cc index 4d308c3ecd..eb1ae75123 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -416,7 +416,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s if (auto gen_id = _topology_state_machine._topology.current_cdc_generation_id) { slogger.debug("topology_state_load: current CDC generation ID: {}", *gen_id); - // TODO start using the generation for CDC writes + co_await cdc_gen_svc.handle_cdc_generation(*gen_id); } } From f081de7cc57a59e0fd68742c085badfe61d67d97 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 13:57:08 +0200 Subject: [PATCH 17/23] service: raft_group0_client: introduce `hold_read_apply_mutex` We'll use it in `storage_service` topology snapshot request handler. --- service/raft/group0_state_machine.cc | 6 +++--- service/raft/raft_group0_client.cc | 6 +++++- service/raft/raft_group0_client.hh | 3 ++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index 02f0f6a0db..cb7ea03a28 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -67,7 +67,7 @@ future<> group0_state_machine::apply(std::vector command) { cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id); slogger.trace("cmd.history_append: {}", cmd.history_append); - auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1); + auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(); if (cmd.prev_state_id) { auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id(); @@ -126,7 +126,7 @@ void group0_state_machine::drop_snapshot(raft::snapshot_id id) { future<> group0_state_machine::load_snapshot(raft::snapshot_id id) { // topology_state_load applies persisted state machine state into // memory and thus needs to be protected with apply mutex - auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1); + auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(); co_await _ss.topology_state_load(_cdc_gen_svc); _ss._topology_state_machine.event.signal(); } @@ -152,7 +152,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s // TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`) - auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1); + auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(); co_await _mm.merge_schema_from(addr, std::move(*cm)); diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc index 53f603c23a..8b8d14bad6 100644 --- a/service/raft/raft_group0_client.cc +++ b/service/raft/raft_group0_client.cc @@ -245,7 +245,7 @@ future raft_group0_client::start_operation(seastar::abort_source* // Take `_group0_read_apply_mutex` *after* read barrier. // Read barrier may wait for `group0_state_machine::apply` which also takes this mutex. - auto read_apply_holder = co_await get_units(_read_apply_mutex, 1); + auto read_apply_holder = co_await hold_read_apply_mutex(); auto observed_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id(); auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id); @@ -413,6 +413,10 @@ future<> raft_group0_client::wait_until_group0_upgraded(abort_source& as) { } } +future> raft_group0_client::hold_read_apply_mutex() { + return get_units(_read_apply_mutex, 1); +} + db::system_keyspace& raft_group0_client::sys_ks() { return _sys_ks; } diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh index e700fc4676..b290b4637a 100644 --- a/service/raft/raft_group0_client.hh +++ b/service/raft/raft_group0_client.hh @@ -64,7 +64,6 @@ public: // Singleton that exists only on shard zero. Used to post commands to group zero class raft_group0_client { - friend class group0_state_machine; service::raft_group_registry& _raft_gr; db::system_keyspace& _sys_ks; @@ -166,6 +165,8 @@ public: // Wait until group 0 upgrade enters the `use_post_raft_procedures` state. future<> wait_until_group0_upgraded(abort_source&); + future> hold_read_apply_mutex(); + db::system_keyspace& sys_ks(); // for test only From 3b26135227672d840ed624e626459c5ed94b7275 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 14:00:13 +0200 Subject: [PATCH 18/23] storage_service: hold group 0 apply mutex when reading topology snapshot This is a bugfix: we need to hold the mutex when loading topology data from tables, otherwise they might be concurrently modified by `group0_state_machine::apply` and the snapshot that we send won't make any sense. Also specify in comments that the lock must be held during `topology_transition`, `topology_state_load`, `merge_topology_snapshot`. --- service/storage_service.cc | 3 +++ service/storage_service.hh | 3 +++ 2 files changed, 6 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index eb1ae75123..975cd72972 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4822,6 +4822,9 @@ void storage_service::init_messaging_service(sharded& pr if (!ss._raft_topology_change_enabled) { co_return raft_topology_snapshot{}; } + // FIXME: make it an rwlock, here we only need to lock for reads, + // might be useful if multiple nodes are trying to pull concurrently. + auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(); auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); std::vector results; diff --git a/service/storage_service.hh b/service/storage_service.hh index fd40bdc877..3c7b7d3536 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -786,10 +786,13 @@ private: future<> update_topology_with_local_metadata(raft::server&); // This is called on all nodes for each new command received through raft + // raft_group0_client::_read_apply_mutex must be held future<> topology_transition(storage_proxy& proxy, cdc::generation_service&, gms::inet_address, std::vector); // load topology state machine snapshot into memory + // raft_group0_client::_read_apply_mutex must be held future<> topology_state_load(cdc::generation_service&); // Applies received raft snapshot to local state machine persistent storage + // raft_group0_client::_read_apply_mutex must be held future<> merge_topology_snapshot(raft_topology_snapshot snp); }; From 3d96bc5dba33facb0392fb74d0ba1f6d5bb8b285 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 15:21:20 +0200 Subject: [PATCH 19/23] db: system_keyspace: introduce `query_mutations` with range/slice There is a `query_mutations` function which loads the entire contents of a given table into memory. There was no function for e.g. loading just a single partition in the form of mutations. Introduce one. --- db/system_keyspace.cc | 12 ++++++++++++ db/system_keyspace.hh | 7 +++++++ 2 files changed, 19 insertions(+) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index c8893fb656..5fd5e51ea9 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3003,6 +3003,18 @@ system_keyspace::query_mutations(distributed& proxy, con .then([] (rpc::tuple>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); }); } +future>> +system_keyspace::query_mutations(distributed& proxy, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, query::clustering_range row_range) { + auto& db = proxy.local().get_db().local(); + auto schema = db.find_schema(ks_name, cf_name); + auto slice = partition_slice_builder(*schema) + .with_range(std::move(row_range)) + .build(); + auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); + return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), partition_range, db::no_timeout) + .then([] (rpc::tuple>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); }); +} + future> system_keyspace::query(distributed& proxy, const sstring& ks_name, const sstring& cf_name) { replica::database& db = proxy.local().get_db().local(); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index ca12f07a65..f6208d36ad 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -282,6 +282,13 @@ public: const sstring& ks_name, const sstring& cf_name); + future>> + static query_mutations(distributed& proxy, + const sstring& ks_name, + const sstring& cf_name, + const dht::partition_range& partition_range, + query::clustering_range row_ranges = query::clustering_range::make_open_ended_both_sides()); + // Returns all data from given system table. // Intended to be used by code which is not performance critical. static future> query(distributed& proxy, From 8afb15700b12e4aacd1195b3ccba0e890b7d9802 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 15:24:02 +0200 Subject: [PATCH 20/23] storage_service: include current CDC generation data in topology snapshots Note that we don't need to include earlier CDC generations, just the current (i.e. latest) one. We might observe a problem when nodes are being bootstrapped in quick succession - I left a FIXME describing the problem and possible solutions. --- idl/storage_service.idl.hh | 3 +- service/raft/group0_state_machine.cc | 2 +- service/storage_service.cc | 78 ++++++++++++++++++++++------ service/topology_state_machine.hh | 6 ++- 4 files changed, 70 insertions(+), 19 deletions(-) diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index c2cb480350..ecfd53ec48 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -25,7 +25,8 @@ namespace service { }; struct raft_topology_snapshot { - std::vector mutations; + std::vector topology_mutations; + std::optional cdc_generation_mutation; }; struct raft_topology_pull_params {}; diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index cb7ea03a28..22024ee4a1 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -156,7 +156,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s co_await _mm.merge_schema_from(addr, std::move(*cm)); - if (!topology_snp.mutations.empty()) { + if (!topology_snp.topology_mutations.empty()) { co_await _ss.merge_topology_snapshot(std::move(topology_snp)); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 975cd72972..5749284cf6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -443,12 +443,18 @@ future<> storage_service::topology_transition(storage_proxy& proxy, cdc::generat } future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) { - auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); std::vector muts; - muts.reserve(snp.mutations.size()); - boost::transform(snp.mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) { - return m.to_mutation(s); - }); + muts.reserve(snp.topology_mutations.size() + (snp.cdc_generation_mutation ? 1 : 0)); + { + auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + boost::transform(snp.topology_mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) { + return m.to_mutation(s); + }); + } + if (snp.cdc_generation_mutation) { + auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3); + muts.push_back(snp.cdc_generation_mutation->to_mutation(s)); + } co_await _db.local().apply(freeze(muts), db::no_timeout); } @@ -4822,17 +4828,57 @@ void storage_service::init_messaging_service(sharded& pr if (!ss._raft_topology_change_enabled) { co_return raft_topology_snapshot{}; } - // FIXME: make it an rwlock, here we only need to lock for reads, - // might be useful if multiple nodes are trying to pull concurrently. - auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(); - auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); - auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); - std::vector results; - results.reserve(rs->partitions().size()); - boost::range::transform(rs->partitions(), std::back_inserter(results), [s] (const partition& p) { - return canonical_mutation{p.mut().unfreeze(s)}; - }); - co_return raft_topology_snapshot{std::move(results)}; + + std::vector topology_mutations; + std::optional curr_cdc_gen_id; + { + // FIXME: make it an rwlock, here we only need to lock for reads, + // might be useful if multiple nodes are trying to pull concurrently. + auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(); + auto rs = co_await db::system_keyspace::query_mutations( + proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + topology_mutations.reserve(rs->partitions().size()); + boost::range::transform( + rs->partitions(), std::back_inserter(topology_mutations), [s] (const partition& p) { + return canonical_mutation{p.mut().unfreeze(s)}; + }); + + curr_cdc_gen_id = ss._topology_state_machine._topology.current_cdc_generation_id; + } + + std::optional cdc_generation_mutation; + if (curr_cdc_gen_id) { + // We don't need to fetch this data under group0 apply mutex, it's immutable. + // We only need to include the current CDC generation data in the snapshot, + // because nodes only load whatever `current_cdc_generation_id` points to in topology. + // + // FIXME: when we bootstrap nodes in quick succession, the timestamp of the newest CDC generation + // may be for some time larger than the clocks of our nodes. The last bootstrapped node + // will only receive the newest CDC generation and not earlier ones, so it will only be able + // to coordinate writes to CDC-enabled tables after its clock advances to reach the newest + // generation's timestamp. In other words, it may not be able to coordinate writes for some + // time after bootstrapping and drivers connecting to it will receive errors. + // To fix that, we could store in topology a small history of recent CDC generation IDs + // (garbage-collected with time) instead of just the last one, and load all of them. + // Alternatively, a node would wait for some time before switching to normal state. + auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3); + auto key = dht::decorate_key(*s, partition_key::from_singular(*s, curr_cdc_gen_id->id)); + auto partition_range = dht::partition_range::make_singular(key); + auto rs = co_await db::system_keyspace::query_mutations( + proxy, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3, partition_range); + if (rs->partitions().size() != 1) { + on_internal_error(slogger, format( + "pull_raft_topology_snapshot: expected a single partition in CDC generation query," + ", got {} (generation ID: {})", rs->partitions().size(), *curr_cdc_gen_id)); + } + cdc_generation_mutation.emplace(rs->partitions().begin()->mut().unfreeze(s)); + } + + co_return raft_topology_snapshot{ + .topology_mutations = std::move(topology_mutations), + .cdc_generation_mutation = std::move(cdc_generation_mutation), + }; }); }); } diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 5b894ae16c..9bf0d25072 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -101,7 +101,11 @@ struct topology { }; struct raft_topology_snapshot { - std::vector mutations; + // Mutations for the system.topology table. + std::vector topology_mutations; + + // Mutation for system.cdc_generations_v3, contains the current CDC generation data. + std::optional cdc_generation_mutation; }; struct raft_topology_pull_params { From d13a0b1930d388f6c557c732471382f1b17b28fb Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 18:08:49 +0200 Subject: [PATCH 21/23] cdc: generation_service: add legacy_ prefix for gossiper-based functions Most of the code in the service exists to handle gossiper-based topology changes. Name the functions appropriately and add a note in the comments. --- cdc/generation.cc | 34 +++++++++++++++++----------------- cdc/generation_service.hh | 37 ++++++++++++++++++++++++++----------- service/storage_service.cc | 4 ++-- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index 6682234905..24c263bcf7 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -347,7 +347,7 @@ db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milli return ts; } -future generation_service::make_new_generation(const std::unordered_set& bootstrap_tokens, bool add_delay) { +future generation_service::legacy_make_new_generation(const std::unordered_set& bootstrap_tokens, bool add_delay) { const locator::token_metadata_ptr tmptr = _token_metadata.get(); // Fetch sharding parameters for a node that owns vnode ending with this token @@ -747,7 +747,7 @@ future<> generation_service::after_join(std::optional&& star _joined = true; // Retrieve the latest CDC generation seen in gossip (if any). - co_await scan_cdc_generations(); + co_await legacy_scan_cdc_generations(); // Ensure that the new CDC stream description table has all required streams. // See the function's comment for details. @@ -778,7 +778,7 @@ future<> generation_service::on_change(gms::inet_address ep, gms::application_st auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value); cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id); - return handle_cdc_generation(gen_id); + return legacy_handle_cdc_generation(gen_id); } future<> generation_service::check_and_repair_cdc_streams() { @@ -880,13 +880,13 @@ future<> generation_service::check_and_repair_cdc_streams() { if (!should_regenerate) { if (latest != _gen_id) { - co_await do_handle_cdc_generation(*latest); + co_await legacy_do_handle_cdc_generation(*latest); } cdc_log.info("CDC generation {} does not need repair", latest); co_return; } - const auto new_gen_id = co_await make_new_generation({}, true); + const auto new_gen_id = co_await legacy_make_new_generation({}, true); // Need to artificially update our STATUS so other nodes handle the generation ID change // FIXME: after 0e0282cd nodes do not require a STATUS update to react to CDC generation changes. @@ -898,7 +898,7 @@ future<> generation_service::check_and_repair_cdc_streams() { cdc_log.error("Aborting CDC generation repair due to missing STATUS"); co_return; } - // Update _gen_id first, so that do_handle_cdc_generation (which will get called due to the status update) + // Update _gen_id first, so that legacy_do_handle_cdc_generation (which will get called due to the status update) // won't try to update the gossiper, which would result in a deadlock inside add_local_application_state _gen_id = new_gen_id; co_await _gossiper.add_local_application_state({ @@ -927,7 +927,7 @@ future<> generation_service::handle_cdc_generation(cdc::generation_id_v2 gen_id) } } -future<> generation_service::handle_cdc_generation(std::optional gen_id) { +future<> generation_service::legacy_handle_cdc_generation(std::optional gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); if (!gen_id) { @@ -953,10 +953,10 @@ future<> generation_service::handle_cdc_generation(std::optional generation_service::handle_cdc_generation(std::optional svc) -> future<> { @@ -979,7 +979,7 @@ void generation_service::async_handle_cdc_generation(cdc::generation_id gen_id) co_await sleep_abortable(std::chrono::seconds(5), svc->_abort_src); try { - bool using_this_gen = co_await svc->do_handle_cdc_generation_intercept_nonfatal_errors(gen_id); + bool using_this_gen = co_await svc->legacy_do_handle_cdc_generation_intercept_nonfatal_errors(gen_id); if (using_this_gen) { cdc_log.info("Starting to use generation {}", gen_id); co_await update_streams_description(gen_id, svc->get_sys_dist_ks(), @@ -1003,7 +1003,7 @@ void generation_service::async_handle_cdc_generation(cdc::generation_id gen_id) })(gen_id, shared_from_this())); } -future<> generation_service::scan_cdc_generations() { +future<> generation_service::legacy_scan_cdc_generations() { assert_shard_zero(__PRETTY_FUNCTION__); std::optional latest; @@ -1016,18 +1016,18 @@ future<> generation_service::scan_cdc_generations() { if (latest) { cdc_log.info("Latest generation seen during startup: {}", *latest); - co_await handle_cdc_generation(latest); + co_await legacy_handle_cdc_generation(latest); } else { cdc_log.info("No generation seen during startup."); } } -future generation_service::do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id gen_id) { +future generation_service::legacy_do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); - // Use futurize_invoke to catch all exceptions from do_handle_cdc_generation. + // Use futurize_invoke to catch all exceptions from legacy_do_handle_cdc_generation. return futurize_invoke([this, gen_id] { - return do_handle_cdc_generation(gen_id); + return legacy_do_handle_cdc_generation(gen_id); }).handle_exception([] (std::exception_ptr ep) -> future { try { std::rethrow_exception(ep); @@ -1047,7 +1047,7 @@ future generation_service::do_handle_cdc_generation_intercept_nonfatal_err }); } -future generation_service::do_handle_cdc_generation(cdc::generation_id gen_id) { +future generation_service::legacy_do_handle_cdc_generation(cdc::generation_id gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); auto sys_dist_ks = get_sys_dist_ks(); diff --git a/cdc/generation_service.hh b/cdc/generation_service.hh index 40b97f4e78..5d2a342945 100644 --- a/cdc/generation_service.hh +++ b/cdc/generation_service.hh @@ -132,8 +132,11 @@ public: * so that other nodes learn about the generation before their clocks cross the generation's timestamp * (not guaranteed in the current implementation, but expected to be the common case; * we assume that `ring_delay` is enough for other nodes to learn about the new generation). + * + * Legacy: used for gossiper-based topology changes. */ - future make_new_generation(const std::unordered_set& bootstrap_tokens, bool add_delay); + future legacy_make_new_generation( + const std::unordered_set& bootstrap_tokens, bool add_delay); /* Retrieve the CDC generation with the given ID from local tables * and start using it for CDC log writes if it's not obsolete. @@ -144,27 +147,39 @@ public: private: /* Retrieve the CDC generation which starts at the given timestamp (from a distributed table created for this purpose) * and start using it for CDC log writes if it's not obsolete. + * + * Legacy: used for gossiper-based topology changes. */ - future<> handle_cdc_generation(std::optional); + future<> legacy_handle_cdc_generation(std::optional); - /* If `handle_cdc_generation` fails, it schedules an asynchronous retry in the background - * using `async_handle_cdc_generation`. + /* If `legacy_handle_cdc_generation` fails, it schedules an asynchronous retry in the background + * using `legacy_async_handle_cdc_generation`. + * + * Legacy: used for gossiper-based topology changes. */ - void async_handle_cdc_generation(cdc::generation_id); + void legacy_async_handle_cdc_generation(cdc::generation_id); - /* Wrapper around `do_handle_cdc_generation` which intercepts timeout/unavailability exceptions. - * Returns: do_handle_cdc_generation(ts). */ - future do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id); + /* Wrapper around `legacy_do_handle_cdc_generation` which intercepts timeout/unavailability exceptions. + * Returns: legacy_do_handle_cdc_generation(ts). + * + * Legacy: used for gossiper-based topology changes. + */ + future legacy_do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id); /* Returns `true` iff we started using the generation (it was not obsolete or already known), - * which means that this node might write some CDC log entries using streams from this generation. */ - future do_handle_cdc_generation(cdc::generation_id); + * which means that this node might write some CDC log entries using streams from this generation. + * + * Legacy: used for gossiper-based topology changes. + */ + future legacy_do_handle_cdc_generation(cdc::generation_id); /* Scan CDC generation timestamps gossiped by other nodes and retrieve the latest one. * This function should be called once at the end of the node startup procedure * (after the node is started and running normally, it will retrieve generations on gossip events instead). + * + * Legacy: used for gossiper-based topology changes. */ - future<> scan_cdc_generations(); + future<> legacy_scan_cdc_generations(); /* generation_service code might be racing with system_distributed_keyspace deinitialization * (the deinitialization order is broken). diff --git a/service/storage_service.cc b/service/storage_service.cc index 5749284cf6..53a59dcc69 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1607,7 +1607,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi && (!_sys_ks.local().bootstrap_complete() || cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) { try { - cdc_gen_id = co_await cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node()); + cdc_gen_id = co_await cdc_gen_service.legacy_make_new_generation(bootstrap_tokens, !is_first_node()); } catch (...) { cdc_log.warn( "Could not create a new CDC generation: {}. This may make it impossible to use CDC or cause performance problems." @@ -1758,7 +1758,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st // We don't do any other generation switches (unless we crash before complecting bootstrap). assert(!cdc_gen_id); - cdc_gen_id = cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node()).get0(); + cdc_gen_id = cdc_gen_service.legacy_make_new_generation(bootstrap_tokens, !is_first_node()).get0(); if (!bootstrap_rbno) { // When is_repair_based_node_ops_enabled is true, the bootstrap node From 16880015851be00a75630fda1f8ff569b687d944 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 30 Mar 2023 18:10:33 +0200 Subject: [PATCH 22/23] cdc: generation_service: add a FIXME --- cdc/generation.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/generation.cc b/cdc/generation.cc index 24c263bcf7..85f56fbd67 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -782,6 +782,7 @@ future<> generation_service::on_change(gms::inet_address ep, gms::application_st } future<> generation_service::check_and_repair_cdc_streams() { + // FIXME: support Raft group 0-based topology changes if (!_joined) { throw std::runtime_error("check_and_repair_cdc_streams: node not initialized yet"); } From 88aff50e8b95ab9806e41edcd65ca1f3035e2abb Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 6 Apr 2023 12:09:58 +0200 Subject: [PATCH 23/23] docs: cdc: describe generation changes using group 0 topology coordinator Update the `Generation switching` section: most of the existing description landed in `Gossiper-based topology changes` subsection, and a new subsection was added to describe Raft group 0 based topology changes. Marked as WIP - we expect further development in this area soon. The existing gossiper-based description was also updated a bit. --- docs/dev/cdc.md | 54 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/docs/dev/cdc.md b/docs/dev/cdc.md index 0a5e4d33ec..42f7307dfb 100644 --- a/docs/dev/cdc.md +++ b/docs/dev/cdc.md @@ -105,7 +105,9 @@ Shard-colocation is an optimization. ### Generation switching -Having different generations operating at different points in time is necessary to maintain colocation in presence of topology changes. When a new node joins the cluster it modifies the token ring by refining existing vnodes into smaller vnodes. But before it does it, it will introduce a new CDC generation whose token ranges refine those new (smaller) vnodes (which means they also refine the old vnodes; that way writes will be colocated on both old and new replicas). +Having different generations operating at different points in time is necessary to maintain colocation in presence of topology changes. When a new node joins the cluster we modify the token ring by refining existing vnodes into smaller vnodes. But before we do it, we introduce a new CDC generation whose token ranges refine those new (smaller) vnodes (which means they also refine the old vnodes; that way writes will be colocated on both old and new replicas). + +#### Gossiper-based topology changes The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::topology_description_generator` class. It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows: ``` @@ -124,12 +126,10 @@ Note that constructing the `cdc::topology_description` (which describes the gene The table lies in the `system_distributed_everywhere` keyspace which is replicated using the `Everywhere` strategy, meaning that the generation data is replicated by every node. The insert is performed using `CL=ALL`, allowing nodes to read the data locally later using `CL=ONE`. -The timestamp for the new generation is chosen after the data is inserted to the table. To choose the timestamp, the node takes its local time and adds a minute or two so that other nodes have a chance to learn about this generation before it starts operating. Thus, the node makes the following assumptions: +The timestamp for the new generation is chosen after the data is inserted to the table. To choose the timestamp, the node takes its local time and adds `2 * ring_delay` (a minute by default) so that other nodes have a chance to learn about this generation before it starts operating. Thus, the node makes the following assumptions: 1. its clock is not too desynchronized with other nodes' clocks, 2. the cluster is not partitioned. -Future patches will make the solution safe by using a two-phase-commit approach. - The timestamp and the randomly generated UUID together form a "generation ID" which uniquely identifies this generation and can be used to retrieve its data from the table and to learn when it starts operating. Next, the node starts gossiping the ID of the new generation together with its set of chosen tokens and status: @@ -152,12 +152,54 @@ CREATE TABLE system.cdc_local ( The timestamp and UUID forming the generation ID are kept under the `"cdc_local"` key in the `streams_timestamp` and `uuid` columns, respectively. When other nodes learn about the generation, they'll extract it from the `cdc_generation_descriptions_v2` table and insert it into their set of known CDC generations using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`. -Notice that nodes learn about the generation together with the new node's tokens. When they learn about its tokens they'll immediately start sending writes to the new node (in the case of bootstrapping, it will become a pending replica). But the old generation will still be operating for a minute or two. Thus colocation will be lost for a while. This problem will be fixed when the two-phase-commit approach is implemented. +Notice that nodes learn about the generation together with the new node's tokens. When they learn about its tokens they'll immediately start sending writes to the new node (in the case of bootstrapping, it will become a pending replica). But the old generation will still be operating for `~ 2 * ring_delay`; during this short period of time we don't have complete colocation of CDC log writes with base writes (one replica may be different). We're not able to prevent a node learning about a new generation too late due to a network partition: if gossip doesn't reach the node in time, some writes might be sent to the wrong (old) generation. However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_generation_descriptions_v2`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (it knows only the ID, in particular it knows the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation). Thus we give up availability for safety. This likely won't happen if the administrator ensures that the cluster is not partitioned before bootstrapping a new node. This problem will also be mitigated with a future patch. +#### Raft group 0 based topology changes (WIP) + +When a node requests the cluster to join, the topology coordinator chooses tokens for the new node. This splits vnodes in the token ring into smaller vnodes. The coordinator then creates a new `cdc::topology_description` which refines those smaller vnodes. This is node using the `cdc::topology_description_generator` class. + +The generation data described by `cdc::topology_description` is then translated into mutations and committed to group 0 using Raft commands. When a node applies these commands (every node in the cluster eventually does that, being a member of group 0), it writes the data into a local table `system.cdc_generations_v3`. The table has the following schema: +``` +CREATE TABLE system.cdc_generations_v3 ( + id uuid, + range_end bigint, + ignore_msb tinyint, + num_ranges int static, + streams frozen>, + PRIMARY KEY (id, range_end) +) ... +``` + +The table's partition key is the `id uuid` column. The UUID used to insert a new generation into this table is randomly generated by the coordinator. + +The committed commands also update the `system.topology` table, storing the UUID in the `new_cdc_generation_data_uuid` column in the row which describes the joining node. Thanks to this, if the coordinator manages to insert the data but then fails, the next coordinator can resume from where the previous coordinator left off - using `new_cdc_generation_data_uuid` to continue with the generation switch. + +Note that the `cdc::topology_description` contains the stream IDs of the generation and describes the generation's mapping, so constructing and inserting it into this table does not require knowing the generation's timestamp. + +The coordinator then performs a global barrier, ensuring that every node managed to store the data locally before proceeding. + +Once the barrier finishes, the coordinator picks a timestamp for the new generation. To choose the timestamp, it takes its local time and adds `2 * ring_delay` (a minute by default) so that other nodes have a chance to learn about this timestamp before the generation starts operating (i.e. before their clocks cross the timestamp). Thus we make the following assumptions: +1. its clock is not too desynchronized with other nodes' clocks, +2. the cluster is not partitioned. + +FIXME: consider implementing a safe algorithm (using separate 'prepare' phase before committing the new generation timestamp). + +The timestamp and the randomly generated UUID together form a "generation ID" which uniquely identifies this generation and can be used to retrieve its data from the table and to learn when it starts operating. + +The coordinator commits the generation ID with a group 0 command which updates the static columns `current_cdc_generation_uuid` and `current_cdc_generation_timestamp` in the `system.topology` table. Each node, when applying this command, learns about the new CDC generation ID (the `storage_service::topology_state_load` function calls `cdc::generation_service::handle_cdc_generation`), retrieves the generation data from `system.cdc_generations_v3` using the UUID key, and inserts it into its in-memory set of known CDC generations using `cdc::metadata::insert(...)`. + +Nodes learn about the new generation together with the new node's tokens. When they learn about its tokens, they immediately start sending writes to the new node (it becomes a pending replica). But the old generation will still be operating for `~ 2 * ring_delay`; during this short period of time we don't have complete colocation of CDC log writes with base writes (one replica may be different). + +We're not able to prevent a node learning about a new generation too late due to a network partition: if the timestamp is not replicated to some node in time, some writes might be sent to the wrong (old) generation. (See FIXME above.) + +After committing the generation ID, the topology coordinator publishes the generation data to user-facing description tables (`system_distributed.cdc_streams_descriptions_v2` and `system_distributed.cdc_generation_timestamps`). + +#### Generation switching: other notes + Due to the need of maintaining colocation we don't allow the client to send writes with arbitrary timestamps. Suppose that a write is requested and the write coordinator's local clock has time `C` and the generation operating at time `C` has timestamp `T` (`T <= C`). Then we only allow the write if its timestamp is in the interval [`T`, `C + generation_leeway`), where `generation_leeway` is a small time-inteval constant (e.g. 5 seconds). Reason: we cannot allow writes before `T`, because they belong to the old generation whose token ranges might no longer refine the current vnodes, so the corresponding log write would not necessarily be colocated with the base write. We also cannot allow writes too far "into the future" because we don't know what generation will be operating at that time (the node which will introduce this generation might not have joined yet). But, as mentioned before, we assume that we'll learn about the next generation in time. Again --- the need for this assumption will be gone in a future patch. @@ -209,6 +251,8 @@ Note that the first phase of inserting stream IDs may fail in the middle; in tha ### Internal generation descriptions table V1 and upgrade procedure +FIXME: update this section once we implement upgrades for group 0 topology coordinator. The coordinator will have to create a new CDC generation when it's first enabled. + As the name suggests, `cdc_generation_descriptions_v2` is the second version of the generation description table. The previous schema was: ``` CREATE TABLE system_distributed.cdc_generation_descriptions (