locator: remove get_shard_count and get_ignore_msb_bits from snitch
Snitch forms a class hierarchy which get_shard_count and get_ignore_msb_bits ignore (their returned values only depend on the gossiper's state). Besides, these functions just don't belong there. Snitch has nothing to do with shard_count or ignore_msb_bits.
This commit is contained in:
@@ -265,11 +265,6 @@ db_context::builder& db_context::builder::with_token_metadata(locator::token_met
|
||||
return *this;
|
||||
}
|
||||
|
||||
db_context::builder& db_context::builder::with_snitch(locator::snitch_ptr& snitch) {
|
||||
_snitch = snitch;
|
||||
return *this;
|
||||
}
|
||||
|
||||
db_context::builder& db_context::builder::with_partitioner(dht::i_partitioner& partitioner) {
|
||||
_partitioner = partitioner;
|
||||
return *this;
|
||||
@@ -286,7 +281,6 @@ db_context db_context::builder::build() {
|
||||
_migration_notifier ? _migration_notifier->get() : service::get_local_storage_service().get_migration_notifier(),
|
||||
_token_metadata ? _token_metadata->get() : service::get_local_storage_service().get_token_metadata(),
|
||||
_cdc_metadata ? _cdc_metadata->get() : service::get_local_storage_service().get_cdc_metadata(),
|
||||
_snitch ? _snitch->get() : locator::i_endpoint_snitch::get_local_snitch_ptr(),
|
||||
_partitioner ? _partitioner->get() : dht::global_partitioner()
|
||||
};
|
||||
}
|
||||
|
||||
@@ -41,7 +41,6 @@ using schema_ptr = seastar::lw_shared_ptr<const schema>;
|
||||
|
||||
namespace locator {
|
||||
|
||||
class snitch_ptr;
|
||||
class token_metadata;
|
||||
|
||||
} // namespace locator
|
||||
@@ -103,7 +102,6 @@ struct db_context final {
|
||||
service::migration_notifier& _migration_notifier;
|
||||
locator::token_metadata& _token_metadata;
|
||||
cdc::metadata& _cdc_metadata;
|
||||
locator::snitch_ptr& _snitch;
|
||||
dht::i_partitioner& _partitioner;
|
||||
|
||||
class builder final {
|
||||
@@ -111,14 +109,12 @@ struct db_context final {
|
||||
std::optional<std::reference_wrapper<service::migration_notifier>> _migration_notifier;
|
||||
std::optional<std::reference_wrapper<locator::token_metadata>> _token_metadata;
|
||||
std::optional<std::reference_wrapper<cdc::metadata>> _cdc_metadata;
|
||||
std::optional<std::reference_wrapper<locator::snitch_ptr>> _snitch;
|
||||
std::optional<std::reference_wrapper<dht::i_partitioner>> _partitioner;
|
||||
public:
|
||||
builder(service::storage_proxy& proxy);
|
||||
|
||||
builder& with_migration_notifier(service::migration_notifier& migration_notifier);
|
||||
builder& with_token_metadata(locator::token_metadata& token_metadata);
|
||||
builder& with_snitch(locator::snitch_ptr& snitch);
|
||||
builder& with_partitioner(dht::i_partitioner& partitioner);
|
||||
builder& with_cdc_metadata(cdc::metadata&);
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "locator/snitch_base.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
@@ -39,6 +38,16 @@
|
||||
|
||||
extern logging::logger cdc_log;
|
||||
|
||||
static int get_shard_count(const gms::inet_address& endpoint, const gms::gossiper& g) {
|
||||
auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::SHARD_COUNT);
|
||||
return ep_state ? std::stoi(ep_state->value) : -1;
|
||||
}
|
||||
|
||||
static unsigned get_sharding_ignore_msb(const gms::inet_address& endpoint, const gms::gossiper& g) {
|
||||
auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::IGNORE_MSB_BITS);
|
||||
return ep_state ? std::stoi(ep_state->value) : 0;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
|
||||
extern const api::timestamp_clock::duration generation_leeway =
|
||||
@@ -112,7 +121,7 @@ topology_description generate_topology_description(
|
||||
const std::unordered_set<dht::token>& bootstrap_tokens,
|
||||
const locator::token_metadata& token_metadata,
|
||||
const dht::i_partitioner& partitioner,
|
||||
locator::snitch_ptr& snitch) {
|
||||
const gms::gossiper& gossiper) {
|
||||
if (bootstrap_tokens.empty()) {
|
||||
throw std::runtime_error(
|
||||
"cdc: bootstrap tokens is empty in generate_topology_description");
|
||||
@@ -138,9 +147,9 @@ topology_description generate_topology_description(
|
||||
if (!endpoint) {
|
||||
throw std::runtime_error(format("Can't find endpoint for token {}", entry.token_range_end));
|
||||
}
|
||||
auto sc = snitch->get_shard_count(*endpoint);
|
||||
auto sc = get_shard_count(*endpoint, gossiper);
|
||||
entry.streams.resize(sc > 0 ? sc : 1);
|
||||
entry.sharding_ignore_msb = snitch->get_ignore_msb_bits(*endpoint);
|
||||
entry.sharding_ignore_msb = get_sharding_ignore_msb(*endpoint, gossiper);
|
||||
}
|
||||
|
||||
spots_to_fill += entry.streams.size();
|
||||
@@ -285,13 +294,14 @@ future<db_clock::time_point> get_local_streams_timestamp() {
|
||||
db_clock::time_point make_new_cdc_generation(
|
||||
const std::unordered_set<dht::token>& bootstrap_tokens,
|
||||
const locator::token_metadata& tm,
|
||||
const gms::gossiper& g,
|
||||
db::system_distributed_keyspace& sys_dist_ks,
|
||||
std::chrono::milliseconds ring_delay,
|
||||
bool for_testing) {
|
||||
assert(!bootstrap_tokens.empty());
|
||||
|
||||
auto gen = generate_topology_description(
|
||||
bootstrap_tokens, tm, dht::global_partitioner(), locator::i_endpoint_snitch::get_local_snitch_ptr());
|
||||
bootstrap_tokens, tm, dht::global_partitioner(), g);
|
||||
|
||||
// Begin the race.
|
||||
auto ts = db_clock::now() + (
|
||||
|
||||
@@ -49,18 +49,6 @@ snitch_base::get_endpoint_info(inet_address endpoint,
|
||||
return ep_state ? std::optional(ep_state->value) : std::nullopt;
|
||||
}
|
||||
|
||||
int snitch_base::get_shard_count(inet_address endpoint) {
|
||||
auto val = get_endpoint_info(endpoint,
|
||||
gms::application_state::SHARD_COUNT);
|
||||
return val ? std::stoi(*val) : -1;
|
||||
}
|
||||
|
||||
unsigned snitch_base::get_ignore_msb_bits(inet_address endpoint) {
|
||||
auto val = get_endpoint_info(endpoint,
|
||||
gms::application_state::IGNORE_MSB_BITS);
|
||||
return val ? std::stoi(*val) : 0;
|
||||
}
|
||||
|
||||
std::vector<inet_address> snitch_base::get_sorted_list_by_proximity(
|
||||
inet_address address,
|
||||
std::vector<inet_address>& unsorted_address) {
|
||||
|
||||
@@ -83,16 +83,6 @@ public:
|
||||
* returns a String representing the datacenter this endpoint belongs to
|
||||
*/
|
||||
virtual sstring get_datacenter(inet_address endpoint) = 0;
|
||||
/**
|
||||
* returns an int representing the number of shards this endpoint has
|
||||
*/
|
||||
virtual int get_shard_count(inet_address endpoint) = 0;
|
||||
|
||||
/**
|
||||
* returns an unsigned representing the value of ignore_msb_bits this
|
||||
* endpoint has
|
||||
*/
|
||||
virtual unsigned get_ignore_msb_bits(inet_address endpoint) = 0;
|
||||
|
||||
/**
|
||||
* returns a new <tt>List</tt> sorted by proximity to the given endpoint
|
||||
@@ -422,10 +412,6 @@ public:
|
||||
// virtual sstring get_datacenter(inet_address endpoint) = 0;
|
||||
//
|
||||
|
||||
virtual int get_shard_count(inet_address endpoint) override;
|
||||
|
||||
virtual unsigned get_ignore_msb_bits(inet_address endpoint) override;
|
||||
|
||||
virtual std::vector<inet_address> get_sorted_list_by_proximity(
|
||||
inet_address address,
|
||||
std::vector<inet_address>& unsorted_address) override;
|
||||
|
||||
@@ -587,6 +587,9 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
|
||||
// gossip snitch infos (local DC and rack)
|
||||
gossip_snitch_info().get();
|
||||
|
||||
// gossip local partitioner information (shard count and ignore_msb_bits)
|
||||
gossip_sharding_info().get();
|
||||
|
||||
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
|
||||
#if 0
|
||||
if (!MessagingService.instance().isListening())
|
||||
@@ -818,7 +821,8 @@ void storage_service::join_token_ring(int delay) {
|
||||
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
|
||||
|
||||
_cdc_streams_ts = cdc::make_new_cdc_generation(
|
||||
_bootstrap_tokens, _token_metadata, _sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
_bootstrap_tokens, _token_metadata, _gossiper,
|
||||
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1041,7 +1045,8 @@ void storage_service::bootstrap() {
|
||||
assert(!_cdc_streams_ts);
|
||||
|
||||
_cdc_streams_ts = cdc::make_new_cdc_generation(
|
||||
_bootstrap_tokens, _token_metadata, _sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
_bootstrap_tokens, _token_metadata, _gossiper,
|
||||
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
} else {
|
||||
// We should not be able to join the cluster if other nodes support CDC but we don't.
|
||||
// The check should have been made somewhere in prepare_to_join (`check_knows_remote_features`).
|
||||
@@ -1875,6 +1880,11 @@ future<> storage_service::gossip_snitch_info() {
|
||||
return _gossiper.add_local_application_state({
|
||||
{ gms::application_state::DC, value_factory.datacenter(dc) },
|
||||
{ gms::application_state::RACK, value_factory.rack(rack) },
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::gossip_sharding_info() {
|
||||
return _gossiper.add_local_application_state({
|
||||
{ gms::application_state::SHARD_COUNT, value_factory.shard_count(smp::count) },
|
||||
{ gms::application_state::IGNORE_MSB_BITS, value_factory.ignore_msb_bits(dht::global_partitioner().sharding_ignore_msb()) },
|
||||
});
|
||||
|
||||
@@ -222,6 +222,7 @@ public:
|
||||
}
|
||||
|
||||
future<> gossip_snitch_info();
|
||||
future<> gossip_sharding_info();
|
||||
|
||||
distributed<database>& db() {
|
||||
return _db;
|
||||
|
||||
Reference in New Issue
Block a user