Merge 'Manage CDC generations when bootstrapping nodes using Raft Group 0 topology coordinator' from Kamil Braun
Introduce a new table `CDC_GENERATIONS_V3` (`system.cdc_generations_v3`). The table schema is a copy-paste of the `CDC_GENERATIONS_V2` schema. The difference is that V2 lives in `system_distributed_keyspace` and writes to it are distributed using regular `storage_proxy` replication mechanisms based on the token ring. The V3 table lives in `system_keyspace` and any mutations written to it will go through group 0. Extend the `TOPOLOGY` schema with new columns: - `new_cdc_generation_data_uuid` will be stored as part of a bootstrapping node's `ring_slice`, it stores UUID of a newly introduced CDC generation which is used as partition key for the `CDC_GENERATIONS_V3` table to access this new generation's data. It's a regular column, meaning that every row (corresponding to a node) will have its own. - `current_cdc_generation_uuid` and `current_cdc_generation_timestamp` together form the ID of the newest CDC generation in the cluster. (the uuid is the data key for `CDC_GENERATIONS_V3`, the timestamp is when the CDC generation starts operating). Those are static columns since there's a single newest CDC generation. When topology coordinator handles a request for node to join, calculate a new CDC generation using the bootstrapping node's tokens, translate it to mutation format, and insert this mutation to the CDC_GENERATIONS_V3 table through group 0 at the same time we assign tokens to the node in Raft topology. The partition key for this data is stored in the bootstrapping node's `ring_slice`. After inserting new CDC generation data , we need to pick a timestamp for this generation and commit it, telling all nodes in the cluster to start using the generation for CDC log writes once their clocks cross that timestamp. We introduce a separate step to the bootstrap saga, before `write_both_read_old`, called `commit_cdc_generation`. In this step, the coordinator takes the `new_cdc_generation_data_uuid` stored in a bootstrapping node's `ring_slice` - which serves as the key to the table where the CDC generation data is stored - and combines it with a timestamp which it generates a bit into the future (as in old gossiper-based code, we use 2 * ring_delay, by default 1 minute). This gives us a CDC generation ID which we commit into the topology state as the `current_cdc_generation_id` while switching the saga to the next step, `write_both_read_old`. Once a new CDC generation is committed to the cluster by the topology coordinator, we also need to publish it to the user-facing description tables so CDC applications know which streams to read from. This uses regular distributed table writes underneath (tables living in the `system_distributed` keyspace) so it requires `token_metadata` to be nonempty. We need a hack for the case of bootstrapping the first node in the cluster - turning the tokens into normal tokens earlier in the procedure in `token_metadata`, but this is fine for the single-node case since no streaming is happening. When a node notices that a new CDC generation was introduced in `storage_service::topology_state_load`, it updates its internal data structures that are used when coordinating writes to CDC log tables. We include the current CDC generation data in topology snapshot transfers. Some fixes and refactors included. Closes #13385 * github.com:scylladb/scylladb: docs: cdc: describe generation changes using group 0 topology coordinator cdc: generation_service: add a FIXME cdc: generation_service: add legacy_ prefix for gossiper-based functions storage_service: include current CDC generation data in topology snapshots db: system_keyspace: introduce `query_mutations` with range/slice storage_service: hold group 0 apply mutex when reading topology snapshot service: raft_group0_client: introduce `hold_read_apply_mutex` storage_service: use CDC generations introduced by Raft topology raft topology: publish new CDC generation to the user description tables raft topology: commit a new CDC generation on node bootstrap raft topology: create new CDC generation data during node bootstrap service: topology_state_machine: make topology::find const db: system_keyspace: small refactor of `load_topology_state` cdc: generation: extract pure parts of `make_new_generation` outside db: system_keyspace: add storage for CDC generations managed by group 0 service: topology_state_machine: better error checking for state name (de)serialization service: raft: plumbing `cdc::generation_service&` cdc: generation: `get_cdc_generation_mutations`: take timestamp as parameter cdc: generation: make `topology_description_generator::get_sharding_info` a parameter sys_dist_ks: make `get_cdc_generation_mutations` public sys_dist_ks: move find_schema outside `get_cdc_generation_mutations` sys_dist_ks: move mutation size threshold calculation outside `get_cdc_generation_mutations` service/raft: group0_state_machine: signal topology state machine in `load_snapshot`
This commit is contained in:
@@ -20,6 +20,7 @@
|
|||||||
#include "db/system_distributed_keyspace.hh"
|
#include "db/system_distributed_keyspace.hh"
|
||||||
#include "dht/token-sharding.hh"
|
#include "dht/token-sharding.hh"
|
||||||
#include "locator/token_metadata.hh"
|
#include "locator/token_metadata.hh"
|
||||||
|
#include "types/set.hh"
|
||||||
#include "gms/application_state.hh"
|
#include "gms/application_state.hh"
|
||||||
#include "gms/inet_address.hh"
|
#include "gms/inet_address.hh"
|
||||||
#include "gms/gossiper.hh"
|
#include "gms/gossiper.hh"
|
||||||
@@ -42,6 +43,10 @@ static unsigned get_sharding_ignore_msb(const gms::inet_address& endpoint, const
|
|||||||
return ep_state ? std::stoi(ep_state->value) : 0;
|
return ep_state ? std::stoi(ep_state->value) : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace db {
|
||||||
|
extern thread_local data_type cdc_streams_set_type;
|
||||||
|
}
|
||||||
|
|
||||||
namespace cdc {
|
namespace cdc {
|
||||||
|
|
||||||
extern const api::timestamp_clock::duration generation_leeway =
|
extern const api::timestamp_clock::duration generation_leeway =
|
||||||
@@ -179,10 +184,9 @@ static std::vector<stream_id> create_stream_ids(
|
|||||||
}
|
}
|
||||||
|
|
||||||
class topology_description_generator final {
|
class topology_description_generator final {
|
||||||
unsigned _ignore_msb_bits;
|
|
||||||
const std::unordered_set<dht::token>& _bootstrap_tokens;
|
const std::unordered_set<dht::token>& _bootstrap_tokens;
|
||||||
const locator::token_metadata_ptr _tmptr;
|
const locator::token_metadata_ptr _tmptr;
|
||||||
const gms::gossiper& _gossiper;
|
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& _get_sharding_info;
|
||||||
|
|
||||||
// Compute a set of tokens that split the token ring into vnodes
|
// Compute a set of tokens that split the token ring into vnodes
|
||||||
auto get_tokens() const {
|
auto get_tokens() const {
|
||||||
@@ -195,28 +199,12 @@ class topology_description_generator final {
|
|||||||
return tokens;
|
return tokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch sharding parameters for a node that owns vnode ending with this.end
|
|
||||||
// Returns <shard_count, ignore_msb> pair.
|
|
||||||
std::pair<size_t, uint8_t> get_sharding_info(dht::token end) const {
|
|
||||||
if (_bootstrap_tokens.contains(end)) {
|
|
||||||
return {smp::count, _ignore_msb_bits};
|
|
||||||
} else {
|
|
||||||
auto endpoint = _tmptr->get_endpoint(end);
|
|
||||||
if (!endpoint) {
|
|
||||||
throw std::runtime_error(
|
|
||||||
format("Can't find endpoint for token {}", end));
|
|
||||||
}
|
|
||||||
auto sc = get_shard_count(*endpoint, _gossiper);
|
|
||||||
return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
token_range_description create_description(size_t index, dht::token start, dht::token end) const {
|
token_range_description create_description(size_t index, dht::token start, dht::token end) const {
|
||||||
token_range_description desc;
|
token_range_description desc;
|
||||||
|
|
||||||
desc.token_range_end = end;
|
desc.token_range_end = end;
|
||||||
|
|
||||||
auto [shard_count, ignore_msb] = get_sharding_info(end);
|
auto [shard_count, ignore_msb] = _get_sharding_info(end);
|
||||||
desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb);
|
desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb);
|
||||||
desc.sharding_ignore_msb = ignore_msb;
|
desc.sharding_ignore_msb = ignore_msb;
|
||||||
|
|
||||||
@@ -224,14 +212,14 @@ class topology_description_generator final {
|
|||||||
}
|
}
|
||||||
public:
|
public:
|
||||||
topology_description_generator(
|
topology_description_generator(
|
||||||
unsigned ignore_msb_bits,
|
|
||||||
const std::unordered_set<dht::token>& bootstrap_tokens,
|
const std::unordered_set<dht::token>& bootstrap_tokens,
|
||||||
const locator::token_metadata_ptr tmptr,
|
const locator::token_metadata_ptr tmptr,
|
||||||
const gms::gossiper& gossiper)
|
// This function must return sharding parameters for a node that owns the vnode ending with
|
||||||
: _ignore_msb_bits(ignore_msb_bits)
|
// the given token. Returns <shard_count, ignore_msb> pair.
|
||||||
, _bootstrap_tokens(bootstrap_tokens)
|
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info)
|
||||||
|
: _bootstrap_tokens(bootstrap_tokens)
|
||||||
, _tmptr(std::move(tmptr))
|
, _tmptr(std::move(tmptr))
|
||||||
, _gossiper(gossiper)
|
, _get_sharding_info(get_sharding_info)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -274,6 +262,39 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
|
||||||
|
schema_ptr s,
|
||||||
|
utils::UUID id,
|
||||||
|
const cdc::topology_description& desc,
|
||||||
|
size_t mutation_size_threshold,
|
||||||
|
api::timestamp_type ts) {
|
||||||
|
utils::chunked_vector<mutation> res;
|
||||||
|
res.emplace_back(s, partition_key::from_singular(*s, id));
|
||||||
|
res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts);
|
||||||
|
size_t size_estimate = 0;
|
||||||
|
for (auto& e : desc.entries()) {
|
||||||
|
if (size_estimate >= mutation_size_threshold) {
|
||||||
|
res.emplace_back(s, partition_key::from_singular(*s, id));
|
||||||
|
size_estimate = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
set_type_impl::native_type streams;
|
||||||
|
streams.reserve(e.streams.size());
|
||||||
|
for (auto& stream: e.streams) {
|
||||||
|
streams.push_back(data_value(stream.to_bytes()));
|
||||||
|
}
|
||||||
|
|
||||||
|
size_estimate += e.streams.size() * 20;
|
||||||
|
auto ckey = clustering_key::from_singular(*s, dht::token::to_int64(e.token_range_end));
|
||||||
|
res.back().set_cell(ckey, to_bytes("streams"), make_set_value(db::cdc_streams_set_type, std::move(streams)), ts);
|
||||||
|
res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts);
|
||||||
|
|
||||||
|
co_await coroutine::maybe_yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
co_return res;
|
||||||
|
}
|
||||||
|
|
||||||
// non-static for testing
|
// non-static for testing
|
||||||
size_t limit_of_streams_in_topology_description() {
|
size_t limit_of_streams_in_topology_description() {
|
||||||
// Each stream takes 16B and we don't want to exceed 4MB so we can have
|
// Each stream takes 16B and we don't want to exceed 4MB so we can have
|
||||||
@@ -306,36 +327,57 @@ topology_description limit_number_of_streams_if_needed(topology_description&& de
|
|||||||
return topology_description(std::move(entries));
|
return topology_description(std::move(entries));
|
||||||
}
|
}
|
||||||
|
|
||||||
future<cdc::generation_id> generation_service::make_new_generation(const std::unordered_set<dht::token>& bootstrap_tokens, bool add_delay) {
|
std::pair<utils::UUID, cdc::topology_description> make_new_generation_data(
|
||||||
|
const std::unordered_set<dht::token>& bootstrap_tokens,
|
||||||
|
const noncopyable_function<std::pair<size_t, uint8_t>(dht::token)>& get_sharding_info,
|
||||||
|
const locator::token_metadata_ptr tmptr) {
|
||||||
|
auto gen = topology_description_generator(bootstrap_tokens, tmptr, get_sharding_info).generate();
|
||||||
|
auto uuid = utils::make_random_uuid();
|
||||||
|
return {uuid, std::move(gen)};
|
||||||
|
}
|
||||||
|
|
||||||
|
db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay) {
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
const locator::token_metadata_ptr tmptr = _token_metadata.get();
|
auto ts = db_clock::now();
|
||||||
auto gen = topology_description_generator(_cfg.ignore_msb_bits, bootstrap_tokens, tmptr, _gossiper).generate();
|
if (add_delay && ring_delay != 0ms) {
|
||||||
|
ts += 2 * ring_delay + duration_cast<milliseconds>(generation_leeway);
|
||||||
|
}
|
||||||
|
return ts;
|
||||||
|
}
|
||||||
|
|
||||||
// We need to call this as late in the procedure as possible.
|
future<cdc::generation_id> generation_service::legacy_make_new_generation(const std::unordered_set<dht::token>& bootstrap_tokens, bool add_delay) {
|
||||||
// In the V2 format we can do this after inserting the generation data into the table;
|
const locator::token_metadata_ptr tmptr = _token_metadata.get();
|
||||||
// in the V1 format we must do it before (because the timestamp is the partition key in the V1 format).
|
|
||||||
auto new_generation_timestamp = [add_delay, ring_delay = _cfg.ring_delay] {
|
// Fetch sharding parameters for a node that owns vnode ending with this token
|
||||||
auto ts = db_clock::now();
|
// using gossiped application states.
|
||||||
if (add_delay && ring_delay != 0ms) {
|
auto get_sharding_info = [&] (dht::token end) -> std::pair<size_t, uint8_t> {
|
||||||
ts += 2 * ring_delay + duration_cast<milliseconds>(generation_leeway);
|
if (bootstrap_tokens.contains(end)) {
|
||||||
|
return {smp::count, _cfg.ignore_msb_bits};
|
||||||
|
} else {
|
||||||
|
auto endpoint = tmptr->get_endpoint(end);
|
||||||
|
if (!endpoint) {
|
||||||
|
throw std::runtime_error(
|
||||||
|
format("Can't find endpoint for token {}", end));
|
||||||
|
}
|
||||||
|
auto sc = get_shard_count(*endpoint, _gossiper);
|
||||||
|
return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)};
|
||||||
}
|
}
|
||||||
return ts;
|
|
||||||
};
|
};
|
||||||
|
auto [uuid, gen] = make_new_generation_data(bootstrap_tokens, get_sharding_info, tmptr);
|
||||||
|
|
||||||
// Our caller should ensure that there are normal tokens in the token ring.
|
// Our caller should ensure that there are normal tokens in the token ring.
|
||||||
auto normal_token_owners = tmptr->count_normal_token_owners();
|
auto normal_token_owners = tmptr->count_normal_token_owners();
|
||||||
assert(normal_token_owners);
|
assert(normal_token_owners);
|
||||||
|
|
||||||
if (_feature_service.cdc_generations_v2) {
|
if (_feature_service.cdc_generations_v2) {
|
||||||
auto uuid = utils::make_random_uuid();
|
|
||||||
cdc_log.info("Inserting new generation data at UUID {}", uuid);
|
cdc_log.info("Inserting new generation data at UUID {}", uuid);
|
||||||
// This may take a while.
|
// This may take a while.
|
||||||
co_await _sys_dist_ks.local().insert_cdc_generation(uuid, gen, { normal_token_owners });
|
co_await _sys_dist_ks.local().insert_cdc_generation(uuid, gen, { normal_token_owners });
|
||||||
|
|
||||||
// Begin the race.
|
// Begin the race.
|
||||||
cdc::generation_id_v2 gen_id{new_generation_timestamp(), uuid};
|
cdc::generation_id_v2 gen_id{new_generation_timestamp(add_delay, _cfg.ring_delay), uuid};
|
||||||
|
|
||||||
cdc_log.info("New CDC generation: {}", gen_id);
|
cdc_log.info("New CDC generation: {}", gen_id);
|
||||||
co_return gen_id;
|
co_return gen_id;
|
||||||
@@ -364,7 +406,7 @@ future<cdc::generation_id> generation_service::make_new_generation(const std::un
|
|||||||
" a new node or running the checkAndRepairCdcStreams nodetool command.");
|
" a new node or running the checkAndRepairCdcStreams nodetool command.");
|
||||||
|
|
||||||
// Begin the race.
|
// Begin the race.
|
||||||
cdc::generation_id_v1 gen_id{new_generation_timestamp()};
|
cdc::generation_id_v1 gen_id{new_generation_timestamp(add_delay, _cfg.ring_delay)};
|
||||||
|
|
||||||
co_await _sys_dist_ks.local().insert_cdc_topology_description(gen_id, std::move(gen), { normal_token_owners });
|
co_await _sys_dist_ks.local().insert_cdc_topology_description(gen_id, std::move(gen), { normal_token_owners });
|
||||||
|
|
||||||
@@ -705,7 +747,7 @@ future<> generation_service::after_join(std::optional<cdc::generation_id>&& star
|
|||||||
_joined = true;
|
_joined = true;
|
||||||
|
|
||||||
// Retrieve the latest CDC generation seen in gossip (if any).
|
// Retrieve the latest CDC generation seen in gossip (if any).
|
||||||
co_await scan_cdc_generations();
|
co_await legacy_scan_cdc_generations();
|
||||||
|
|
||||||
// Ensure that the new CDC stream description table has all required streams.
|
// Ensure that the new CDC stream description table has all required streams.
|
||||||
// See the function's comment for details.
|
// See the function's comment for details.
|
||||||
@@ -736,10 +778,11 @@ future<> generation_service::on_change(gms::inet_address ep, gms::application_st
|
|||||||
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value);
|
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value);
|
||||||
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
|
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
|
||||||
|
|
||||||
return handle_cdc_generation(gen_id);
|
return legacy_handle_cdc_generation(gen_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> generation_service::check_and_repair_cdc_streams() {
|
future<> generation_service::check_and_repair_cdc_streams() {
|
||||||
|
// FIXME: support Raft group 0-based topology changes
|
||||||
if (!_joined) {
|
if (!_joined) {
|
||||||
throw std::runtime_error("check_and_repair_cdc_streams: node not initialized yet");
|
throw std::runtime_error("check_and_repair_cdc_streams: node not initialized yet");
|
||||||
}
|
}
|
||||||
@@ -838,13 +881,13 @@ future<> generation_service::check_and_repair_cdc_streams() {
|
|||||||
|
|
||||||
if (!should_regenerate) {
|
if (!should_regenerate) {
|
||||||
if (latest != _gen_id) {
|
if (latest != _gen_id) {
|
||||||
co_await do_handle_cdc_generation(*latest);
|
co_await legacy_do_handle_cdc_generation(*latest);
|
||||||
}
|
}
|
||||||
cdc_log.info("CDC generation {} does not need repair", latest);
|
cdc_log.info("CDC generation {} does not need repair", latest);
|
||||||
co_return;
|
co_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto new_gen_id = co_await make_new_generation({}, true);
|
const auto new_gen_id = co_await legacy_make_new_generation({}, true);
|
||||||
|
|
||||||
// Need to artificially update our STATUS so other nodes handle the generation ID change
|
// Need to artificially update our STATUS so other nodes handle the generation ID change
|
||||||
// FIXME: after 0e0282cd nodes do not require a STATUS update to react to CDC generation changes.
|
// FIXME: after 0e0282cd nodes do not require a STATUS update to react to CDC generation changes.
|
||||||
@@ -856,7 +899,7 @@ future<> generation_service::check_and_repair_cdc_streams() {
|
|||||||
cdc_log.error("Aborting CDC generation repair due to missing STATUS");
|
cdc_log.error("Aborting CDC generation repair due to missing STATUS");
|
||||||
co_return;
|
co_return;
|
||||||
}
|
}
|
||||||
// Update _gen_id first, so that do_handle_cdc_generation (which will get called due to the status update)
|
// Update _gen_id first, so that legacy_do_handle_cdc_generation (which will get called due to the status update)
|
||||||
// won't try to update the gossiper, which would result in a deadlock inside add_local_application_state
|
// won't try to update the gossiper, which would result in a deadlock inside add_local_application_state
|
||||||
_gen_id = new_gen_id;
|
_gen_id = new_gen_id;
|
||||||
co_await _gossiper.add_local_application_state({
|
co_await _gossiper.add_local_application_state({
|
||||||
@@ -866,7 +909,26 @@ future<> generation_service::check_and_repair_cdc_streams() {
|
|||||||
co_await _sys_ks.local().update_cdc_generation_id(new_gen_id);
|
co_await _sys_ks.local().update_cdc_generation_id(new_gen_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> generation_service::handle_cdc_generation(std::optional<cdc::generation_id> gen_id) {
|
future<> generation_service::handle_cdc_generation(cdc::generation_id_v2 gen_id) {
|
||||||
|
auto ts = get_ts(gen_id);
|
||||||
|
if (co_await container().map_reduce(and_reducer(), [ts] (generation_service& svc) {
|
||||||
|
return !svc._cdc_metadata.prepare(ts);
|
||||||
|
})) {
|
||||||
|
co_return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto gen_data = co_await _sys_ks.local().read_cdc_generation(gen_id.id);
|
||||||
|
|
||||||
|
bool using_this_gen = co_await container().map_reduce(or_reducer(), [ts, &gen_data] (generation_service& svc) {
|
||||||
|
return svc._cdc_metadata.insert(ts, cdc::topology_description{gen_data});
|
||||||
|
});
|
||||||
|
|
||||||
|
if (using_this_gen) {
|
||||||
|
cdc_log.info("Started using generation {}.", gen_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
future<> generation_service::legacy_handle_cdc_generation(std::optional<cdc::generation_id> gen_id) {
|
||||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
if (!gen_id) {
|
if (!gen_id) {
|
||||||
@@ -892,10 +954,10 @@ future<> generation_service::handle_cdc_generation(std::optional<cdc::generation
|
|||||||
|
|
||||||
bool using_this_gen = false;
|
bool using_this_gen = false;
|
||||||
try {
|
try {
|
||||||
using_this_gen = co_await do_handle_cdc_generation_intercept_nonfatal_errors(*gen_id);
|
using_this_gen = co_await legacy_do_handle_cdc_generation_intercept_nonfatal_errors(*gen_id);
|
||||||
} catch (generation_handling_nonfatal_exception& e) {
|
} catch (generation_handling_nonfatal_exception& e) {
|
||||||
cdc_log.warn(could_not_retrieve_msg_template, gen_id, e.what(), "retrying in the background");
|
cdc_log.warn(could_not_retrieve_msg_template, gen_id, e.what(), "retrying in the background");
|
||||||
async_handle_cdc_generation(*gen_id);
|
legacy_async_handle_cdc_generation(*gen_id);
|
||||||
co_return;
|
co_return;
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
cdc_log.error(could_not_retrieve_msg_template, gen_id, std::current_exception(), "not retrying");
|
cdc_log.error(could_not_retrieve_msg_template, gen_id, std::current_exception(), "not retrying");
|
||||||
@@ -910,7 +972,7 @@ future<> generation_service::handle_cdc_generation(std::optional<cdc::generation
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void generation_service::async_handle_cdc_generation(cdc::generation_id gen_id) {
|
void generation_service::legacy_async_handle_cdc_generation(cdc::generation_id gen_id) {
|
||||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
(void)(([] (cdc::generation_id gen_id, shared_ptr<generation_service> svc) -> future<> {
|
(void)(([] (cdc::generation_id gen_id, shared_ptr<generation_service> svc) -> future<> {
|
||||||
@@ -918,7 +980,7 @@ void generation_service::async_handle_cdc_generation(cdc::generation_id gen_id)
|
|||||||
co_await sleep_abortable(std::chrono::seconds(5), svc->_abort_src);
|
co_await sleep_abortable(std::chrono::seconds(5), svc->_abort_src);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
bool using_this_gen = co_await svc->do_handle_cdc_generation_intercept_nonfatal_errors(gen_id);
|
bool using_this_gen = co_await svc->legacy_do_handle_cdc_generation_intercept_nonfatal_errors(gen_id);
|
||||||
if (using_this_gen) {
|
if (using_this_gen) {
|
||||||
cdc_log.info("Starting to use generation {}", gen_id);
|
cdc_log.info("Starting to use generation {}", gen_id);
|
||||||
co_await update_streams_description(gen_id, svc->get_sys_dist_ks(),
|
co_await update_streams_description(gen_id, svc->get_sys_dist_ks(),
|
||||||
@@ -942,7 +1004,7 @@ void generation_service::async_handle_cdc_generation(cdc::generation_id gen_id)
|
|||||||
})(gen_id, shared_from_this()));
|
})(gen_id, shared_from_this()));
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> generation_service::scan_cdc_generations() {
|
future<> generation_service::legacy_scan_cdc_generations() {
|
||||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
std::optional<cdc::generation_id> latest;
|
std::optional<cdc::generation_id> latest;
|
||||||
@@ -955,18 +1017,18 @@ future<> generation_service::scan_cdc_generations() {
|
|||||||
|
|
||||||
if (latest) {
|
if (latest) {
|
||||||
cdc_log.info("Latest generation seen during startup: {}", *latest);
|
cdc_log.info("Latest generation seen during startup: {}", *latest);
|
||||||
co_await handle_cdc_generation(latest);
|
co_await legacy_handle_cdc_generation(latest);
|
||||||
} else {
|
} else {
|
||||||
cdc_log.info("No generation seen during startup.");
|
cdc_log.info("No generation seen during startup.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
future<bool> generation_service::do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id gen_id) {
|
future<bool> generation_service::legacy_do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id gen_id) {
|
||||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
// Use futurize_invoke to catch all exceptions from do_handle_cdc_generation.
|
// Use futurize_invoke to catch all exceptions from legacy_do_handle_cdc_generation.
|
||||||
return futurize_invoke([this, gen_id] {
|
return futurize_invoke([this, gen_id] {
|
||||||
return do_handle_cdc_generation(gen_id);
|
return legacy_do_handle_cdc_generation(gen_id);
|
||||||
}).handle_exception([] (std::exception_ptr ep) -> future<bool> {
|
}).handle_exception([] (std::exception_ptr ep) -> future<bool> {
|
||||||
try {
|
try {
|
||||||
std::rethrow_exception(ep);
|
std::rethrow_exception(ep);
|
||||||
@@ -986,7 +1048,7 @@ future<bool> generation_service::do_handle_cdc_generation_intercept_nonfatal_err
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
future<bool> generation_service::do_handle_cdc_generation(cdc::generation_id gen_id) {
|
future<bool> generation_service::legacy_do_handle_cdc_generation(cdc::generation_id gen_id) {
|
||||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
auto sys_dist_ks = get_sys_dist_ks();
|
auto sys_dist_ks = get_sys_dist_ks();
|
||||||
|
|||||||
@@ -133,4 +133,21 @@ public:
|
|||||||
*/
|
*/
|
||||||
bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper&);
|
bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper&);
|
||||||
|
|
||||||
|
std::pair<utils::UUID, cdc::topology_description> make_new_generation_data(
|
||||||
|
const std::unordered_set<dht::token>& bootstrap_tokens,
|
||||||
|
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info,
|
||||||
|
const locator::token_metadata_ptr);
|
||||||
|
|
||||||
|
db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay);
|
||||||
|
|
||||||
|
// Translates the CDC generation data given by a `cdc::topology_description` into a vector of mutations,
|
||||||
|
// using `mutation_size_threshold` to decide on the mutation sizes. The partition key of each mutation
|
||||||
|
// is given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`.
|
||||||
|
//
|
||||||
|
// Works for only specific schemas: CDC_GENERATIONS_V2 (in system_distributed_keyspace)
|
||||||
|
// and CDC_GENERATIONS_V3 (in system_keyspace).
|
||||||
|
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
|
||||||
|
schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&,
|
||||||
|
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
|
||||||
|
|
||||||
} // namespace cdc
|
} // namespace cdc
|
||||||
|
|||||||
@@ -132,33 +132,54 @@ public:
|
|||||||
* so that other nodes learn about the generation before their clocks cross the generation's timestamp
|
* so that other nodes learn about the generation before their clocks cross the generation's timestamp
|
||||||
* (not guaranteed in the current implementation, but expected to be the common case;
|
* (not guaranteed in the current implementation, but expected to be the common case;
|
||||||
* we assume that `ring_delay` is enough for other nodes to learn about the new generation).
|
* we assume that `ring_delay` is enough for other nodes to learn about the new generation).
|
||||||
|
*
|
||||||
|
* Legacy: used for gossiper-based topology changes.
|
||||||
*/
|
*/
|
||||||
future<cdc::generation_id> make_new_generation(const std::unordered_set<dht::token>& bootstrap_tokens, bool add_delay);
|
future<cdc::generation_id> legacy_make_new_generation(
|
||||||
|
const std::unordered_set<dht::token>& bootstrap_tokens, bool add_delay);
|
||||||
|
|
||||||
|
/* Retrieve the CDC generation with the given ID from local tables
|
||||||
|
* and start using it for CDC log writes if it's not obsolete.
|
||||||
|
* Precondition: the generation was committed using group 0 and locally applied.
|
||||||
|
*/
|
||||||
|
future<> handle_cdc_generation(cdc::generation_id_v2);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/* Retrieve the CDC generation which starts at the given timestamp (from a distributed table created for this purpose)
|
/* Retrieve the CDC generation which starts at the given timestamp (from a distributed table created for this purpose)
|
||||||
* and start using it for CDC log writes if it's not obsolete.
|
* and start using it for CDC log writes if it's not obsolete.
|
||||||
|
*
|
||||||
|
* Legacy: used for gossiper-based topology changes.
|
||||||
*/
|
*/
|
||||||
future<> handle_cdc_generation(std::optional<cdc::generation_id>);
|
future<> legacy_handle_cdc_generation(std::optional<cdc::generation_id>);
|
||||||
|
|
||||||
/* If `handle_cdc_generation` fails, it schedules an asynchronous retry in the background
|
/* If `legacy_handle_cdc_generation` fails, it schedules an asynchronous retry in the background
|
||||||
* using `async_handle_cdc_generation`.
|
* using `legacy_async_handle_cdc_generation`.
|
||||||
|
*
|
||||||
|
* Legacy: used for gossiper-based topology changes.
|
||||||
*/
|
*/
|
||||||
void async_handle_cdc_generation(cdc::generation_id);
|
void legacy_async_handle_cdc_generation(cdc::generation_id);
|
||||||
|
|
||||||
/* Wrapper around `do_handle_cdc_generation` which intercepts timeout/unavailability exceptions.
|
/* Wrapper around `legacy_do_handle_cdc_generation` which intercepts timeout/unavailability exceptions.
|
||||||
* Returns: do_handle_cdc_generation(ts). */
|
* Returns: legacy_do_handle_cdc_generation(ts).
|
||||||
future<bool> do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id);
|
*
|
||||||
|
* Legacy: used for gossiper-based topology changes.
|
||||||
|
*/
|
||||||
|
future<bool> legacy_do_handle_cdc_generation_intercept_nonfatal_errors(cdc::generation_id);
|
||||||
|
|
||||||
/* Returns `true` iff we started using the generation (it was not obsolete or already known),
|
/* Returns `true` iff we started using the generation (it was not obsolete or already known),
|
||||||
* which means that this node might write some CDC log entries using streams from this generation. */
|
* which means that this node might write some CDC log entries using streams from this generation.
|
||||||
future<bool> do_handle_cdc_generation(cdc::generation_id);
|
*
|
||||||
|
* Legacy: used for gossiper-based topology changes.
|
||||||
|
*/
|
||||||
|
future<bool> legacy_do_handle_cdc_generation(cdc::generation_id);
|
||||||
|
|
||||||
/* Scan CDC generation timestamps gossiped by other nodes and retrieve the latest one.
|
/* Scan CDC generation timestamps gossiped by other nodes and retrieve the latest one.
|
||||||
* This function should be called once at the end of the node startup procedure
|
* This function should be called once at the end of the node startup procedure
|
||||||
* (after the node is started and running normally, it will retrieve generations on gossip events instead).
|
* (after the node is started and running normally, it will retrieve generations on gossip events instead).
|
||||||
|
*
|
||||||
|
* Legacy: used for gossiper-based topology changes.
|
||||||
*/
|
*/
|
||||||
future<> scan_cdc_generations();
|
future<> legacy_scan_cdc_generations();
|
||||||
|
|
||||||
/* generation_service code might be racing with system_distributed_keyspace deinitialization
|
/* generation_service code might be racing with system_distributed_keyspace deinitialization
|
||||||
* (the deinitialization order is broken).
|
* (the deinitialization order is broken).
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ namespace {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern thread_local data_type cdc_streams_set_type;
|
||||||
thread_local data_type cdc_streams_set_type = set_type_impl::get_instance(bytes_type, false);
|
thread_local data_type cdc_streams_set_type = set_type_impl::get_instance(bytes_type, false);
|
||||||
|
|
||||||
/* See `token_range_description` struct */
|
/* See `token_range_description` struct */
|
||||||
@@ -503,14 +504,15 @@ system_distributed_keyspace::read_cdc_topology_description(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
static future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
|
future<>
|
||||||
const replica::database& db,
|
system_distributed_keyspace::insert_cdc_generation(
|
||||||
utils::UUID id,
|
utils::UUID id,
|
||||||
size_t num_replicas,
|
const cdc::topology_description& desc,
|
||||||
size_t concurrency,
|
context ctx) {
|
||||||
const cdc::topology_description& desc) {
|
using namespace std::chrono_literals;
|
||||||
assert(num_replicas);
|
|
||||||
auto s = db.find_schema(system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2);
|
const size_t concurrency = 10;
|
||||||
|
const size_t num_replicas = ctx.num_token_owners;
|
||||||
|
|
||||||
// To insert the data quickly and efficiently we send it in batches of multiple rows
|
// To insert the data quickly and efficiently we send it in batches of multiple rows
|
||||||
// (each batch represented by a single mutation). We also send multiple such batches concurrently.
|
// (each batch represented by a single mutation). We also send multiple such batches concurrently.
|
||||||
@@ -530,46 +532,11 @@ static future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
|
|||||||
// It has been tested that sending 1MB batches to 3 replicas with concurrency 20 works OK,
|
// It has been tested that sending 1MB batches to 3 replicas with concurrency 20 works OK,
|
||||||
// which would correspond to L ~= 60MB. Hence that's the limit we use here.
|
// which would correspond to L ~= 60MB. Hence that's the limit we use here.
|
||||||
const size_t L = 60'000'000;
|
const size_t L = 60'000'000;
|
||||||
const auto new_mutation_threshold = std::max(size_t(1), L / (num_replicas * concurrency));
|
const auto mutation_size_threshold = std::max(size_t(1), L / (num_replicas * concurrency));
|
||||||
|
|
||||||
auto ts = api::new_timestamp();
|
auto s = _qp.db().real_database().find_schema(
|
||||||
utils::chunked_vector<mutation> res;
|
system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2);
|
||||||
res.emplace_back(s, partition_key::from_singular(*s, id));
|
auto ms = co_await cdc::get_cdc_generation_mutations(s, id, desc, mutation_size_threshold, api::new_timestamp());
|
||||||
res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts);
|
|
||||||
size_t size_estimate = 0;
|
|
||||||
for (auto& e : desc.entries()) {
|
|
||||||
if (size_estimate >= new_mutation_threshold) {
|
|
||||||
res.emplace_back(s, partition_key::from_singular(*s, id));
|
|
||||||
size_estimate = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
set_type_impl::native_type streams;
|
|
||||||
streams.reserve(e.streams.size());
|
|
||||||
for (auto& stream: e.streams) {
|
|
||||||
streams.push_back(data_value(stream.to_bytes()));
|
|
||||||
}
|
|
||||||
|
|
||||||
size_estimate += e.streams.size() * 20;
|
|
||||||
auto ckey = clustering_key::from_singular(*s, dht::token::to_int64(e.token_range_end));
|
|
||||||
res.back().set_cell(ckey, to_bytes("streams"), make_set_value(cdc_streams_set_type, std::move(streams)), ts);
|
|
||||||
res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts);
|
|
||||||
|
|
||||||
co_await coroutine::maybe_yield();
|
|
||||||
}
|
|
||||||
|
|
||||||
co_return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
future<>
|
|
||||||
system_distributed_keyspace::insert_cdc_generation(
|
|
||||||
utils::UUID id,
|
|
||||||
const cdc::topology_description& desc,
|
|
||||||
context ctx) {
|
|
||||||
using namespace std::chrono_literals;
|
|
||||||
|
|
||||||
const size_t concurrency = 10;
|
|
||||||
|
|
||||||
auto ms = co_await get_cdc_generation_mutations(_qp.db().real_database(), id, ctx.num_token_owners, concurrency, desc);
|
|
||||||
co_await max_concurrent_for_each(ms, concurrency, [&] (mutation& m) -> future<> {
|
co_await max_concurrent_for_each(ms, concurrency, [&] (mutation& m) -> future<> {
|
||||||
co_await _sp.mutate(
|
co_await _sp.mutate(
|
||||||
{ std::move(m) },
|
{ std::move(m) },
|
||||||
|
|||||||
@@ -67,6 +67,7 @@
|
|||||||
#include "service/topology_state_machine.hh"
|
#include "service/topology_state_machine.hh"
|
||||||
#include "sstables/open_info.hh"
|
#include "sstables/open_info.hh"
|
||||||
#include "sstables/generation_type.hh"
|
#include "sstables/generation_type.hh"
|
||||||
|
#include "cdc/generation.hh"
|
||||||
|
|
||||||
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
|
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
|
||||||
|
|
||||||
@@ -83,6 +84,7 @@ namespace {
|
|||||||
system_keyspace::DISCOVERY,
|
system_keyspace::DISCOVERY,
|
||||||
system_keyspace::BROADCAST_KV_STORE,
|
system_keyspace::BROADCAST_KV_STORE,
|
||||||
system_keyspace::TOPOLOGY,
|
system_keyspace::TOPOLOGY,
|
||||||
|
system_keyspace::CDC_GENERATIONS_V3,
|
||||||
};
|
};
|
||||||
if (ks_name == system_keyspace::NAME && system_ks_null_shard_tables.contains(cf_name)) {
|
if (ks_name == system_keyspace::NAME && system_ks_null_shard_tables.contains(cf_name)) {
|
||||||
props.use_null_sharder = true;
|
props.use_null_sharder = true;
|
||||||
@@ -98,6 +100,7 @@ namespace {
|
|||||||
system_keyspace::DISCOVERY,
|
system_keyspace::DISCOVERY,
|
||||||
system_keyspace::BROADCAST_KV_STORE,
|
system_keyspace::BROADCAST_KV_STORE,
|
||||||
system_keyspace::TOPOLOGY,
|
system_keyspace::TOPOLOGY,
|
||||||
|
system_keyspace::CDC_GENERATIONS_V3,
|
||||||
};
|
};
|
||||||
if (ks_name == system_keyspace::NAME && extra_durable_tables.contains(cf_name)) {
|
if (ks_name == system_keyspace::NAME && extra_durable_tables.contains(cf_name)) {
|
||||||
props.wait_for_sync_to_commitlog = true;
|
props.wait_for_sync_to_commitlog = true;
|
||||||
@@ -250,6 +253,9 @@ schema_ptr system_keyspace::topology() {
|
|||||||
.with_column("num_tokens", int32_type)
|
.with_column("num_tokens", int32_type)
|
||||||
.with_column("shard_count", int32_type)
|
.with_column("shard_count", int32_type)
|
||||||
.with_column("ignore_msb", int32_type)
|
.with_column("ignore_msb", int32_type)
|
||||||
|
.with_column("new_cdc_generation_data_uuid", uuid_type)
|
||||||
|
.with_column("current_cdc_generation_uuid", uuid_type, column_kind::static_column)
|
||||||
|
.with_column("current_cdc_generation_timestamp", timestamp_type, column_kind::static_column)
|
||||||
.set_comment("Current state of topology change machine")
|
.set_comment("Current state of topology change machine")
|
||||||
.with_version(generate_schema_version(id))
|
.with_version(generate_schema_version(id))
|
||||||
.build();
|
.build();
|
||||||
@@ -257,6 +263,45 @@ schema_ptr system_keyspace::topology() {
|
|||||||
return schema;
|
return schema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern thread_local data_type cdc_streams_set_type;
|
||||||
|
|
||||||
|
/* An internal table used by nodes to store CDC generation data.
|
||||||
|
* Written to by Raft Group 0. */
|
||||||
|
schema_ptr system_keyspace::cdc_generations_v3() {
|
||||||
|
thread_local auto schema = [] {
|
||||||
|
auto id = generate_legacy_id(NAME, CDC_GENERATIONS_V3);
|
||||||
|
return schema_builder(NAME, CDC_GENERATIONS_V3, {id})
|
||||||
|
/* The unique identifier of this generation. */
|
||||||
|
.with_column("id", uuid_type, column_kind::partition_key)
|
||||||
|
/* The generation describes a mapping from all tokens in the token ring to a set of stream IDs.
|
||||||
|
* This mapping is built from a bunch of smaller mappings, each describing how tokens in a
|
||||||
|
* subrange of the token ring are mapped to stream IDs; these subranges together cover the entire
|
||||||
|
* token ring. Each such range-local mapping is represented by a row of this table. The
|
||||||
|
* clustering key of the row is the end of the range being described by this row. The start of
|
||||||
|
* this range is the range_end of the previous row (in the clustering order, which is the integer
|
||||||
|
* order) or of the last row of this partition if this is the first the first row. */
|
||||||
|
.with_column("range_end", long_type, column_kind::clustering_key)
|
||||||
|
/* The set of streams mapped to in this range. The number of streams mapped to a single range in
|
||||||
|
* a CDC generation is bounded from above by the number of shards on the owner of that range in
|
||||||
|
* the token ring. In other words, the number of elements of this set is bounded by the maximum
|
||||||
|
* of the number of shards over all nodes. The serialized size is obtained by counting about 20B
|
||||||
|
* for each stream. For example, if all nodes in the cluster have at most 128 shards, the
|
||||||
|
* serialized size of this set will be bounded by ~2.5 KB. */
|
||||||
|
.with_column("streams", cdc_streams_set_type)
|
||||||
|
/* The value of the `ignore_msb` sharding parameter of the node which was the owner of this token
|
||||||
|
* range when the generation was first created. Together with the set of streams above it fully
|
||||||
|
* describes the mapping for this particular range. */
|
||||||
|
.with_column("ignore_msb", byte_type)
|
||||||
|
/* Column used for sanity checking. For a given generation it's equal to the number of ranges in
|
||||||
|
* this generation; thus, after the generation is fully inserted, it must be equal to the number
|
||||||
|
* of rows in the partition. */
|
||||||
|
.with_column("num_ranges", int32_type, column_kind::static_column)
|
||||||
|
.with_version(system_keyspace::generate_schema_version(id))
|
||||||
|
.build();
|
||||||
|
}();
|
||||||
|
return schema;
|
||||||
|
}
|
||||||
|
|
||||||
schema_ptr system_keyspace::raft() {
|
schema_ptr system_keyspace::raft() {
|
||||||
static thread_local auto schema = [] {
|
static thread_local auto schema = [] {
|
||||||
auto id = generate_legacy_id(NAME, RAFT);
|
auto id = generate_legacy_id(NAME, RAFT);
|
||||||
@@ -2838,7 +2883,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
|||||||
r.insert(r.end(), {raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery()});
|
r.insert(r.end(), {raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery()});
|
||||||
|
|
||||||
if (cfg.check_experimental(db::experimental_features_t::feature::RAFT)) {
|
if (cfg.check_experimental(db::experimental_features_t::feature::RAFT)) {
|
||||||
r.insert(r.end(), {topology()});
|
r.insert(r.end(), {topology(), cdc_generations_v3()});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
|
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
|
||||||
@@ -2958,6 +3003,18 @@ system_keyspace::query_mutations(distributed<service::storage_proxy>& proxy, con
|
|||||||
.then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); });
|
.then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
||||||
|
system_keyspace::query_mutations(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, query::clustering_range row_range) {
|
||||||
|
auto& db = proxy.local().get_db().local();
|
||||||
|
auto schema = db.find_schema(ks_name, cf_name);
|
||||||
|
auto slice = partition_slice_builder(*schema)
|
||||||
|
.with_range(std::move(row_range))
|
||||||
|
.build();
|
||||||
|
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
|
||||||
|
return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), partition_range, db::no_timeout)
|
||||||
|
.then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); });
|
||||||
|
}
|
||||||
|
|
||||||
future<lw_shared_ptr<query::result_set>>
|
future<lw_shared_ptr<query::result_set>>
|
||||||
system_keyspace::query(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const sstring& cf_name) {
|
system_keyspace::query(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const sstring& cf_name) {
|
||||||
replica::database& db = proxy.local().get_db().local();
|
replica::database& db = proxy.local().get_db().local();
|
||||||
@@ -3475,19 +3532,36 @@ future<service::topology> system_keyspace::load_topology_state() {
|
|||||||
|
|
||||||
service::node_state nstate = service::node_state_from_string(row.get_as<sstring>("node_state"));
|
service::node_state nstate = service::node_state_from_string(row.get_as<sstring>("node_state"));
|
||||||
|
|
||||||
std::optional<service::ring_slice::replication_state> tstate;
|
std::optional<service::ring_slice> ring_slice;
|
||||||
if (row.has("replication_state")) {
|
if (row.has("replication_state")) {
|
||||||
tstate = service::replication_state_from_string(row.get_as<sstring>("replication_state"));
|
auto repl_state = service::replication_state_from_string(row.get_as<sstring>("replication_state"));
|
||||||
}
|
|
||||||
|
|
||||||
std::unordered_set<dht::token> t;
|
std::unordered_set<dht::token> tokens;
|
||||||
|
if (row.has("tokens")) {
|
||||||
|
auto blob = row.get_blob("tokens");
|
||||||
|
auto cdef = topology()->get_column_definition("tokens");
|
||||||
|
auto deserialized = cdef->type->deserialize(blob);
|
||||||
|
auto ts = value_cast<set_type_impl::native_type>(deserialized);
|
||||||
|
tokens = decode_tokens(ts);
|
||||||
|
}
|
||||||
|
|
||||||
if (row.has("tokens")) {
|
if (tokens.empty()) {
|
||||||
auto blob = row.get_blob("tokens");
|
on_fatal_internal_error(slogger, format(
|
||||||
auto cdef = topology()->get_column_definition("tokens");
|
"load_topology_state: node {} has replication state ({}) but missing tokens",
|
||||||
auto deserialized = cdef->type->deserialize(blob);
|
host_id, repl_state));
|
||||||
auto tokens = value_cast<set_type_impl::native_type>(deserialized);
|
}
|
||||||
t = decode_tokens(tokens);
|
|
||||||
|
if (!row.has("new_cdc_generation_data_uuid")) {
|
||||||
|
on_fatal_internal_error(slogger, format(
|
||||||
|
"load_topology_state: node {} has replication state ({}) but missing CDC generation data UUID",
|
||||||
|
host_id, repl_state));
|
||||||
|
}
|
||||||
|
|
||||||
|
ring_slice = service::ring_slice {
|
||||||
|
.state = repl_state,
|
||||||
|
.tokens = std::move(tokens),
|
||||||
|
.new_cdc_generation_data_uuid = row.get_as<utils::UUID>("new_cdc_generation_data_uuid"),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<raft::server_id> replaced_id;
|
std::optional<raft::server_id> replaced_id;
|
||||||
@@ -3545,29 +3619,113 @@ future<service::topology> system_keyspace::load_topology_state() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tstate && t.size() != 0) {
|
|
||||||
on_fatal_internal_error(slogger, "There cannot be tokens without the replication state");
|
|
||||||
}
|
|
||||||
std::unordered_map<raft::server_id, service::replica_state>* map = nullptr;
|
std::unordered_map<raft::server_id, service::replica_state>* map = nullptr;
|
||||||
if (nstate == service::node_state::normal) {
|
if (nstate == service::node_state::normal) {
|
||||||
map = &ret.normal_nodes;
|
map = &ret.normal_nodes;
|
||||||
|
if (!ring_slice) {
|
||||||
|
on_fatal_internal_error(slogger, format(
|
||||||
|
"load_topology_state: node {} in normal state but missing ring slice", host_id));
|
||||||
|
}
|
||||||
} else if (nstate == service::node_state::left) {
|
} else if (nstate == service::node_state::left) {
|
||||||
ret.left_nodes.emplace(host_id);
|
ret.left_nodes.emplace(host_id);
|
||||||
} else if (nstate == service::node_state::none) {
|
} else if (nstate == service::node_state::none) {
|
||||||
map = &ret.new_nodes;
|
map = &ret.new_nodes;
|
||||||
} else {
|
} else {
|
||||||
map = &ret.transition_nodes;
|
map = &ret.transition_nodes;
|
||||||
|
if (!ring_slice) {
|
||||||
|
on_fatal_internal_error(slogger, format(
|
||||||
|
"load_topology_state: node {} in transitioning state but missing ring slice", host_id));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (map) {
|
if (map) {
|
||||||
map->emplace(host_id, service::replica_state{nstate, std::move(datacenter), std::move(rack), std::move(release_version),
|
map->emplace(host_id, service::replica_state{
|
||||||
tstate ? std::optional<service::ring_slice>(service::ring_slice{*tstate, std::move(t)}) : std::nullopt,
|
nstate, std::move(datacenter), std::move(rack), std::move(release_version),
|
||||||
shard_count, ignore_msb});
|
ring_slice, shard_count, ignore_msb});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Here we access static columns, any row will do.
|
||||||
|
auto& some_row = *rs->begin();
|
||||||
|
if (some_row.has("current_cdc_generation_uuid")) {
|
||||||
|
auto gen_uuid = some_row.get_as<utils::UUID>("current_cdc_generation_uuid");
|
||||||
|
if (!some_row.has("current_cdc_generation_timestamp")) {
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"load_topology_state: current CDC generation UUID ({}) present, but timestamp missing", gen_uuid));
|
||||||
|
}
|
||||||
|
auto gen_ts = some_row.get_as<db_clock::time_point>("current_cdc_generation_timestamp");
|
||||||
|
ret.current_cdc_generation_id = cdc::generation_id_v2 {
|
||||||
|
.ts = gen_ts,
|
||||||
|
.id = gen_uuid
|
||||||
|
};
|
||||||
|
|
||||||
|
// Sanity check for CDC generation data consistency.
|
||||||
|
{
|
||||||
|
auto gen_rows = co_await qctx->execute_cql(
|
||||||
|
format("SELECT count(range_end) as cnt, num_ranges FROM system.{} WHERE id = ?",
|
||||||
|
CDC_GENERATIONS_V3),
|
||||||
|
gen_uuid);
|
||||||
|
assert(gen_rows);
|
||||||
|
if (gen_rows->empty()) {
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"load_topology_state: current CDC generation UUID ({}) present, but data missing", gen_uuid));
|
||||||
|
}
|
||||||
|
auto& row = gen_rows->one();
|
||||||
|
auto counted_ranges = row.get_as<int64_t>("cnt");
|
||||||
|
auto num_ranges = row.get_as<int32_t>("num_ranges");
|
||||||
|
if (counted_ranges != num_ranges) {
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"load_topology_state: inconsistency in CDC generation data (UUID {}):"
|
||||||
|
" counted {} ranges, should be {}", gen_uuid, counted_ranges, num_ranges));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!ret.normal_nodes.empty()) {
|
||||||
|
on_internal_error(slogger,
|
||||||
|
"load_topology_state: normal nodes present but no current CDC generation ID");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
co_return ret;
|
co_return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<cdc::topology_description>
|
||||||
|
system_keyspace::read_cdc_generation(utils::UUID id) {
|
||||||
|
std::vector<cdc::token_range_description> entries;
|
||||||
|
auto num_ranges = 0;
|
||||||
|
co_await _qp.local().query_internal(
|
||||||
|
format("SELECT range_end, streams, ignore_msb, num_ranges FROM {}.{} WHERE id = ?",
|
||||||
|
NAME, CDC_GENERATIONS_V3),
|
||||||
|
db::consistency_level::ONE,
|
||||||
|
{ id },
|
||||||
|
1000, // for ~1KB rows, ~1MB page size
|
||||||
|
[&] (const cql3::untyped_result_set_row& row) {
|
||||||
|
std::vector<cdc::stream_id> streams;
|
||||||
|
row.get_list_data<bytes>("streams", std::back_inserter(streams));
|
||||||
|
entries.push_back(cdc::token_range_description{
|
||||||
|
dht::token::from_int64(row.get_as<int64_t>("range_end")),
|
||||||
|
std::move(streams),
|
||||||
|
uint8_t(row.get_as<int8_t>("ignore_msb"))});
|
||||||
|
num_ranges = row.get_as<int32_t>("num_ranges");
|
||||||
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (entries.empty()) {
|
||||||
|
// The data must be present by precondition.
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"read_cdc_generation: data for CDC generation {} not present", id));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (entries.size() != num_ranges) {
|
||||||
|
throw std::runtime_error(format(
|
||||||
|
"read_cdc_generation: wrong number of rows. The `num_ranges` column claimed {} rows,"
|
||||||
|
" but reading the partition returned {}.", num_ranges, entries.size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
co_return cdc::topology_description{std::move(entries)};
|
||||||
|
}
|
||||||
|
|
||||||
future<> system_keyspace::sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc) {
|
future<> system_keyspace::sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc) {
|
||||||
static const auto req = format("INSERT INTO system.{} (location, generation, uuid, status, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY);
|
static const auto req = format("INSERT INTO system.{} (location, generation, uuid, status, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY);
|
||||||
slogger.trace("Inserting {}.{}:{} into {}", location, desc.generation.value(), uuid, SSTABLES_REGISTRY);
|
slogger.trace("Inserting {}.{}:{} into {}", location, desc.generation.value(), uuid, SSTABLES_REGISTRY);
|
||||||
|
|||||||
@@ -68,6 +68,10 @@ namespace gms {
|
|||||||
class gossiper;
|
class gossiper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace cdc {
|
||||||
|
class topology_description;
|
||||||
|
}
|
||||||
|
|
||||||
bool is_system_keyspace(std::string_view ks_name);
|
bool is_system_keyspace(std::string_view ks_name);
|
||||||
|
|
||||||
namespace db {
|
namespace db {
|
||||||
@@ -151,6 +155,7 @@ public:
|
|||||||
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
|
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
|
||||||
static constexpr auto TOPOLOGY = "topology";
|
static constexpr auto TOPOLOGY = "topology";
|
||||||
static constexpr auto SSTABLES_REGISTRY = "sstables";
|
static constexpr auto SSTABLES_REGISTRY = "sstables";
|
||||||
|
static constexpr auto CDC_GENERATIONS_V3 = "cdc_generations_v3";
|
||||||
|
|
||||||
struct v3 {
|
struct v3 {
|
||||||
static constexpr auto BATCHES = "batches";
|
static constexpr auto BATCHES = "batches";
|
||||||
@@ -233,6 +238,7 @@ public:
|
|||||||
static schema_ptr broadcast_kv_store();
|
static schema_ptr broadcast_kv_store();
|
||||||
static schema_ptr topology();
|
static schema_ptr topology();
|
||||||
static schema_ptr sstables_registry();
|
static schema_ptr sstables_registry();
|
||||||
|
static schema_ptr cdc_generations_v3();
|
||||||
|
|
||||||
static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0);
|
static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0);
|
||||||
|
|
||||||
@@ -276,6 +282,13 @@ public:
|
|||||||
const sstring& ks_name,
|
const sstring& ks_name,
|
||||||
const sstring& cf_name);
|
const sstring& cf_name);
|
||||||
|
|
||||||
|
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
||||||
|
static query_mutations(distributed<service::storage_proxy>& proxy,
|
||||||
|
const sstring& ks_name,
|
||||||
|
const sstring& cf_name,
|
||||||
|
const dht::partition_range& partition_range,
|
||||||
|
query::clustering_range row_ranges = query::clustering_range::make_open_ended_both_sides());
|
||||||
|
|
||||||
// Returns all data from given system table.
|
// Returns all data from given system table.
|
||||||
// Intended to be used by code which is not performance critical.
|
// Intended to be used by code which is not performance critical.
|
||||||
static future<lw_shared_ptr<query::result_set>> query(distributed<service::storage_proxy>& proxy,
|
static future<lw_shared_ptr<query::result_set>> query(distributed<service::storage_proxy>& proxy,
|
||||||
@@ -443,6 +456,10 @@ public:
|
|||||||
|
|
||||||
static future<service::topology> load_topology_state();
|
static future<service::topology> load_topology_state();
|
||||||
|
|
||||||
|
// Read CDC generation data with the given UUID as key.
|
||||||
|
// Precondition: the data is known to be present in the table (because it was committed earlier through group 0).
|
||||||
|
future<cdc::topology_description> read_cdc_generation(utils::UUID id);
|
||||||
|
|
||||||
// The mutation appends the given state ID to the group 0 history table, with the given description if non-empty.
|
// The mutation appends the given state ID to the group 0 history table, with the given description if non-empty.
|
||||||
//
|
//
|
||||||
// If `gc_older_than` is provided, the mutation will also contain a tombstone that clears all entries whose
|
// If `gc_older_than` is provided, the mutation will also contain a tombstone that clears all entries whose
|
||||||
|
|||||||
@@ -105,7 +105,9 @@ Shard-colocation is an optimization.
|
|||||||
|
|
||||||
### Generation switching
|
### Generation switching
|
||||||
|
|
||||||
Having different generations operating at different points in time is necessary to maintain colocation in presence of topology changes. When a new node joins the cluster it modifies the token ring by refining existing vnodes into smaller vnodes. But before it does it, it will introduce a new CDC generation whose token ranges refine those new (smaller) vnodes (which means they also refine the old vnodes; that way writes will be colocated on both old and new replicas).
|
Having different generations operating at different points in time is necessary to maintain colocation in presence of topology changes. When a new node joins the cluster we modify the token ring by refining existing vnodes into smaller vnodes. But before we do it, we introduce a new CDC generation whose token ranges refine those new (smaller) vnodes (which means they also refine the old vnodes; that way writes will be colocated on both old and new replicas).
|
||||||
|
|
||||||
|
#### Gossiper-based topology changes
|
||||||
|
|
||||||
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::topology_description_generator` class. It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows:
|
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::topology_description_generator` class. It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows:
|
||||||
```
|
```
|
||||||
@@ -124,12 +126,10 @@ Note that constructing the `cdc::topology_description` (which describes the gene
|
|||||||
|
|
||||||
The table lies in the `system_distributed_everywhere` keyspace which is replicated using the `Everywhere` strategy, meaning that the generation data is replicated by every node. The insert is performed using `CL=ALL`, allowing nodes to read the data locally later using `CL=ONE`.
|
The table lies in the `system_distributed_everywhere` keyspace which is replicated using the `Everywhere` strategy, meaning that the generation data is replicated by every node. The insert is performed using `CL=ALL`, allowing nodes to read the data locally later using `CL=ONE`.
|
||||||
|
|
||||||
The timestamp for the new generation is chosen after the data is inserted to the table. To choose the timestamp, the node takes its local time and adds a minute or two so that other nodes have a chance to learn about this generation before it starts operating. Thus, the node makes the following assumptions:
|
The timestamp for the new generation is chosen after the data is inserted to the table. To choose the timestamp, the node takes its local time and adds `2 * ring_delay` (a minute by default) so that other nodes have a chance to learn about this generation before it starts operating. Thus, the node makes the following assumptions:
|
||||||
1. its clock is not too desynchronized with other nodes' clocks,
|
1. its clock is not too desynchronized with other nodes' clocks,
|
||||||
2. the cluster is not partitioned.
|
2. the cluster is not partitioned.
|
||||||
|
|
||||||
Future patches will make the solution safe by using a two-phase-commit approach.
|
|
||||||
|
|
||||||
The timestamp and the randomly generated UUID together form a "generation ID" which uniquely identifies this generation and can be used to retrieve its data from the table and to learn when it starts operating.
|
The timestamp and the randomly generated UUID together form a "generation ID" which uniquely identifies this generation and can be used to retrieve its data from the table and to learn when it starts operating.
|
||||||
|
|
||||||
Next, the node starts gossiping the ID of the new generation together with its set of chosen tokens and status:
|
Next, the node starts gossiping the ID of the new generation together with its set of chosen tokens and status:
|
||||||
@@ -152,12 +152,54 @@ CREATE TABLE system.cdc_local (
|
|||||||
The timestamp and UUID forming the generation ID are kept under the `"cdc_local"` key in the `streams_timestamp` and `uuid` columns, respectively.
|
The timestamp and UUID forming the generation ID are kept under the `"cdc_local"` key in the `streams_timestamp` and `uuid` columns, respectively.
|
||||||
|
|
||||||
When other nodes learn about the generation, they'll extract it from the `cdc_generation_descriptions_v2` table and insert it into their set of known CDC generations using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`.
|
When other nodes learn about the generation, they'll extract it from the `cdc_generation_descriptions_v2` table and insert it into their set of known CDC generations using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`.
|
||||||
Notice that nodes learn about the generation together with the new node's tokens. When they learn about its tokens they'll immediately start sending writes to the new node (in the case of bootstrapping, it will become a pending replica). But the old generation will still be operating for a minute or two. Thus colocation will be lost for a while. This problem will be fixed when the two-phase-commit approach is implemented.
|
Notice that nodes learn about the generation together with the new node's tokens. When they learn about its tokens they'll immediately start sending writes to the new node (in the case of bootstrapping, it will become a pending replica). But the old generation will still be operating for `~ 2 * ring_delay`; during this short period of time we don't have complete colocation of CDC log writes with base writes (one replica may be different).
|
||||||
|
|
||||||
We're not able to prevent a node learning about a new generation too late due to a network partition: if gossip doesn't reach the node in time, some writes might be sent to the wrong (old) generation.
|
We're not able to prevent a node learning about a new generation too late due to a network partition: if gossip doesn't reach the node in time, some writes might be sent to the wrong (old) generation.
|
||||||
However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_generation_descriptions_v2`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (it knows only the ID, in particular it knows the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation).
|
However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_generation_descriptions_v2`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (it knows only the ID, in particular it knows the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation).
|
||||||
Thus we give up availability for safety. This likely won't happen if the administrator ensures that the cluster is not partitioned before bootstrapping a new node. This problem will also be mitigated with a future patch.
|
Thus we give up availability for safety. This likely won't happen if the administrator ensures that the cluster is not partitioned before bootstrapping a new node. This problem will also be mitigated with a future patch.
|
||||||
|
|
||||||
|
#### Raft group 0 based topology changes (WIP)
|
||||||
|
|
||||||
|
When a node requests the cluster to join, the topology coordinator chooses tokens for the new node. This splits vnodes in the token ring into smaller vnodes. The coordinator then creates a new `cdc::topology_description` which refines those smaller vnodes. This is node using the `cdc::topology_description_generator` class.
|
||||||
|
|
||||||
|
The generation data described by `cdc::topology_description` is then translated into mutations and committed to group 0 using Raft commands. When a node applies these commands (every node in the cluster eventually does that, being a member of group 0), it writes the data into a local table `system.cdc_generations_v3`. The table has the following schema:
|
||||||
|
```
|
||||||
|
CREATE TABLE system.cdc_generations_v3 (
|
||||||
|
id uuid,
|
||||||
|
range_end bigint,
|
||||||
|
ignore_msb tinyint,
|
||||||
|
num_ranges int static,
|
||||||
|
streams frozen<set<blob>>,
|
||||||
|
PRIMARY KEY (id, range_end)
|
||||||
|
) ...
|
||||||
|
```
|
||||||
|
|
||||||
|
The table's partition key is the `id uuid` column. The UUID used to insert a new generation into this table is randomly generated by the coordinator.
|
||||||
|
|
||||||
|
The committed commands also update the `system.topology` table, storing the UUID in the `new_cdc_generation_data_uuid` column in the row which describes the joining node. Thanks to this, if the coordinator manages to insert the data but then fails, the next coordinator can resume from where the previous coordinator left off - using `new_cdc_generation_data_uuid` to continue with the generation switch.
|
||||||
|
|
||||||
|
Note that the `cdc::topology_description` contains the stream IDs of the generation and describes the generation's mapping, so constructing and inserting it into this table does not require knowing the generation's timestamp.
|
||||||
|
|
||||||
|
The coordinator then performs a global barrier, ensuring that every node managed to store the data locally before proceeding.
|
||||||
|
|
||||||
|
Once the barrier finishes, the coordinator picks a timestamp for the new generation. To choose the timestamp, it takes its local time and adds `2 * ring_delay` (a minute by default) so that other nodes have a chance to learn about this timestamp before the generation starts operating (i.e. before their clocks cross the timestamp). Thus we make the following assumptions:
|
||||||
|
1. its clock is not too desynchronized with other nodes' clocks,
|
||||||
|
2. the cluster is not partitioned.
|
||||||
|
|
||||||
|
FIXME: consider implementing a safe algorithm (using separate 'prepare' phase before committing the new generation timestamp).
|
||||||
|
|
||||||
|
The timestamp and the randomly generated UUID together form a "generation ID" which uniquely identifies this generation and can be used to retrieve its data from the table and to learn when it starts operating.
|
||||||
|
|
||||||
|
The coordinator commits the generation ID with a group 0 command which updates the static columns `current_cdc_generation_uuid` and `current_cdc_generation_timestamp` in the `system.topology` table. Each node, when applying this command, learns about the new CDC generation ID (the `storage_service::topology_state_load` function calls `cdc::generation_service::handle_cdc_generation`), retrieves the generation data from `system.cdc_generations_v3` using the UUID key, and inserts it into its in-memory set of known CDC generations using `cdc::metadata::insert(...)`.
|
||||||
|
|
||||||
|
Nodes learn about the new generation together with the new node's tokens. When they learn about its tokens, they immediately start sending writes to the new node (it becomes a pending replica). But the old generation will still be operating for `~ 2 * ring_delay`; during this short period of time we don't have complete colocation of CDC log writes with base writes (one replica may be different).
|
||||||
|
|
||||||
|
We're not able to prevent a node learning about a new generation too late due to a network partition: if the timestamp is not replicated to some node in time, some writes might be sent to the wrong (old) generation. (See FIXME above.)
|
||||||
|
|
||||||
|
After committing the generation ID, the topology coordinator publishes the generation data to user-facing description tables (`system_distributed.cdc_streams_descriptions_v2` and `system_distributed.cdc_generation_timestamps`).
|
||||||
|
|
||||||
|
#### Generation switching: other notes
|
||||||
|
|
||||||
Due to the need of maintaining colocation we don't allow the client to send writes with arbitrary timestamps.
|
Due to the need of maintaining colocation we don't allow the client to send writes with arbitrary timestamps.
|
||||||
Suppose that a write is requested and the write coordinator's local clock has time `C` and the generation operating at time `C` has timestamp `T` (`T <= C`). Then we only allow the write if its timestamp is in the interval [`T`, `C + generation_leeway`), where `generation_leeway` is a small time-inteval constant (e.g. 5 seconds).
|
Suppose that a write is requested and the write coordinator's local clock has time `C` and the generation operating at time `C` has timestamp `T` (`T <= C`). Then we only allow the write if its timestamp is in the interval [`T`, `C + generation_leeway`), where `generation_leeway` is a small time-inteval constant (e.g. 5 seconds).
|
||||||
Reason: we cannot allow writes before `T`, because they belong to the old generation whose token ranges might no longer refine the current vnodes, so the corresponding log write would not necessarily be colocated with the base write. We also cannot allow writes too far "into the future" because we don't know what generation will be operating at that time (the node which will introduce this generation might not have joined yet). But, as mentioned before, we assume that we'll learn about the next generation in time. Again --- the need for this assumption will be gone in a future patch.
|
Reason: we cannot allow writes before `T`, because they belong to the old generation whose token ranges might no longer refine the current vnodes, so the corresponding log write would not necessarily be colocated with the base write. We also cannot allow writes too far "into the future" because we don't know what generation will be operating at that time (the node which will introduce this generation might not have joined yet). But, as mentioned before, we assume that we'll learn about the next generation in time. Again --- the need for this assumption will be gone in a future patch.
|
||||||
@@ -209,6 +251,8 @@ Note that the first phase of inserting stream IDs may fail in the middle; in tha
|
|||||||
|
|
||||||
### Internal generation descriptions table V1 and upgrade procedure
|
### Internal generation descriptions table V1 and upgrade procedure
|
||||||
|
|
||||||
|
FIXME: update this section once we implement upgrades for group 0 topology coordinator. The coordinator will have to create a new CDC generation when it's first enabled.
|
||||||
|
|
||||||
As the name suggests, `cdc_generation_descriptions_v2` is the second version of the generation description table. The previous schema was:
|
As the name suggests, `cdc_generation_descriptions_v2` is the second version of the generation description table. The previous schema was:
|
||||||
```
|
```
|
||||||
CREATE TABLE system_distributed.cdc_generation_descriptions (
|
CREATE TABLE system_distributed.cdc_generation_descriptions (
|
||||||
|
|||||||
@@ -25,7 +25,8 @@ namespace service {
|
|||||||
};
|
};
|
||||||
|
|
||||||
struct raft_topology_snapshot {
|
struct raft_topology_snapshot {
|
||||||
std::vector<canonical_mutation> mutations;
|
std::vector<canonical_mutation> topology_mutations;
|
||||||
|
std::optional<canonical_mutation> cdc_generation_mutation;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct raft_topology_pull_params {};
|
struct raft_topology_pull_params {};
|
||||||
|
|||||||
2
main.cc
2
main.cc
@@ -1478,7 +1478,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
|||||||
|
|
||||||
service::raft_group0 group0_service{
|
service::raft_group0 group0_service{
|
||||||
stop_signal.as_local_abort_source(), raft_gr.local(), messaging,
|
stop_signal.as_local_abort_source(), raft_gr.local(), messaging,
|
||||||
gossiper.local(), qp.local(), mm.local(), feature_service.local(), sys_ks.local(), group0_client, ss.local()};
|
gossiper.local(), qp.local(), mm.local(), feature_service.local(), sys_ks.local(), group0_client, ss.local(), cdc_generation_service.local()};
|
||||||
group0_service.start().get();
|
group0_service.start().get();
|
||||||
auto stop_group0_service = defer_verbose_shutdown("group 0 service", [&group0_service] {
|
auto stop_group0_service = defer_verbose_shutdown("group 0 service", [&group0_service] {
|
||||||
group0_service.abort().get();
|
group0_service.abort().get();
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
|
|||||||
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
|
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
|
||||||
slogger.trace("cmd.history_append: {}", cmd.history_append);
|
slogger.trace("cmd.history_append: {}", cmd.history_append);
|
||||||
|
|
||||||
auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);
|
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex();
|
||||||
|
|
||||||
if (cmd.prev_state_id) {
|
if (cmd.prev_state_id) {
|
||||||
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
|
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
|
||||||
@@ -107,7 +107,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
|
|||||||
_client.set_query_result(cmd.new_state_id, std::move(result));
|
_client.set_query_result(cmd.new_state_id, std::move(result));
|
||||||
},
|
},
|
||||||
[&] (topology_change& chng) -> future<> {
|
[&] (topology_change& chng) -> future<> {
|
||||||
return _ss.topology_transition(_sp, cmd.creator_addr, std::move(chng.mutations));
|
return _ss.topology_transition(_sp, _cdc_gen_svc, cmd.creator_addr, std::move(chng.mutations));
|
||||||
}
|
}
|
||||||
), cmd.change);
|
), cmd.change);
|
||||||
|
|
||||||
@@ -126,8 +126,9 @@ void group0_state_machine::drop_snapshot(raft::snapshot_id id) {
|
|||||||
future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
|
future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
|
||||||
// topology_state_load applies persisted state machine state into
|
// topology_state_load applies persisted state machine state into
|
||||||
// memory and thus needs to be protected with apply mutex
|
// memory and thus needs to be protected with apply mutex
|
||||||
auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);
|
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex();
|
||||||
co_await _ss.topology_state_load();
|
co_await _ss.topology_state_load(_cdc_gen_svc);
|
||||||
|
_ss._topology_state_machine.event.signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) {
|
future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) {
|
||||||
@@ -151,11 +152,11 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s
|
|||||||
|
|
||||||
// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)
|
// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)
|
||||||
|
|
||||||
auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);
|
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex();
|
||||||
|
|
||||||
co_await _mm.merge_schema_from(addr, std::move(*cm));
|
co_await _mm.merge_schema_from(addr, std::move(*cm));
|
||||||
|
|
||||||
if (!topology_snp.mutations.empty()) {
|
if (!topology_snp.topology_mutations.empty()) {
|
||||||
co_await _ss.merge_topology_snapshot(std::move(topology_snp));
|
co_await _ss.merge_topology_snapshot(std::move(topology_snp));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,10 @@
|
|||||||
#include "mutation/canonical_mutation.hh"
|
#include "mutation/canonical_mutation.hh"
|
||||||
#include "service/raft/raft_state_machine.hh"
|
#include "service/raft/raft_state_machine.hh"
|
||||||
|
|
||||||
|
namespace cdc {
|
||||||
|
class generation_service;
|
||||||
|
}
|
||||||
|
|
||||||
namespace service {
|
namespace service {
|
||||||
class raft_group0_client;
|
class raft_group0_client;
|
||||||
class migration_manager;
|
class migration_manager;
|
||||||
@@ -73,8 +77,9 @@ class group0_state_machine : public raft_state_machine {
|
|||||||
migration_manager& _mm;
|
migration_manager& _mm;
|
||||||
storage_proxy& _sp;
|
storage_proxy& _sp;
|
||||||
storage_service& _ss;
|
storage_service& _ss;
|
||||||
|
cdc::generation_service& _cdc_gen_svc;
|
||||||
public:
|
public:
|
||||||
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss) : _client(client), _mm(mm), _sp(sp), _ss(ss) {}
|
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss, cdc::generation_service& cdc_gen_svc) : _client(client), _mm(mm), _sp(sp), _ss(ss), _cdc_gen_svc(cdc_gen_svc) {}
|
||||||
future<> apply(std::vector<raft::command_cref> command) override;
|
future<> apply(std::vector<raft::command_cref> command) override;
|
||||||
future<raft::snapshot_id> take_snapshot() override;
|
future<raft::snapshot_id> take_snapshot() override;
|
||||||
void drop_snapshot(raft::snapshot_id id) override;
|
void drop_snapshot(raft::snapshot_id id) override;
|
||||||
|
|||||||
@@ -134,8 +134,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
|
|||||||
gms::feature_service& feat,
|
gms::feature_service& feat,
|
||||||
db::system_keyspace& sys_ks,
|
db::system_keyspace& sys_ks,
|
||||||
raft_group0_client& client,
|
raft_group0_client& client,
|
||||||
storage_service& ss)
|
storage_service& ss,
|
||||||
: _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _feat(feat), _sys_ks(sys_ks), _client(client), _ss(ss)
|
cdc::generation_service& cdc_gen_svc)
|
||||||
|
: _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _feat(feat), _sys_ks(sys_ks), _client(client), _ss(ss), _cdc_gen_svc(cdc_gen_svc)
|
||||||
, _status_for_monitoring(_raft_gr.is_enabled() ? status_for_monitoring::normal : status_for_monitoring::disabled)
|
, _status_for_monitoring(_raft_gr.is_enabled() ? status_for_monitoring::normal : status_for_monitoring::disabled)
|
||||||
{
|
{
|
||||||
register_metrics();
|
register_metrics();
|
||||||
@@ -195,7 +196,7 @@ const raft::server_id& raft_group0::load_my_id() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id) {
|
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id) {
|
||||||
auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy(), _ss);
|
auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy(), _ss, _cdc_gen_svc);
|
||||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), gid, my_id);
|
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), gid, my_id);
|
||||||
// Keep a reference to a specific RPC class.
|
// Keep a reference to a specific RPC class.
|
||||||
auto& rpc_ref = *rpc;
|
auto& rpc_ref = *rpc;
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ namespace cql3 { class query_processor; }
|
|||||||
|
|
||||||
namespace db { class system_keyspace; }
|
namespace db { class system_keyspace; }
|
||||||
|
|
||||||
|
namespace cdc { class generation_service; }
|
||||||
|
|
||||||
namespace gms { class gossiper; class feature_service; }
|
namespace gms { class gossiper; class feature_service; }
|
||||||
|
|
||||||
namespace service {
|
namespace service {
|
||||||
@@ -76,6 +78,7 @@ class raft_group0 {
|
|||||||
db::system_keyspace& _sys_ks;
|
db::system_keyspace& _sys_ks;
|
||||||
raft_group0_client& _client;
|
raft_group0_client& _client;
|
||||||
service::storage_service& _ss;
|
service::storage_service& _ss;
|
||||||
|
cdc::generation_service& _cdc_gen_svc;
|
||||||
|
|
||||||
// Status of leader discovery. Initially there is no group 0,
|
// Status of leader discovery. Initially there is no group 0,
|
||||||
// and the variant contains no state. During initial cluster
|
// and the variant contains no state. During initial cluster
|
||||||
@@ -114,7 +117,8 @@ public:
|
|||||||
gms::feature_service& feat,
|
gms::feature_service& feat,
|
||||||
db::system_keyspace& sys_ks,
|
db::system_keyspace& sys_ks,
|
||||||
raft_group0_client& client,
|
raft_group0_client& client,
|
||||||
storage_service& ss);
|
storage_service& ss,
|
||||||
|
cdc::generation_service& cdc_gen_svc);
|
||||||
|
|
||||||
// Initialises RPC verbs on all shards.
|
// Initialises RPC verbs on all shards.
|
||||||
// Call after construction but before using the object.
|
// Call after construction but before using the object.
|
||||||
|
|||||||
@@ -245,7 +245,7 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source*
|
|||||||
|
|
||||||
// Take `_group0_read_apply_mutex` *after* read barrier.
|
// Take `_group0_read_apply_mutex` *after* read barrier.
|
||||||
// Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
|
// Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
|
||||||
auto read_apply_holder = co_await get_units(_read_apply_mutex, 1);
|
auto read_apply_holder = co_await hold_read_apply_mutex();
|
||||||
|
|
||||||
auto observed_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
|
auto observed_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
|
||||||
auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
|
auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
|
||||||
@@ -413,6 +413,10 @@ future<> raft_group0_client::wait_until_group0_upgraded(abort_source& as) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<semaphore_units<>> raft_group0_client::hold_read_apply_mutex() {
|
||||||
|
return get_units(_read_apply_mutex, 1);
|
||||||
|
}
|
||||||
|
|
||||||
db::system_keyspace& raft_group0_client::sys_ks() {
|
db::system_keyspace& raft_group0_client::sys_ks() {
|
||||||
return _sys_ks;
|
return _sys_ks;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,7 +64,6 @@ public:
|
|||||||
|
|
||||||
// Singleton that exists only on shard zero. Used to post commands to group zero
|
// Singleton that exists only on shard zero. Used to post commands to group zero
|
||||||
class raft_group0_client {
|
class raft_group0_client {
|
||||||
friend class group0_state_machine;
|
|
||||||
service::raft_group_registry& _raft_gr;
|
service::raft_group_registry& _raft_gr;
|
||||||
db::system_keyspace& _sys_ks;
|
db::system_keyspace& _sys_ks;
|
||||||
|
|
||||||
@@ -166,6 +165,8 @@ public:
|
|||||||
// Wait until group 0 upgrade enters the `use_post_raft_procedures` state.
|
// Wait until group 0 upgrade enters the `use_post_raft_procedures` state.
|
||||||
future<> wait_until_group0_upgraded(abort_source&);
|
future<> wait_until_group0_upgraded(abort_source&);
|
||||||
|
|
||||||
|
future<semaphore_units<>> hold_read_apply_mutex();
|
||||||
|
|
||||||
db::system_keyspace& sys_ks();
|
db::system_keyspace& sys_ks();
|
||||||
|
|
||||||
// for test only
|
// for test only
|
||||||
|
|||||||
@@ -283,7 +283,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela
|
|||||||
slogger.info("Checking bootstrapping/leaving nodes: ok");
|
slogger.info("Checking bootstrapping/leaving nodes: ok");
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> storage_service::topology_state_load() {
|
future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_svc) {
|
||||||
#ifdef SEASTAR_DEBUG
|
#ifdef SEASTAR_DEBUG
|
||||||
static bool running = false;
|
static bool running = false;
|
||||||
assert(!running); // The function is not re-entrant
|
assert(!running); // The function is not re-entrant
|
||||||
@@ -370,8 +370,16 @@ future<> storage_service::topology_state_load() {
|
|||||||
co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid());
|
co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid());
|
||||||
}
|
}
|
||||||
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack});
|
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack});
|
||||||
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip);
|
if (_topology_state_machine._topology.normal_nodes.empty()) {
|
||||||
co_await update_pending_ranges(tmptr, format("bootstrapping node {}/{}", id, ip));
|
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
|
||||||
|
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
|
||||||
|
// (such as the CDC generation write).
|
||||||
|
// It doesn't break anything to set the tokens to normal early in this single-node case.
|
||||||
|
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
|
||||||
|
} else {
|
||||||
|
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip);
|
||||||
|
co_await update_pending_ranges(tmptr, format("bootstrapping node {}/{}", id, ip));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case node_state::decommissioning:
|
case node_state::decommissioning:
|
||||||
case node_state::removing:
|
case node_state::removing:
|
||||||
@@ -404,9 +412,14 @@ future<> storage_service::topology_state_load() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
if (auto gen_id = _topology_state_machine._topology.current_cdc_generation_id) {
|
||||||
|
slogger.debug("topology_state_load: current CDC generation ID: {}", *gen_id);
|
||||||
|
co_await cdc_gen_svc.handle_cdc_generation(*gen_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> storage_service::topology_transition(storage_proxy& proxy, gms::inet_address from, std::vector<canonical_mutation> cms) {
|
future<> storage_service::topology_transition(storage_proxy& proxy, cdc::generation_service& cdc_gen_svc, gms::inet_address from, std::vector<canonical_mutation> cms) {
|
||||||
assert(this_shard_id() == 0);
|
assert(this_shard_id() == 0);
|
||||||
// write new state into persistent storage
|
// write new state into persistent storage
|
||||||
std::vector<mutation> mutations;
|
std::vector<mutation> mutations;
|
||||||
@@ -423,18 +436,24 @@ future<> storage_service::topology_transition(storage_proxy& proxy, gms::inet_ad
|
|||||||
|
|
||||||
co_await proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr());
|
co_await proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr());
|
||||||
|
|
||||||
co_await topology_state_load(); // reload new state
|
co_await topology_state_load(cdc_gen_svc); // reload new state
|
||||||
|
|
||||||
_topology_state_machine.event.signal();
|
_topology_state_machine.event.signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
|
future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
|
||||||
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
|
||||||
std::vector<mutation> muts;
|
std::vector<mutation> muts;
|
||||||
muts.reserve(snp.mutations.size());
|
muts.reserve(snp.topology_mutations.size() + (snp.cdc_generation_mutation ? 1 : 0));
|
||||||
boost::transform(snp.mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) {
|
{
|
||||||
return m.to_mutation(s);
|
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
||||||
});
|
boost::transform(snp.topology_mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) {
|
||||||
|
return m.to_mutation(s);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (snp.cdc_generation_mutation) {
|
||||||
|
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
|
||||||
|
muts.push_back(snp.cdc_generation_mutation->to_mutation(s));
|
||||||
|
}
|
||||||
co_await _db.local().apply(freeze(muts), db::no_timeout);
|
co_await _db.local().apply(freeze(muts), db::no_timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -453,6 +472,8 @@ public:
|
|||||||
topology_mutation_builder& set(const char* cell, const raft::server_id& value);
|
topology_mutation_builder& set(const char* cell, const raft::server_id& value);
|
||||||
topology_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
|
topology_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
|
||||||
topology_mutation_builder& set(const char* cell, const uint32_t& value);
|
topology_mutation_builder& set(const char* cell, const uint32_t& value);
|
||||||
|
topology_mutation_builder& set(const char* cell, const utils::UUID& value);
|
||||||
|
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
|
||||||
topology_mutation_builder& del(const char* cell);
|
topology_mutation_builder& del(const char* cell);
|
||||||
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
|
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
|
||||||
};
|
};
|
||||||
@@ -486,6 +507,14 @@ topology_mutation_builder& topology_mutation_builder::set(const char* cell, cons
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
topology_mutation_builder& topology_mutation_builder::set(
|
||||||
|
const char* cell, const utils::UUID& value) {
|
||||||
|
auto cdef = _s->get_column_definition(cell);
|
||||||
|
assert(cdef);
|
||||||
|
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value)));
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
topology_mutation_builder& topology_mutation_builder::del(const char* cell) {
|
topology_mutation_builder& topology_mutation_builder::del(const char* cell) {
|
||||||
auto cdef = _s->get_column_definition(cell);
|
auto cdef = _s->get_column_definition(cell);
|
||||||
assert(cdef);
|
assert(cdef);
|
||||||
@@ -519,7 +548,14 @@ topology_mutation_builder& topology_mutation_builder::set(const char* cell, cons
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, raft::term_t term, sharded<db::system_distributed_keyspace>& sys_dist_ks, abort_source& as) {
|
topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id(
|
||||||
|
const cdc::generation_id_v2& value) {
|
||||||
|
_m.set_static_cell("current_cdc_generation_timestamp", value.ts, _ts);
|
||||||
|
_m.set_static_cell("current_cdc_generation_uuid", value.id, _ts);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, raft::term_t term, cdc::generation_service& cdc_gen_svc, sharded<db::system_distributed_keyspace>& sys_dist_ks, abort_source& as) {
|
||||||
slogger.info("raft topology: start topology coordinator fiber");
|
slogger.info("raft topology: start topology coordinator fiber");
|
||||||
|
|
||||||
auto abort = as.subscribe([this] () noexcept {
|
auto abort = as.subscribe([this] () noexcept {
|
||||||
@@ -641,12 +677,83 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
|||||||
|
|
||||||
bool res;
|
bool res;
|
||||||
switch (node.rs->ring.value().state) {
|
switch (node.rs->ring.value().state) {
|
||||||
|
case ring_slice::replication_state::commit_cdc_generation: {
|
||||||
|
// make sure all nodes know about new topology and have the new CDC generation data
|
||||||
|
// (we require all nodes to be alive for topo change for now)
|
||||||
|
std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false, replaced_node);
|
||||||
|
if (!res) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't need to add delay to the generation timestamp if this is the first generation.
|
||||||
|
bool add_ts_delay = bool(node.topology->current_cdc_generation_id);
|
||||||
|
|
||||||
|
// Begin the race.
|
||||||
|
// See the large FIXME below.
|
||||||
|
auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, get_ring_delay());
|
||||||
|
auto cdc_gen_uuid = node.rs->ring.value().new_cdc_generation_data_uuid;
|
||||||
|
cdc::generation_id_v2 cdc_gen_id {
|
||||||
|
.ts = cdc_gen_ts,
|
||||||
|
.id = cdc_gen_uuid,
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
// Sanity check.
|
||||||
|
// This could happen if the topology coordinator's clock is broken.
|
||||||
|
auto curr_gen_id = node.topology->current_cdc_generation_id;
|
||||||
|
if (curr_gen_id && curr_gen_id->ts >= cdc_gen_ts) {
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"raft topology: new CDC generation has smaller timestamp than the previous one."
|
||||||
|
" Old generation ID: {}, new generation ID: {}", *curr_gen_id, cdc_gen_id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tell all nodes to start using the new CDC generation by updating the topology
|
||||||
|
// with the generation's ID and timestamp.
|
||||||
|
// At the same time move the topology change procedure to the next step.
|
||||||
|
//
|
||||||
|
// FIXME: as in previous implementation with gossiper and ring_delay, this assumes that all nodes
|
||||||
|
// will learn about the new CDC generation before their clocks reach the generation's timestamp.
|
||||||
|
// With this group 0 based implementation, it means that the command must be committed,
|
||||||
|
// replicated and applied on all nodes before their clocks reach the generation's timestamp
|
||||||
|
// (i.e. within 2 * ring_delay = 60 seconds by default if clocks are synchronized). If this
|
||||||
|
// doesn't hold some coordinators might use the wrong CDC streams for some time and CDC stream
|
||||||
|
// readers will miss some data. It's likely that Raft replication doesn't converge as quickly
|
||||||
|
// as gossiping does.
|
||||||
|
//
|
||||||
|
// We could use a two-phase algorithm instead: first tell all nodes to prepare for using
|
||||||
|
// the new generation, then tell all nodes to commit. If some nodes don't manage to prepare
|
||||||
|
// in time, we abort the generation switch. If all nodes prepare, we commit. If a node prepares
|
||||||
|
// but doesn't receive a commit in time, it stops coordinating CDC-enabled writes until it
|
||||||
|
// receives a commit or abort. This solution does not have a safety problem like the one
|
||||||
|
// above, but it has an availability problem when nodes get disconnected from group 0 majority
|
||||||
|
// in the middle of a CDC generation switch (when they are prepared to switch but not
|
||||||
|
// committed) - they won't coordinate CDC-enabled writes until they reconnect to the
|
||||||
|
// majority and commit.
|
||||||
|
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||||
|
builder.set("replication_state", ring_slice::replication_state::write_both_read_old)
|
||||||
|
.set_current_cdc_generation_id(cdc_gen_id);
|
||||||
|
auto str = fmt::format("{}: committed new CDC generation, ID: {}", node.rs->state, cdc_gen_id);
|
||||||
|
co_await update_replica_state(std::move(node), {builder.build()}, std::move(str));
|
||||||
|
}
|
||||||
|
break;
|
||||||
case ring_slice::ring_slice::replication_state::write_both_read_old: {
|
case ring_slice::ring_slice::replication_state::write_both_read_old: {
|
||||||
// make sure all nodes know about new topology (we require all nodes to be alive for topo change for now)
|
// make sure all nodes know about new topology (we require all nodes to be alive for topo change for now)
|
||||||
std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false, replaced_node);
|
std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false, replaced_node);
|
||||||
if (!res) {
|
if (!res) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If a node is bootstrapping, we just committed a new CDC generation in the commit_cdc_generation step.
|
||||||
|
// Publish it to the user-facing distributed CDC description tables.
|
||||||
|
if (node.rs->state == node_state::bootstrapping) {
|
||||||
|
auto curr_gen_id = node.topology->current_cdc_generation_id.value();
|
||||||
|
auto gen_data = co_await _sys_ks.local().read_cdc_generation(curr_gen_id.id);
|
||||||
|
|
||||||
|
co_await sys_dist_ks.local().create_cdc_desc(
|
||||||
|
curr_gen_id.ts, gen_data, { get_token_metadata().count_normal_token_owners() });
|
||||||
|
}
|
||||||
|
|
||||||
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
|
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
|
||||||
if (node.rs->state == node_state::removing) {
|
if (node.rs->state == node_state::removing) {
|
||||||
// tell all nodes to stream data of the removed node to new range owners
|
// tell all nodes to stream data of the removed node to new range owners
|
||||||
@@ -742,12 +849,70 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
|||||||
auto tmptr = get_token_metadata_ptr();
|
auto tmptr = get_token_metadata_ptr();
|
||||||
auto bootstrap_tokens = boot_strapper::get_random_bootstrap_tokens(tmptr, num_tokens, dht::check_token_endpoint::yes);
|
auto bootstrap_tokens = boot_strapper::get_random_bootstrap_tokens(tmptr, num_tokens, dht::check_token_endpoint::yes);
|
||||||
|
|
||||||
// Write choosen tokens through raft.
|
auto get_sharding_info = [&] (dht::token end) -> std::pair<size_t, uint8_t> {
|
||||||
|
if (bootstrap_tokens.contains(end)) {
|
||||||
|
return {node.rs->shard_count, node.rs->ignore_msb};
|
||||||
|
} else {
|
||||||
|
// FIXME: token metadata should directly return host ID for given token. See #12279
|
||||||
|
auto ep = tmptr->get_endpoint(end);
|
||||||
|
if (!ep) {
|
||||||
|
// get_sharding_info is only called for bootstrap tokens
|
||||||
|
// or for tokens present in token_metadata
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
|
||||||
|
" can't find endpoint for token {}", end));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto id = tmptr->get_host_id_if_known(*ep);
|
||||||
|
if (!id) {
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
|
||||||
|
" can't find host ID for endpoint {}, owner of token {}", *ep, end));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto ptr = node.topology->find(raft::server_id{id->uuid()});
|
||||||
|
if (!ptr) {
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
|
||||||
|
" couldn't find node {} in topology, owner of token {}", *id, end));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& rs = ptr->second;
|
||||||
|
return {rs.shard_count, rs.ignore_msb};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto [gen_uuid, gen_desc] = cdc::make_new_generation_data(
|
||||||
|
bootstrap_tokens, get_sharding_info, tmptr);
|
||||||
|
auto gen_table_schema = _db.local().find_schema(
|
||||||
|
db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
|
||||||
|
|
||||||
|
// FIXME: the CDC generation data can be large and not fit in a single command
|
||||||
|
// (for large clusters, it will introduce reactor stalls and go over commitlog entry
|
||||||
|
// size limit). We need to split it into multiple mutations by smartly picking
|
||||||
|
// a `mutation_size_threshold` and sending each mutation as a separate group 0 command.
|
||||||
|
// We also don't want to serialize the commands - there may be many of them,
|
||||||
|
// and we don't want to wait for a network round-trip to a quorum between each command.
|
||||||
|
// So we need to introduce a mechanism for group 0 to send a sequence of commands
|
||||||
|
// that can be committed concurrently. Also we need to be careful with memory consumption
|
||||||
|
// with many large mutations.
|
||||||
|
// See `system_distributed_keyspace::insert_cdc_generation` for inspiration how it
|
||||||
|
// was done when the mutations were stored in a regular distributed table.
|
||||||
|
const size_t mutation_size_threshold = 2'000'000;
|
||||||
|
auto gen_mutations = co_await cdc::get_cdc_generation_mutations(
|
||||||
|
gen_table_schema, gen_uuid, gen_desc, mutation_size_threshold, node.guard.write_timestamp());
|
||||||
|
std::vector<canonical_mutation> updates{gen_mutations.begin(), gen_mutations.end()};
|
||||||
|
|
||||||
|
// Write chosen tokens and CDC generation data through raft.
|
||||||
builder.set("node_state", node_state::bootstrapping)
|
builder.set("node_state", node_state::bootstrapping)
|
||||||
.del("topology_request")
|
.del("topology_request")
|
||||||
.set("tokens", bootstrap_tokens)
|
.set("tokens", bootstrap_tokens)
|
||||||
.set("replication_state", ring_slice::replication_state::write_both_read_old);
|
.set("replication_state", ring_slice::replication_state::commit_cdc_generation)
|
||||||
co_await update_replica_state(std::move(node), {builder.build()}, "bootstrap: assign tokens");
|
.set("new_cdc_generation_data_uuid", gen_uuid);
|
||||||
|
updates.push_back(builder.build());
|
||||||
|
auto reason = format(
|
||||||
|
"bootstrap: assign tokens and insert CDC generation data (UUID: {})", gen_uuid);
|
||||||
|
co_await update_replica_state(std::move(node), {std::move(updates)}, reason);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case topology_request::leave:
|
case topology_request::leave:
|
||||||
@@ -903,7 +1068,7 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, cdc::generation_service& cdc_gen_svc, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||||
std::optional<abort_source> as;
|
std::optional<abort_source> as;
|
||||||
try {
|
try {
|
||||||
while (!_abort_source.abort_requested()) {
|
while (!_abort_source.abort_requested()) {
|
||||||
@@ -920,7 +1085,7 @@ future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<d
|
|||||||
// We are the leader now but that can change any time!
|
// We are the leader now but that can change any time!
|
||||||
as.emplace();
|
as.emplace();
|
||||||
// start topology change coordinator in the background
|
// start topology change coordinator in the background
|
||||||
_topology_change_coordinator = topology_change_coordinator_fiber(raft, raft.get_current_term(), sys_dist_ks, *as);
|
_topology_change_coordinator = topology_change_coordinator_fiber(raft, raft.get_current_term(), cdc_gen_svc, sys_dist_ks, *as);
|
||||||
}
|
}
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
slogger.info("raft_state_monitor_fiber aborted with {}", std::current_exception());
|
slogger.info("raft_state_monitor_fiber aborted with {}", std::current_exception());
|
||||||
@@ -1277,7 +1442,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
|||||||
slogger.info("topology changes are using raft");
|
slogger.info("topology changes are using raft");
|
||||||
|
|
||||||
// start topology coordinator fiber
|
// start topology coordinator fiber
|
||||||
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks);
|
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, cdc_gen_service, sys_dist_ks);
|
||||||
|
|
||||||
// Need to start system_distributed_keyspace before bootstrap because bootstraping
|
// Need to start system_distributed_keyspace before bootstrap because bootstraping
|
||||||
// process may access those tables.
|
// process may access those tables.
|
||||||
@@ -1441,7 +1606,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
|||||||
&& (!_sys_ks.local().bootstrap_complete()
|
&& (!_sys_ks.local().bootstrap_complete()
|
||||||
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
|
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
|
||||||
try {
|
try {
|
||||||
cdc_gen_id = co_await cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node());
|
cdc_gen_id = co_await cdc_gen_service.legacy_make_new_generation(bootstrap_tokens, !is_first_node());
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
cdc_log.warn(
|
cdc_log.warn(
|
||||||
"Could not create a new CDC generation: {}. This may make it impossible to use CDC or cause performance problems."
|
"Could not create a new CDC generation: {}. This may make it impossible to use CDC or cause performance problems."
|
||||||
@@ -1592,7 +1757,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
|
|||||||
// We don't do any other generation switches (unless we crash before complecting bootstrap).
|
// We don't do any other generation switches (unless we crash before complecting bootstrap).
|
||||||
assert(!cdc_gen_id);
|
assert(!cdc_gen_id);
|
||||||
|
|
||||||
cdc_gen_id = cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node()).get0();
|
cdc_gen_id = cdc_gen_service.legacy_make_new_generation(bootstrap_tokens, !is_first_node()).get0();
|
||||||
|
|
||||||
if (!bootstrap_rbno) {
|
if (!bootstrap_rbno) {
|
||||||
// When is_repair_based_node_ops_enabled is true, the bootstrap node
|
// When is_repair_based_node_ops_enabled is true, the bootstrap node
|
||||||
@@ -4662,14 +4827,57 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
|
|||||||
if (!ss._raft_topology_change_enabled) {
|
if (!ss._raft_topology_change_enabled) {
|
||||||
co_return raft_topology_snapshot{};
|
co_return raft_topology_snapshot{};
|
||||||
}
|
}
|
||||||
auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
|
||||||
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
std::vector<canonical_mutation> topology_mutations;
|
||||||
std::vector<canonical_mutation> results;
|
std::optional<cdc::generation_id_v2> curr_cdc_gen_id;
|
||||||
results.reserve(rs->partitions().size());
|
{
|
||||||
boost::range::transform(rs->partitions(), std::back_inserter(results), [s] (const partition& p) {
|
// FIXME: make it an rwlock, here we only need to lock for reads,
|
||||||
return canonical_mutation{p.mut().unfreeze(s)};
|
// might be useful if multiple nodes are trying to pull concurrently.
|
||||||
});
|
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
|
||||||
co_return raft_topology_snapshot{std::move(results)};
|
auto rs = co_await db::system_keyspace::query_mutations(
|
||||||
|
proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
||||||
|
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
||||||
|
topology_mutations.reserve(rs->partitions().size());
|
||||||
|
boost::range::transform(
|
||||||
|
rs->partitions(), std::back_inserter(topology_mutations), [s] (const partition& p) {
|
||||||
|
return canonical_mutation{p.mut().unfreeze(s)};
|
||||||
|
});
|
||||||
|
|
||||||
|
curr_cdc_gen_id = ss._topology_state_machine._topology.current_cdc_generation_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<canonical_mutation> cdc_generation_mutation;
|
||||||
|
if (curr_cdc_gen_id) {
|
||||||
|
// We don't need to fetch this data under group0 apply mutex, it's immutable.
|
||||||
|
// We only need to include the current CDC generation data in the snapshot,
|
||||||
|
// because nodes only load whatever `current_cdc_generation_id` points to in topology.
|
||||||
|
//
|
||||||
|
// FIXME: when we bootstrap nodes in quick succession, the timestamp of the newest CDC generation
|
||||||
|
// may be for some time larger than the clocks of our nodes. The last bootstrapped node
|
||||||
|
// will only receive the newest CDC generation and not earlier ones, so it will only be able
|
||||||
|
// to coordinate writes to CDC-enabled tables after its clock advances to reach the newest
|
||||||
|
// generation's timestamp. In other words, it may not be able to coordinate writes for some
|
||||||
|
// time after bootstrapping and drivers connecting to it will receive errors.
|
||||||
|
// To fix that, we could store in topology a small history of recent CDC generation IDs
|
||||||
|
// (garbage-collected with time) instead of just the last one, and load all of them.
|
||||||
|
// Alternatively, a node would wait for some time before switching to normal state.
|
||||||
|
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
|
||||||
|
auto key = dht::decorate_key(*s, partition_key::from_singular(*s, curr_cdc_gen_id->id));
|
||||||
|
auto partition_range = dht::partition_range::make_singular(key);
|
||||||
|
auto rs = co_await db::system_keyspace::query_mutations(
|
||||||
|
proxy, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3, partition_range);
|
||||||
|
if (rs->partitions().size() != 1) {
|
||||||
|
on_internal_error(slogger, format(
|
||||||
|
"pull_raft_topology_snapshot: expected a single partition in CDC generation query,"
|
||||||
|
", got {} (generation ID: {})", rs->partitions().size(), *curr_cdc_gen_id));
|
||||||
|
}
|
||||||
|
cdc_generation_mutation.emplace(rs->partitions().begin()->mut().unfreeze(s));
|
||||||
|
}
|
||||||
|
|
||||||
|
co_return raft_topology_snapshot{
|
||||||
|
.topology_mutations = std::move(topology_mutations),
|
||||||
|
.cdc_generation_mutation = std::move(cdc_generation_mutation),
|
||||||
|
};
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -762,13 +762,13 @@ private:
|
|||||||
future<> _raft_state_monitor = make_ready_future<>();
|
future<> _raft_state_monitor = make_ready_future<>();
|
||||||
// This fibers monitors raft state and start/stops the topology change
|
// This fibers monitors raft state and start/stops the topology change
|
||||||
// coordinator fiber
|
// coordinator fiber
|
||||||
future<> raft_state_monitor_fiber(raft::server&, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
future<> raft_state_monitor_fiber(raft::server&, cdc::generation_service&, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||||
|
|
||||||
// State machine that is responsible for topology change
|
// State machine that is responsible for topology change
|
||||||
topology_state_machine _topology_state_machine;
|
topology_state_machine _topology_state_machine;
|
||||||
|
|
||||||
future<> _topology_change_coordinator = make_ready_future<>();
|
future<> _topology_change_coordinator = make_ready_future<>();
|
||||||
future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, sharded<db::system_distributed_keyspace>&, abort_source&);
|
future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, cdc::generation_service&, sharded<db::system_distributed_keyspace>&, abort_source&);
|
||||||
|
|
||||||
// Those futures hold results of streaming for various operations
|
// Those futures hold results of streaming for various operations
|
||||||
std::optional<shared_future<>> _bootstrap_result;
|
std::optional<shared_future<>> _bootstrap_result;
|
||||||
@@ -786,10 +786,13 @@ private:
|
|||||||
future<> update_topology_with_local_metadata(raft::server&);
|
future<> update_topology_with_local_metadata(raft::server&);
|
||||||
|
|
||||||
// This is called on all nodes for each new command received through raft
|
// This is called on all nodes for each new command received through raft
|
||||||
future<> topology_transition(storage_proxy& proxy, gms::inet_address, std::vector<canonical_mutation>);
|
// raft_group0_client::_read_apply_mutex must be held
|
||||||
|
future<> topology_transition(storage_proxy& proxy, cdc::generation_service&, gms::inet_address, std::vector<canonical_mutation>);
|
||||||
// load topology state machine snapshot into memory
|
// load topology state machine snapshot into memory
|
||||||
future<> topology_state_load();
|
// raft_group0_client::_read_apply_mutex must be held
|
||||||
|
future<> topology_state_load(cdc::generation_service&);
|
||||||
// Applies received raft snapshot to local state machine persistent storage
|
// Applies received raft snapshot to local state machine persistent storage
|
||||||
|
// raft_group0_client::_read_apply_mutex must be held
|
||||||
future<> merge_topology_snapshot(raft_topology_snapshot snp);
|
future<> merge_topology_snapshot(raft_topology_snapshot snp);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -11,7 +11,9 @@
|
|||||||
|
|
||||||
namespace service {
|
namespace service {
|
||||||
|
|
||||||
const std::pair<const raft::server_id, replica_state>* topology::find(raft::server_id id) {
|
logging::logger tsmlogger("topology_state_machine");
|
||||||
|
|
||||||
|
const std::pair<const raft::server_id, replica_state>* topology::find(raft::server_id id) const {
|
||||||
auto it = normal_nodes.find(id);
|
auto it = normal_nodes.find(id);
|
||||||
if (it != normal_nodes.end()) {
|
if (it != normal_nodes.end()) {
|
||||||
return &*it;
|
return &*it;
|
||||||
@@ -35,14 +37,18 @@ bool topology::contains(raft::server_id id) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static std::unordered_map<ring_slice::replication_state, sstring> replication_state_to_name_map = {
|
static std::unordered_map<ring_slice::replication_state, sstring> replication_state_to_name_map = {
|
||||||
|
{ring_slice::replication_state::commit_cdc_generation, "commit cdc generation"},
|
||||||
{ring_slice::replication_state::write_both_read_old, "write both read old"},
|
{ring_slice::replication_state::write_both_read_old, "write both read old"},
|
||||||
{ring_slice::replication_state::write_both_read_new, "write both read new"},
|
{ring_slice::replication_state::write_both_read_new, "write both read new"},
|
||||||
{ring_slice::replication_state::owner, "owner"},
|
{ring_slice::replication_state::owner, "owner"},
|
||||||
};
|
};
|
||||||
|
|
||||||
std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s) {
|
std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s) {
|
||||||
os << replication_state_to_name_map[s];
|
auto it = replication_state_to_name_map.find(s);
|
||||||
return os;
|
if (it == replication_state_to_name_map.end()) {
|
||||||
|
on_internal_error(tsmlogger, "cannot print replication_state");
|
||||||
|
}
|
||||||
|
return os << it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
ring_slice::replication_state replication_state_from_string(const sstring& s) {
|
ring_slice::replication_state replication_state_from_string(const sstring& s) {
|
||||||
@@ -51,7 +57,7 @@ ring_slice::replication_state replication_state_from_string(const sstring& s) {
|
|||||||
return e.first;
|
return e.first;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw std::runtime_error(fmt::format("cannot map name {} to token_state", s));
|
on_internal_error(tsmlogger, format("cannot map name {} to replication_state", s));
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::unordered_map<node_state, sstring> node_state_to_name_map = {
|
static std::unordered_map<node_state, sstring> node_state_to_name_map = {
|
||||||
@@ -66,8 +72,11 @@ static std::unordered_map<node_state, sstring> node_state_to_name_map = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
std::ostream& operator<<(std::ostream& os, node_state s) {
|
std::ostream& operator<<(std::ostream& os, node_state s) {
|
||||||
os << node_state_to_name_map[s];
|
auto it = node_state_to_name_map.find(s);
|
||||||
return os;
|
if (it == node_state_to_name_map.end()) {
|
||||||
|
on_internal_error(tsmlogger, "cannot print node_state");
|
||||||
|
}
|
||||||
|
return os << it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
node_state node_state_from_string(const sstring& s) {
|
node_state node_state_from_string(const sstring& s) {
|
||||||
@@ -76,7 +85,7 @@ node_state node_state_from_string(const sstring& s) {
|
|||||||
return e.first;
|
return e.first;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw std::runtime_error(fmt::format("cannot map name {} to node_state", s));
|
on_internal_error(tsmlogger, format("cannot map name {} to node_state", s));
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::unordered_map<topology_request, sstring> topology_request_to_name_map = {
|
static std::unordered_map<topology_request, sstring> topology_request_to_name_map = {
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
#include <seastar/core/condition-variable.hh>
|
#include <seastar/core/condition-variable.hh>
|
||||||
#include <seastar/core/sstring.hh>
|
#include <seastar/core/sstring.hh>
|
||||||
#include <seastar/core/shared_ptr.hh>
|
#include <seastar/core/shared_ptr.hh>
|
||||||
|
#include "cdc/generation_id.hh"
|
||||||
#include "dht/token.hh"
|
#include "dht/token.hh"
|
||||||
#include "raft/raft.hh"
|
#include "raft/raft.hh"
|
||||||
#include "utils/UUID.hh"
|
#include "utils/UUID.hh"
|
||||||
@@ -48,6 +49,7 @@ using request_param = std::variant<raft::server_id, sstring, uint32_t>;
|
|||||||
|
|
||||||
struct ring_slice {
|
struct ring_slice {
|
||||||
enum class replication_state: uint8_t {
|
enum class replication_state: uint8_t {
|
||||||
|
commit_cdc_generation,
|
||||||
write_both_read_old,
|
write_both_read_old,
|
||||||
write_both_read_new,
|
write_both_read_new,
|
||||||
owner
|
owner
|
||||||
@@ -55,6 +57,11 @@ struct ring_slice {
|
|||||||
|
|
||||||
replication_state state;
|
replication_state state;
|
||||||
std::unordered_set<dht::token> tokens;
|
std::unordered_set<dht::token> tokens;
|
||||||
|
|
||||||
|
// When a new node joins the cluster, always a new CDC generation is created.
|
||||||
|
// This is the UUID used to access the data of the CDC generation introduced
|
||||||
|
// when the node owning this ring_slice joined (it's the partition key in CDC_GENERATIONS_V3 table).
|
||||||
|
utils::UUID new_cdc_generation_data_uuid;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct replica_state {
|
struct replica_state {
|
||||||
@@ -85,14 +92,20 @@ struct topology {
|
|||||||
// operation untill the node becomes normal
|
// operation untill the node becomes normal
|
||||||
std::unordered_map<raft::server_id, request_param> req_param;
|
std::unordered_map<raft::server_id, request_param> req_param;
|
||||||
|
|
||||||
|
std::optional<cdc::generation_id_v2> current_cdc_generation_id;
|
||||||
|
|
||||||
// Find only nodes in non 'left' state
|
// Find only nodes in non 'left' state
|
||||||
const std::pair<const raft::server_id, replica_state>* find(raft::server_id id);
|
const std::pair<const raft::server_id, replica_state>* find(raft::server_id id) const;
|
||||||
// Return true if node exists in any state including 'left' one
|
// Return true if node exists in any state including 'left' one
|
||||||
bool contains(raft::server_id id);
|
bool contains(raft::server_id id);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct raft_topology_snapshot {
|
struct raft_topology_snapshot {
|
||||||
std::vector<canonical_mutation> mutations;
|
// Mutations for the system.topology table.
|
||||||
|
std::vector<canonical_mutation> topology_mutations;
|
||||||
|
|
||||||
|
// Mutation for system.cdc_generations_v3, contains the current CDC generation data.
|
||||||
|
std::optional<canonical_mutation> cdc_generation_mutation;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct raft_topology_pull_params {
|
struct raft_topology_pull_params {
|
||||||
|
|||||||
@@ -886,7 +886,7 @@ public:
|
|||||||
|
|
||||||
service::raft_group0 group0_service{
|
service::raft_group0 group0_service{
|
||||||
abort_sources.local(), raft_gr.local(), ms,
|
abort_sources.local(), raft_gr.local(), ms,
|
||||||
gossiper.local(), qp.local(), mm.local(), feature_service.local(), sys_ks.local(), group0_client, ss.local()};
|
gossiper.local(), qp.local(), mm.local(), feature_service.local(), sys_ks.local(), group0_client, ss.local(), cdc_generation_service.local()};
|
||||||
group0_service.start().get();
|
group0_service.start().get();
|
||||||
auto stop_group0_service = defer([&group0_service] {
|
auto stop_group0_service = defer([&group0_service] {
|
||||||
group0_service.abort().get();
|
group0_service.abort().get();
|
||||||
|
|||||||
Reference in New Issue
Block a user