system_keyspace: De-static cdc_set_rewritten()
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -613,7 +613,7 @@ future<> generation_service::maybe_rewrite_streams_descriptions() {
|
||||
if (times_and_ttls.empty()) {
|
||||
// There's no point in rewriting old generations' streams (they don't contain any data).
|
||||
cdc_log.info("No CDC log tables present, not rewriting stream tables.");
|
||||
co_return co_await db::system_keyspace::cdc_set_rewritten(std::nullopt);
|
||||
co_return co_await _sys_ks.local().cdc_set_rewritten(std::nullopt);
|
||||
}
|
||||
|
||||
auto get_num_token_owners = [tm = _token_metadata.get()] { return tm->count_normal_token_owners(); };
|
||||
@@ -631,7 +631,7 @@ future<> generation_service::maybe_rewrite_streams_descriptions() {
|
||||
std::move(get_num_token_owners),
|
||||
_abort_src);
|
||||
|
||||
co_await db::system_keyspace::cdc_set_rewritten(last_rewritten);
|
||||
co_await _sys_ks.local().cdc_set_rewritten(last_rewritten);
|
||||
}
|
||||
|
||||
static void assert_shard_zero(const sstring& where) {
|
||||
|
||||
@@ -1793,12 +1793,12 @@ static const sstring CDC_REWRITTEN_KEY = "rewritten";
|
||||
|
||||
future<> system_keyspace::cdc_set_rewritten(std::optional<cdc::generation_id_v1> gen_id) {
|
||||
if (gen_id) {
|
||||
return qctx->execute_cql(
|
||||
return execute_cql(
|
||||
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL),
|
||||
CDC_REWRITTEN_KEY, gen_id->ts).discard_result();
|
||||
} else {
|
||||
// Insert just the row marker.
|
||||
return qctx->execute_cql(
|
||||
return execute_cql(
|
||||
format("INSERT INTO system.{} (key) VALUES (?)", v3::CDC_LOCAL),
|
||||
CDC_REWRITTEN_KEY).discard_result();
|
||||
}
|
||||
|
||||
@@ -419,7 +419,7 @@ public:
|
||||
static future<std::optional<cdc::generation_id>> get_cdc_generation_id();
|
||||
|
||||
static future<bool> cdc_is_rewritten();
|
||||
static future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);
|
||||
future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);
|
||||
|
||||
static future<> enable_features_on_startup(sharded<gms::feature_service>& feat);
|
||||
|
||||
|
||||
@@ -550,7 +550,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
|
||||
// Don't try rewriting CDC stream description tables.
|
||||
// See cdc.md design notes, `Streams description table V1 and rewriting` section, for explanation.
|
||||
co_await db::system_keyspace::cdc_set_rewritten(std::nullopt);
|
||||
co_await _sys_ks.local().cdc_set_rewritten(std::nullopt);
|
||||
}
|
||||
|
||||
if (!cdc_gen_id) {
|
||||
|
||||
Reference in New Issue
Block a user