Merge "Constify access to token_metadata" from Benny

"
We keep refrences to locator::token_metadata in many places.
Most of them are for read-only access and only a few want
to modify the token_metadata.

Recently, in 94995acedb,
we added yielding loops that access token_metadata in order
to avoid cpu stalls.  To make that possible we need to make
sure they token_metadata object they are traversing won't change
mid-loop.

This series is a first step in ensuring the serialization of
updates to shared token metadata to reading it.

Test: unit(dev)
Dtest: bootstrap_test:TestBootstrap.start_stop_test{,_node}, update_cluster_layout_tests.py -a next-gating(dev)
"

* tag 'constify-token-metadata-access-v2' of github.com:bhalevy/scylla:
  api/http_context: keep a const sharded<locator::token_metadata>&
  gossiper: keep a const token_metadata&
  storage_service: separate get_mutable_token_metadata
  range_streamer: keep a const token_metadata&
  storage_proxy: delete unused get_restricted_ranges declaration
  storage_proxy: keep a const token_metadata&
  storage_proxy: get rid of mutable get_token_metadata getter
  database: keep const token_metadata&
  database: keyspace_metadata: pass const locator::token_metadata& around
  everywhere_replication_strategy: move methods out of line
  replication_strategy: keep a const token_metadata&
  abstract_replication_strategy: get_ranges: accept const token_metadata&
  token_metadata: rename calculate_pending_ranges to update_pending_ranges
  token_metadata: mark const methods
  token_ranges: pending_endpoints_for: return empty vector if keyspace not found
  token_ranges: get_pending_ranges: return empty vector if keyspace not found
  token_ranges: get rid of unused get_pending_ranges variant
  replication_strategy: calculate_natural_endpoints: make token_metadata& param const
  token_metadata: add get_datacenter_racks() const variant
This commit is contained in:
Avi Kivity
2020-08-20 18:42:10 +03:00
26 changed files with 211 additions and 217 deletions

View File

@@ -423,7 +423,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
// TODO: label
// TODO: creation time
auto& tm = _proxy.get_token_metadata();
const auto& tm = _proxy.get_token_metadata();
// cannot really "resume" query, must iterate all data. because we cannot query neither "time" (pk) > something,
// or on expired...
// TODO: maybe add secondary index to topology table to enable this?

View File

@@ -39,11 +39,11 @@ struct http_context {
distributed<database>& db;
distributed<service::storage_proxy>& sp;
service::load_meter& lmeter;
sharded<locator::token_metadata>& token_metadata;
const sharded<locator::token_metadata>& token_metadata;
http_context(distributed<database>& _db,
distributed<service::storage_proxy>& _sp,
service::load_meter& _lm, sharded<locator::token_metadata>& _tm)
service::load_meter& _lm, const sharded<locator::token_metadata>& _tm)
: db(_db), sp(_sp), lmeter(_lm), token_metadata(_tm) {
}
};

View File

