token_metadata: drop the template
Replace token_metadata2 ->token_metadata, make token_metadata back non-template. No behavior changes, just compilation fixes.
This commit is contained in:
@@ -39,10 +39,7 @@ namespace gms {
|
||||
|
||||
namespace locator {
|
||||
|
||||
template <typename NodeId>
|
||||
class generic_token_metadata;
|
||||
using token_metadata = generic_token_metadata<gms::inet_address>;
|
||||
using token_metadata2 = generic_token_metadata<host_id>;
|
||||
class token_metadata;
|
||||
class shared_token_metadata;
|
||||
class snitch_ptr;
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
|
||||
}) == stop_iteration::no;
|
||||
}
|
||||
|
||||
bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata2& tm) {
|
||||
bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm) {
|
||||
if (tm.sorted_tokens().size() != gen.entries().size()) {
|
||||
// We probably have garbage streams from old generations
|
||||
cdc_log.info("Generation size does not match the token ring");
|
||||
@@ -324,7 +324,7 @@ topology_description limit_number_of_streams_if_needed(topology_description&& de
|
||||
}
|
||||
|
||||
// Compute a set of tokens that split the token ring into vnodes.
|
||||
static auto get_tokens(const std::unordered_set<dht::token>& bootstrap_tokens, const locator::token_metadata2_ptr tmptr) {
|
||||
static auto get_tokens(const std::unordered_set<dht::token>& bootstrap_tokens, const locator::token_metadata_ptr tmptr) {
|
||||
auto tokens = tmptr->sorted_tokens();
|
||||
auto it = tokens.insert(tokens.end(), bootstrap_tokens.begin(), bootstrap_tokens.end());
|
||||
std::sort(it, tokens.end());
|
||||
@@ -352,7 +352,7 @@ static token_range_description create_token_range_description(
|
||||
cdc::topology_description make_new_generation_description(
|
||||
const std::unordered_set<dht::token>& bootstrap_tokens,
|
||||
const noncopyable_function<std::pair<size_t, uint8_t>(dht::token)>& get_sharding_info,
|
||||
const locator::token_metadata2_ptr tmptr) {
|
||||
const locator::token_metadata_ptr tmptr) {
|
||||
const auto tokens = get_tokens(bootstrap_tokens, tmptr);
|
||||
|
||||
utils::chunked_vector<token_range_description> vnode_descriptions;
|
||||
@@ -378,7 +378,7 @@ db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milli
|
||||
}
|
||||
|
||||
future<cdc::generation_id> generation_service::legacy_make_new_generation(const std::unordered_set<dht::token>& bootstrap_tokens, bool add_delay) {
|
||||
const locator::token_metadata2_ptr tmptr = _token_metadata.get();
|
||||
const locator::token_metadata_ptr tmptr = _token_metadata.get();
|
||||
|
||||
// Fetch sharding parameters for a node that owns vnode ending with this token
|
||||
// using gossiped application states.
|
||||
|
||||
@@ -137,7 +137,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
|
||||
* Checks if the CDC generation is optimal, which is true if its `topology_description` is consistent
|
||||
* with `token_metadata`.
|
||||
*/
|
||||
bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata2& tm);
|
||||
bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm);
|
||||
|
||||
/*
|
||||
* Generate a set of CDC stream identifiers such that for each shard
|
||||
@@ -157,7 +157,7 @@ bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locat
|
||||
cdc::topology_description make_new_generation_description(
|
||||
const std::unordered_set<dht::token>& bootstrap_tokens,
|
||||
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info,
|
||||
const locator::token_metadata2_ptr);
|
||||
const locator::token_metadata_ptr);
|
||||
|
||||
db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay);
|
||||
|
||||
|
||||
@@ -36,10 +36,7 @@ using schema_ptr = seastar::lw_shared_ptr<const schema>;
|
||||
|
||||
namespace locator {
|
||||
|
||||
template <typename NodeId>
|
||||
class generic_token_metadata;
|
||||
using token_metadata = generic_token_metadata<gms::inet_address>;
|
||||
using token_metadata2 = generic_token_metadata<host_id>;
|
||||
class token_metadata;
|
||||
|
||||
} // namespace locator
|
||||
|
||||
|
||||
@@ -253,7 +253,7 @@ create_keyspace_statement::execute(query_processor& qp, service::query_state& st
|
||||
});
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> create_keyspace_statement::get_keyspace_metadata(const locator::token_metadata2& tm) {
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> create_keyspace_statement::get_keyspace_metadata(const locator::token_metadata& tm) {
|
||||
_attrs->validate();
|
||||
return _attrs->as_ks_metadata(_name, tm);
|
||||
}
|
||||
|
||||
@@ -17,11 +17,7 @@
|
||||
|
||||
namespace locator {
|
||||
|
||||
template <typename NodeId>
|
||||
class generic_token_metadata;
|
||||
using token_metadata = generic_token_metadata<gms::inet_address>;
|
||||
using token_metadata2 = generic_token_metadata<host_id>;
|
||||
|
||||
class token_metadata;
|
||||
};
|
||||
|
||||
namespace data_dictionary {
|
||||
@@ -76,7 +72,7 @@ public:
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> get_keyspace_metadata(const locator::token_metadata2& tm);
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> get_keyspace_metadata(const locator::token_metadata& tm);
|
||||
};
|
||||
|
||||
std::vector<sstring> check_against_restricted_replication_strategies(
|
||||
|
||||
@@ -20,7 +20,7 @@ namespace statements {
|
||||
|
||||
static std::map<sstring, sstring> prepare_options(
|
||||
const sstring& strategy_class,
|
||||
const locator::token_metadata2& tm,
|
||||
const locator::token_metadata& tm,
|
||||
std::map<sstring, sstring> options,
|
||||
const std::map<sstring, sstring>& old_options = {}) {
|
||||
options.erase(ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
@@ -111,13 +111,13 @@ std::optional<sstring> ks_prop_defs::get_replication_strategy_class() const {
|
||||
return _strategy_class;
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata2& tm) {
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm) {
|
||||
auto sc = get_replication_strategy_class().value();
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
|
||||
prepare_options(sc, tm, get_replication_options()), get_boolean(KW_DURABLE_WRITES, true), std::vector<schema_ptr>{}, get_storage_options());
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata2& tm) {
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata& tm) {
|
||||
std::map<sstring, sstring> options;
|
||||
const auto& old_options = old->strategy_options();
|
||||
auto sc = get_replication_strategy_class();
|
||||
|
||||
@@ -26,10 +26,7 @@ namespace gms {
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
template <typename NodeId>
|
||||
class generic_token_metadata;
|
||||
using token_metadata = generic_token_metadata<gms::inet_address>;
|
||||
using token_metadata2 = generic_token_metadata<host_id>;
|
||||
class token_metadata;
|
||||
class shared_token_metadata;
|
||||
struct snitch_ptr;
|
||||
class abstract_replication_strategy;
|
||||
@@ -54,8 +51,8 @@ public:
|
||||
std::map<sstring, sstring> get_replication_options() const;
|
||||
std::optional<sstring> get_replication_strategy_class() const;
|
||||
data_dictionary::storage_options get_storage_options() const;
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata2&);
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata2&);
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&);
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&);
|
||||
|
||||
#if 0
|
||||
public KSMetaData asKSMetadataUpdate(KSMetaData old) throws RequestValidationException
|
||||
|
||||
@@ -2573,7 +2573,7 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac
|
||||
return std::max(backlog, _max.load(std::memory_order_relaxed));
|
||||
}
|
||||
|
||||
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata2& tm, const sstring& ks_name,
|
||||
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name,
|
||||
const sstring& cf_name) {
|
||||
using view_statuses_type = std::unordered_map<locator::host_id, sstring>;
|
||||
return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
|
||||
@@ -2584,7 +2584,7 @@ future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata2& tm, const replica::table& t,
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
|
||||
streaming::stream_reason reason) {
|
||||
if (is_internal_keyspace(t.schema()->ks_name())) {
|
||||
return make_ready_future<bool>(false);
|
||||
|
||||
@@ -24,15 +24,12 @@ class system_distributed_keyspace;
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
template <typename NodeId>
|
||||
class generic_token_metadata;
|
||||
using token_metadata = generic_token_metadata<gms::inet_address>;
|
||||
using token_metadata2 = generic_token_metadata<host_id>;
|
||||
class token_metadata;
|
||||
}
|
||||
|
||||
namespace db::view {
|
||||
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata2& tm, const replica::table& t,
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
|
||||
streaming::stream_reason reason);
|
||||
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ public:
|
||||
|
||||
future<> execute(std::function<void(mutation)> mutation_sink) override {
|
||||
return _ss.get_ownership().then([&, mutation_sink] (std::map<gms::inet_address, float> ownership) {
|
||||
const locator::token_metadata2& tm = _ss.get_token_metadata();
|
||||
const locator::token_metadata& tm = _ss.get_token_metadata();
|
||||
|
||||
_gossiper.for_each_endpoint_state([&] (const gms::inet_address& endpoint, const gms::endpoint_state&) {
|
||||
mutation m(schema(), partition_key::from_single_value(*schema(), data_value(endpoint).serialize_nonnull()));
|
||||
|
||||
@@ -63,7 +63,7 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<token> boot_strapper::get_random_bootstrap_tokens(const token_metadata2_ptr tmptr, size_t num_tokens, dht::check_token_endpoint check) {
|
||||
std::unordered_set<token> boot_strapper::get_random_bootstrap_tokens(const token_metadata_ptr tmptr, size_t num_tokens, dht::check_token_endpoint check) {
|
||||
if (num_tokens < 1) {
|
||||
throw std::runtime_error("num_tokens must be >= 1");
|
||||
}
|
||||
@@ -77,7 +77,7 @@ std::unordered_set<token> boot_strapper::get_random_bootstrap_tokens(const token
|
||||
return tokens;
|
||||
}
|
||||
|
||||
std::unordered_set<token> boot_strapper::get_bootstrap_tokens(const token_metadata2_ptr tmptr, const db::config& cfg, dht::check_token_endpoint check) {
|
||||
std::unordered_set<token> boot_strapper::get_bootstrap_tokens(const token_metadata_ptr tmptr, const db::config& cfg, dht::check_token_endpoint check) {
|
||||
std::unordered_set<sstring> initial_tokens;
|
||||
sstring tokens_string = cfg.initial_token();
|
||||
try {
|
||||
@@ -104,7 +104,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(const token_metada
|
||||
return get_random_bootstrap_tokens(tmptr, cfg.num_tokens(), check);
|
||||
}
|
||||
|
||||
std::unordered_set<token> boot_strapper::get_random_tokens(const token_metadata2_ptr tmptr, size_t num_tokens) {
|
||||
std::unordered_set<token> boot_strapper::get_random_tokens(const token_metadata_ptr tmptr, size_t num_tokens) {
|
||||
std::unordered_set<token> tokens;
|
||||
while (tokens.size() < num_tokens) {
|
||||
auto token = dht::token::get_random_token();
|
||||
|
||||
@@ -29,9 +29,7 @@ using check_token_endpoint = bool_class<struct check_token_endpoint_tag>;
|
||||
class boot_strapper {
|
||||
using inet_address = gms::inet_address;
|
||||
using token_metadata = locator::token_metadata;
|
||||
using token_metadata2 = locator::token_metadata2;
|
||||
using token_metadata_ptr = locator::token_metadata_ptr;
|
||||
using token_metadata2_ptr = locator::token_metadata2_ptr;
|
||||
using token = dht::token;
|
||||
distributed<replica::database>& _db;
|
||||
sharded<streaming::stream_manager>& _stream_manager;
|
||||
@@ -42,10 +40,10 @@ class boot_strapper {
|
||||
locator::endpoint_dc_rack _dr;
|
||||
/* token of the node being bootstrapped. */
|
||||
std::unordered_set<token> _tokens;
|
||||
const locator::token_metadata2_ptr _token_metadata_ptr;
|
||||
const locator::token_metadata_ptr _token_metadata_ptr;
|
||||
public:
|
||||
boot_strapper(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, abort_source& abort_source,
|
||||
locator::host_id addr, locator::endpoint_dc_rack dr, std::unordered_set<token> tokens, const token_metadata2_ptr tmptr)
|
||||
locator::host_id addr, locator::endpoint_dc_rack dr, std::unordered_set<token> tokens, const token_metadata_ptr tmptr)
|
||||
: _db(db)
|
||||
, _stream_manager(sm)
|
||||
, _abort_source(abort_source)
|
||||
@@ -62,14 +60,14 @@ public:
|
||||
* otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node.
|
||||
* else choose num_tokens tokens at random
|
||||
*/
|
||||
static std::unordered_set<token> get_bootstrap_tokens(const token_metadata2_ptr tmptr, const db::config& cfg, check_token_endpoint check);
|
||||
static std::unordered_set<token> get_bootstrap_tokens(const token_metadata_ptr tmptr, const db::config& cfg, check_token_endpoint check);
|
||||
|
||||
/**
|
||||
* Same as above but does not consult initialtoken config
|
||||
*/
|
||||
static std::unordered_set<token> get_random_bootstrap_tokens(const token_metadata2_ptr tmptr, size_t num_tokens, check_token_endpoint check);
|
||||
static std::unordered_set<token> get_random_bootstrap_tokens(const token_metadata_ptr tmptr, size_t num_tokens, check_token_endpoint check);
|
||||
|
||||
static std::unordered_set<token> get_random_tokens(const token_metadata2_ptr tmptr, size_t num_tokens);
|
||||
static std::unordered_set<token> get_random_tokens(const token_metadata_ptr tmptr, size_t num_tokens);
|
||||
#if 0
|
||||
public static class StringSerializer implements IVersionedSerializer<String>
|
||||
{
|
||||
@@ -93,7 +91,7 @@ public:
|
||||
#endif
|
||||
|
||||
private:
|
||||
const token_metadata2& get_token_metadata() {
|
||||
const token_metadata& get_token_metadata() {
|
||||
return *_token_metadata_ptr;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -123,14 +123,14 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
|
||||
auto& strat = erm->get_replication_strategy();
|
||||
|
||||
//Active ranges
|
||||
auto metadata_clone = locator::make_token_metadata2_ptr(get_token_metadata().clone_only_token_map().get0());
|
||||
auto range_addresses = strat.get_range_addresses(token_metadata(metadata_clone)).get0();
|
||||
auto metadata_clone = get_token_metadata().clone_only_token_map().get0();
|
||||
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
|
||||
|
||||
//Pending ranges
|
||||
metadata_clone->update_topology(_address, _dr);
|
||||
metadata_clone->update_normal_tokens(_tokens, _address).get();
|
||||
auto pending_range_addresses = strat.get_range_addresses(token_metadata(metadata_clone)).get0();
|
||||
metadata_clone->clear_gently().get();
|
||||
metadata_clone.update_topology(_address, _dr);
|
||||
metadata_clone.update_normal_tokens(_tokens, _address).get();
|
||||
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
|
||||
metadata_clone.clear_gently().get();
|
||||
|
||||
//Collects the source that will have its range moved to the new node
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>> range_sources;
|
||||
|
||||
@@ -37,9 +37,7 @@ class range_streamer {
|
||||
public:
|
||||
using inet_address = gms::inet_address;
|
||||
using token_metadata = locator::token_metadata;
|
||||
using token_metadata2 = locator::token_metadata2;
|
||||
using token_metadata_ptr = locator::token_metadata_ptr;
|
||||
using token_metadata2_ptr = locator::token_metadata2_ptr;
|
||||
using stream_plan = streaming::stream_plan;
|
||||
using stream_state = streaming::stream_state;
|
||||
public:
|
||||
@@ -79,7 +77,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata2_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
|
||||
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
|
||||
locator::host_id address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason,
|
||||
service::frozen_topology_guard topo_guard,
|
||||
std::vector<sstring> tables = {})
|
||||
@@ -98,7 +96,7 @@ public:
|
||||
_abort_source.check();
|
||||
}
|
||||
|
||||
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata2_ptr tmptr, abort_source& abort_source,
|
||||
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source,
|
||||
locator::host_id address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector<sstring> tables = {})
|
||||
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason, std::move(topo_guard), std::move(tables)) {
|
||||
}
|
||||
@@ -147,7 +145,7 @@ private:
|
||||
#endif
|
||||
|
||||
// Can be called only before stream_async().
|
||||
const token_metadata2& get_token_metadata() {
|
||||
const token_metadata& get_token_metadata() {
|
||||
return *_token_metadata_ptr;
|
||||
}
|
||||
public:
|
||||
@@ -156,7 +154,7 @@ public:
|
||||
private:
|
||||
distributed<replica::database>& _db;
|
||||
sharded<streaming::stream_manager>& _stream_manager;
|
||||
token_metadata2_ptr _token_metadata_ptr;
|
||||
token_metadata_ptr _token_metadata_ptr;
|
||||
abort_source& _abort_source;
|
||||
std::unordered_set<token> _tokens;
|
||||
locator::host_id _address;
|
||||
|
||||
@@ -2634,7 +2634,7 @@ void gossiper::append_endpoint_state(std::stringstream& ss, const endpoint_state
|
||||
}
|
||||
}
|
||||
|
||||
locator::token_metadata2_ptr gossiper::get_token_metadata_ptr() const noexcept {
|
||||
locator::token_metadata_ptr gossiper::get_token_metadata_ptr() const noexcept {
|
||||
return _shared_token_metadata.get();
|
||||
}
|
||||
|
||||
|
||||
@@ -674,7 +674,7 @@ private:
|
||||
gossip_config _gcfg;
|
||||
// Get features supported by a particular node
|
||||
std::set<sstring> get_supported_features(inet_address endpoint) const;
|
||||
locator::token_metadata2_ptr get_token_metadata_ptr() const noexcept;
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
|
||||
public:
|
||||
void check_knows_remote_features(std::set<std::string_view>& local_features, const std::unordered_map<inet_address, sstring>& loaded_peer_features) const;
|
||||
// Get features supported by all the nodes this node knows about
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
|
||||
namespace locator {
|
||||
|
||||
static endpoint_set resolve_endpoints(const host_id_set& host_ids, const token_metadata2& tm) {
|
||||
static endpoint_set resolve_endpoints(const host_id_set& host_ids, const token_metadata& tm) {
|
||||
endpoint_set result{};
|
||||
result.reserve(host_ids.size());
|
||||
for (const auto& host_id: host_ids) {
|
||||
@@ -68,9 +68,9 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring&
|
||||
}
|
||||
}
|
||||
|
||||
future<endpoint_set> abstract_replication_strategy::calculate_natural_ips(const token& search_token, const token_metadata2_ptr& tm) const {
|
||||
const auto host_ids = co_await calculate_natural_endpoints(search_token, *tm);
|
||||
co_return resolve_endpoints(host_ids, *tm);
|
||||
future<endpoint_set> abstract_replication_strategy::calculate_natural_ips(const token& search_token, const token_metadata& tm) const {
|
||||
const auto host_ids = co_await calculate_natural_endpoints(search_token, tm);
|
||||
co_return resolve_endpoints(host_ids, tm);
|
||||
}
|
||||
|
||||
using strategy_class_registry = class_registry<
|
||||
@@ -87,7 +87,7 @@ inet_address_vector_replica_set vnode_effective_replication_map::get_natural_end
|
||||
return natural_endpoints;
|
||||
}
|
||||
|
||||
void maybe_remove_node_being_replaced(const token_metadata2& tm,
|
||||
void maybe_remove_node_being_replaced(const token_metadata& tm,
|
||||
const abstract_replication_strategy& rs,
|
||||
inet_address_vector_replica_set& natural_endpoints) {
|
||||
if (tm.is_any_node_being_replaced() &&
|
||||
@@ -264,10 +264,10 @@ abstract_replication_strategy::get_ranges(locator::host_id ep, token_metadata_pt
|
||||
future<dht::token_range_vector>
|
||||
abstract_replication_strategy::get_ranges(locator::host_id ep, const token_metadata& tm) const {
|
||||
dht::token_range_vector ret;
|
||||
if (!tm.get_new()->is_normal_token_owner(ep)) {
|
||||
if (!tm.is_normal_token_owner(ep)) {
|
||||
co_return ret;
|
||||
}
|
||||
const auto& sorted_tokens = tm.get_new()->sorted_tokens();
|
||||
const auto& sorted_tokens = tm.sorted_tokens();
|
||||
if (sorted_tokens.empty()) {
|
||||
on_internal_error(rslogger, "Token metadata is empty");
|
||||
}
|
||||
@@ -279,7 +279,7 @@ abstract_replication_strategy::get_ranges(locator::host_id ep, const token_metad
|
||||
// Using the common path would make the function quadratic in the number of endpoints.
|
||||
should_add = true;
|
||||
} else {
|
||||
auto eps = co_await calculate_natural_endpoints(tok, *tm.get_new());
|
||||
auto eps = co_await calculate_natural_endpoints(tok, tm);
|
||||
should_add = eps.contains(ep);
|
||||
}
|
||||
if (should_add) {
|
||||
@@ -327,7 +327,7 @@ vnode_effective_replication_map::get_primary_ranges_within_dc(inet_address ep) c
|
||||
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
vnode_effective_replication_map::get_range_addresses() const {
|
||||
const token_metadata2& tm = *_tmptr;
|
||||
const token_metadata& tm = *_tmptr;
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> ret;
|
||||
for (auto& t : tm.sorted_tokens()) {
|
||||
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
|
||||
@@ -342,10 +342,9 @@ vnode_effective_replication_map::get_range_addresses() const {
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
abstract_replication_strategy::get_range_addresses(const token_metadata& tm) const {
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> ret;
|
||||
auto tm_new = tm.get_new_strong();
|
||||
for (auto& t : tm_new->sorted_tokens()) {
|
||||
dht::token_range_vector ranges = tm_new->get_primary_ranges_for(t);
|
||||
auto eps = co_await calculate_natural_ips(t, tm_new);
|
||||
for (auto& t : tm.sorted_tokens()) {
|
||||
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
|
||||
auto eps = co_await calculate_natural_ips(t, tm);
|
||||
for (auto& r : ranges) {
|
||||
ret.emplace(r, eps.get_vector());
|
||||
}
|
||||
@@ -354,26 +353,26 @@ abstract_replication_strategy::get_range_addresses(const token_metadata& tm) con
|
||||
}
|
||||
|
||||
future<dht::token_range_vector>
|
||||
abstract_replication_strategy::get_pending_address_ranges(const token_metadata2_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const {
|
||||
abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const {
|
||||
dht::token_range_vector ret;
|
||||
auto temp = make_token_metadata2_ptr(co_await tmptr->clone_only_token_map());
|
||||
temp->update_topology(pending_address, std::move(dr));
|
||||
co_await temp->update_normal_tokens(pending_tokens, pending_address);
|
||||
for (const auto& t : temp->sorted_tokens()) {
|
||||
auto eps = co_await calculate_natural_endpoints(t, *temp);
|
||||
auto temp = co_await tmptr->clone_only_token_map();
|
||||
temp.update_topology(pending_address, std::move(dr));
|
||||
co_await temp.update_normal_tokens(pending_tokens, pending_address);
|
||||
for (const auto& t : temp.sorted_tokens()) {
|
||||
auto eps = co_await calculate_natural_endpoints(t, temp);
|
||||
if (eps.contains(pending_address)) {
|
||||
dht::token_range_vector r = temp->get_primary_ranges_for(t);
|
||||
dht::token_range_vector r = temp.get_primary_ranges_for(t);
|
||||
rslogger.debug("get_pending_address_ranges: token={} primary_range={} endpoint={}", t, r, pending_address);
|
||||
ret.insert(ret.end(), r.begin(), r.end());
|
||||
}
|
||||
}
|
||||
co_await temp->clear_gently();
|
||||
co_await temp.clear_gently();
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
static const auto default_replication_map_key = dht::token::from_int64(0);
|
||||
|
||||
future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata2_ptr tmptr) {
|
||||
future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) {
|
||||
replication_map replication_map;
|
||||
ring_mapping pending_endpoints;
|
||||
ring_mapping read_endpoints;
|
||||
@@ -441,11 +440,11 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
|
||||
}
|
||||
} else if (depend_on_token) {
|
||||
for (const auto &t : sorted_tokens) {
|
||||
auto eps = co_await rs->calculate_natural_ips(t, tmptr);
|
||||
auto eps = co_await rs->calculate_natural_ips(t, *tmptr);
|
||||
replication_map.emplace(t, std::move(eps).extract_vector());
|
||||
}
|
||||
} else {
|
||||
auto eps = co_await rs->calculate_natural_ips(default_replication_map_key, tmptr);
|
||||
auto eps = co_await rs->calculate_natural_ips(default_replication_map_key, *tmptr);
|
||||
replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector());
|
||||
}
|
||||
|
||||
@@ -512,7 +511,7 @@ vnode_effective_replication_map::~vnode_effective_replication_map() {
|
||||
}
|
||||
|
||||
effective_replication_map::effective_replication_map(replication_strategy_ptr rs,
|
||||
token_metadata2_ptr tmptr,
|
||||
token_metadata_ptr tmptr,
|
||||
size_t replication_factor) noexcept
|
||||
: _rs(std::move(rs))
|
||||
, _tmptr(std::move(tmptr))
|
||||
@@ -520,11 +519,11 @@ effective_replication_map::effective_replication_map(replication_strategy_ptr rs
|
||||
, _validity_abort_source(std::make_unique<abort_source>())
|
||||
{ }
|
||||
|
||||
vnode_effective_replication_map::factory_key vnode_effective_replication_map::make_factory_key(const replication_strategy_ptr& rs, const token_metadata2_ptr& tmptr) {
|
||||
vnode_effective_replication_map::factory_key vnode_effective_replication_map::make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr) {
|
||||
return factory_key(rs->get_type(), rs->get_config_options(), tmptr->get_ring_version());
|
||||
}
|
||||
|
||||
future<vnode_effective_replication_map_ptr> effective_replication_map_factory::create_effective_replication_map(replication_strategy_ptr rs, token_metadata2_ptr tmptr) {
|
||||
future<vnode_effective_replication_map_ptr> effective_replication_map_factory::create_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) {
|
||||
// lookup key on local shard
|
||||
auto key = vnode_effective_replication_map::make_factory_key(rs, tmptr);
|
||||
auto erm = find_effective_replication_map(key);
|
||||
|
||||
@@ -103,8 +103,8 @@ public:
|
||||
// is small, that implementation may not yield since by itself it won't cause a reactor stall (assuming practical
|
||||
// cluster sizes and number of tokens per node). The caller is responsible for yielding if they call this function
|
||||
// in a loop.
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata2& tm) const = 0;
|
||||
future<endpoint_set> calculate_natural_ips(const token& search_token, const token_metadata2_ptr& tm) const;
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const = 0;
|
||||
future<endpoint_set> calculate_natural_ips(const token& search_token, const token_metadata& tm) const;
|
||||
|
||||
virtual ~abstract_replication_strategy() {}
|
||||
static ptr_type create_replication_strategy(const sstring& strategy_name, const replication_strategy_config_options& config_options);
|
||||
@@ -119,7 +119,7 @@ public:
|
||||
|
||||
virtual void validate_options(const gms::feature_service&) const = 0;
|
||||
virtual std::optional<std::unordered_set<sstring>> recognized_options(const topology&) const = 0;
|
||||
virtual size_t get_replication_factor(const token_metadata2& tm) const = 0;
|
||||
virtual size_t get_replication_factor(const token_metadata& tm) const = 0;
|
||||
// Decide if the replication strategy allow removing the node being
|
||||
// replaced from the natural endpoints when a node is being replaced in the
|
||||
// cluster. LocalStrategy is the not allowed to do so because it always
|
||||
@@ -155,7 +155,7 @@ public:
|
||||
// Caller must ensure that token_metadata will not change throughout the call.
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_addresses(const token_metadata& tm) const;
|
||||
|
||||
future<dht::token_range_vector> get_pending_address_ranges(const token_metadata2_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const;
|
||||
future<dht::token_range_vector> get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const;
|
||||
};
|
||||
|
||||
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;
|
||||
@@ -175,17 +175,17 @@ using mutable_replication_strategy_ptr = seastar::shared_ptr<abstract_replicatio
|
||||
class effective_replication_map {
|
||||
protected:
|
||||
replication_strategy_ptr _rs;
|
||||
token_metadata2_ptr _tmptr;
|
||||
token_metadata_ptr _tmptr;
|
||||
size_t _replication_factor;
|
||||
std::unique_ptr<abort_source> _validity_abort_source;
|
||||
public:
|
||||
effective_replication_map(replication_strategy_ptr, token_metadata2_ptr, size_t replication_factor) noexcept;
|
||||
effective_replication_map(replication_strategy_ptr, token_metadata_ptr, size_t replication_factor) noexcept;
|
||||
effective_replication_map(effective_replication_map&&) noexcept = default;
|
||||
virtual ~effective_replication_map() = default;
|
||||
|
||||
const abstract_replication_strategy& get_replication_strategy() const noexcept { return *_rs; }
|
||||
const token_metadata2& get_token_metadata() const noexcept { return *_tmptr; }
|
||||
const token_metadata2_ptr& get_token_metadata_ptr() const noexcept { return _tmptr; }
|
||||
const token_metadata& get_token_metadata() const noexcept { return *_tmptr; }
|
||||
const token_metadata_ptr& get_token_metadata_ptr() const noexcept { return _tmptr; }
|
||||
const topology& get_topology() const noexcept { return _tmptr->get_topology(); }
|
||||
size_t get_replication_factor() const noexcept { return _replication_factor; }
|
||||
|
||||
@@ -255,7 +255,7 @@ protected:
|
||||
}
|
||||
public:
|
||||
virtual ~per_table_replication_strategy() = default;
|
||||
virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata2_ptr) const = 0;
|
||||
virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const = 0;
|
||||
};
|
||||
|
||||
// Holds the full replication_map resulting from applying the
|
||||
@@ -302,7 +302,7 @@ public: // effective_replication_map
|
||||
std::unique_ptr<token_range_splitter> make_splitter() const override;
|
||||
const dht::sharder& get_sharder(const schema& s) const override;
|
||||
public:
|
||||
explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata2_ptr tmptr, replication_map replication_map,
|
||||
explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map,
|
||||
ring_mapping pending_endpoints, ring_mapping read_endpoints, size_t replication_factor) noexcept
|
||||
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
|
||||
, _replication_map(std::move(replication_map))
|
||||
@@ -357,7 +357,7 @@ private:
|
||||
const inet_address_vector_replica_set& do_get_natural_endpoints(const token& tok, bool is_vnode) const;
|
||||
|
||||
public:
|
||||
static factory_key make_factory_key(const replication_strategy_ptr& rs, const token_metadata2_ptr& tmptr);
|
||||
static factory_key make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr);
|
||||
|
||||
const factory_key& get_factory_key() const noexcept {
|
||||
return *_factory_key;
|
||||
@@ -382,7 +382,7 @@ using mutable_vnode_effective_replication_map_ptr = shared_ptr<vnode_effective_r
|
||||
using vnode_erm_ptr = vnode_effective_replication_map_ptr;
|
||||
using mutable_vnode_erm_ptr = mutable_vnode_effective_replication_map_ptr;
|
||||
|
||||
inline mutable_vnode_erm_ptr make_effective_replication_map(replication_strategy_ptr rs, token_metadata2_ptr tmptr, replication_map replication_map, ring_mapping pending_endpoints,
|
||||
inline mutable_vnode_erm_ptr make_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, ring_mapping pending_endpoints,
|
||||
ring_mapping read_endpoints, size_t replication_factor) {
|
||||
return seastar::make_shared<vnode_effective_replication_map>(
|
||||
std::move(rs), std::move(tmptr), std::move(replication_map),
|
||||
@@ -390,7 +390,7 @@ inline mutable_vnode_erm_ptr make_effective_replication_map(replication_strategy
|
||||
}
|
||||
|
||||
// Apply the replication strategy over the current configuration and the given token_metadata.
|
||||
future<mutable_vnode_erm_ptr> calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata2_ptr tmptr);
|
||||
future<mutable_vnode_erm_ptr> calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr);
|
||||
|
||||
// Class to hold a coherent view of a keyspace
|
||||
// effective replication map on all shards
|
||||
@@ -478,7 +478,7 @@ public:
|
||||
// vnode_effective_replication_map for the local shard.
|
||||
//
|
||||
// Therefore create should be called first on shard 0, then on all other shards.
|
||||
future<vnode_erm_ptr> create_effective_replication_map(replication_strategy_ptr rs, token_metadata2_ptr tmptr);
|
||||
future<vnode_erm_ptr> create_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr);
|
||||
|
||||
future<> stop() noexcept;
|
||||
|
||||
@@ -497,7 +497,7 @@ private:
|
||||
friend class vnode_effective_replication_map;
|
||||
};
|
||||
|
||||
void maybe_remove_node_being_replaced(const token_metadata2&,
|
||||
void maybe_remove_node_being_replaced(const token_metadata&,
|
||||
const abstract_replication_strategy&,
|
||||
inet_address_vector_replica_set& natural_endpoints);
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ everywhere_replication_strategy::everywhere_replication_strategy(const replicati
|
||||
_natural_endpoints_depend_on_token = false;
|
||||
}
|
||||
|
||||
future<host_id_set> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata2& tm) const {
|
||||
future<host_id_set> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const {
|
||||
if (tm.sorted_tokens().empty()) {
|
||||
host_id_set result{host_id_vector_replica_set({host_id{}})};
|
||||
return make_ready_future<host_id_set>(std::move(result));
|
||||
@@ -29,7 +29,7 @@ future<host_id_set> everywhere_replication_strategy::calculate_natural_endpoints
|
||||
return make_ready_future<host_id_set>(host_id_set(all_endpoints.begin(), all_endpoints.end()));
|
||||
}
|
||||
|
||||
size_t everywhere_replication_strategy::get_replication_factor(const token_metadata2& tm) const {
|
||||
size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const {
|
||||
return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners();
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ class everywhere_replication_strategy : public abstract_replication_strategy {
|
||||
public:
|
||||
everywhere_replication_strategy(const replication_strategy_config_options& config_options);
|
||||
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata2& tm) const override;
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
|
||||
|
||||
virtual void validate_options(const gms::feature_service&) const override { /* noop */ }
|
||||
|
||||
@@ -27,7 +27,7 @@ public:
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
virtual size_t get_replication_factor(const token_metadata2& tm) const override;
|
||||
virtual size_t get_replication_factor(const token_metadata& tm) const override;
|
||||
|
||||
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override {
|
||||
return true;
|
||||
|
||||
@@ -56,7 +56,7 @@ class load_sketch {
|
||||
}
|
||||
};
|
||||
std::unordered_map<host_id, node_load> _nodes;
|
||||
token_metadata2_ptr _tm;
|
||||
token_metadata_ptr _tm;
|
||||
private:
|
||||
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
|
||||
// We reflect migrations in the load as if they already happened,
|
||||
@@ -65,7 +65,7 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
load_sketch(token_metadata2_ptr tm)
|
||||
load_sketch(token_metadata_ptr tm)
|
||||
: _tm(std::move(tm)) {
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ local_strategy::local_strategy(const replication_strategy_config_options& config
|
||||
_natural_endpoints_depend_on_token = false;
|
||||
}
|
||||
|
||||
future<host_id_set> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata2& tm) const {
|
||||
future<host_id_set> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
|
||||
return make_ready_future<host_id_set>(host_id_set{host_id{}});
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ std::optional<std::unordered_set<sstring>> local_strategy::recognized_options(co
|
||||
return {};
|
||||
}
|
||||
|
||||
size_t local_strategy::get_replication_factor(const token_metadata2&) const {
|
||||
size_t local_strategy::get_replication_factor(const token_metadata&) const {
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -25,9 +25,9 @@ class local_strategy : public abstract_replication_strategy {
|
||||
public:
|
||||
local_strategy(const replication_strategy_config_options& config_options);
|
||||
virtual ~local_strategy() {};
|
||||
virtual size_t get_replication_factor(const token_metadata2&) const override;
|
||||
virtual size_t get_replication_factor(const token_metadata&) const override;
|
||||
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata2& tm) const override;
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
|
||||
|
||||
virtual void validate_options(const gms::feature_service&) const override;
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ class natural_endpoints_tracker {
|
||||
}
|
||||
};
|
||||
|
||||
const token_metadata2& _tm;
|
||||
const token_metadata& _tm;
|
||||
const topology& _tp;
|
||||
std::unordered_map<sstring, size_t> _dc_rep_factor;
|
||||
|
||||
@@ -189,7 +189,7 @@ class natural_endpoints_tracker {
|
||||
size_t _dcs_to_fill;
|
||||
|
||||
public:
|
||||
natural_endpoints_tracker(const token_metadata2& tm, const std::unordered_map<sstring, size_t>& dc_rep_factor)
|
||||
natural_endpoints_tracker(const token_metadata& tm, const std::unordered_map<sstring, size_t>& dc_rep_factor)
|
||||
: _tm(tm)
|
||||
, _tp(_tm.get_topology())
|
||||
, _dc_rep_factor(dc_rep_factor)
|
||||
@@ -239,7 +239,7 @@ public:
|
||||
|
||||
future<host_id_set>
|
||||
network_topology_strategy::calculate_natural_endpoints(
|
||||
const token& search_token, const token_metadata2& tm) const {
|
||||
const token& search_token, const token_metadata& tm) const {
|
||||
|
||||
natural_endpoints_tracker tracker(tm, _dc_rep_factor);
|
||||
|
||||
@@ -281,14 +281,14 @@ std::optional<std::unordered_set<sstring>> network_topology_strategy::recognized
|
||||
return opts;
|
||||
}
|
||||
|
||||
effective_replication_map_ptr network_topology_strategy::make_replication_map(table_id table, token_metadata2_ptr tm) const {
|
||||
effective_replication_map_ptr network_topology_strategy::make_replication_map(table_id table, token_metadata_ptr tm) const {
|
||||
if (!uses_tablets()) {
|
||||
on_internal_error(rslogger, format("make_replication_map() called for table {} but replication strategy not configured to use tablets", table));
|
||||
}
|
||||
return do_make_replication_map(table, shared_from_this(), std::move(tm), _rep_factor);
|
||||
}
|
||||
|
||||
future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata2_ptr tm) const {
|
||||
future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata_ptr tm) const {
|
||||
auto tablet_count = get_initial_tablets();
|
||||
auto aligned_tablet_count = 1ul << log2ceil(tablet_count);
|
||||
if (tablet_count != aligned_tablet_count) {
|
||||
|
||||
@@ -25,7 +25,7 @@ public:
|
||||
network_topology_strategy(
|
||||
const replication_strategy_config_options& config_options);
|
||||
|
||||
virtual size_t get_replication_factor(const token_metadata2&) const override {
|
||||
virtual size_t get_replication_factor(const token_metadata&) const override {
|
||||
return _rep_factor;
|
||||
}
|
||||
|
||||
@@ -43,15 +43,15 @@ public:
|
||||
}
|
||||
|
||||
public: // tablet_aware_replication_strategy
|
||||
virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata2_ptr) const override;
|
||||
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata2_ptr) const override;
|
||||
virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const override;
|
||||
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr) const override;
|
||||
protected:
|
||||
/**
|
||||
* calculate endpoints in one pass through the tokens by tracking our
|
||||
* progress in each DC, rack etc.
|
||||
*/
|
||||
virtual future<host_id_set> calculate_natural_endpoints(
|
||||
const token& search_token, const token_metadata2& tm) const override;
|
||||
const token& search_token, const token_metadata& tm) const override;
|
||||
|
||||
virtual void validate_options(const gms::feature_service&) const override;
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ simple_strategy::simple_strategy(const replication_strategy_config_options& conf
|
||||
}
|
||||
}
|
||||
|
||||
future<host_id_set> simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata2& tm) const {
|
||||
future<host_id_set> simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
|
||||
const std::vector<token>& tokens = tm.sorted_tokens();
|
||||
|
||||
if (tokens.empty()) {
|
||||
@@ -63,7 +63,7 @@ future<host_id_set> simple_strategy::calculate_natural_endpoints(const token& t,
|
||||
co_return endpoints;
|
||||
}
|
||||
|
||||
size_t simple_strategy::get_replication_factor(const token_metadata2&) const {
|
||||
size_t simple_strategy::get_replication_factor(const token_metadata&) const {
|
||||
return _replication_factor;
|
||||
}
|
||||
|
||||
|
||||
@@ -19,14 +19,14 @@ class simple_strategy : public abstract_replication_strategy {
|
||||
public:
|
||||
simple_strategy(const replication_strategy_config_options& config_options);
|
||||
virtual ~simple_strategy() {};
|
||||
virtual size_t get_replication_factor(const token_metadata2& tm) const override;
|
||||
virtual size_t get_replication_factor(const token_metadata& tm) const override;
|
||||
virtual void validate_options(const gms::feature_service&) const override;
|
||||
virtual std::optional<std::unordered_set<sstring>> recognized_options(const topology&) const override;
|
||||
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata2& tm) const override;
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
|
||||
private:
|
||||
size_t _replication_factor = 1;
|
||||
};
|
||||
|
||||
@@ -45,7 +45,7 @@ public:
|
||||
return _abort_source;
|
||||
}
|
||||
|
||||
locator::token_metadata2_ptr get_token_metadata() {
|
||||
locator::token_metadata_ptr get_token_metadata() {
|
||||
return _erm->get_token_metadata_ptr();
|
||||
}
|
||||
|
||||
|
||||
@@ -38,13 +38,13 @@ protected:
|
||||
size_t get_initial_tablets() const { return _initial_tablets; }
|
||||
effective_replication_map_ptr do_make_replication_map(table_id,
|
||||
replication_strategy_ptr,
|
||||
token_metadata2_ptr,
|
||||
token_metadata_ptr,
|
||||
size_t replication_factor) const;
|
||||
|
||||
public:
|
||||
/// Generates tablet_map for a new table.
|
||||
/// Runs under group0 guard.
|
||||
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata2_ptr) const = 0;
|
||||
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr) const = 0;
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
|
||||
@@ -17,7 +17,7 @@ namespace locator {
|
||||
/// Implements sharder object which reflects assignment of tablets of a given table to local shards.
|
||||
/// Token ranges which don't have local tablets are reported to belong to shard 0.
|
||||
class tablet_sharder : public dht::sharder {
|
||||
const token_metadata2& _tm;
|
||||
const token_metadata& _tm;
|
||||
table_id _table;
|
||||
mutable const tablet_map* _tmap = nullptr;
|
||||
private:
|
||||
@@ -29,7 +29,7 @@ private:
|
||||
}
|
||||
}
|
||||
public:
|
||||
tablet_sharder(const token_metadata2& tm, table_id table)
|
||||
tablet_sharder(const token_metadata& tm, table_id table)
|
||||
: _tm(tm)
|
||||
, _table(table)
|
||||
{ }
|
||||
|
||||
@@ -348,7 +348,7 @@ private:
|
||||
public:
|
||||
tablet_effective_replication_map(table_id table,
|
||||
replication_strategy_ptr rs,
|
||||
token_metadata2_ptr tmptr,
|
||||
token_metadata_ptr tmptr,
|
||||
size_t replication_factor)
|
||||
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
|
||||
, _table(table)
|
||||
@@ -480,11 +480,11 @@ public:
|
||||
|
||||
virtual std::unique_ptr<token_range_splitter> make_splitter() const override {
|
||||
class splitter : public token_range_splitter {
|
||||
token_metadata2_ptr _tmptr; // To keep the tablet map alive.
|
||||
token_metadata_ptr _tmptr; // To keep the tablet map alive.
|
||||
const tablet_map& _tmap;
|
||||
std::optional<tablet_id> _next;
|
||||
public:
|
||||
splitter(token_metadata2_ptr tmptr, const tablet_map& tmap)
|
||||
splitter(token_metadata_ptr tmptr, const tablet_map& tmap)
|
||||
: _tmptr(std::move(tmptr))
|
||||
, _tmap(tmap)
|
||||
{ }
|
||||
@@ -548,7 +548,7 @@ std::unordered_set<sstring> tablet_aware_replication_strategy::recognized_tablet
|
||||
}
|
||||
|
||||
effective_replication_map_ptr tablet_aware_replication_strategy::do_make_replication_map(
|
||||
table_id table, replication_strategy_ptr rs, token_metadata2_ptr tm, size_t replication_factor) const {
|
||||
table_id table, replication_strategy_ptr rs, token_metadata_ptr tm, size_t replication_factor) const {
|
||||
return seastar::make_shared<tablet_effective_replication_map>(table, std::move(rs), std::move(tm), replication_factor);
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -43,10 +43,7 @@ class abstract_replication_strategy;
|
||||
|
||||
using token = dht::token;
|
||||
|
||||
template <typename NodeId>
|
||||
class generic_token_metadata;
|
||||
using token_metadata = generic_token_metadata<gms::inet_address>;
|
||||
using token_metadata2 = generic_token_metadata<locator::host_id>;
|
||||
class token_metadata;
|
||||
class tablet_metadata;
|
||||
|
||||
struct host_id_or_endpoint {
|
||||
@@ -71,29 +68,14 @@ struct host_id_or_endpoint {
|
||||
|
||||
// Map the host_id to endpoint based on whichever of them is set,
|
||||
// using the token_metadata
|
||||
template <typename NodeId>
|
||||
void resolve(const generic_token_metadata<NodeId>& tm);
|
||||
void resolve(const token_metadata& tm);
|
||||
};
|
||||
|
||||
template <typename NodeId>
|
||||
class token_metadata_impl;
|
||||
template <typename NodeId = gms::inet_address>
|
||||
struct topology_change_info;
|
||||
|
||||
class generic_token_metadata_base {
|
||||
public:
|
||||
struct config {
|
||||
topology::config topo_cfg;
|
||||
};
|
||||
using inet_address = gms::inet_address;
|
||||
using version_t = service::topology::version_t;
|
||||
using version_tracker_t = utils::phased_barrier::operation;
|
||||
};
|
||||
|
||||
template <typename NodeId = gms::inet_address>
|
||||
class generic_token_metadata final: public generic_token_metadata_base {
|
||||
std::unique_ptr<token_metadata_impl<NodeId>> _impl;
|
||||
std::variant<std::monostate, lw_shared_ptr<token_metadata2>, lw_shared_ptr<const token_metadata2>> _new_value;
|
||||
class token_metadata final {
|
||||
std::unique_ptr<token_metadata_impl> _impl;
|
||||
private:
|
||||
friend class token_metadata_ring_splitter;
|
||||
class tokens_iterator {
|
||||
@@ -105,30 +87,31 @@ private:
|
||||
using reference = token&;
|
||||
public:
|
||||
tokens_iterator() = default;
|
||||
tokens_iterator(const token& start, const token_metadata_impl<NodeId>* token_metadata);
|
||||
tokens_iterator(const token& start, const token_metadata_impl* token_metadata);
|
||||
bool operator==(const tokens_iterator& it) const;
|
||||
const token& operator*() const;
|
||||
tokens_iterator& operator++();
|
||||
private:
|
||||
std::vector<token>::const_iterator _cur_it;
|
||||
size_t _remaining = 0;
|
||||
const token_metadata_impl<NodeId>* _token_metadata = nullptr;
|
||||
const token_metadata_impl* _token_metadata = nullptr;
|
||||
|
||||
friend class token_metadata_impl<NodeId>;
|
||||
friend class token_metadata_impl;
|
||||
};
|
||||
|
||||
public:
|
||||
generic_token_metadata(config cfg);
|
||||
explicit generic_token_metadata(std::unique_ptr<token_metadata_impl<NodeId>> impl);
|
||||
template <typename T = NodeId>
|
||||
requires std::is_same_v<T, gms::inet_address>
|
||||
generic_token_metadata(std::unique_ptr<token_metadata_impl<NodeId>> impl, token_metadata2 new_value);
|
||||
template <typename T = NodeId>
|
||||
requires std::is_same_v<T, gms::inet_address>
|
||||
generic_token_metadata(lw_shared_ptr<const token_metadata2> new_value);
|
||||
generic_token_metadata(generic_token_metadata&&) noexcept; // Can't use "= default;" - hits some static_assert in unique_ptr
|
||||
generic_token_metadata& operator=(generic_token_metadata&&) noexcept;
|
||||
~generic_token_metadata();
|
||||
struct config {
|
||||
topology::config topo_cfg;
|
||||
};
|
||||
using inet_address = gms::inet_address;
|
||||
using version_t = service::topology::version_t;
|
||||
using version_tracker_t = utils::phased_barrier::operation;
|
||||
|
||||
token_metadata(config cfg);
|
||||
explicit token_metadata(std::unique_ptr<token_metadata_impl> impl);
|
||||
token_metadata(token_metadata&&) noexcept; // Can't use "= default;" - hits some static_assert in unique_ptr
|
||||
token_metadata& operator=(token_metadata&&) noexcept;
|
||||
~token_metadata();
|
||||
const std::vector<token>& sorted_tokens() const;
|
||||
const tablet_metadata& tablets() const;
|
||||
tablet_metadata& tablets();
|
||||
@@ -138,52 +121,21 @@ public:
|
||||
//
|
||||
// Note: the function is not exception safe!
|
||||
// It must be called only on a temporary copy of the token_metadata
|
||||
future<> update_normal_tokens(std::unordered_set<token> tokens, NodeId endpoint);
|
||||
future<> update_normal_tokens(std::unordered_set<token> tokens, host_id endpoint);
|
||||
const token& first_token(const token& start) const;
|
||||
size_t first_token_index(const token& start) const;
|
||||
std::optional<NodeId> get_endpoint(const token& token) const;
|
||||
std::vector<token> get_tokens(const NodeId& addr) const;
|
||||
const std::unordered_map<token, NodeId>& get_token_to_endpoint() const;
|
||||
const std::unordered_set<NodeId>& get_leaving_endpoints() const;
|
||||
const std::unordered_map<token, NodeId>& get_bootstrap_tokens() const;
|
||||
|
||||
template <typename T = NodeId>
|
||||
requires std::is_same_v<T, gms::inet_address>
|
||||
token_metadata2* get_new() {
|
||||
if (holds_alternative<lw_shared_ptr<token_metadata2>>(_new_value)) {
|
||||
return get<lw_shared_ptr<token_metadata2>>(_new_value).get();
|
||||
}
|
||||
throw_with_backtrace<std::runtime_error>("no mutable new value");
|
||||
}
|
||||
|
||||
template <typename T = NodeId>
|
||||
requires std::is_same_v<T, gms::inet_address>
|
||||
const token_metadata2* get_new() const {
|
||||
if (holds_alternative<lw_shared_ptr<token_metadata2>>(_new_value)) {
|
||||
return get<lw_shared_ptr<token_metadata2>>(_new_value).get();
|
||||
}
|
||||
if (holds_alternative<lw_shared_ptr<const token_metadata2>>(_new_value)) {
|
||||
return get<lw_shared_ptr<const token_metadata2>>(_new_value).get();
|
||||
}
|
||||
throw_with_backtrace<std::runtime_error>("no new value");
|
||||
}
|
||||
|
||||
template <typename T = NodeId>
|
||||
requires std::is_same_v<T, gms::inet_address>
|
||||
lw_shared_ptr<const token_metadata2> get_new_strong() const {
|
||||
if (holds_alternative<lw_shared_ptr<token_metadata2>>(_new_value)) {
|
||||
return get<lw_shared_ptr<token_metadata2>>(_new_value);
|
||||
}
|
||||
if (holds_alternative<lw_shared_ptr<const token_metadata2>>(_new_value)) {
|
||||
return get<lw_shared_ptr<const token_metadata2>>(_new_value);
|
||||
}
|
||||
throw_with_backtrace<std::runtime_error>("no new value");
|
||||
}
|
||||
std::optional<host_id> get_endpoint(const token& token) const;
|
||||
std::vector<token> get_tokens(const host_id& addr) const;
|
||||
const std::unordered_map<token, host_id>& get_token_to_endpoint() const;
|
||||
const std::unordered_set<host_id>& get_leaving_endpoints() const;
|
||||
const std::unordered_map<token, host_id>& get_bootstrap_tokens() const;
|
||||
|
||||
/**
|
||||
* Update or add endpoint given its inet_address and endpoint_dc_rack.
|
||||
* Update or add a node for a given host_id.
|
||||
* The other arguments (dc, state, shard_count) are optional, i.e. the corresponding node
|
||||
* fields won't be updated if std::nullopt is passed.
|
||||
*/
|
||||
void update_topology(NodeId ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st = std::nullopt,
|
||||
void update_topology(host_id ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st = std::nullopt,
|
||||
std::optional<shard_id> shard_count = std::nullopt);
|
||||
/**
|
||||
* Creates an iterable range of the sorted tokens starting at the token t
|
||||
@@ -235,39 +187,39 @@ public:
|
||||
/// Returns host_id of the local node.
|
||||
host_id get_my_id() const;
|
||||
|
||||
void add_bootstrap_token(token t, NodeId endpoint);
|
||||
void add_bootstrap_token(token t, host_id endpoint);
|
||||
|
||||
void add_bootstrap_tokens(std::unordered_set<token> tokens, NodeId endpoint);
|
||||
void add_bootstrap_tokens(std::unordered_set<token> tokens, host_id endpoint);
|
||||
|
||||
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
|
||||
|
||||
void add_leaving_endpoint(NodeId endpoint);
|
||||
void del_leaving_endpoint(NodeId endpoint);
|
||||
void add_leaving_endpoint(host_id endpoint);
|
||||
void del_leaving_endpoint(host_id endpoint);
|
||||
|
||||
void remove_endpoint(NodeId endpoint);
|
||||
void remove_endpoint(host_id endpoint);
|
||||
|
||||
// Checks if the node is part of the token ring. If yes, the node is one of
|
||||
// the nodes that owns the tokens and inside the set _normal_token_owners.
|
||||
bool is_normal_token_owner(NodeId endpoint) const;
|
||||
bool is_normal_token_owner(host_id endpoint) const;
|
||||
|
||||
bool is_leaving(NodeId endpoint) const;
|
||||
bool is_leaving(host_id endpoint) const;
|
||||
|
||||
// Is this node being replaced by another node
|
||||
bool is_being_replaced(NodeId endpoint) const;
|
||||
bool is_being_replaced(host_id endpoint) const;
|
||||
|
||||
// Is any node being replaced by another node
|
||||
bool is_any_node_being_replaced() const;
|
||||
|
||||
void add_replacing_endpoint(NodeId existing_node, NodeId replacing_node);
|
||||
void add_replacing_endpoint(host_id existing_node, host_id replacing_node);
|
||||
|
||||
void del_replacing_endpoint(NodeId existing_node);
|
||||
void del_replacing_endpoint(host_id existing_node);
|
||||
|
||||
/**
|
||||
* Create a full copy of token_metadata using asynchronous continuations.
|
||||
* The caller must ensure that the cloned object will not change if
|
||||
* the function yields.
|
||||
*/
|
||||
future<generic_token_metadata> clone_async() const noexcept;
|
||||
future<token_metadata> clone_async() const noexcept;
|
||||
|
||||
/**
|
||||
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
|
||||
@@ -275,7 +227,7 @@ public:
|
||||
* The caller must ensure that the cloned object will not change if
|
||||
* the function yields.
|
||||
*/
|
||||
future<generic_token_metadata> clone_only_token_map() const noexcept;
|
||||
future<token_metadata> clone_only_token_map() const noexcept;
|
||||
/**
|
||||
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
|
||||
* current leave operations have finished.
|
||||
@@ -284,7 +236,7 @@ public:
|
||||
*
|
||||
* @return a future holding a new token metadata
|
||||
*/
|
||||
future<generic_token_metadata> clone_after_all_left() const noexcept;
|
||||
future<token_metadata> clone_after_all_left() const noexcept;
|
||||
|
||||
/**
|
||||
* Gently clear the token_metadata members.
|
||||
@@ -304,16 +256,14 @@ public:
|
||||
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
|
||||
static range<dht::token> interval_to_range(boost::icl::interval<token>::interval_type i);
|
||||
|
||||
future<> update_topology_change_info(dc_rack_fn<NodeId>& get_dc_rack);
|
||||
future<> update_topology_change_info(dc_rack_fn<host_id>& get_dc_rack);
|
||||
|
||||
const std::optional<topology_change_info<NodeId>>& get_topology_change_info() const;
|
||||
const std::optional<topology_change_info>& get_topology_change_info() const;
|
||||
|
||||
token get_predecessor(token t) const;
|
||||
|
||||
const std::unordered_set<NodeId>& get_all_endpoints() const;
|
||||
const std::unordered_set<host_id>& get_all_endpoints() const;
|
||||
|
||||
template <typename T = NodeId>
|
||||
requires std::is_same_v<T, locator::host_id>
|
||||
std::unordered_set<gms::inet_address> get_all_ips() const;
|
||||
|
||||
/* Returns the number of different endpoints that own tokens in the ring.
|
||||
@@ -334,26 +284,20 @@ public:
|
||||
version_t get_version() const;
|
||||
void set_version(version_t version);
|
||||
|
||||
friend class token_metadata_impl<NodeId>;
|
||||
friend class token_metadata_impl;
|
||||
friend class shared_token_metadata;
|
||||
private:
|
||||
void set_version_tracker(version_tracker_t tracker);
|
||||
};
|
||||
|
||||
extern template class generic_token_metadata<locator::host_id>;
|
||||
extern template class generic_token_metadata<gms::inet_address>;
|
||||
extern template void host_id_or_endpoint::resolve(const token_metadata& tm);
|
||||
extern template void host_id_or_endpoint::resolve(const token_metadata2& tm);
|
||||
|
||||
template <typename NodeId>
|
||||
struct topology_change_info {
|
||||
lw_shared_ptr<generic_token_metadata<NodeId>> target_token_metadata;
|
||||
lw_shared_ptr<generic_token_metadata<NodeId>> base_token_metadata;
|
||||
lw_shared_ptr<token_metadata> target_token_metadata;
|
||||
lw_shared_ptr<token_metadata> base_token_metadata;
|
||||
std::vector<dht::token> all_tokens;
|
||||
token_metadata::read_new_t read_new;
|
||||
|
||||
topology_change_info(lw_shared_ptr<generic_token_metadata<NodeId>> target_token_metadata_,
|
||||
lw_shared_ptr<generic_token_metadata<NodeId>> base_token_metadata_,
|
||||
topology_change_info(lw_shared_ptr<token_metadata> target_token_metadata_,
|
||||
lw_shared_ptr<token_metadata> base_token_metadata_,
|
||||
std::vector<dht::token> all_tokens_,
|
||||
token_metadata::read_new_t read_new_);
|
||||
future<> clear_gently();
|
||||
@@ -367,13 +311,8 @@ mutable_token_metadata_ptr make_token_metadata_ptr(Args... args) {
|
||||
return make_lw_shared<token_metadata>(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
mutable_token_metadata2_ptr make_token_metadata2_ptr(Args... args) {
|
||||
return make_lw_shared<token_metadata2>(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
class shared_token_metadata {
|
||||
mutable_token_metadata2_ptr _shared;
|
||||
mutable_token_metadata_ptr _shared;
|
||||
token_metadata_lock_func _lock_func;
|
||||
|
||||
// We use this barrier during the transition to a new token_metadata version to ensure that the
|
||||
@@ -392,13 +331,13 @@ class shared_token_metadata {
|
||||
// includes its own invocation as an operation in the new phase.
|
||||
utils::phased_barrier _versions_barrier;
|
||||
shared_future<> _stale_versions_in_use{make_ready_future<>()};
|
||||
token_metadata2::version_t _fence_version = 0;
|
||||
token_metadata::version_t _fence_version = 0;
|
||||
|
||||
public:
|
||||
// used to construct the shared object as a sharded<> instance
|
||||
// lock_func returns semaphore_units<>
|
||||
explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata2::config cfg)
|
||||
: _shared(make_token_metadata2_ptr(std::move(cfg)))
|
||||
explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg)
|
||||
: _shared(make_token_metadata_ptr(std::move(cfg)))
|
||||
, _lock_func(std::move(lock_func))
|
||||
{
|
||||
_shared->set_version_tracker(_versions_barrier.start());
|
||||
@@ -407,18 +346,18 @@ public:
|
||||
shared_token_metadata(const shared_token_metadata& x) = delete;
|
||||
shared_token_metadata(shared_token_metadata&& x) = default;
|
||||
|
||||
token_metadata2_ptr get() const noexcept {
|
||||
token_metadata_ptr get() const noexcept {
|
||||
return _shared;
|
||||
}
|
||||
|
||||
void set(mutable_token_metadata2_ptr tmptr) noexcept;
|
||||
void set(mutable_token_metadata_ptr tmptr) noexcept;
|
||||
|
||||
future<> stale_versions_in_use() const {
|
||||
return _stale_versions_in_use.get_future();
|
||||
}
|
||||
|
||||
void update_fence_version(token_metadata2::version_t version);
|
||||
token_metadata2::version_t get_fence_version() const noexcept {
|
||||
void update_fence_version(token_metadata::version_t version);
|
||||
token_metadata::version_t get_fence_version() const noexcept {
|
||||
return _fence_version;
|
||||
}
|
||||
|
||||
@@ -438,7 +377,7 @@ public:
|
||||
// If the functor is successful, the mutated clone
|
||||
// is set back to to the shared_token_metadata,
|
||||
// otherwise, the clone is destroyed.
|
||||
future<> mutate_token_metadata(seastar::noncopyable_function<future<> (token_metadata2&)> func);
|
||||
future<> mutate_token_metadata(seastar::noncopyable_function<future<> (token_metadata&)> func);
|
||||
|
||||
// mutate_token_metadata_on_all_shards acquires the shared_token_metadata lock,
|
||||
// clones the token_metadata (using clone_async)
|
||||
@@ -450,7 +389,7 @@ public:
|
||||
// otherwise, the clone is destroyed.
|
||||
//
|
||||
// Must be called on shard 0.
|
||||
static future<> mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata2&)> func);
|
||||
static future<> mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata&)> func);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -11,13 +11,9 @@
|
||||
|
||||
namespace locator {
|
||||
|
||||
template <typename NodeId>
|
||||
class generic_token_metadata;
|
||||
using token_metadata = generic_token_metadata<gms::inet_address>;
|
||||
class token_metadata;
|
||||
|
||||
using token_metadata_ptr = lw_shared_ptr<const token_metadata>;
|
||||
using mutable_token_metadata_ptr = lw_shared_ptr<token_metadata>;
|
||||
using token_metadata2 = generic_token_metadata<host_id>;
|
||||
using token_metadata2_ptr = lw_shared_ptr<const token_metadata2>;
|
||||
using mutable_token_metadata2_ptr = lw_shared_ptr<token_metadata2>;
|
||||
|
||||
} // namespace locator
|
||||
|
||||
@@ -40,6 +40,6 @@ public:
|
||||
virtual std::optional<dht::token> next_token() = 0;
|
||||
};
|
||||
|
||||
std::unique_ptr<locator::token_range_splitter> make_splitter(token_metadata2_ptr);
|
||||
std::unique_ptr<locator::token_range_splitter> make_splitter(token_metadata_ptr);
|
||||
|
||||
}
|
||||
@@ -424,7 +424,6 @@ private:
|
||||
return _nodes_by_endpoint;
|
||||
};
|
||||
|
||||
template <typename NodeId>
|
||||
friend class token_metadata_impl;
|
||||
public:
|
||||
void test_compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
|
||||
|
||||
@@ -54,7 +54,7 @@ get_range_to_address_map(locator::effective_replication_map_ptr erm,
|
||||
|
||||
// Caller is responsible to hold token_metadata valid until the returned future is resolved
|
||||
static future<std::vector<token>>
|
||||
get_tokens_in_local_dc(const locator::token_metadata2& tm) {
|
||||
get_tokens_in_local_dc(const locator::token_metadata& tm) {
|
||||
std::vector<token> filtered_tokens;
|
||||
auto local_dc_filter = tm.get_topology().get_local_dc_filter();
|
||||
for (auto token : tm.sorted_tokens()) {
|
||||
|
||||
2
main.cc
2
main.cc
@@ -1206,7 +1206,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
const auto listen_address = utils::resolve(cfg->listen_address, family).get0();
|
||||
const auto host_id = initialize_local_info_thread(sys_ks, snitch, listen_address, *cfg, broadcast_addr, broadcast_rpc_addr);
|
||||
|
||||
shared_token_metadata::mutate_on_all_shards(token_metadata, [host_id, endpoint = broadcast_addr] (locator::token_metadata2& tm) {
|
||||
shared_token_metadata::mutate_on_all_shards(token_metadata, [host_id, endpoint = broadcast_addr] (locator::token_metadata& tm) {
|
||||
// Makes local host id available in topology cfg as soon as possible.
|
||||
// Raft topology discard the endpoint-to-id map, so the local id can
|
||||
// still be found in the config.
|
||||
|
||||
@@ -25,10 +25,7 @@ class storage_service;
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
template <typename NodeId>
|
||||
class generic_token_metadata;
|
||||
using token_metadata = generic_token_metadata<gms::inet_address>;
|
||||
using token_metadata2 = generic_token_metadata<host_id>;
|
||||
class token_metadata;
|
||||
}
|
||||
|
||||
class node_ops_info {
|
||||
@@ -139,7 +136,7 @@ public:
|
||||
sstring desc;
|
||||
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
|
||||
gms::inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
|
||||
lw_shared_ptr<const locator::token_metadata2> tmptr;
|
||||
lw_shared_ptr<const locator::token_metadata> tmptr;
|
||||
std::unordered_set<gms::inet_address> sync_nodes;
|
||||
std::unordered_set<gms::inet_address> ignore_nodes;
|
||||
node_ops_cmd_request req;
|
||||
|
||||
@@ -1492,7 +1492,7 @@ std::optional<double> repair::data_sync_repair_task_impl::expected_children_numb
|
||||
return smp::count;
|
||||
}
|
||||
|
||||
future<> repair_service::bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens) {
|
||||
future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens) {
|
||||
assert(this_shard_id() == 0);
|
||||
using inet_address = gms::inet_address;
|
||||
return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens)] () mutable {
|
||||
@@ -1531,14 +1531,14 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata2_ptr tmpt
|
||||
auto replication_factor = erm->get_replication_factor();
|
||||
|
||||
//Active ranges
|
||||
auto metadata_clone = locator::make_token_metadata2_ptr(tmptr->clone_only_token_map().get0());
|
||||
auto range_addresses = strat.get_range_addresses(locator::token_metadata(metadata_clone)).get0();
|
||||
auto metadata_clone = tmptr->clone_only_token_map().get0();
|
||||
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
|
||||
|
||||
//Pending ranges
|
||||
metadata_clone->update_topology(myid, myloc, locator::node::state::bootstrapping);
|
||||
metadata_clone->update_normal_tokens(tokens, myid).get();
|
||||
auto pending_range_addresses = strat.get_range_addresses(locator::token_metadata(metadata_clone)).get0();
|
||||
metadata_clone->clear_gently().get();
|
||||
metadata_clone.update_topology(myid, myloc, locator::node::state::bootstrapping);
|
||||
metadata_clone.update_normal_tokens(tokens, myid).get();
|
||||
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
|
||||
metadata_clone.clear_gently().get();
|
||||
|
||||
//Collects the source that will have its range moved to the new node
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
@@ -1669,7 +1669,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata2_ptr tmpt
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata2_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
assert(this_shard_id() == 0);
|
||||
using inet_address = gms::inet_address;
|
||||
return seastar::async([this, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable {
|
||||
@@ -1720,15 +1720,15 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
// Find (for each range) all nodes that store replicas for these ranges as well
|
||||
for (auto& r : ranges) {
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto eps = strat.calculate_natural_ips(end_token, tmptr).get0();
|
||||
auto eps = strat.calculate_natural_ips(end_token, *tmptr).get0();
|
||||
current_replica_endpoints.emplace(r, std::move(eps));
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
auto temp = locator::make_token_metadata2_ptr(tmptr->clone_after_all_left().get0());
|
||||
auto temp = tmptr->clone_after_all_left().get0();
|
||||
// leaving_node might or might not be 'leaving'. If it was not leaving (that is, removenode
|
||||
// command was used), it is still present in temp and must be removed.
|
||||
if (temp->is_normal_token_owner(leaving_node_id)) {
|
||||
temp->remove_endpoint(leaving_node_id);
|
||||
if (temp.is_normal_token_owner(leaving_node_id)) {
|
||||
temp.remove_endpoint(leaving_node_id);
|
||||
}
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
dht::token_range_vector ranges_for_removenode;
|
||||
@@ -1843,7 +1843,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
}
|
||||
}
|
||||
}
|
||||
temp->clear_gently().get();
|
||||
temp.clear_gently().get();
|
||||
if (reason == streaming::stream_reason::decommission) {
|
||||
container().invoke_on_all([nr_ranges_skipped] (repair_service& rs) {
|
||||
rs.get_metrics().decommission_finished_ranges += nr_ranges_skipped;
|
||||
@@ -1865,13 +1865,13 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::decommission_with_repair(locator::token_metadata2_ptr tmptr) {
|
||||
future<> repair_service::decommission_with_repair(locator::token_metadata_ptr tmptr) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto my_address = tmptr->get_topology().my_address();
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), my_address, {});
|
||||
}
|
||||
|
||||
future<> repair_service::removenode_with_repair(locator::token_metadata2_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
assert(this_shard_id() == 0);
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), std::move(leaving_node), std::move(ops)).then([this] {
|
||||
rlogger.debug("Triggering off-strategy compaction for all non-system tables on removenode completion");
|
||||
@@ -1884,7 +1884,7 @@ future<> repair_service::removenode_with_repair(locator::token_metadata2_ptr tmp
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata2_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
assert(this_shard_id() == 0);
|
||||
return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable {
|
||||
auto& db = get_db().local();
|
||||
@@ -1898,7 +1898,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata2
|
||||
}
|
||||
auto& strat = erm->get_replication_strategy();
|
||||
// Okay to yield since tm is immutable
|
||||
dht::token_range_vector ranges = strat.get_ranges(myid, locator::token_metadata(tmptr)).get0();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myid, tmptr).get0();
|
||||
auto nr_tables = get_nr_tables(db, keyspace_name);
|
||||
nr_ranges_total += ranges.size() * nr_tables;
|
||||
|
||||
@@ -1922,7 +1922,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata2
|
||||
continue;
|
||||
}
|
||||
auto& strat = erm->get_replication_strategy();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myid, locator::token_metadata(tmptr)).get0();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myid, *tmptr).get0();
|
||||
auto& topology = erm->get_token_metadata().get_topology();
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
auto nr_tables = get_nr_tables(db, keyspace_name);
|
||||
@@ -1931,7 +1931,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata2
|
||||
auto& r = *it;
|
||||
seastar::thread::maybe_yield();
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_ips(end_token, tmptr).get0() |
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_ips(end_token, *tmptr).get0() |
|
||||
boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) {
|
||||
if (node == myip) {
|
||||
return false;
|
||||
@@ -1969,7 +1969,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata2
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::rebuild_with_repair(locator::token_metadata2_ptr tmptr, sstring source_dc) {
|
||||
future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto op = sstring("rebuild_with_repair");
|
||||
if (source_dc.empty()) {
|
||||
@@ -1985,7 +1985,7 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata2_ptr tmptr,
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::replace_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto cloned_tm = co_await tmptr->clone_async();
|
||||
auto op = sstring("replace_with_repair");
|
||||
@@ -1994,7 +1994,7 @@ future<> repair_service::replace_with_repair(locator::token_metadata2_ptr tmptr,
|
||||
auto reason = streaming::stream_reason::replace;
|
||||
// update a cloned version of tmptr
|
||||
// no need to set the original version
|
||||
auto cloned_tmptr = make_token_metadata2_ptr(std::move(cloned_tm));
|
||||
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
|
||||
cloned_tmptr->update_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing);
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id());
|
||||
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), myloc.dc, reason, std::move(ignore_nodes));
|
||||
|
||||
@@ -669,7 +669,7 @@ void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_
|
||||
lw_shared_ptr<const decorated_key_with_hash> last_dk;
|
||||
bool do_small_table_optimization = erm && small_table_optimization;
|
||||
auto* strat = do_small_table_optimization ? &erm->get_replication_strategy() : nullptr;
|
||||
auto tm = do_small_table_optimization ? erm->get_token_metadata_ptr() : nullptr;
|
||||
auto* tm = do_small_table_optimization ? &erm->get_token_metadata() : nullptr;
|
||||
auto myip = do_small_table_optimization ? erm->get_topology().my_address() : gms::inet_address();
|
||||
for (auto& r : rows) {
|
||||
thread::maybe_yield();
|
||||
@@ -679,7 +679,7 @@ void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_
|
||||
const auto& dk = r.get_dk_with_hash()->dk;
|
||||
if (do_small_table_optimization) {
|
||||
// Check if the token is owned by the node
|
||||
auto eps = strat->calculate_natural_ips(dk.token(), tm).get0();
|
||||
auto eps = strat->calculate_natural_ips(dk.token(), *tm).get0();
|
||||
if (!eps.contains(myip)) {
|
||||
rlogger.trace("master: ignore row, token={}", dk.token());
|
||||
continue;
|
||||
@@ -1900,7 +1900,7 @@ public:
|
||||
}
|
||||
if (small_table_optimization) {
|
||||
auto& strat = erm.get_replication_strategy();
|
||||
auto& tm = erm.get_token_metadata_ptr();
|
||||
const auto& tm = erm.get_token_metadata();
|
||||
std::list<repair_row> tmp;
|
||||
for (auto& row : row_diff) {
|
||||
repair_row r = std::move(row);
|
||||
|
||||
@@ -138,14 +138,14 @@ public:
|
||||
|
||||
// The tokens are the tokens assigned to the bootstrap node.
|
||||
// all repair-based node operation entry points must be called on shard 0
|
||||
future<> bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
|
||||
future<> decommission_with_repair(locator::token_metadata2_ptr tmptr);
|
||||
future<> removenode_with_repair(locator::token_metadata2_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> rebuild_with_repair(locator::token_metadata2_ptr tmptr, sstring source_dc);
|
||||
future<> replace_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
future<> bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
|
||||
future<> decommission_with_repair(locator::token_metadata_ptr tmptr);
|
||||
future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc);
|
||||
future<> replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
private:
|
||||
future<> do_decommission_removenode_with_repair(locator::token_metadata2_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> do_rebuild_replace_with_repair(locator::token_metadata2_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
|
||||
// Must be called on shard 0
|
||||
future<> sync_data_using_repair(sstring keyspace,
|
||||
|
||||
@@ -1561,7 +1561,7 @@ public:
|
||||
}
|
||||
|
||||
const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
|
||||
const locator::token_metadata2& get_token_metadata() const { return *_shared_token_metadata.get(); }
|
||||
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }
|
||||
|
||||
wasm::manager& wasm() noexcept { return _wasm; }
|
||||
const wasm::manager& wasm() const noexcept { return _wasm; }
|
||||
|
||||
@@ -303,7 +303,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
locator::token_metadata2_ptr forward_service::get_token_metadata_ptr() const noexcept {
|
||||
locator::token_metadata_ptr forward_service::get_token_metadata_ptr() const noexcept {
|
||||
return _shared_token_metadata.get();
|
||||
}
|
||||
|
||||
|
||||
@@ -159,7 +159,7 @@ private:
|
||||
// Used to execute a `forward_request` on a shard.
|
||||
future<query::forward_result> execute_on_this_shard(query::forward_request req, std::optional<tracing::trace_info> tr_info);
|
||||
|
||||
locator::token_metadata2_ptr get_token_metadata_ptr() const noexcept;
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
|
||||
|
||||
void register_metrics();
|
||||
void init_messaging_service();
|
||||
|
||||
@@ -2286,7 +2286,7 @@ bool paxos_response_handler::learned(gms::inet_address ep) {
|
||||
}
|
||||
|
||||
static inet_address_vector_replica_set
|
||||
replica_ids_to_endpoints(const locator::token_metadata2& tm, const std::vector<locator::host_id>& replica_ids) {
|
||||
replica_ids_to_endpoints(const locator::token_metadata& tm, const std::vector<locator::host_id>& replica_ids) {
|
||||
inet_address_vector_replica_set endpoints;
|
||||
endpoints.reserve(replica_ids.size());
|
||||
|
||||
@@ -2300,7 +2300,7 @@ replica_ids_to_endpoints(const locator::token_metadata2& tm, const std::vector<l
|
||||
}
|
||||
|
||||
static std::vector<locator::host_id>
|
||||
endpoints_to_replica_ids(const locator::token_metadata2& tm, const inet_address_vector_replica_set& endpoints) {
|
||||
endpoints_to_replica_ids(const locator::token_metadata& tm, const inet_address_vector_replica_set& endpoints) {
|
||||
std::vector<locator::host_id> replica_ids;
|
||||
replica_ids.reserve(endpoints.size());
|
||||
|
||||
@@ -6545,7 +6545,7 @@ storage_proxy::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
locator::token_metadata2_ptr storage_proxy::get_token_metadata_ptr() const noexcept {
|
||||
locator::token_metadata_ptr storage_proxy::get_token_metadata_ptr() const noexcept {
|
||||
return _shared_token_metadata.get();
|
||||
}
|
||||
|
||||
|
||||
@@ -220,7 +220,7 @@ public:
|
||||
return _erm_factory;
|
||||
}
|
||||
|
||||
locator::token_metadata2_ptr get_token_metadata_ptr() const noexcept;
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
|
||||
|
||||
query::max_result_size get_max_result_size(const query::partition_slice& slice) const;
|
||||
query::tombstone_limit get_tombstone_limit() const;
|
||||
|
||||
@@ -283,7 +283,7 @@ static future<> set_gossip_tokens(gms::gossiper& g,
|
||||
});
|
||||
}
|
||||
|
||||
static std::unordered_map<token, gms::inet_address> get_token_to_endpoint(const locator::token_metadata2& tm) {
|
||||
static std::unordered_map<token, gms::inet_address> get_token_to_endpoint(const locator::token_metadata& tm) {
|
||||
const auto& map = tm.get_token_to_endpoint();
|
||||
std::unordered_map<token, gms::inet_address> result;
|
||||
result.reserve(map.size());
|
||||
@@ -405,7 +405,7 @@ future<> storage_service::topology_state_load() {
|
||||
co_await _messaging.local().ban_host(locator::host_id{id.uuid()});
|
||||
}
|
||||
|
||||
co_await mutate_token_metadata(seastar::coroutine::lambda([this, &id2ip, &am] (mutable_token_metadata2_ptr tmptr) -> future<> {
|
||||
co_await mutate_token_metadata(seastar::coroutine::lambda([this, &id2ip, &am] (mutable_token_metadata_ptr tmptr) -> future<> {
|
||||
co_await tmptr->clear_gently(); // drop previous state
|
||||
|
||||
tmptr->set_version(_topology_state_machine._topology.version);
|
||||
@@ -925,11 +925,11 @@ class topology_coordinator {
|
||||
// True if an ongoing topology change should be rolled back
|
||||
bool _rollback = false;
|
||||
|
||||
const locator::token_metadata2& get_token_metadata() const noexcept {
|
||||
const locator::token_metadata& get_token_metadata() const noexcept {
|
||||
return *_shared_tm.get();
|
||||
}
|
||||
|
||||
locator::token_metadata2_ptr get_token_metadata_ptr() const noexcept {
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept {
|
||||
return _shared_tm.get();
|
||||
}
|
||||
|
||||
@@ -1207,7 +1207,7 @@ class topology_coordinator {
|
||||
// If there's a bootstrapping node, its tokens should be included in the new generation.
|
||||
// Pass them and a reference to the bootstrapping node's replica_state through `binfo`.
|
||||
future<std::pair<utils::UUID, utils::chunked_vector<mutation>>> prepare_new_cdc_generation_data(
|
||||
locator::token_metadata2_ptr tmptr, const group0_guard& guard, std::optional<bootstrapping_info> binfo) {
|
||||
locator::token_metadata_ptr tmptr, const group0_guard& guard, std::optional<bootstrapping_info> binfo) {
|
||||
auto get_sharding_info = [&] (dht::token end) -> std::pair<size_t, uint8_t> {
|
||||
if (binfo && binfo->bootstrap_tokens.contains(end)) {
|
||||
return {binfo->rs.shard_count, binfo->rs.ignore_msb};
|
||||
@@ -1269,7 +1269,7 @@ class topology_coordinator {
|
||||
// (bootstrapping is quick if there is no data in the cluster, but usually if one has 100 nodes they
|
||||
// have tons of data, so indeed streaming/repair will take much longer (hours/days)).
|
||||
future<std::tuple<utils::UUID, group0_guard, canonical_mutation>> prepare_and_broadcast_cdc_generation_data(
|
||||
locator::token_metadata2_ptr tmptr, group0_guard guard, std::optional<bootstrapping_info> binfo) {
|
||||
locator::token_metadata_ptr tmptr, group0_guard guard, std::optional<bootstrapping_info> binfo) {
|
||||
auto [gen_uuid, gen_mutations] = co_await prepare_new_cdc_generation_data(tmptr, guard, binfo);
|
||||
|
||||
if (gen_mutations.empty()) {
|
||||
@@ -3414,7 +3414,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
}
|
||||
|
||||
slogger.debug("Setting tokens to {}", bootstrap_tokens);
|
||||
co_await mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata2_ptr tmptr) -> future<> {
|
||||
co_await mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) -> future<> {
|
||||
// This node must know about its chosen tokens before other nodes do
|
||||
// since they may start sending writes to this node after it gossips status = NORMAL.
|
||||
// Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now.
|
||||
@@ -3493,7 +3493,7 @@ future<> storage_service::mark_existing_views_as_built() {
|
||||
});
|
||||
}
|
||||
|
||||
std::unordered_set<gms::inet_address> storage_service::parse_node_list(sstring comma_separated_list, const token_metadata2& tm) {
|
||||
std::unordered_set<gms::inet_address> storage_service::parse_node_list(sstring comma_separated_list, const token_metadata& tm) {
|
||||
std::vector<sstring> ignore_nodes_strs = utils::split_comma_separated_list(std::move(comma_separated_list));
|
||||
std::unordered_set<gms::inet_address> ignore_nodes;
|
||||
for (const sstring& n : ignore_nodes_strs) {
|
||||
@@ -3555,7 +3555,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
|
||||
// When is_repair_based_node_ops_enabled is true, the bootstrap node
|
||||
// will use node_ops_cmd to bootstrap, node_ops_cmd will update the pending ranges.
|
||||
slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens);
|
||||
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata2_ptr tmptr) {
|
||||
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->update_topology(tmptr->get_my_id(), _snitch.local()->get_location(), locator::node::state::bootstrapping);
|
||||
tmptr->add_bootstrap_tokens(bootstrap_tokens, tmptr->get_my_id());
|
||||
@@ -3650,25 +3650,20 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint, gms::per
|
||||
// continue.
|
||||
auto tmlock = co_await get_token_metadata_lock();
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
auto update_tm = [&]<typename NodeId>(locator::generic_token_metadata<NodeId>& tm, NodeId n, std::optional<locator::endpoint_dc_rack> dc_rack) {
|
||||
if (tm.is_normal_token_owner(n)) {
|
||||
// If isLeaving is false, we have missed both LEAVING and LEFT. However, if
|
||||
// isLeaving is true, we have only missed LEFT. Waiting time between completing
|
||||
// leave operation and rebootstrapping is relatively short, so the latter is quite
|
||||
// common (not enough time for gossip to spread). Therefore we report only the
|
||||
// former in the log.
|
||||
if (!tm.is_leaving(n)) {
|
||||
slogger.info("Node {} state jump to bootstrap", n);
|
||||
}
|
||||
tm.remove_endpoint(n);
|
||||
}
|
||||
|
||||
tm.update_topology(n, dc_rack, locator::node::state::bootstrapping);
|
||||
tm.add_bootstrap_tokens(tokens, n);
|
||||
};
|
||||
const auto dc_rack = get_dc_rack_for(endpoint);
|
||||
const auto host_id = _gossiper.get_host_id(endpoint);
|
||||
update_tm(*tmptr, host_id, dc_rack);
|
||||
if (tmptr->is_normal_token_owner(host_id)) {
|
||||
// If isLeaving is false, we have missed both LEAVING and LEFT. However, if
|
||||
// isLeaving is true, we have only missed LEFT. Waiting time between completing
|
||||
// leave operation and rebootstrapping is relatively short, so the latter is quite
|
||||
// common (not enough time for gossip to spread). Therefore we report only the
|
||||
// former in the log.
|
||||
if (!tmptr->is_leaving(host_id)) {
|
||||
slogger.info("Node {} state jump to bootstrap", host_id);
|
||||
}
|
||||
tmptr->remove_endpoint(host_id);
|
||||
}
|
||||
tmptr->update_topology(host_id, get_dc_rack_for(endpoint), locator::node::state::bootstrapping);
|
||||
tmptr->add_bootstrap_tokens(tokens, host_id);
|
||||
tmptr->update_host_id(host_id, endpoint);
|
||||
|
||||
co_await update_topology_change_info(tmptr, ::format("handle_state_bootstrap {}", endpoint));
|
||||
@@ -4302,13 +4297,13 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
|
||||
co_return co_await join_token_ring(sys_dist_ks, proxy, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay());
|
||||
}
|
||||
|
||||
future<> storage_service::replicate_to_all_cores(mutable_token_metadata2_ptr tmptr) noexcept {
|
||||
future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
slogger.debug("Replicating token_metadata to all cores");
|
||||
std::exception_ptr ex;
|
||||
|
||||
std::vector<mutable_token_metadata2_ptr> pending_token_metadata_ptr;
|
||||
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
|
||||
pending_token_metadata_ptr.resize(smp::count);
|
||||
std::vector<std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr>> pending_effective_replication_maps;
|
||||
pending_effective_replication_maps.resize(smp::count);
|
||||
@@ -4339,7 +4334,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata2_ptr tmp
|
||||
pending_token_metadata_ptr[base_shard] = tmptr;
|
||||
// clone a local copy of updated token_metadata on all other shards
|
||||
co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> {
|
||||
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata2_ptr(co_await tmptr->clone_async());
|
||||
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tmptr->clone_async());
|
||||
});
|
||||
|
||||
// Precalculate new effective_replication_map for all keyspaces
|
||||
@@ -5496,7 +5491,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(tmptr->get_host_id(node));
|
||||
@@ -5504,7 +5499,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(tmptr->get_host_id(node));
|
||||
@@ -5544,7 +5539,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(tmptr->get_host_id(node));
|
||||
@@ -5552,7 +5547,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes));
|
||||
}).get();
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(tmptr->get_host_id(node));
|
||||
@@ -5605,7 +5600,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
if (!coordinator_host_id) {
|
||||
throw std::runtime_error("Coordinator host_id not found");
|
||||
}
|
||||
mutate_token_metadata([coordinator, coordinator_host_id, &req, this] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
mutate_token_metadata([coordinator, coordinator_host_id, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
@@ -5631,7 +5626,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, coordinator_host_id, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, coordinator_host_id, req = std::move(req)] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
return mutate_token_metadata([this, coordinator, coordinator_host_id, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
@@ -5682,7 +5677,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
if (!coordinator_host_id) {
|
||||
throw std::runtime_error("Coordinator host_id not found");
|
||||
}
|
||||
mutate_token_metadata([coordinator, coordinator_host_id, &req, this] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
mutate_token_metadata([coordinator, coordinator_host_id, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
@@ -5697,7 +5692,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));
|
||||
}).get();
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
@@ -5914,12 +5909,12 @@ storage_service::get_changed_ranges_for_leaving(locator::vnode_effective_replica
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
auto temp = locator::make_token_metadata2_ptr(co_await get_token_metadata_ptr()->clone_after_all_left());
|
||||
auto temp = co_await get_token_metadata_ptr()->clone_after_all_left();
|
||||
|
||||
// endpoint might or might not be 'leaving'. If it was not leaving (that is, removenode
|
||||
// command was used), it is still present in temp and must be removed.
|
||||
if (const auto host_id = temp->get_host_id_if_known(endpoint); host_id && temp->is_normal_token_owner(*host_id)) {
|
||||
temp->remove_endpoint(*host_id);
|
||||
if (const auto host_id = temp.get_host_id_if_known(endpoint); host_id && temp.is_normal_token_owner(*host_id)) {
|
||||
temp.remove_endpoint(*host_id);
|
||||
}
|
||||
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges;
|
||||
@@ -5960,7 +5955,7 @@ storage_service::get_changed_ranges_for_leaving(locator::vnode_effective_replica
|
||||
// E.g. everywhere_replication_strategy
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_await temp->clear_gently();
|
||||
co_await temp.clear_gently();
|
||||
|
||||
co_return changed_ranges;
|
||||
}
|
||||
@@ -6080,7 +6075,7 @@ future<> storage_service::excise(std::unordered_set<token> tokens, inet_address
|
||||
future<> storage_service::leave_ring() {
|
||||
co_await _cdc_gens.local().leave_ring();
|
||||
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP);
|
||||
co_await mutate_token_metadata([this] (mutable_token_metadata2_ptr tmptr) {
|
||||
co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
const auto my_id = tmptr->get_my_id();
|
||||
tmptr->remove_endpoint(my_id);
|
||||
@@ -6242,7 +6237,7 @@ future<locator::token_metadata_lock> storage_service::get_token_metadata_lock()
|
||||
// db::schema_tables::do_merge_schema.
|
||||
//
|
||||
// Note: must be called on shard 0.
|
||||
future<> storage_service::mutate_token_metadata(std::function<future<> (mutable_token_metadata2_ptr)> func, acquire_merge_lock acquire_merge_lock) noexcept {
|
||||
future<> storage_service::mutate_token_metadata(std::function<future<> (mutable_token_metadata_ptr)> func, acquire_merge_lock acquire_merge_lock) noexcept {
|
||||
assert(this_shard_id() == 0);
|
||||
std::optional<token_metadata_lock> tmlock;
|
||||
|
||||
@@ -6254,7 +6249,7 @@ future<> storage_service::mutate_token_metadata(std::function<future<> (mutable_
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
future<> storage_service::update_topology_change_info(mutable_token_metadata2_ptr tmptr, sstring reason) {
|
||||
future<> storage_service::update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason) {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
try {
|
||||
@@ -6282,7 +6277,7 @@ future<> storage_service::update_topology_change_info(mutable_token_metadata2_pt
|
||||
}
|
||||
|
||||
future<> storage_service::update_topology_change_info(sstring reason, acquire_merge_lock acquire_merge_lock) {
|
||||
return mutate_token_metadata([this, reason = std::move(reason)] (mutable_token_metadata2_ptr tmptr) mutable {
|
||||
return mutate_token_metadata([this, reason = std::move(reason)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
return update_topology_change_info(std::move(tmptr), std::move(reason));
|
||||
}, acquire_merge_lock);
|
||||
}
|
||||
@@ -6324,7 +6319,7 @@ future<> storage_service::load_tablet_metadata() {
|
||||
future<> storage_service::snitch_reconfigured() {
|
||||
assert(this_shard_id() == 0);
|
||||
auto& snitch = _snitch.local();
|
||||
co_await mutate_token_metadata([&snitch] (mutable_token_metadata2_ptr tmptr) -> future<> {
|
||||
co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> {
|
||||
// re-read local rack and DC info
|
||||
tmptr->update_topology(tmptr->get_my_id(), snitch->get_location());
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -113,12 +113,9 @@ private:
|
||||
using endpoint_details = dht::endpoint_details;
|
||||
using boot_strapper = dht::boot_strapper;
|
||||
using token_metadata = locator::token_metadata;
|
||||
using token_metadata2 = locator::token_metadata2;
|
||||
using shared_token_metadata = locator::shared_token_metadata;
|
||||
using token_metadata_ptr = locator::token_metadata_ptr;
|
||||
using token_metadata2_ptr = locator::token_metadata2_ptr;
|
||||
using mutable_token_metadata_ptr = locator::mutable_token_metadata_ptr;
|
||||
using mutable_token_metadata2_ptr = locator::mutable_token_metadata2_ptr;
|
||||
using token_metadata_lock = locator::token_metadata_lock;
|
||||
using application_state = gms::application_state;
|
||||
using inet_address = gms::inet_address;
|
||||
@@ -214,24 +211,24 @@ private:
|
||||
// db::schema_tables::do_merge_schema.
|
||||
//
|
||||
// Note: must be called on shard 0.
|
||||
future<> mutate_token_metadata(std::function<future<> (mutable_token_metadata2_ptr)> func, acquire_merge_lock aml = acquire_merge_lock::yes) noexcept;
|
||||
future<> mutate_token_metadata(std::function<future<> (mutable_token_metadata_ptr)> func, acquire_merge_lock aml = acquire_merge_lock::yes) noexcept;
|
||||
|
||||
// Update pending ranges locally and then replicate to all cores.
|
||||
// Should be serialized under token_metadata_lock.
|
||||
// Must be called on shard 0.
|
||||
future<> update_topology_change_info(mutable_token_metadata2_ptr tmptr, sstring reason);
|
||||
future<> update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason);
|
||||
future<> update_topology_change_info(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes);
|
||||
future<> keyspace_changed(const sstring& ks_name);
|
||||
void register_metrics();
|
||||
future<> snitch_reconfigured();
|
||||
|
||||
future<mutable_token_metadata2_ptr> get_mutable_token_metadata_ptr() noexcept {
|
||||
return _shared_token_metadata.get()->clone_async().then([] (token_metadata2 tm) {
|
||||
future<mutable_token_metadata_ptr> get_mutable_token_metadata_ptr() noexcept {
|
||||
return _shared_token_metadata.get()->clone_async().then([] (token_metadata tm) {
|
||||
// bump the token_metadata ring_version
|
||||
// to invalidate cached token/replication mappings
|
||||
// when the modified token_metadata is committed.
|
||||
tm.invalidate_cached_rings();
|
||||
return make_ready_future<mutable_token_metadata2_ptr>(make_token_metadata2_ptr(std::move(tm)));
|
||||
return make_ready_future<mutable_token_metadata_ptr>(make_token_metadata_ptr(std::move(tm)));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -258,11 +255,11 @@ public:
|
||||
return _erm_factory;
|
||||
}
|
||||
|
||||
token_metadata2_ptr get_token_metadata_ptr() const noexcept {
|
||||
token_metadata_ptr get_token_metadata_ptr() const noexcept {
|
||||
return _shared_token_metadata.get();
|
||||
}
|
||||
|
||||
const locator::token_metadata2& get_token_metadata() const noexcept {
|
||||
const locator::token_metadata& get_token_metadata() const noexcept {
|
||||
return *_shared_token_metadata.get();
|
||||
}
|
||||
|
||||
@@ -328,7 +325,7 @@ private:
|
||||
|
||||
public:
|
||||
|
||||
static std::unordered_set<gms::inet_address> parse_node_list(sstring comma_separated_list, const locator::token_metadata2& tm);
|
||||
static std::unordered_set<gms::inet_address> parse_node_list(sstring comma_separated_list, const locator::token_metadata& tm);
|
||||
|
||||
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
|
||||
@@ -480,7 +477,7 @@ private:
|
||||
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(inet_address endpoint);
|
||||
private:
|
||||
// Should be serialized under token_metadata_lock.
|
||||
future<> replicate_to_all_cores(mutable_token_metadata2_ptr tmptr) noexcept;
|
||||
future<> replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
locator::snitch_signal_slot_t _snitch_reconfigure;
|
||||
|
||||
@@ -199,7 +199,7 @@ class load_balancer {
|
||||
|
||||
std::optional<locator::load_sketch> target_load_sketch;
|
||||
|
||||
future<load_sketch&> get_load_sketch(const token_metadata2_ptr& tm) {
|
||||
future<load_sketch&> get_load_sketch(const token_metadata_ptr& tm) {
|
||||
if (!target_load_sketch) {
|
||||
target_load_sketch.emplace(tm);
|
||||
co_await target_load_sketch->populate(id);
|
||||
@@ -255,7 +255,7 @@ class load_balancer {
|
||||
const size_t max_write_streaming_load = 2;
|
||||
const size_t max_read_streaming_load = 4;
|
||||
|
||||
token_metadata2_ptr _tm;
|
||||
token_metadata_ptr _tm;
|
||||
load_balancer_stats_manager& _stats;
|
||||
private:
|
||||
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
|
||||
@@ -290,7 +290,7 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
load_balancer(token_metadata2_ptr tm, load_balancer_stats_manager& stats)
|
||||
load_balancer(token_metadata_ptr tm, load_balancer_stats_manager& stats)
|
||||
: _tm(std::move(tm))
|
||||
, _stats(stats)
|
||||
{ }
|
||||
@@ -819,7 +819,7 @@ public:
|
||||
_stopped = true;
|
||||
}
|
||||
|
||||
future<migration_plan> balance_tablets(token_metadata2_ptr tm) {
|
||||
future<migration_plan> balance_tablets(token_metadata_ptr tm) {
|
||||
load_balancer lb(tm, _load_balancer_stats);
|
||||
co_return co_await lb.make_plan();
|
||||
}
|
||||
@@ -869,7 +869,7 @@ future<> tablet_allocator::stop() {
|
||||
return impl().stop();
|
||||
}
|
||||
|
||||
future<migration_plan> tablet_allocator::balance_tablets(locator::token_metadata2_ptr tm) {
|
||||
future<migration_plan> tablet_allocator::balance_tablets(locator::token_metadata_ptr tm) {
|
||||
return impl().balance_tablets(tm);
|
||||
}
|
||||
|
||||
|
||||
@@ -90,7 +90,7 @@ public:
|
||||
///
|
||||
/// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account.
|
||||
///
|
||||
future<migration_plan> balance_tablets(locator::token_metadata2_ptr);
|
||||
future<migration_plan> balance_tablets(locator::token_metadata_ptr);
|
||||
|
||||
/// Should be called when the node is no longer a leader.
|
||||
void on_leadership_lost();
|
||||
|
||||
@@ -267,7 +267,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) {
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
@@ -312,7 +312,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) {
|
||||
|
||||
std::vector<unsigned> node3_shards(node3_shard_count, 0);
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tablet_metadata tab_meta;
|
||||
tablet_map tmap(4);
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ static void check_ranges_are_sorted(vnode_effective_replication_map_ptr erm, gms
|
||||
|
||||
void strategy_sanity_check(
|
||||
replication_strategy_ptr ars_ptr,
|
||||
const token_metadata2_ptr& tm,
|
||||
const token_metadata_ptr& tm,
|
||||
const std::map<sstring, sstring>& options) {
|
||||
|
||||
const network_topology_strategy* nts_ptr =
|
||||
@@ -95,7 +95,7 @@ void strategy_sanity_check(
|
||||
|
||||
void endpoints_check(
|
||||
replication_strategy_ptr ars_ptr,
|
||||
const token_metadata2_ptr& tm,
|
||||
const token_metadata_ptr& tm,
|
||||
const inet_address_vector_replica_set& endpoints,
|
||||
const locator::topology& topo) {
|
||||
|
||||
@@ -156,7 +156,7 @@ auto d2t = [](double d) -> int64_t {
|
||||
void full_ring_check(const std::vector<ring_point>& ring_points,
|
||||
const std::map<sstring, sstring>& options,
|
||||
replication_strategy_ptr ars_ptr,
|
||||
locator::token_metadata2_ptr tmptr) {
|
||||
locator::token_metadata_ptr tmptr) {
|
||||
auto& tm = *tmptr;
|
||||
const auto& topo = tm.get_topology();
|
||||
strategy_sanity_check(ars_ptr, tmptr, options);
|
||||
@@ -190,7 +190,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
|
||||
void full_ring_check(const tablet_map& tmap,
|
||||
const std::map<sstring, sstring>& options,
|
||||
replication_strategy_ptr rs_ptr,
|
||||
locator::token_metadata2_ptr tmptr) {
|
||||
locator::token_metadata_ptr tmptr) {
|
||||
auto& tm = *tmptr;
|
||||
const auto& topo = tm.get_topology();
|
||||
|
||||
@@ -250,7 +250,7 @@ void simple_test() {
|
||||
};
|
||||
|
||||
// Initialize the token_metadata
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) -> future<> {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
std::unordered_set<token> tokens;
|
||||
@@ -292,7 +292,7 @@ void simple_test() {
|
||||
// points will be taken from the cache when it shouldn't and the
|
||||
// corresponding check will fail.
|
||||
//
|
||||
stm.mutate_token_metadata([] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([] (token_metadata& tm) {
|
||||
tm.invalidate_cached_rings();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -357,7 +357,7 @@ void heavy_origin_test() {
|
||||
}
|
||||
}
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) -> future<> {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
|
||||
@@ -413,7 +413,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
|
||||
};
|
||||
|
||||
// Initialize the token_metadata
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) -> future<> {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
std::unordered_set<token> tokens;
|
||||
@@ -524,7 +524,7 @@ static bool has_sufficient_replicas(
|
||||
}
|
||||
|
||||
static locator::host_id_set calculate_natural_endpoints(
|
||||
const token& search_token, const token_metadata2& tm,
|
||||
const token& search_token, const token_metadata& tm,
|
||||
const locator::topology& topo,
|
||||
const std::unordered_map<sstring, size_t>& datacenters) {
|
||||
//
|
||||
@@ -650,7 +650,7 @@ static void test_equivalence(const shared_token_metadata& stm, const locator::to
|
||||
return std::make_pair(p.first, to_sstring(p.second));
|
||||
})));
|
||||
|
||||
const token_metadata2& tm = *stm.get();
|
||||
const token_metadata& tm = *stm.get();
|
||||
for (size_t i = 0; i < 1000; ++i) {
|
||||
auto token = dht::token::get_random_token();
|
||||
auto expected = calculate_natural_endpoints(token, tm, topo, datacenters);
|
||||
@@ -737,7 +737,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
|
||||
}
|
||||
}
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) -> future<> {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
generate_topology(tm.get_topology(), datacenters, nodes);
|
||||
for (auto&& i : endpoint_tokens) {
|
||||
co_await tm.update_normal_tokens(std::move(i.second), i.first);
|
||||
@@ -835,7 +835,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
|
||||
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
auto& topo = tm.get_topology();
|
||||
generate_topology(topo, datacenters, nodes);
|
||||
|
||||
@@ -882,7 +882,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
|
||||
|
||||
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack);
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host1, ip1); // this_node added last on purpose
|
||||
return make_ready_future<>();
|
||||
@@ -905,7 +905,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
|
||||
|
||||
// Removing local node
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.remove_endpoint(host1);
|
||||
tm.update_host_id(host3, ip3);
|
||||
return make_ready_future<>();
|
||||
@@ -918,7 +918,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
|
||||
|
||||
// Removing node with no local node
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.remove_endpoint(host2);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -930,7 +930,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
|
||||
|
||||
// Repopulate after clear_gently()
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) -> future<> {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
co_await tm.clear_gently();
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host1, ip1); // this_node added last on purpose
|
||||
@@ -953,7 +953,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
|
||||
|
||||
// get_location() should pick up endpoint_dc_rack from node info
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) -> future<> {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
co_await tm.clear_gently();
|
||||
tm.get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::being_decommissioned);
|
||||
}).get();
|
||||
|
||||
@@ -41,7 +41,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
|
||||
|
||||
std::vector<dht::ring_position> ring = make_ring(s, 10);
|
||||
|
||||
auto check = [&s](locator::token_metadata2_ptr tmptr, dht::partition_range input,
|
||||
auto check = [&s](locator::token_metadata_ptr tmptr, dht::partition_range input,
|
||||
dht::partition_range_vector expected) {
|
||||
query_ranges_to_vnodes_generator ranges_to_vnodes(locator::make_splitter(tmptr), s, {input});
|
||||
auto actual = ranges_to_vnodes(1000);
|
||||
@@ -54,7 +54,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
|
||||
|
||||
{
|
||||
// Ring with minimum token
|
||||
auto tmptr = locator::make_token_metadata2_ptr(locator::token_metadata::config{});
|
||||
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{});
|
||||
const auto host_id = locator::host_id{utils::UUID(0, 1)};
|
||||
tmptr->update_topology(host_id, locator::endpoint_dc_rack{"dc1", "rack1"});
|
||||
tmptr->update_normal_tokens(std::unordered_set<dht::token>({dht::minimum_token()}), host_id).get();
|
||||
@@ -69,7 +69,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
|
||||
}
|
||||
|
||||
{
|
||||
auto tmptr = locator::make_token_metadata2_ptr(locator::token_metadata::config{});
|
||||
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{});
|
||||
const auto id1 = locator::host_id{utils::UUID(0, 1)};
|
||||
const auto id2 = locator::host_id{utils::UUID(0, 2)};
|
||||
tmptr->update_topology(id1, locator::endpoint_dc_rack{"dc1", "rack1"});
|
||||
|
||||
@@ -433,7 +433,7 @@ SEASTAR_TEST_CASE(test_sharder) {
|
||||
|
||||
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
|
||||
|
||||
token_metadata2 tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } });
|
||||
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } });
|
||||
tokm.get_topology().add_or_update_endpoint(tokm.get_topology().my_address(), h1);
|
||||
|
||||
std::vector<tablet_id> tablet_ids;
|
||||
@@ -591,7 +591,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) {
|
||||
|
||||
// Reflects the plan in a given token metadata as if the migrations were fully executed.
|
||||
static
|
||||
void apply_plan(token_metadata2& tm, const migration_plan& plan) {
|
||||
void apply_plan(token_metadata& tm, const migration_plan& plan) {
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table);
|
||||
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
|
||||
@@ -611,7 +611,7 @@ tablet_transition_info migration_to_transition_info(const tablet_migration_info&
|
||||
|
||||
// Reflects the plan in a given token metadata as if the migrations were started but not yet executed.
|
||||
static
|
||||
void apply_plan_as_in_progress(token_metadata2& tm, const migration_plan& plan) {
|
||||
void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) {
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table);
|
||||
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
|
||||
@@ -626,7 +626,7 @@ void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm) {
|
||||
if (plan.empty()) {
|
||||
break;
|
||||
}
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
apply_plan(tm, plan);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -640,7 +640,7 @@ void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_met
|
||||
if (plan.empty()) {
|
||||
break;
|
||||
}
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
apply_plan_as_in_progress(tm, plan);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -650,7 +650,7 @@ void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_met
|
||||
// Completes any in progress tablet migrations.
|
||||
static
|
||||
void execute_transitions(shared_token_metadata& stm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
for (auto&& [tablet, tmap_] : tm.tablets().all_tables()) {
|
||||
auto& tmap = tmap_;
|
||||
for (auto&& [tablet, trinfo]: tmap.transitions()) {
|
||||
@@ -689,7 +689,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
@@ -783,7 +783,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&](token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&](token_metadata& tm) {
|
||||
const unsigned shard_count = 2;
|
||||
|
||||
tm.update_host_id(host1, ip1);
|
||||
@@ -839,7 +839,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host3) == 0);
|
||||
}
|
||||
|
||||
stm.mutate_token_metadata([&](token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&](token_metadata& tm) {
|
||||
tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::left);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -885,7 +885,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) {
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&](token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&](token_metadata& tm) {
|
||||
const unsigned shard_count = 1;
|
||||
|
||||
tm.update_host_id(host1, ip1);
|
||||
@@ -986,7 +986,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) {
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&](token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&](token_metadata& tm) {
|
||||
const unsigned shard_count = 1;
|
||||
|
||||
tm.update_host_id(host1, ip1);
|
||||
@@ -1060,7 +1060,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) {
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&](token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&](token_metadata& tm) {
|
||||
const unsigned shard_count = 2;
|
||||
|
||||
tm.update_host_id(host1, ip1);
|
||||
@@ -1117,7 +1117,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions)
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
@@ -1186,7 +1186,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) {
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
@@ -1249,7 +1249,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
@@ -1405,7 +1405,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
|
||||
});
|
||||
|
||||
size_t total_tablet_count = 0;
|
||||
stm.mutate_token_metadata([&](token_metadata2& tm) {
|
||||
stm.mutate_token_metadata([&](token_metadata& tm) {
|
||||
tablet_metadata tmeta;
|
||||
|
||||
int i = 0;
|
||||
|
||||
@@ -28,8 +28,8 @@ namespace {
|
||||
};
|
||||
}
|
||||
|
||||
mutable_token_metadata2_ptr create_token_metadata(host_id this_host_id) {
|
||||
return make_lw_shared<token_metadata2>(token_metadata::config {
|
||||
mutable_token_metadata_ptr create_token_metadata(host_id this_host_id) {
|
||||
return make_lw_shared<token_metadata>(token_metadata::config {
|
||||
topology::config {
|
||||
.this_host_id = this_host_id,
|
||||
.local_dc_rack = get_dc_rack(this_host_id)
|
||||
@@ -38,7 +38,7 @@ namespace {
|
||||
}
|
||||
|
||||
template <typename Strategy>
|
||||
mutable_vnode_erm_ptr create_erm(mutable_token_metadata2_ptr tmptr, replication_strategy_config_options opts = {}) {
|
||||
mutable_vnode_erm_ptr create_erm(mutable_token_metadata_ptr tmptr, replication_strategy_config_options opts = {}) {
|
||||
dc_rack_fn<host_id> get_dc_rack_fn = get_dc_rack;
|
||||
tmptr->update_topology_change_info(get_dc_rack_fn).get();
|
||||
auto strategy = seastar::make_shared<Strategy>(std::move(opts));
|
||||
|
||||
@@ -640,7 +640,7 @@ private:
|
||||
host_id = linfo.host_id;
|
||||
_sys_ks.local().save_local_info(std::move(linfo), _snitch.local()->get_location(), my_address, my_address).get();
|
||||
}
|
||||
locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = host_id, &cfg_in] (locator::token_metadata2& tm) {
|
||||
locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = host_id, &cfg_in] (locator::token_metadata& tm) {
|
||||
auto& topo = tm.get_topology();
|
||||
topo.set_host_id_cfg(hostid);
|
||||
topo.add_or_update_endpoint(cfg_in.broadcast_address,
|
||||
|
||||
Reference in New Issue
Block a user