cdc: Replace db::config with generation_service::config

This is to push the service towards general idea that each
component should have its own config and db::config to stay
in main.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-09-22 10:32:30 +03:00
parent b879d3f3a5
commit db623c5f64
6 changed files with 25 additions and 14 deletions

View File

@@ -29,7 +29,6 @@
#include "keys.hh"
#include "schema_builder.hh"
#include "database.hh"
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "dht/token-sharding.hh"
@@ -337,13 +336,12 @@ future<cdc::generation_id> generation_service::make_new_generation(const std::un
using namespace std::chrono_literals;
const locator::token_metadata_ptr tmptr = _token_metadata.get();
auto gen = topology_description_generator(_cfg.murmur3_partitioner_ignore_msb_bits(), bootstrap_tokens, tmptr, _gossiper).generate();
std::chrono::milliseconds ring_delay(_cfg.ring_delay_ms());
auto gen = topology_description_generator(_cfg.ignore_msb_bits, bootstrap_tokens, tmptr, _gossiper).generate();
// We need to call this as late in the procedure as possible.
// In the V2 format we can do this after inserting the generation data into the table;
// in the V1 format we must do it before (because the timestamp is the partition key in the V1 format).
auto new_generation_timestamp = [add_delay, ring_delay] {
auto new_generation_timestamp = [add_delay, ring_delay = _cfg.ring_delay] {
auto ts = db_clock::now();
if (add_delay && ring_delay != 0ms) {
ts += 2 * ring_delay + duration_cast<milliseconds>(generation_leeway);
@@ -597,7 +595,7 @@ future<> generation_service::maybe_rewrite_streams_descriptions() {
co_return;
}
if (_db.get_config().cdc_dont_rewrite_streams()) {
if (_cfg.dont_rewrite_streams) {
cdc_log.warn("Stream rewriting disabled. Manual administrator intervention may be required...");
co_return;
}
@@ -694,10 +692,10 @@ constexpr char could_not_retrieve_msg_template[]
= "Could not retrieve CDC streams with timestamp {} upon gossip event. Reason: \"{}\". Action: {}.";
generation_service::generation_service(
const db::config& cfg, gms::gossiper& g, sharded<db::system_distributed_keyspace>& sys_dist_ks,
config cfg, gms::gossiper& g, sharded<db::system_distributed_keyspace>& sys_dist_ks,
abort_source& abort_src, const locator::shared_token_metadata& stm, gms::feature_service& f,
database& db)
: _cfg(cfg)
: _cfg(std::move(cfg))
, _gossiper(g)
, _sys_dist_ks(sys_dist_ks)
, _abort_src(abort_src)

View File

@@ -29,7 +29,6 @@
namespace db {
class system_distributed_keyspace;
class config;
}
namespace gms {
@@ -50,13 +49,20 @@ namespace cdc {
class generation_service : public peering_sharded_service<generation_service>
, public async_sharded_service<generation_service>
, public gms::i_endpoint_state_change_subscriber {
public:
struct config {
unsigned ignore_msb_bits;
std::chrono::milliseconds ring_delay;
bool dont_rewrite_streams = false;
};
private:
bool _stopped = false;
// The node has joined the token ring. Set to `true` on `after_join` call.
bool _joined = false;
const db::config& _cfg;
config _cfg;
gms::gossiper& _gossiper;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
abort_source& _abort_src;
@@ -83,7 +89,7 @@ class generation_service : public peering_sharded_service<generation_service>
*/
std::optional<cdc::generation_id> _gen_id;
public:
generation_service(const db::config&, gms::gossiper&,
generation_service(config cfg, gms::gossiper&,
sharded<db::system_distributed_keyspace>&, abort_source&, const locator::shared_token_metadata&,
gms::feature_service&, database& db);

View File

@@ -35,7 +35,6 @@
#include "cdc/metadata.hh"
#include "bytes.hh"
#include "database.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "partition_slice_builder.hh"
#include "schema.hh"

View File

@@ -1100,7 +1100,11 @@ int main(int ac, char** av) {
* every time it accesses it (because it may have been stopped already), then take local_shared()
* which will prevent sys_dist_ks from being destroyed while the service operates on it.
*/
cdc_generation_service.start(std::ref(*cfg), std::ref(gossiper), std::ref(sys_dist_ks),
cdc::generation_service::config cdc_config;
cdc_config.ignore_msb_bits = cfg->murmur3_partitioner_ignore_msb_bits();
cdc_config.ring_delay = std::chrono::milliseconds(cfg->ring_delay_ms());
cdc_config.dont_rewrite_streams = cfg->cdc_dont_rewrite_streams();
cdc_generation_service.start(std::move(cdc_config), std::ref(gossiper), std::ref(sys_dist_ks),
std::ref(stop_signal.as_sharded_abort_source()), std::ref(token_metadata), std::ref(feature_service), std::ref(db)).get();
auto stop_cdc_generation_service = defer_verbose_shutdown("CDC Generation Management service", [] {
cdc_generation_service.stop().get();

View File

@@ -121,7 +121,8 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources), std::ref(sst_dir_semaphore)).get();
auto stop_db = defer([&] { db.stop().get(); });
cdc_generation_service.start(std::ref(*cfg), std::ref(gms::get_gossiper()), std::ref(sys_dist_ks), std::ref(abort_sources), std::ref(token_metadata), std::ref(feature_service), std::ref(db)).get();
cdc::generation_service::config cdc_cfg;
cdc_generation_service.start(std::move(cdc_cfg), std::ref(gms::get_gossiper()), std::ref(sys_dist_ks), std::ref(abort_sources), std::ref(token_metadata), std::ref(feature_service), std::ref(db)).get();
auto stop_cdc_generation_service = defer([&cdc_generation_service] {
cdc_generation_service.stop().get();
});

View File

@@ -693,7 +693,10 @@ public:
stop_raft_rpc.cancel();
}
cdc_generation_service.start(std::ref(*cfg), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(abort_sources), std::ref(token_metadata), std::ref(feature_service), std::ref(db)).get();
cdc::generation_service::config cdc_config;
cdc_config.ignore_msb_bits = cfg->murmur3_partitioner_ignore_msb_bits();
cdc_config.ring_delay = std::chrono::milliseconds(cfg->ring_delay_ms());
cdc_generation_service.start(std::ref(cdc_config), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(abort_sources), std::ref(token_metadata), std::ref(feature_service), std::ref(db)).get();
auto stop_cdc_generation_service = defer([&cdc_generation_service] {
cdc_generation_service.stop().get();
});