@@ -587,7 +587,7 @@ db_context::builder& db_context::builder::with_migration_notifier(service::migra
return *this;
}
db_context::builder& db_context::builder::with_token_metadata(locator::token_metadata& token_metadata) {
db_context::builder& db_context::builder::with_token_metadata(const locator::token_metadata& token_metadata) {
_token_metadata = token_metadata;
return *this;
}

View File

@@ -100,19 +100,19 @@ public:
struct db_context final {
service::storage_proxy& _proxy;
service::migration_notifier& _migration_notifier;
locator::token_metadata& _token_metadata;
const locator::token_metadata& _token_metadata;
cdc::metadata& _cdc_metadata;
class builder final {
service::storage_proxy& _proxy;
std::optional<std::reference_wrapper<service::migration_notifier>> _migration_notifier;
std::optional<std::reference_wrapper<locator::token_metadata>> _token_metadata;
std::optional<std::reference_wrapper<const locator::token_metadata>> _token_metadata;
std::optional<std::reference_wrapper<cdc::metadata>> _cdc_metadata;
public:
builder(service::storage_proxy& proxy);
builder& with_migration_notifier(service::migration_notifier& migration_notifier);
builder& with_token_metadata(locator::token_metadata& token_metadata);
builder& with_token_metadata(const locator::token_metadata& token_metadata);
builder& with_cdc_metadata(cdc::metadata&);
db_context build();

View File

@@ -165,7 +165,7 @@ bool string_pair_eq::operator()(spair lhs, spair rhs) const {
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as)
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::token_metadata& tm, abort_source& as)
: _stats(make_lw_shared<db_stats>())
, _cl_stats(std::make_unique<cell_locker_stats>())
, _cfg(cfg)
@@ -871,7 +871,7 @@ bool database::column_family_exists(const utils::UUID& uuid) const {
}
void
keyspace::create_replication_strategy(locator::token_metadata& tm, const std::map<sstring, sstring>& options) {
keyspace::create_replication_strategy(const locator::token_metadata& tm, const std::map<sstring, sstring>& options) {
using namespace locator;
_replication_strategy =
@@ -895,7 +895,7 @@ keyspace::set_replication_strategy(std::unique_ptr<locator::abstract_replication
_replication_strategy = std::move(replication_strategy);
}
void keyspace::update_from(locator::token_metadata& tm, ::lw_shared_ptr<keyspace_metadata> ksm) {
void keyspace::update_from(const locator::token_metadata& tm, ::lw_shared_ptr<keyspace_metadata> ksm) {
_metadata = std::move(ksm);
create_replication_strategy(tm, _metadata->strategy_options());
}
@@ -1011,7 +1011,7 @@ const column_family& database::find_column_family(const schema_ptr& schema) cons
using strategy_class_registry = class_registry<
locator::abstract_replication_strategy,
const sstring&,
locator::token_metadata&,
const locator::token_metadata&,
locator::snitch_ptr&,
const std::map<sstring, sstring>&>;
@@ -1044,7 +1044,7 @@ keyspace_metadata::keyspace_metadata(std::string_view name,
}
}
void keyspace_metadata::validate(locator::token_metadata& tm) const {
void keyspace_metadata::validate(const locator::token_metadata& tm) const {
using namespace locator;
abstract_replication_strategy::validate_replication_strategy(name(), strategy_name(), tm, strategy_options());
}

View File

@@ -1080,7 +1080,7 @@ public:
std::map<sstring, sstring> options,
bool durables_writes,
std::vector<schema_ptr> cf_defs = std::vector<schema_ptr>{});
void validate(locator::token_metadata& tm) const;
void validate(const locator::token_metadata& tm) const;
const sstring& name() const {
return _name;
}
@@ -1150,14 +1150,14 @@ private:
public:
explicit keyspace(lw_shared_ptr<keyspace_metadata> metadata, config cfg);
void update_from(locator::token_metadata& tm, lw_shared_ptr<keyspace_metadata>);
void update_from(const locator::token_metadata& tm, lw_shared_ptr<keyspace_metadata>);
/** Note: return by shared pointer value, since the meta data is
* semi-volatile. I.e. we could do alter keyspace at any time, and
* boom, it is replaced.
*/
lw_shared_ptr<keyspace_metadata> metadata() const;
void create_replication_strategy(locator::token_metadata& tm, const std::map<sstring, sstring>& options);
void create_replication_strategy(const locator::token_metadata& tm, const std::map<sstring, sstring>& options);
/**
* This should not really be return by reference, since replication
* strategy is also volatile in that it could be replaced at "any" time.
@@ -1337,7 +1337,7 @@ private:
service::migration_notifier& _mnotifier;
gms::feature_service& _feat;
locator::token_metadata& _token_metadata;
const locator::token_metadata& _token_metadata;
bool _supports_infinite_bound_range_deletions = false;
gms::feature::listener_registration _infinite_bound_range_deletions_reg;
@@ -1377,7 +1377,7 @@ public:
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }
future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::token_metadata& tm, abort_source& as);
database(database&&) = delete;
~database();
@@ -1403,7 +1403,6 @@ public:
}
const locator::token_metadata& get_token_metadata() const { return _token_metadata; }
locator::token_metadata& get_token_metadata() { return _token_metadata; }
service::migration_notifier& get_notifier() { return _mnotifier; }
const service::migration_notifier& get_notifier() const { return _mnotifier; }

View File

@@ -101,7 +101,7 @@ public:
}
};
range_streamer(distributed<database>& db, token_metadata& tm, abort_source& abort_source, std::unordered_set<token> tokens, inet_address address, sstring description, streaming::stream_reason reason)
range_streamer(distributed<database>& db, const token_metadata& tm, abort_source& abort_source, std::unordered_set<token> tokens, inet_address address, sstring description, streaming::stream_reason reason)
: _db(db)
, _metadata(tm)
, _abort_source(abort_source)
@@ -113,7 +113,7 @@ public:
_abort_source.check();
}
range_streamer(distributed<database>& db, token_metadata& tm, abort_source& abort_source, inet_address address, sstring description, streaming::stream_reason reason)
range_streamer(distributed<database>& db, const token_metadata& tm, abort_source& abort_source, inet_address address, sstring description, streaming::stream_reason reason)
: range_streamer(db, tm, abort_source, std::unordered_set<token>(), address, description, reason) {
}
@@ -165,7 +165,7 @@ public:
size_t nr_ranges_to_stream();
private:
distributed<database>& _db;
token_metadata& _metadata;
const token_metadata& _metadata;
abort_source& _abort_source;
std::unordered_set<token> _tokens;
inet_address _address;

View File

@@ -123,7 +123,7 @@ public:
void on_restart(inet_address, endpoint_state) override {}
};
gossiper::gossiper(abort_source& as, feature_service& features, locator::token_metadata& tokens, netw::messaging_service& ms, db::config& cfg)
gossiper::gossiper(abort_source& as, feature_service& features, const locator::token_metadata& tokens, netw::messaging_service& ms, db::config& cfg)
: _abort_source(as)
, _feature_service(features)
, _token_metadata(tokens)

View File

@@ -238,7 +238,7 @@ private:
// The value must be kept alive until completes and not change.
future<> replicate(inet_address, application_state key, const versioned_value& value);
public:
explicit gossiper(abort_source& as, feature_service& features, locator::token_metadata& tokens, netw::messaging_service& ms, db::config& cfg);
explicit gossiper(abort_source& as, feature_service& features, const locator::token_metadata& tokens, netw::messaging_service& ms, db::config& cfg);
void set_last_processed_message_at();
void set_last_processed_message_at(clk::time_point tp);
@@ -559,7 +559,7 @@ private:
abort_source& _abort_source;
condition_variable _features_condvar;
feature_service& _feature_service;
locator::token_metadata& _token_metadata;
const locator::token_metadata& _token_metadata;
netw::messaging_service& _messaging;
db::config& _cfg;
failure_detector _fd;

View File

@@ -30,7 +30,7 @@ logging::logger abstract_replication_strategy::logger("replication_strategy");
abstract_replication_strategy::abstract_replication_strategy(
const sstring& ks_name,
token_metadata& token_metadata,
const token_metadata& token_metadata,
snitch_ptr& snitch,
const std::map<sstring, sstring>& config_options,
replication_strategy_type my_type)
@@ -40,12 +40,12 @@ abstract_replication_strategy::abstract_replication_strategy(
, _snitch(snitch)
, _my_type(my_type) {}
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, const std::map<sstring, sstring>& config_options) {
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, const token_metadata& tk_metadata, const std::map<sstring, sstring>& config_options) {
assert(locator::i_endpoint_snitch::get_local_snitch_ptr());
try {
return create_object<abstract_replication_strategy,
const sstring&,
token_metadata&,
const token_metadata&,
snitch_ptr&,
const std::map<sstring, sstring>&>
(strategy_name, ks_name, tk_metadata,
@@ -57,7 +57,7 @@ std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::cr
void abstract_replication_strategy::validate_replication_strategy(const sstring& ks_name,
const sstring& strategy_name,
token_metadata& token_metadata,
const token_metadata& token_metadata,
const std::map<sstring, sstring>& config_options)
{
auto strategy = create_replication_strategy(ks_name, strategy_name, token_metadata, config_options);
@@ -177,17 +177,17 @@ abstract_replication_strategy::get_ranges_in_thread(inet_address ep) const {
}
dht::token_range_vector
abstract_replication_strategy::get_ranges(inet_address ep, token_metadata& tm) const {
abstract_replication_strategy::get_ranges(inet_address ep, const token_metadata& tm) const {
return do_get_ranges(ep, tm, false);
}
dht::token_range_vector
abstract_replication_strategy::get_ranges_in_thread(inet_address ep, token_metadata& tm) const {
abstract_replication_strategy::get_ranges_in_thread(inet_address ep, const token_metadata& tm) const {
return do_get_ranges(ep, tm, true);
}
dht::token_range_vector
abstract_replication_strategy::do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const {
abstract_replication_strategy::do_get_ranges(inet_address ep, const token_metadata& tm, bool can_yield) const {
dht::token_range_vector ret;
auto prev_tok = tm.sorted_tokens().back();
for (auto tok : tm.sorted_tokens()) {
@@ -244,7 +244,7 @@ abstract_replication_strategy::get_primary_ranges_within_dc(inet_address ep) {
}
std::unordered_multimap<inet_address, dht::token_range>
abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
abstract_replication_strategy::get_address_ranges(const token_metadata& tm) const {
std::unordered_multimap<inet_address, dht::token_range> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector r = tm.get_primary_ranges_for(t);
@@ -260,7 +260,7 @@ abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
}
std::unordered_map<dht::token_range, std::vector<inet_address>>
abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
abstract_replication_strategy::get_range_addresses(const token_metadata& tm) const {
std::unordered_map<dht::token_range, std::vector<inet_address>> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
@@ -273,12 +273,12 @@ abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
}
dht::token_range_vector
abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address) {
abstract_replication_strategy::get_pending_address_ranges(const token_metadata& tm, token pending_token, inet_address pending_address) const {
return get_pending_address_ranges(tm, std::unordered_set<token>{pending_token}, pending_address);
}
dht::token_range_vector
abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address) {
abstract_replication_strategy::get_pending_address_ranges(const token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address) const {
dht::token_range_vector ret;
auto temp = tm.clone_only_token_map();
temp.update_normal_tokens(pending_tokens, pending_address);

View File

@@ -59,7 +59,7 @@ protected:
// TODO: Do we need this member at all?
//keyspace* _keyspace = nullptr;
std::map<sstring, sstring> _config_options;
token_metadata& _token_metadata;
const token_metadata& _token_metadata;
snitch_ptr& _snitch;
replication_strategy_type _my_type;
@@ -83,16 +83,16 @@ protected:
public:
abstract_replication_strategy(
const sstring& keyspace_name,
token_metadata& token_metadata,
const token_metadata& token_metadata,
snitch_ptr& snitch,
const std::map<sstring, sstring>& config_options,
replication_strategy_type my_type);
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, token_metadata& tm) const = 0;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const = 0;
virtual ~abstract_replication_strategy() {}
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, const std::map<sstring, sstring>& config_options);
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, const token_metadata& token_metadata, const std::map<sstring, sstring>& config_options);
static void validate_replication_strategy(const sstring& ks_name,
const sstring& strategy_name,
token_metadata& token_metadata,
const token_metadata& token_metadata,
const std::map<sstring, sstring>& config_options);
virtual std::vector<inet_address> get_natural_endpoints(const token& search_token);
virtual std::vector<inet_address> get_natural_endpoints_without_node_being_replaced(const token& search_token);
@@ -116,10 +116,10 @@ public:
dht::token_range_vector get_ranges_in_thread(inet_address ep) const;
// Use the token_metadata provided by the caller instead of _token_metadata
dht::token_range_vector get_ranges(inet_address ep, token_metadata& tm) const;
dht::token_range_vector get_ranges_in_thread(inet_address ep, token_metadata& tm) const;
dht::token_range_vector get_ranges(inet_address ep, const token_metadata& tm) const;
dht::token_range_vector get_ranges_in_thread(inet_address ep, const token_metadata& tm) const;
private:
dht::token_range_vector do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const;
dht::token_range_vector do_get_ranges(inet_address ep, const token_metadata& tm, bool can_yield) const;
public:
// get_primary_ranges() returns the list of "primary ranges" for the given
@@ -134,13 +134,13 @@ public:
// instead of one node globally.
dht::token_range_vector get_primary_ranges_within_dc(inet_address ep);
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(token_metadata& tm) const;
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_addresses(token_metadata& tm) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_addresses(const token_metadata& tm) const;
dht::token_range_vector get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address);
dht::token_range_vector get_pending_address_ranges(const token_metadata& tm, token pending_token, inet_address pending_address) const;
dht::token_range_vector get_pending_address_ranges(token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address);
dht::token_range_vector get_pending_address_ranges(const token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address) const;
};
}

View File

@@ -40,12 +40,17 @@
#include "locator/everywhere_replication_strategy.hh"
#include "utils/class_registrator.hh"
#include "utils/fb_utilities.hh"
#include "locator/token_metadata.hh"
namespace locator {
everywhere_replication_strategy::everywhere_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
everywhere_replication_strategy::everywhere_replication_strategy(const sstring& keyspace_name, const token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::everywhere_topology) {}
std::vector<inet_address> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const {
return tm.get_all_endpoints();
}
std::vector<inet_address> everywhere_replication_strategy::get_natural_endpoints(const token& search_token) {
if (_token_metadata.sorted_tokens().empty()) {
return std::vector<inet_address>({utils::fb_utilities::get_broadcast_address()});
@@ -53,7 +58,11 @@ std::vector<inet_address> everywhere_replication_strategy::get_natural_endpoints
return calculate_natural_endpoints(search_token, _token_metadata);
}
using registry = class_registrator<abstract_replication_strategy, everywhere_replication_strategy, const sstring&, token_metadata&, snitch_ptr&, const std::map<sstring, sstring>&>;
size_t everywhere_replication_strategy::get_replication_factor() const {
return _token_metadata.get_all_endpoints_count();
}
using registry = class_registrator<abstract_replication_strategy, everywhere_replication_strategy, const sstring&, const token_metadata&, snitch_ptr&, const std::map<sstring, sstring>&>;
static registry registrator("org.apache.cassandra.locator.EverywhereStrategy");
static registry registrator_short_name("EverywhereStrategy");
}

View File

@@ -44,11 +44,9 @@
namespace locator {
class everywhere_replication_strategy : public abstract_replication_strategy {
public:
everywhere_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring,sstring>& config_options);
everywhere_replication_strategy(const sstring& keyspace_name, const token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring,sstring>& config_options);
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, token_metadata& tm) const override {
return tm.get_all_endpoints();
}
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
std::vector<inet_address> get_natural_endpoints(const token& search_token) override;
virtual void validate_options() const override { /* noop */ }
@@ -58,9 +56,7 @@ public:
return std::nullopt;
}
virtual size_t get_replication_factor() const override {
return _token_metadata.get_all_endpoints_count();
}
virtual size_t get_replication_factor() const override;
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override {
return true;

View File

@@ -231,7 +231,7 @@ future<> gossiping_property_file_snitch::reload_configuration() {
parallel_for_each(cpus.begin(), cpus.end(), [] (unsigned int c) {
return smp::submit_to(c, [] {
if (service::get_storage_service().local_is_initialized()) {
auto& tmd = service::get_local_storage_service().get_token_metadata();
auto& tmd = service::get_local_storage_service().get_mutable_token_metadata();
// initiate the token metadata endpoints cache reset
tmd.invalidate_cached_rings();

View File

@@ -27,14 +27,14 @@
namespace locator {
local_strategy::local_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
local_strategy::local_strategy(const sstring& keyspace_name, const token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::local) {}
std::vector<inet_address> local_strategy::get_natural_endpoints(const token& t) {
return calculate_natural_endpoints(t, _token_metadata);
}
std::vector<inet_address> local_strategy::calculate_natural_endpoints(const token& t, token_metadata& tm) const {
std::vector<inet_address> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
return std::vector<inet_address>({utils::fb_utilities::get_broadcast_address()});
}
@@ -50,7 +50,7 @@ size_t local_strategy::get_replication_factor() const {
return 1;
}
using registry = class_registrator<abstract_replication_strategy, local_strategy, const sstring&, token_metadata&, snitch_ptr&, const std::map<sstring, sstring>&>;
using registry = class_registrator<abstract_replication_strategy, local_strategy, const sstring&, const token_metadata&, snitch_ptr&, const std::map<sstring, sstring>&>;
static registry registrator("org.apache.cassandra.locator.LocalStrategy");
static registry registrator_short_name("LocalStrategy");

View File

@@ -36,9 +36,9 @@ using token = dht::token;
class local_strategy : public abstract_replication_strategy {
protected:
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, token_metadata& tm) const override;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
public:
local_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options);
local_strategy(const sstring& keyspace_name, const token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options);
virtual ~local_strategy() {};
virtual size_t get_replication_factor() const;
/**

View File

@@ -59,7 +59,7 @@ bool operator==(const endpoint_dc_rack& d1, const endpoint_dc_rack& d2) {
network_topology_strategy::network_topology_strategy(
const sstring& keyspace_name,
token_metadata& token_metadata,
const token_metadata& token_metadata,
snitch_ptr& snitch,
const std::map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name,
@@ -104,7 +104,7 @@ network_topology_strategy::network_topology_strategy(
std::vector<inet_address>
network_topology_strategy::calculate_natural_endpoints(
const token& search_token, token_metadata& tm) const {
const token& search_token, const token_metadata& tm) const {
using endpoint_set = utils::sequenced_set<inet_address>;
using endpoint_dc_rack_set = std::unordered_set<endpoint_dc_rack>;
@@ -200,20 +200,20 @@ network_topology_strategy::calculate_natural_endpoints(
// tracks the racks we have already placed replicas in
endpoint_dc_rack_set seen_racks;
topology& tp = tm.get_topology();
const topology& tp = tm.get_topology();
//
// all endpoints in each DC, so we can check when we have exhausted all
// the members of a DC
//
std::unordered_map<sstring,
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
all_endpoints = tp.get_datacenter_endpoints();
//
// all racks in a DC so we can check when we have exhausted all racks in a
// DC
//
std::unordered_map<sstring,
const std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>&
racks = tp.get_datacenter_racks();
@@ -274,7 +274,7 @@ std::optional<std::set<sstring>> network_topology_strategy::recognized_options()
return std::nullopt;
}
using registry = class_registrator<abstract_replication_strategy, network_topology_strategy, const sstring&, token_metadata&, snitch_ptr&, const std::map<sstring, sstring>&>;
using registry = class_registrator<abstract_replication_strategy, network_topology_strategy, const sstring&, const token_metadata&, snitch_ptr&, const std::map<sstring, sstring>&>;
static registry registrator("org.apache.cassandra.locator.NetworkTopologyStrategy");
static registry registrator_short_name("NetworkTopologyStrategy");
}

View File

@@ -49,7 +49,7 @@ class network_topology_strategy : public abstract_replication_strategy {
public:
network_topology_strategy(
const sstring& keyspace_name,
token_metadata& token_metadata,
const token_metadata& token_metadata,
snitch_ptr& snitch,
const std::map<sstring,sstring>& config_options);
@@ -76,7 +76,7 @@ protected:
* progress in each DC, rack etc.
*/
virtual std::vector<inet_address> calculate_natural_endpoints(
const token& search_token, token_metadata& tm) const override;
const token& search_token, const token_metadata& tm) const override;
virtual void validate_options() const override;

View File

@@ -27,7 +27,7 @@
namespace locator {
simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
simple_strategy::simple_strategy(const sstring& keyspace_name, const token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::simple) {
for (auto& config_pair : config_options) {
auto& key = config_pair.first;
@@ -42,7 +42,7 @@ simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& t
}
}
std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const token& t, token_metadata& tm) const {
std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
const std::vector<token>& tokens = tm.sorted_tokens();
if (tokens.empty()) {
@@ -82,7 +82,7 @@ std::optional<std::set<sstring>>simple_strategy::recognized_options() const {
return {{ "replication_factor" }};
}
using registry = class_registrator<abstract_replication_strategy, simple_strategy, const sstring&, token_metadata&, snitch_ptr&, const std::map<sstring, sstring>&>;
using registry = class_registrator<abstract_replication_strategy, simple_strategy, const sstring&, const token_metadata&, snitch_ptr&, const std::map<sstring, sstring>&>;
static registry registrator("org.apache.cassandra.locator.SimpleStrategy");
static registry registrator_short_name("SimpleStrategy");

View File

@@ -30,9 +30,9 @@ namespace locator {
class simple_strategy : public abstract_replication_strategy {
protected:
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, token_metadata& tm) const override;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
public:
simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options);
simple_strategy(const sstring& keyspace_name, const token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options);
virtual ~simple_strategy() {};
virtual size_t get_replication_factor() const override;
virtual void validate_options() const override;

View File

@@ -131,7 +131,7 @@ public:
return _topology;
}
void debug_show();
void debug_show() const;
#if 0
private static final Logger logger = LoggerFactory.getLogger(TokenMetadata.class);
@@ -357,15 +357,15 @@ public:
#endif
bool is_member(inet_address endpoint);
bool is_member(inet_address endpoint) const;
bool is_leaving(inet_address endpoint);
bool is_leaving(inet_address endpoint) const;
// Is this node being replaced by another node
bool is_being_replaced(inet_address endpoint);
bool is_being_replaced(inet_address endpoint) const;
// Is any node being replaced by another node
bool is_any_node_being_replaced();
bool is_any_node_being_replaced() const;
void add_replacing_endpoint(inet_address existing_node, inet_address replacing_node);
@@ -379,7 +379,7 @@ public:
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
* bootstrap tokens and leaving endpoints are not included in the copy.
*/
token_metadata_impl clone_only_token_map() {
token_metadata_impl clone_only_token_map() const {
return token_metadata_impl(this->_token_to_endpoint_map, this->_endpoint_to_host_id_map, this->_topology);
}
#if 0
@@ -415,7 +415,7 @@ public:
*
* @return new token metadata
*/
token_metadata_impl clone_after_all_left() {
token_metadata_impl clone_after_all_left() const {
auto all_left_metadata = clone_only_token_map();
for (auto endpoint : _leaving_endpoints) {
@@ -432,7 +432,7 @@ public:
*
* @return new token metadata
*/
token_metadata_impl clone_after_all_settled();
token_metadata_impl clone_after_all_settled() const;
#if 0
public InetAddress getEndpoint(Token token)
{
@@ -448,21 +448,18 @@ public:
}
#endif
public:
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens);
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens) const;
dht::token_range_vector get_primary_ranges_for(token right);
dht::token_range_vector get_primary_ranges_for(token right) const;
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
static range<dht::token> interval_to_range(boost::icl::interval<token>::interval_type i);
private:
std::unordered_multimap<range<token>, inet_address>& get_pending_ranges_mm(sstring keyspace_name);
void set_pending_ranges(const sstring& keyspace_name, std::unordered_multimap<range<token>, inet_address> new_pending_ranges);
public:
/** a mutable map may be returned but caller should not modify it */
const std::unordered_map<range<token>, std::unordered_set<inet_address>>& get_pending_ranges(sstring keyspace_name);
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint);
// returns an empty vector if keyspace_name not found
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint) const;
/**
* Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
*
@@ -486,25 +483,25 @@ public:
* NOTE: This is heavy and ineffective operation. This will be done only once when a node
* changes state in the cluster, so it should be manageable.
*/
future<> calculate_pending_ranges(
token_metadata& unpimplified_this,
abstract_replication_strategy& strategy, const sstring& keyspace_name);
future<> update_pending_ranges(
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy, const sstring& keyspace_name);
future<> calculate_pending_ranges_for_leaving(
token_metadata& unpimplified_this,
abstract_replication_strategy& strategy,
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata);
lw_shared_ptr<token_metadata> all_left_metadata) const;
void calculate_pending_ranges_for_bootstrap(
abstract_replication_strategy& strategy,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata);
lw_shared_ptr<token_metadata> all_left_metadata) const;
future<> calculate_pending_ranges_for_replacing(
token_metadata& unpimplified_this,
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges);
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges) const;
public:
token get_predecessor(token t);
token get_predecessor(token t) const;
#if 0
public Token getSuccessor(Token token)
@@ -730,9 +727,10 @@ public:
return sb.toString();
}
#endif
sstring print_pending_ranges();
sstring print_pending_ranges() const;
public:
std::vector<gms::inet_address> pending_endpoints_for(const token& token, const sstring& keyspace_name);
// returns empty vector if keyspace_name not found.
std::vector<gms::inet_address> pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
#if 0
/**
* @deprecated retained for benefit of old tests
@@ -745,12 +743,12 @@ public:
public:
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading();
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading() const;
/**
* @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes
* in the cluster.
*/
std::map<token, inet_address> get_normal_and_bootstrapping_token_to_endpoint_map();
std::map<token, inet_address> get_normal_and_bootstrapping_token_to_endpoint_map() const;
#if 0
/**
@@ -1089,7 +1087,7 @@ std::optional<inet_address> token_metadata_impl::get_endpoint(const token& token
}
}
void token_metadata_impl::debug_show() {
void token_metadata_impl::debug_show() const {
auto reporter = std::make_shared<timer<lowres_clock>>();
reporter->set_callback ([reporter, this] {
fmt::print("Endpoint -> Token\n");
@@ -1162,7 +1160,7 @@ const std::unordered_map<inet_address, utils::UUID>& token_metadata_impl::get_en
return _endpoint_to_host_id_map;
}
bool token_metadata_impl::is_member(inet_address endpoint) {
bool token_metadata_impl::is_member(inet_address endpoint) const {
return _topology.has_endpoint(endpoint);
}
@@ -1231,15 +1229,15 @@ void token_metadata_impl::remove_bootstrap_tokens(std::unordered_set<token> toke
}
}
bool token_metadata_impl::is_leaving(inet_address endpoint) {
bool token_metadata_impl::is_leaving(inet_address endpoint) const {
return _leaving_endpoints.contains(endpoint);
}
bool token_metadata_impl::is_being_replaced(inet_address endpoint) {
bool token_metadata_impl::is_being_replaced(inet_address endpoint) const {
return _replacing_endpoints.contains(endpoint);
}
bool token_metadata_impl::is_any_node_being_replaced() {
bool token_metadata_impl::is_any_node_being_replaced() const {
return !_replacing_endpoints.empty();
}
@@ -1254,7 +1252,7 @@ void token_metadata_impl::remove_endpoint(inet_address endpoint) {
invalidate_cached_rings();
}
token token_metadata_impl::get_predecessor(token t) {
token token_metadata_impl::get_predecessor(token t) const {
auto& tokens = sorted_tokens();
auto it = std::lower_bound(tokens.begin(), tokens.end(), t);
if (it == tokens.end() || *it != t) {
@@ -1270,7 +1268,7 @@ token token_metadata_impl::get_predecessor(token t) {
}
}
dht::token_range_vector token_metadata_impl::get_primary_ranges_for(std::unordered_set<token> tokens) {
dht::token_range_vector token_metadata_impl::get_primary_ranges_for(std::unordered_set<token> tokens) const {
dht::token_range_vector ranges;
ranges.reserve(tokens.size() + 1); // one of the ranges will wrap
for (auto right : tokens) {
@@ -1283,7 +1281,7 @@ dht::token_range_vector token_metadata_impl::get_primary_ranges_for(std::unorder
return ranges;
}
dht::token_range_vector token_metadata_impl::get_primary_ranges_for(token right) {
dht::token_range_vector token_metadata_impl::get_primary_ranges_for(token right) const {
return get_primary_ranges_for(std::unordered_set<token>{right});
}
@@ -1361,34 +1359,27 @@ void token_metadata_impl::set_pending_ranges(const sstring& keyspace_name,
_pending_ranges_map[keyspace_name] = std::move(map);
}
std::unordered_multimap<range<token>, inet_address>&
token_metadata_impl::get_pending_ranges_mm(sstring keyspace_name) {
return _pending_ranges[keyspace_name];
}
const std::unordered_map<range<token>, std::unordered_set<inet_address>>&
token_metadata_impl::get_pending_ranges(sstring keyspace_name) {
return _pending_ranges_map[keyspace_name];
}
std::vector<range<token>>
token_metadata_impl::get_pending_ranges(sstring keyspace_name, inet_address endpoint) {
token_metadata_impl::get_pending_ranges(sstring keyspace_name, inet_address endpoint) const {
std::vector<range<token>> ret;
for (auto x : get_pending_ranges_mm(keyspace_name)) {
const auto it = _pending_ranges.find(keyspace_name);
if (it != _pending_ranges.end()) {
for (auto x : it->second) {
auto& range_token = x.first;
auto& ep = x.second;
if (ep == endpoint) {
ret.push_back(range_token);
}
}
}
return ret;
}
future<> token_metadata_impl::calculate_pending_ranges_for_leaving(
token_metadata& unpimplified_this,
abstract_replication_strategy& strategy,
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata) {
lw_shared_ptr<token_metadata> all_left_metadata) const {
std::unordered_multimap<inet_address, dht::token_range> address_ranges = strategy.get_address_ranges(unpimplified_this);
// get all ranges that will be affected by leaving nodes
std::unordered_set<range<token>> affected_ranges;
@@ -1423,9 +1414,9 @@ future<> token_metadata_impl::calculate_pending_ranges_for_leaving(
}
future<> token_metadata_impl::calculate_pending_ranges_for_replacing(
token_metadata& unpimplified_this,
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges) {
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges) const {
if (_replacing_endpoints.empty()) {
return make_ready_future<>();
}
@@ -1448,9 +1439,9 @@ future<> token_metadata_impl::calculate_pending_ranges_for_replacing(
}
void token_metadata_impl::calculate_pending_ranges_for_bootstrap(
abstract_replication_strategy& strategy,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata) {
lw_shared_ptr<token_metadata> all_left_metadata) const {
// For each of the bootstrapping nodes, simply add and remove them one by one to
// allLeftMetadata and check in between what their ranges would be.
std::unordered_multimap<inet_address, token> bootstrap_addresses;
@@ -1478,9 +1469,9 @@ void token_metadata_impl::calculate_pending_ranges_for_bootstrap(
}
}
future<> token_metadata_impl::calculate_pending_ranges(
token_metadata& unpimplified_this,
abstract_replication_strategy& strategy, const sstring& keyspace_name) {
future<> token_metadata_impl::update_pending_ranges(
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy, const sstring& keyspace_name) {
auto new_pending_ranges = make_lw_shared<std::unordered_multimap<range<token>, inet_address>>();
tlogger.debug("calculate_pending_ranges: keyspace_name={}, bootstrap_tokens={}, leaving nodes={}, replacing_endpoints={}",
@@ -1520,7 +1511,7 @@ size_t token_metadata_impl::count_normal_token_owners() const {
return eps.size();
}
sstring token_metadata_impl::print_pending_ranges() {
sstring token_metadata_impl::print_pending_ranges() const {
std::stringstream ss;
for (auto& x : _pending_ranges) {
@@ -1553,7 +1544,7 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) {
_replacing_endpoints.erase(existing_node);
}
token_metadata_impl token_metadata_impl::clone_after_all_settled() {
token_metadata_impl token_metadata_impl::clone_after_all_settled() const {
token_metadata_impl metadata = clone_only_token_map();
for (auto endpoint : _leaving_endpoints) {
@@ -1563,35 +1554,37 @@ token_metadata_impl token_metadata_impl::clone_after_all_settled() {
return metadata;
}
std::vector<gms::inet_address> token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) {
// Fast path 0: no pending ranges at all
if (_pending_ranges_interval_map.empty()) {
std::vector<gms::inet_address> token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
// Fast path 0: pending ranges not found for this keyspace_name
const auto pr_it = _pending_ranges_interval_map.find(keyspace_name);
if (pr_it == _pending_ranges_interval_map.end()) {
return {};
}
// Fast path 1: no pending ranges for this keyspace_name
if (_pending_ranges_interval_map[keyspace_name].empty()) {
// Fast path 1: empty pending ranges for this keyspace_name
const auto& ks_map = pr_it->second;
if (ks_map.empty()) {
return {};
}
// Slow path: lookup pending ranges
std::vector<gms::inet_address> endpoints;
auto interval = range_to_interval(range<dht::token>(token));
auto it = _pending_ranges_interval_map[keyspace_name].find(interval);
if (it != _pending_ranges_interval_map[keyspace_name].end()) {
const auto it = ks_map.find(interval);
if (it != ks_map.end()) {
// interval_map does not work with std::vector, convert to std::vector of ips
endpoints = std::vector<gms::inet_address>(it->second.begin(), it->second.end());
}
return endpoints;
}
std::map<token, inet_address> token_metadata_impl::get_normal_and_bootstrapping_token_to_endpoint_map() {
std::map<token, inet_address> token_metadata_impl::get_normal_and_bootstrapping_token_to_endpoint_map() const {
std::map<token, inet_address> ret(_token_to_endpoint_map.begin(), _token_to_endpoint_map.end());
ret.insert(_bootstrap_tokens.begin(), _bootstrap_tokens.end());
return ret;
}
std::multimap<inet_address, token> token_metadata_impl::get_endpoint_to_token_map_for_reading() {
std::multimap<inet_address, token> token_metadata_impl::get_endpoint_to_token_map_for_reading() const {
std::multimap<inet_address, token> cloned;
for (const auto& x : _token_to_endpoint_map) {
cloned.emplace(x.second, x.first);
@@ -1758,7 +1751,7 @@ token_metadata::get_topology() const {
}
void
token_metadata::debug_show() {
token_metadata::debug_show() const {
_impl->debug_show();
}
@@ -1813,22 +1806,22 @@ token_metadata::remove_endpoint(inet_address endpoint) {
}
bool
token_metadata::is_member(inet_address endpoint) {
token_metadata::is_member(inet_address endpoint) const {
return _impl->is_member(endpoint);
}
bool
token_metadata::is_leaving(inet_address endpoint) {
token_metadata::is_leaving(inet_address endpoint) const {
return _impl->is_leaving(endpoint);
}
bool
token_metadata::is_being_replaced(inet_address endpoint) {
token_metadata::is_being_replaced(inet_address endpoint) const {
return _impl->is_being_replaced(endpoint);
}
bool
token_metadata::is_any_node_being_replaced() {
token_metadata::is_any_node_being_replaced() const {
return _impl->is_any_node_being_replaced();
}
@@ -1841,27 +1834,27 @@ void token_metadata::del_replacing_endpoint(inet_address existing_node) {
}
token_metadata
token_metadata::clone_only_token_map() {
token_metadata::clone_only_token_map() const {
return token_metadata(std::make_unique<token_metadata_impl>(_impl->clone_only_token_map()));
}
token_metadata
token_metadata::clone_after_all_left() {
token_metadata::clone_after_all_left() const {
return token_metadata(std::make_unique<token_metadata_impl>(_impl->clone_after_all_left()));
}
token_metadata
token_metadata::clone_after_all_settled() {
token_metadata::clone_after_all_settled() const {
return token_metadata(std::make_unique<token_metadata_impl>(_impl->clone_after_all_settled()));
}
dht::token_range_vector
token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) {
token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) const {
return _impl->get_primary_ranges_for(std::move(tokens));
}
dht::token_range_vector
token_metadata::get_primary_ranges_for(token right) {
token_metadata::get_primary_ranges_for(token right) const {
return _impl->get_primary_ranges_for(right);
}
@@ -1875,39 +1868,34 @@ token_metadata::interval_to_range(boost::icl::interval<token>::interval_type i)
return token_metadata_impl::interval_to_range(std::move(i));
}
const std::unordered_map<range<token>, std::unordered_set<inet_address>>&
token_metadata::get_pending_ranges(sstring keyspace_name) {
return _impl->get_pending_ranges(std::move(keyspace_name));
}
std::vector<range<token>>
token_metadata::get_pending_ranges(sstring keyspace_name, inet_address endpoint) {
token_metadata::get_pending_ranges(sstring keyspace_name, inet_address endpoint) const {
return _impl->get_pending_ranges(std::move(keyspace_name), endpoint);
}
future<>
token_metadata::calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name) {
return _impl->calculate_pending_ranges(*this, strategy, keyspace_name);
token_metadata::update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name) {
return _impl->update_pending_ranges(*this, strategy, keyspace_name);
}
future<>
token_metadata::calculate_pending_ranges_for_leaving(
abstract_replication_strategy& strategy,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata) {
lw_shared_ptr<token_metadata> all_left_metadata) const {
return _impl->calculate_pending_ranges_for_leaving(*this, strategy, std::move(new_pending_ranges), std::move(all_left_metadata));
}
void
token_metadata::calculate_pending_ranges_for_bootstrap(
abstract_replication_strategy& strategy,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata) {
lw_shared_ptr<token_metadata> all_left_metadata) const {
_impl->calculate_pending_ranges_for_bootstrap(strategy, std::move(new_pending_ranges), std::move(all_left_metadata));
}
token
token_metadata::get_predecessor(token t) {
token_metadata::get_predecessor(token t) const {
return _impl->get_predecessor(t);
}
@@ -1927,22 +1915,22 @@ token_metadata::count_normal_token_owners() const {
}
sstring
token_metadata::print_pending_ranges() {
token_metadata::print_pending_ranges() const {
return _impl->print_pending_ranges();
}
std::vector<gms::inet_address>
token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) {
token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
return _impl->pending_endpoints_for(token, keyspace_name);
}
std::multimap<inet_address, token>
token_metadata::get_endpoint_to_token_map_for_reading() {
token_metadata::get_endpoint_to_token_map_for_reading() const {
return _impl->get_endpoint_to_token_map_for_reading();
}
std::map<token, inet_address>
token_metadata::get_normal_and_bootstrapping_token_to_endpoint_map() {
token_metadata::get_normal_and_bootstrapping_token_to_endpoint_map() const {
return _impl->get_normal_and_bootstrapping_token_to_endpoint_map();
}

View File

@@ -113,6 +113,13 @@ public:
return _dc_racks;
}
const std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>&
get_datacenter_racks() const {
return _dc_racks;
}
const endpoint_dc_rack& get_location(const inet_address& ep) const;
private:
/** multi-map: DC -> endpoints in that DC */
@@ -190,7 +197,7 @@ public:
topology& get_topology();
const topology& get_topology() const;
void debug_show();
void debug_show() const;
/**
* Store an end-point to host ID mapping. Each ID must be unique, and
@@ -223,15 +230,15 @@ public:
void remove_endpoint(inet_address endpoint);
bool is_member(inet_address endpoint);
bool is_member(inet_address endpoint) const;
bool is_leaving(inet_address endpoint);
bool is_leaving(inet_address endpoint) const;
// Is this node being replaced by another node
bool is_being_replaced(inet_address endpoint);
bool is_being_replaced(inet_address endpoint) const;
// Is any node being replaced by another node
bool is_any_node_being_replaced();
bool is_any_node_being_replaced() const;
void add_replacing_endpoint(inet_address existing_node, inet_address replacing_node);
@@ -241,31 +248,29 @@ public:
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
* bootstrap tokens and leaving endpoints are not included in the copy.
*/
token_metadata clone_only_token_map();
token_metadata clone_only_token_map() const;
/**
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
* current leave operations have finished.
*
* @return new token metadata
*/
token_metadata clone_after_all_left();
token_metadata clone_after_all_left() const;
/**
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
* current leave, and move operations have finished.
*
* @return new token metadata
*/
token_metadata clone_after_all_settled();
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens);
token_metadata clone_after_all_settled() const;
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens) const;
dht::token_range_vector get_primary_ranges_for(token right);
dht::token_range_vector get_primary_ranges_for(token right) const;
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
static range<dht::token> interval_to_range(boost::icl::interval<token>::interval_type i);
/** a mutable map may be returned but caller should not modify it */
const std::unordered_map<range<token>, std::unordered_set<inet_address>>& get_pending_ranges(sstring keyspace_name);
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint);
// returns an empty vector if keyspace_name not found
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint) const;
/**
* Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
*
@@ -289,20 +294,20 @@ public:
* NOTE: This is heavy and ineffective operation. This will be done only once when a node
* changes state in the cluster, so it should be manageable.
*/
future<> calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name);
future<> update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name);
future<> calculate_pending_ranges_for_leaving(
abstract_replication_strategy& strategy,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata);
lw_shared_ptr<token_metadata> all_left_metadata) const;
void calculate_pending_ranges_for_bootstrap(
abstract_replication_strategy& strategy,
const abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata);
lw_shared_ptr<token_metadata> all_left_metadata) const;
future<> calculate_pending_ranges_for_replacing(
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges);
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges) const;
token get_predecessor(token t);
token get_predecessor(token t) const;
std::vector<inet_address> get_all_endpoints() const;
size_t get_all_endpoints_count() const;
@@ -312,16 +317,17 @@ public:
size_t count_normal_token_owners() const;
sstring print_pending_ranges();
std::vector<gms::inet_address> pending_endpoints_for(const token& token, const sstring& keyspace_name);
sstring print_pending_ranges() const;
// returns empty vector if keyspace_name not found.
std::vector<gms::inet_address> pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading();
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading() const;
/**
* @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes
* in the cluster.
*/
std::map<token, inet_address> get_normal_and_bootstrapping_token_to_endpoint_map();
std::map<token, inet_address> get_normal_and_bootstrapping_token_to_endpoint_map() const;
long get_ring_version() const;
void invalidate_cached_rings();

