diff --git a/cdc/cdc.cc b/cdc/cdc.cc index d09aa931fc..3fab56d1ce 100644 --- a/cdc/cdc.cc +++ b/cdc/cdc.cc @@ -265,11 +265,6 @@ db_context::builder& db_context::builder::with_token_metadata(locator::token_met return *this; } -db_context::builder& db_context::builder::with_snitch(locator::snitch_ptr& snitch) { - _snitch = snitch; - return *this; -} - db_context::builder& db_context::builder::with_partitioner(dht::i_partitioner& partitioner) { _partitioner = partitioner; return *this; @@ -286,7 +281,6 @@ db_context db_context::builder::build() { _migration_notifier ? _migration_notifier->get() : service::get_local_storage_service().get_migration_notifier(), _token_metadata ? _token_metadata->get() : service::get_local_storage_service().get_token_metadata(), _cdc_metadata ? _cdc_metadata->get() : service::get_local_storage_service().get_cdc_metadata(), - _snitch ? _snitch->get() : locator::i_endpoint_snitch::get_local_snitch_ptr(), _partitioner ? _partitioner->get() : dht::global_partitioner() }; } diff --git a/cdc/cdc.hh b/cdc/cdc.hh index 992652a8d9..e081db12a7 100644 --- a/cdc/cdc.hh +++ b/cdc/cdc.hh @@ -41,7 +41,6 @@ using schema_ptr = seastar::lw_shared_ptr; namespace locator { -class snitch_ptr; class token_metadata; } // namespace locator @@ -103,7 +102,6 @@ struct db_context final { service::migration_notifier& _migration_notifier; locator::token_metadata& _token_metadata; cdc::metadata& _cdc_metadata; - locator::snitch_ptr& _snitch; dht::i_partitioner& _partitioner; class builder final { @@ -111,14 +109,12 @@ struct db_context final { std::optional> _migration_notifier; std::optional> _token_metadata; std::optional> _cdc_metadata; - std::optional> _snitch; std::optional> _partitioner; public: builder(service::storage_proxy& proxy); builder& with_migration_notifier(service::migration_notifier& migration_notifier); builder& with_token_metadata(locator::token_metadata& token_metadata); - builder& with_snitch(locator::snitch_ptr& snitch); builder& with_partitioner(dht::i_partitioner& partitioner); builder& with_cdc_metadata(cdc::metadata&); diff --git a/cdc/generation.cc b/cdc/generation.cc index b7db71897f..a68823971f 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -30,7 +30,6 @@ #include "db/system_distributed_keyspace.hh" #include "dht/i_partitioner.hh" #include "locator/token_metadata.hh" -#include "locator/snitch_base.hh" #include "gms/application_state.hh" #include "gms/inet_address.hh" #include "gms/gossiper.hh" @@ -39,6 +38,16 @@ extern logging::logger cdc_log; +static int get_shard_count(const gms::inet_address& endpoint, const gms::gossiper& g) { + auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::SHARD_COUNT); + return ep_state ? std::stoi(ep_state->value) : -1; +} + +static unsigned get_sharding_ignore_msb(const gms::inet_address& endpoint, const gms::gossiper& g) { + auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::IGNORE_MSB_BITS); + return ep_state ? std::stoi(ep_state->value) : 0; +} + namespace cdc { extern const api::timestamp_clock::duration generation_leeway = @@ -112,7 +121,7 @@ topology_description generate_topology_description( const std::unordered_set& bootstrap_tokens, const locator::token_metadata& token_metadata, const dht::i_partitioner& partitioner, - locator::snitch_ptr& snitch) { + const gms::gossiper& gossiper) { if (bootstrap_tokens.empty()) { throw std::runtime_error( "cdc: bootstrap tokens is empty in generate_topology_description"); @@ -138,9 +147,9 @@ topology_description generate_topology_description( if (!endpoint) { throw std::runtime_error(format("Can't find endpoint for token {}", entry.token_range_end)); } - auto sc = snitch->get_shard_count(*endpoint); + auto sc = get_shard_count(*endpoint, gossiper); entry.streams.resize(sc > 0 ? sc : 1); - entry.sharding_ignore_msb = snitch->get_ignore_msb_bits(*endpoint); + entry.sharding_ignore_msb = get_sharding_ignore_msb(*endpoint, gossiper); } spots_to_fill += entry.streams.size(); @@ -285,13 +294,14 @@ future get_local_streams_timestamp() { db_clock::time_point make_new_cdc_generation( const std::unordered_set& bootstrap_tokens, const locator::token_metadata& tm, + const gms::gossiper& g, db::system_distributed_keyspace& sys_dist_ks, std::chrono::milliseconds ring_delay, bool for_testing) { assert(!bootstrap_tokens.empty()); auto gen = generate_topology_description( - bootstrap_tokens, tm, dht::global_partitioner(), locator::i_endpoint_snitch::get_local_snitch_ptr()); + bootstrap_tokens, tm, dht::global_partitioner(), g); // Begin the race. auto ts = db_clock::now() + ( diff --git a/locator/snitch_base.cc b/locator/snitch_base.cc index 12a33c04df..d800cd824e 100644 --- a/locator/snitch_base.cc +++ b/locator/snitch_base.cc @@ -49,18 +49,6 @@ snitch_base::get_endpoint_info(inet_address endpoint, return ep_state ? std::optional(ep_state->value) : std::nullopt; } -int snitch_base::get_shard_count(inet_address endpoint) { - auto val = get_endpoint_info(endpoint, - gms::application_state::SHARD_COUNT); - return val ? std::stoi(*val) : -1; -} - -unsigned snitch_base::get_ignore_msb_bits(inet_address endpoint) { - auto val = get_endpoint_info(endpoint, - gms::application_state::IGNORE_MSB_BITS); - return val ? std::stoi(*val) : 0; -} - std::vector snitch_base::get_sorted_list_by_proximity( inet_address address, std::vector& unsorted_address) { diff --git a/locator/snitch_base.hh b/locator/snitch_base.hh index 01b7210920..c886ef3ec8 100644 --- a/locator/snitch_base.hh +++ b/locator/snitch_base.hh @@ -83,16 +83,6 @@ public: * returns a String representing the datacenter this endpoint belongs to */ virtual sstring get_datacenter(inet_address endpoint) = 0; - /** - * returns an int representing the number of shards this endpoint has - */ - virtual int get_shard_count(inet_address endpoint) = 0; - - /** - * returns an unsigned representing the value of ignore_msb_bits this - * endpoint has - */ - virtual unsigned get_ignore_msb_bits(inet_address endpoint) = 0; /** * returns a new List sorted by proximity to the given endpoint @@ -422,10 +412,6 @@ public: // virtual sstring get_datacenter(inet_address endpoint) = 0; // - virtual int get_shard_count(inet_address endpoint) override; - - virtual unsigned get_ignore_msb_bits(inet_address endpoint) override; - virtual std::vector get_sorted_list_by_proximity( inet_address address, std::vector& unsorted_address) override; diff --git a/service/storage_service.cc b/service/storage_service.cc index ca8f075e94..d14b14b53c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -587,6 +587,9 @@ void storage_service::prepare_to_join(std::vector loaded_endpoints // gossip snitch infos (local DC and rack) gossip_snitch_info().get(); + // gossip local partitioner information (shard count and ignore_msb_bits) + gossip_sharding_info().get(); + // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) #if 0 if (!MessagingService.instance().isListening()) @@ -818,7 +821,8 @@ void storage_service::join_token_ring(int delay) { || cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) { _cdc_streams_ts = cdc::make_new_cdc_generation( - _bootstrap_tokens, _token_metadata, _sys_dist_ks.local(), get_ring_delay(), _for_testing); + _bootstrap_tokens, _token_metadata, _gossiper, + _sys_dist_ks.local(), get_ring_delay(), _for_testing); } } @@ -1041,7 +1045,8 @@ void storage_service::bootstrap() { assert(!_cdc_streams_ts); _cdc_streams_ts = cdc::make_new_cdc_generation( - _bootstrap_tokens, _token_metadata, _sys_dist_ks.local(), get_ring_delay(), _for_testing); + _bootstrap_tokens, _token_metadata, _gossiper, + _sys_dist_ks.local(), get_ring_delay(), _for_testing); } else { // We should not be able to join the cluster if other nodes support CDC but we don't. // The check should have been made somewhere in prepare_to_join (`check_knows_remote_features`). @@ -1875,6 +1880,11 @@ future<> storage_service::gossip_snitch_info() { return _gossiper.add_local_application_state({ { gms::application_state::DC, value_factory.datacenter(dc) }, { gms::application_state::RACK, value_factory.rack(rack) }, + }); +} + +future<> storage_service::gossip_sharding_info() { + return _gossiper.add_local_application_state({ { gms::application_state::SHARD_COUNT, value_factory.shard_count(smp::count) }, { gms::application_state::IGNORE_MSB_BITS, value_factory.ignore_msb_bits(dht::global_partitioner().sharding_ignore_msb()) }, }); diff --git a/service/storage_service.hh b/service/storage_service.hh index 4b30505ad9..cb9a19097e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -222,6 +222,7 @@ public: } future<> gossip_snitch_info(); + future<> gossip_sharding_info(); distributed& db() { return _db;