View File

@@ -1282,7 +1282,7 @@ bool paxos_response_handler::learned(gms::inet_address ep) {
}
static std::vector<gms::inet_address>
replica_ids_to_endpoints(locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
replica_ids_to_endpoints(const locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
std::vector<gms::inet_address> endpoints;
endpoints.reserve(replica_ids.size());
@@ -1296,7 +1296,7 @@ replica_ids_to_endpoints(locator::token_metadata& tm, const std::vector<utils::U
}
static std::vector<utils::UUID>
endpoints_to_replica_ids(locator::token_metadata& tm, const std::vector<gms::inet_address>& endpoints) {
endpoints_to_replica_ids(const locator::token_metadata& tm, const std::vector<gms::inet_address>& endpoints) {
std::vector<utils::UUID> replica_ids;
replica_ids.reserve(endpoints.size());
@@ -1774,7 +1774,7 @@ using namespace std::literals::chrono_literals;
storage_proxy::~storage_proxy() {}
storage_proxy::storage_proxy(distributed<database>& db, storage_proxy::config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key, gms::feature_service& feat, locator::token_metadata& tm, netw::messaging_service& ms)
scheduling_group_key stats_key, gms::feature_service& feat, const locator::token_metadata& tm, netw::messaging_service& ms)
: _db(db)
, _token_metadata(tm)
, _read_smp_service_group(cfg.read_smp_service_group)
@@ -4597,7 +4597,7 @@ std::vector<gms::inet_address> storage_proxy::intersection(const std::vector<gms
return inter;
}
query_ranges_to_vnodes_generator::query_ranges_to_vnodes_generator(locator::token_metadata& tm, schema_ptr s, dht::partition_range_vector ranges, bool local) :
query_ranges_to_vnodes_generator::query_ranges_to_vnodes_generator(const locator::token_metadata& tm, schema_ptr s, dht::partition_range_vector ranges, bool local) :
_s(s), _ranges(std::move(ranges)), _i(_ranges.begin()), _local(local), _tm(tm) {}
dht::partition_range_vector query_ranges_to_vnodes_generator::operator()(size_t n) {

View File

@@ -126,10 +126,10 @@ class query_ranges_to_vnodes_generator {
dht::partition_range_vector _ranges;
dht::partition_range_vector::iterator _i; // iterator to current range in _ranges
bool _local;
locator::token_metadata& _tm;
const locator::token_metadata& _tm;
void process_one_range(size_t n, dht::partition_range_vector& ranges);
public:
query_ranges_to_vnodes_generator(locator::token_metadata& tm, schema_ptr s, dht::partition_range_vector ranges, bool local = false);
query_ranges_to_vnodes_generator(const locator::token_metadata& tm, schema_ptr s, dht::partition_range_vector ranges, bool local = false);
query_ranges_to_vnodes_generator(const query_ranges_to_vnodes_generator&) = delete;
query_ranges_to_vnodes_generator(query_ranges_to_vnodes_generator&&) = default;
// generate next 'n' vnodes, may return less than requested number of ranges
@@ -245,13 +245,12 @@ public:
const gms::feature_service& features() const { return _features; }
const locator::token_metadata& get_token_metadata() const { return _token_metadata; }
locator::token_metadata& get_token_metadata() { return _token_metadata; }
query::max_result_size get_max_result_size(const query::partition_slice& slice) const;
private:
distributed<database>& _db;
locator::token_metadata& _token_metadata;
const locator::token_metadata& _token_metadata;
smp_service_group _read_smp_service_group;
smp_service_group _write_smp_service_group;
smp_service_group _write_ack_smp_service_group;
@@ -437,7 +436,7 @@ private:
future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, clock_type::time_point timeout);
public:
storage_proxy(distributed<database>& db, config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key, gms::feature_service& feat, locator::token_metadata& tokens, netw::messaging_service& ms);
scheduling_group_key stats_key, gms::feature_service& feat, const locator::token_metadata& tokens, netw::messaging_service& ms);
~storage_proxy();
const distributed<database>& get_db() const {
return _db;
@@ -751,7 +750,4 @@ inline shared_ptr<storage_proxy> get_local_shared_storage_proxy() {
return _the_storage_proxy.local_shared();
}
dht::partition_range_vector get_restricted_ranges(locator::token_metadata&,
const schema&, dht::partition_range);
}

View File

@@ -2782,9 +2782,9 @@ future<> storage_service::do_update_pending_ranges() {
return do_for_each(keyspaces, [this] (auto& keyspace_name) {
auto& ks = this->_db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
slogger.debug("Calculating pending ranges for keyspace={} starts", keyspace_name);
return get_token_metadata().calculate_pending_ranges(strategy, keyspace_name).finally([&keyspace_name] {
slogger.debug("Calculating pending ranges for keyspace={} ends", keyspace_name);
slogger.debug("Updating pending ranges for keyspace={} starts", keyspace_name);
return get_mutable_token_metadata().update_pending_ranges(strategy, keyspace_name).finally([&keyspace_name] {
slogger.debug("Updating pending ranges for keyspace={} ends", keyspace_name);
});
});
});
@@ -2794,7 +2794,7 @@ future<> storage_service::do_update_pending_ranges() {
future<> storage_service::update_pending_ranges() {
return get_storage_service().invoke_on(0, [] (auto& ss){
return ss._update_pending_ranges_action.trigger_later().then([&ss] {
// calculate_pending_ranges will modify token_metadata, we need to repliate to other cores
// update_pending_ranges will modify token_metadata, we need to replicate to other cores
return ss.replicate_to_all_cores().then([s = ss.shared_from_this()] { });
});
});

View File

@@ -174,7 +174,7 @@ public:
return _token_metadata;
}
locator::token_metadata& get_token_metadata() {
locator::token_metadata& get_mutable_token_metadata() {
return _token_metadata;
}