Merge 'repair: harden effective replication map' from Benny Halevy

As described in #11993 per-shard repair_info instances get the effective_replication_map on their own with no centralized synchronization.

This series ensures that the effective replication maps used by repair (and other associated structures like the token metadata and topology) are all in sync with the one used to initiate the repair operation.

While at at, the series includes other cleanups in this area in repair and view that are not fixes as the calls happen in synchronous functions that do not yield.

Fixes #11993

Closes #11994

* github.com:scylladb/scylladb:
  repair: pass erm down to get_hosts_participating_in_repair and get_neighbors
  repair: pass effective_replication_map down to repair_info
  repair: coroutinize sync_data_using_repair
  repair: futurize do_repair_start
  effective_replication_map: add global_effective_replication_map
  shared_token_metadata: get_lock is const
  repair: sync_data_using_repair: require to run on shard 0
  repair: require all node operations to be called on shard 0
  repair: repair_info: keep effective_replication_map
  repair: do_repair_start: use keyspace erm to get keyspace local ranges
  repair: do_repair_start: use keyspace erm for get_primary_ranges
  repair: do_repair_start: use keyspace erm for get_primary_ranges_within_dc
  repair: do_repair_start: check_in_shutdown first
  repair: get_db().local() where needed
  repair: get topology from erm/token_metdata_ptr
  view: get_view_natural_endpoint: get topology from erm
This commit is contained in:
Avi Kivity
2022-11-17 13:29:02 +02:00
7 changed files with 161 additions and 103 deletions

View File

@@ -1394,9 +1394,9 @@ static std::optional<gms::inet_address>
get_view_natural_endpoint(const sstring& keyspace_name,
const dht::token& base_token, const dht::token& view_token) {
auto &db = service::get_local_storage_proxy().local_db();
auto& topology = service::get_local_storage_proxy().get_token_metadata_ptr()->get_topology();
auto& ks = db.find_keyspace(keyspace_name);
auto erm = ks.get_effective_replication_map();
auto& topology = erm->get_token_metadata_ptr()->get_topology();
auto my_address = utils::fb_utilities::get_broadcast_address();
auto my_datacenter = topology.get_datacenter();
bool network_topology = dynamic_cast<const locator::network_topology_strategy*>(&ks.get_replication_strategy());

View File

@@ -12,6 +12,8 @@
#include <boost/range/algorithm/remove_if.hpp>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "replica/database.hh"
#include "utils/stall_free.hh"
namespace locator {
@@ -468,6 +470,44 @@ void effective_replication_map_factory::submit_background_work(future<> fut) {
});
}
future<> global_effective_replication_map::get_keyspace_erms(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
return sharded_db.invoke_on(0, [this, &sharded_db, keyspace_name] (replica::database& db) -> future<> {
// To ensure we get the same effective_replication_map
// on all shards, acquire the shared_token_metadata lock.
//
// As a sanity check compare the ring_version on each shard
// to the reference version on shard 0.
//
// This invariant is achieved by storage_service::mutate_token_metadata
// and storage_service::replicate_to_all_cores that first acquire the
// shared_token_metadata lock, then prepare a mutated token metadata
// that will have an incremented ring_version, use it to re-calculate
// all e_r_m:s and clone both on all shards. including the ring version,
// all under the lock.
auto lk = co_await db.get_shared_token_metadata().get_lock();
auto erm = db.find_keyspace(keyspace_name).get_effective_replication_map();
auto ring_version = erm->get_token_metadata().get_ring_version();
_erms[0] = make_foreign(std::move(erm));
co_await coroutine::parallel_for_each(boost::irange(1u, smp::count), [this, &sharded_db, keyspace_name, ring_version] (unsigned shard) -> future<> {
_erms[shard] = co_await sharded_db.invoke_on(shard, [keyspace_name, ring_version] (const replica::database& db) {
const auto& ks = db.find_keyspace(keyspace_name);
auto erm = ks.get_effective_replication_map();
auto local_ring_version = erm->get_token_metadata().get_ring_version();
if (local_ring_version != ring_version) {
on_internal_error(rslogger, format("Inconsistent effective_replication_map ring_verion {}, expected {}", local_ring_version, ring_version));
}
return make_foreign(std::move(erm));
});
});
});
}
future<global_effective_replication_map> make_global_effective_replication_map(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
global_effective_replication_map ret;
co_await ret.get_keyspace_erms(sharded_db, keyspace_name);
co_return ret;
}
} // namespace locator
std::ostream& operator<<(std::ostream& os, locator::replication_strategy_type t) {

View File

@@ -22,6 +22,7 @@
// forward declaration since replica/database.hh includes this file
namespace replica {
class database;
class keyspace;
}
@@ -265,6 +266,33 @@ inline mutable_effective_replication_map_ptr make_effective_replication_map(abst
// Apply the replication strategy over the current configuration and the given token_metadata.
future<mutable_effective_replication_map_ptr> calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr);
// Class to hold a coherent view of a keyspace
// effective replication map on all shards
class global_effective_replication_map {
std::vector<foreign_ptr<effective_replication_map_ptr>> _erms;
public:
global_effective_replication_map() : _erms(smp::count) {}
global_effective_replication_map(global_effective_replication_map&&) = default;
global_effective_replication_map& operator=(global_effective_replication_map&&) = default;
future<> get_keyspace_erms(sharded<replica::database>& sharded_db, std::string_view keyspace_name);
const effective_replication_map& get() const noexcept {
return *_erms[this_shard_id()];
}
const effective_replication_map& operator*() const noexcept {
return get();
}
const effective_replication_map* operator->() const noexcept {
return &get();
}
};
future<global_effective_replication_map> make_global_effective_replication_map(sharded<replica::database>& sharded_db, std::string_view keyspace_name);
} // namespace locator
std::ostream& operator<<(std::ostream& os, locator::replication_strategy_type);

View File

@@ -311,7 +311,7 @@ public:
// using the schema_tables merge_lock.
//
// Must be called on shard 0.
future<token_metadata_lock> get_lock() noexcept {
future<token_metadata_lock> get_lock() const noexcept {
return _lock_func();
}

View File

@@ -199,21 +199,18 @@ void remove_item(Collection& c, T& item) {
}
// Return all of the neighbors with whom we share the provided range.
static std::vector<gms::inet_address> get_neighbors(replica::database& db,
static std::vector<gms::inet_address> get_neighbors(
const locator::effective_replication_map& erm,
const sstring& ksname, query::range<dht::token> range,
const std::vector<sstring>& data_centers,
const std::vector<sstring>& hosts,
const std::unordered_set<gms::inet_address>& ignore_nodes) {
replica::keyspace& ks = db.find_keyspace(ksname);
auto erm = ks.get_effective_replication_map();
dht::token tok = range.end() ? range.end()->value() : dht::maximum_token();
auto ret = erm->get_natural_endpoints(tok);
auto ret = erm.get_natural_endpoints(tok);
remove_item(ret, utils::fb_utilities::get_broadcast_address());
if (!data_centers.empty()) {
auto dc_endpoints_map = db.get_token_metadata().get_topology().get_datacenter_endpoints();
auto dc_endpoints_map = erm.get_token_metadata().get_topology().get_datacenter_endpoints();
std::unordered_set<gms::inet_address> dc_endpoints;
for (const sstring& dc : data_centers) {
auto it = dc_endpoints_map.find(dc);
@@ -276,7 +273,7 @@ static std::vector<gms::inet_address> get_neighbors(replica::database& db,
}
if (ret.size() < 1) {
auto me = utils::fb_utilities::get_broadcast_address();
auto others = erm->get_natural_endpoints(tok);
auto others = erm.get_natural_endpoints(tok);
remove_item(others, me);
throw std::runtime_error(fmt::format("Repair requires at least two "
"endpoints that are neighbors before it can continue, "
@@ -318,7 +315,8 @@ static std::vector<gms::inet_address> get_neighbors(replica::database& db,
#endif
}
static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(replica::database& db,
static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(
const locator::effective_replication_map& erm,
const sstring& ksname,
const dht::token_range_vector& ranges,
const std::vector<sstring>& data_centers,
@@ -332,7 +330,7 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(re
participating_hosts.insert(utils::fb_utilities::get_broadcast_address());
co_await do_for_each(ranges, [&] (const dht::token_range& range) {
const auto nbs = get_neighbors(db, ksname, range, data_centers, hosts, ignore_nodes);
const auto nbs = get_neighbors(erm, ksname, range, data_centers, hosts, ignore_nodes);
for (const auto& nb : nbs) {
participating_hosts.insert(nb);
}
@@ -544,6 +542,7 @@ get_sharder_for_tables(seastar::sharded<replica::database>& db, const sstring& k
repair_info::repair_info(repair_service& repair,
const sstring& keyspace_,
locator::effective_replication_map_ptr erm_,
const dht::token_range_vector& ranges_,
std::vector<table_id> table_ids_,
repair_uniq_id id_,
@@ -562,6 +561,7 @@ repair_info::repair_info(repair_service& repair,
, gossiper(repair.get_gossiper())
, sharder(get_sharder_for_tables(db, keyspace_, table_ids_))
, keyspace(keyspace_)
, erm(std::move(erm_))
, ranges(ranges_)
, cfs(get_table_names(db.local(), table_ids_))
, table_ids(std::move(table_ids_))
@@ -570,7 +570,7 @@ repair_info::repair_info(repair_service& repair,
, hosts(hosts_)
, ignore_nodes(ignore_nodes_)
, reason(reason_)
, total_rf(db.local().find_keyspace(keyspace).get_effective_replication_map()->get_replication_factor())
, total_rf(erm->get_replication_factor())
, nr_ranges_total(ranges.size())
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) {
if (as != nullptr) {
@@ -605,7 +605,7 @@ void repair_info::check_in_abort() {
repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range) {
return neighbors.empty() ?
repair_neighbors(get_neighbors(db.local(), keyspace, range, data_centers, hosts, ignore_nodes)) :
repair_neighbors(get_neighbors(*erm, keyspace, range, data_centers, hosts, ignore_nodes)) :
neighbors[range];
}
@@ -675,26 +675,6 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
});
}
static dht::token_range_vector get_primary_ranges_for_endpoint(
replica::database& db, sstring keyspace, gms::inet_address ep) {
return db.find_keyspace(keyspace).get_effective_replication_map()->get_primary_ranges(ep);
}
static dht::token_range_vector get_primary_ranges(
replica::database& db, sstring keyspace) {
return get_primary_ranges_for_endpoint(db, keyspace,
utils::fb_utilities::get_broadcast_address());
}
// get_primary_ranges_within_dc() is similar to get_primary_ranges(),
// but instead of each range being assigned just one primary owner
// across the entire cluster, here each range is assigned a primary
// owner in each of the clusters.
static dht::token_range_vector get_primary_ranges_within_dc(
replica::database& db, sstring keyspace) {
return db.find_keyspace(keyspace).get_effective_replication_map()->get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address());
}
void repair_stats::add(const repair_stats& o) {
round_nr += o.round_nr;
round_nr_fast_path_already_synced += o.round_nr_fast_path_already_synced;
@@ -1010,10 +990,13 @@ static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
// CPU is that it allows us to keep some state (like a list of ongoing
// repairs). It is fine to always do this on one CPU, because the function
// itself does very little (mainly tell other nodes and CPUs what to do).
int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map) {
seastar::sharded<replica::database>& db = get_db();
auto& topology = db.local().get_token_metadata().get_topology();
future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map) {
get_repair_module().check_in_shutdown();
auto& sharded_db = get_db();
auto& db = sharded_db.local();
auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace));
auto& erm = germs->get();
auto& topology = erm.get_token_metadata().get_topology();
repair_options options(options_map);
@@ -1042,14 +1025,19 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
// may be set, except data_centers may contain only local DC (-local)
if (options.data_centers.size() == 1 &&
options.data_centers[0] == topology.get_datacenter()) {
ranges = get_primary_ranges_within_dc(db.local(), keyspace);
// get_primary_ranges_within_dc() is similar to get_primary_ranges(),
// but instead of each range being assigned just one primary owner
// across the entire cluster, here each range is assigned a primary
// owner in each of the DCs.
ranges = erm.get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address());
} else if (options.data_centers.size() > 0 || options.hosts.size() > 0) {
throw std::runtime_error("You need to run primary range repair on all nodes in the cluster.");
} else {
ranges = get_primary_ranges(db.local(), keyspace);
ranges = erm.get_primary_ranges(utils::fb_utilities::get_broadcast_address());
}
} else {
ranges = db.local().get_keyspace_local_ranges(keyspace);
// get keyspace local ranges
ranges = erm.get_ranges(utils::fb_utilities::get_broadcast_address());
}
if (!options.data_centers.empty() && !options.hosts.empty()) {
@@ -1103,21 +1091,21 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
}
std::vector<sstring> cfs =
options.column_families.size() ? options.column_families : list_column_families(db.local(), keyspace);
options.column_families.size() ? options.column_families : list_column_families(db, keyspace);
if (cfs.empty()) {
rlogger.info("repair[{}]: completed successfully: no tables to repair", id.uuid());
return id.id;
co_return id.id;
}
// Do it in the background.
(void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace),
(void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace), germs = std::move(germs),
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
auto uuid = id.uuid();
bool needs_flush_before_repair = false;
if (db.local().features().tombstone_gc_options) {
if (db.features().tombstone_gc_options) {
for (auto& table: cfs) {
auto s = db.local().find_column_family(keyspace, table).schema();
auto s = db.find_column_family(keyspace, table).schema();
const auto& options = s->tombstone_gc_options();
if (options.mode() == tombstone_gc_mode::repair) {
needs_flush_before_repair = true;
@@ -1126,9 +1114,9 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
}
bool hints_batchlog_flushed = false;
auto participants = get_hosts_participating_in_repair(db.local(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get();
auto participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get();
if (needs_flush_before_repair) {
auto waiting_nodes = db.local().get_token_metadata().get_all_endpoints();
auto waiting_nodes = db.get_token_metadata().get_all_endpoints();
std::erase_if(waiting_nodes, [&] (const auto& addr) {
return ignore_nodes.contains(addr);
});
@@ -1161,7 +1149,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
auto table_ids = get_table_ids(db, keyspace, cfs);
abort_source as;
auto off_strategy_updater = seastar::async([this, uuid = uuid.uuid(), &table_ids, &participants, &as] {
auto tables = std::list<table_id>(table_ids.begin(), table_ids.end());
@@ -1206,10 +1194,10 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed,
data_centers = options.data_centers, hosts = options.hosts, ignore_nodes] (repair_service& local_repair) mutable {
data_centers = options.data_centers, hosts = options.hosts, ignore_nodes, germs] (repair_service& local_repair) mutable {
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
std::move(keyspace), germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, nullptr, hints_batchlog_flushed);
return repair_ranges(ri);
});
@@ -1233,7 +1221,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid(), ep);
});
return id.id;
co_return id.id;
}
future<int> repair_start(seastar::sharded<repair_service>& repair,
@@ -1273,49 +1261,43 @@ future<> repair_service::abort_all() {
future<> repair_service::sync_data_using_repair(
sstring keyspace,
locator::effective_replication_map_ptr erm,
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,
shared_ptr<node_ops_info> ops_info) {
if (ranges.empty()) {
return make_ready_future<>();
co_return;
}
return container().invoke_on(0, [keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] (repair_service& local_repair) mutable {
return local_repair.do_sync_data_using_repair(std::move(keyspace), std::move(ranges), std::move(neighbors), reason, ops_info);
});
}
future<> repair_service::do_sync_data_using_repair(
sstring keyspace,
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,
shared_ptr<node_ops_info> ops_info) {
seastar::sharded<replica::database>& db = get_db();
assert(this_shard_id() == 0);
auto& sharded_db = get_db();
auto& db = sharded_db.local();
auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace));
repair_uniq_id id = get_repair_module().new_repair_uniq_id();
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace);
return get_repair_module().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
auto cfs = list_column_families(db.local(), keyspace);
co_await get_repair_module().run(id, [this, id, &db, keyspace, germs = std::move(germs), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
auto cfs = list_column_families(db, keyspace);
if (cfs.empty()) {
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace);
return;
}
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
auto table_ids = get_table_ids(db, keyspace, cfs);
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
if (get_repair_module().is_aborted(id.uuid())) {
throw std::runtime_error("aborted by user request");
}
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info] (repair_service& local_repair) mutable {
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info, germs] (repair_service& local_repair) mutable {
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
bool hints_batchlog_flushed = false;
abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr;
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
std::move(keyspace), germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
@@ -1339,7 +1321,7 @@ future<> repair_service::do_sync_data_using_repair(
}).then([id, keyspace] {
rlogger.info("repair[{}]: sync data for keyspace={}, status=succeeded", id.uuid(), keyspace);
}).handle_exception([&db, id, keyspace] (std::exception_ptr ep) {
if (!db.local().has_keyspace(keyspace)) {
if (!db.has_keyspace(keyspace)) {
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: keyspace does not exist any more, ignoring it, {}", id.uuid(), keyspace, ep);
return make_ready_future<>();
}
@@ -1349,22 +1331,25 @@ future<> repair_service::do_sync_data_using_repair(
}
future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens) {
assert(this_shard_id() == 0);
using inet_address = gms::inet_address;
return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens)] () mutable {
seastar::sharded<replica::database>& db = get_db();
auto ks_erms = db.local().get_non_local_strategy_keyspaces_erms();
auto& db = get_db().local();
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto& topology = tmptr->get_topology();
auto local_dc = topology.get_datacenter();
auto myip = utils::fb_utilities::get_broadcast_address();
auto reason = streaming::stream_reason::bootstrap;
// Calculate number of ranges to sync data
size_t nr_ranges_total = 0;
for (const auto& [keyspace_name, erm] : ks_erms) {
if (!db.local().has_keyspace(keyspace_name)) {
if (!db.has_keyspace(keyspace_name)) {
continue;
}
auto& strat = erm->get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, _sys_ks.local().local_dc_rack()).get0();
seastar::thread::maybe_yield();
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
auto nr_tables = get_nr_tables(db, keyspace_name);
nr_ranges_total += desired_ranges.size() * nr_tables;
}
container().invoke_on_all([nr_ranges_total] (repair_service& rs) {
@@ -1373,7 +1358,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
}).get();
rlogger.info("bootstrap_with_repair: started with keyspaces={}, nr_ranges_total={}", ks_erms | boost::adaptors::map_keys, nr_ranges_total);
for (const auto& [keyspace_name, erm] : ks_erms) {
if (!db.local().has_keyspace(keyspace_name)) {
if (!db.has_keyspace(keyspace_name)) {
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
continue;
}
@@ -1396,7 +1381,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
auto nr_tables = get_nr_tables(db, keyspace_name);
rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size() * nr_tables);
for (auto& desired_range : desired_ranges) {
for (auto& x : range_addresses) {
@@ -1426,8 +1411,6 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
std::vector<gms::inet_address> mandatory_neighbors;
// All neighbors
std::vector<inet_address> neighbors;
auto& topology = db.local().get_token_metadata().get_topology();
auto local_dc = topology.get_datacenter();
auto get_node_losing_the_ranges = [&, &keyspace_name = keyspace_name] (const std::vector<gms::inet_address>& old_nodes, const std::unordered_set<gms::inet_address>& new_nodes) {
// Remove the new nodes from the old nodes list, so
// that it contains only the node that will lose
@@ -1515,7 +1498,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
}
}
auto nr_ranges = desired_ranges.size();
sync_data_using_repair(keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get();
sync_data_using_repair(keyspace_name, erm, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get();
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
}
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | boost::adaptors::map_keys);
@@ -1523,18 +1506,21 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
}
future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
assert(this_shard_id() == 0);
using inet_address = gms::inet_address;
return seastar::async([this, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable {
seastar::sharded<replica::database>& db = get_db();
auto& db = get_db().local();
auto myip = utils::fb_utilities::get_broadcast_address();
auto ks_erms = db.local().get_non_local_strategy_keyspaces_erms();
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto& topology = tmptr->get_topology();
auto local_dc = topology.get_datacenter();
bool is_removenode = myip != leaving_node;
auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair";
streaming::stream_reason reason = is_removenode ? streaming::stream_reason::removenode : streaming::stream_reason::decommission;
size_t nr_ranges_total = 0;
for (const auto& [keyspace_name, erm] : ks_erms) {
dht::token_range_vector ranges = erm->get_ranges(leaving_node);
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
auto nr_tables = get_nr_tables(db, keyspace_name);
nr_ranges_total += ranges.size() * nr_tables;
}
if (reason == streaming::stream_reason::decommission) {
@@ -1550,14 +1536,14 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
}
rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, ks_erms | boost::adaptors::map_keys, leaving_node);
for (const auto& [keyspace_name, erm] : ks_erms) {
if (!db.local().has_keyspace(keyspace_name)) {
if (!db.has_keyspace(keyspace_name)) {
rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name);
continue;
}
auto& strat = erm->get_replication_strategy();
// First get all ranges the leaving node is responsible for
dht::token_range_vector ranges = erm->get_ranges(leaving_node);
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
auto nr_tables = get_nr_tables(db, keyspace_name);
rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size() * nr_tables);
size_t nr_ranges_total = ranges.size() * nr_tables;
size_t nr_ranges_skipped = 0;
@@ -1577,8 +1563,6 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
}
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
dht::token_range_vector ranges_for_removenode;
auto& topology = db.local().get_token_metadata().get_topology();
auto local_dc = topology.get_datacenter();
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
for (auto&r : ranges) {
seastar::thread::maybe_yield();
@@ -1706,7 +1690,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
ranges.swap(ranges_for_removenode);
}
auto nr_ranges_synced = ranges.size();
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, ops).get();
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, ops).get();
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
}
@@ -1715,10 +1699,12 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
}
future<> repair_service::decommission_with_repair(locator::token_metadata_ptr tmptr) {
assert(this_shard_id() == 0);
return do_decommission_removenode_with_repair(std::move(tmptr), utils::fb_utilities::get_broadcast_address(), {});
}
future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
assert(this_shard_id() == 0);
return do_decommission_removenode_with_repair(std::move(tmptr), std::move(leaving_node), std::move(ops)).then([this] {
rlogger.debug("Triggering off-strategy compaction for all non-system tables on removenode completion");
seastar::sharded<replica::database>& db = get_db();
@@ -1731,19 +1717,20 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt
}
future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::list<gms::inet_address> ignore_nodes) {
assert(this_shard_id() == 0);
return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable {
seastar::sharded<replica::database>& db = get_db();
auto ks_erms = db.local().get_non_local_strategy_keyspaces_erms();
auto& db = get_db().local();
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto myip = utils::fb_utilities::get_broadcast_address();
size_t nr_ranges_total = 0;
for (const auto& [keyspace_name, erm] : ks_erms) {
if (!db.local().has_keyspace(keyspace_name)) {
if (!db.has_keyspace(keyspace_name)) {
continue;
}
auto& strat = erm->get_replication_strategy();
// Okay to yield since tm is immutable
dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0();
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
auto nr_tables = get_nr_tables(db, keyspace_name);
nr_ranges_total += ranges.size() * nr_tables;
}
@@ -1761,20 +1748,20 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
rlogger.info("{}: started with keyspaces={}, source_dc={}, nr_ranges_total={}, ignore_nodes={}", op, ks_erms | boost::adaptors::map_keys, source_dc, nr_ranges_total, ignore_nodes);
for (const auto& [keyspace_name, erm] : ks_erms) {
size_t nr_ranges_skipped = 0;
if (!db.local().has_keyspace(keyspace_name)) {
if (!db.has_keyspace(keyspace_name)) {
rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name);
continue;
}
auto& strat = erm->get_replication_strategy();
dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0();
auto& topology = erm->get_token_metadata().get_topology();
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
auto nr_tables = get_nr_tables(db, keyspace_name);
rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}, ignore_nodes={}", op, keyspace_name, source_dc, ranges.size() * nr_tables, ignore_nodes);
for (auto it = ranges.begin(); it != ranges.end();) {
auto& r = *it;
seastar::thread::maybe_yield();
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto& topology = db.local().get_token_metadata().get_topology();
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_endpoints(end_token, *tmptr).get0() |
boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) {
if (node == myip) {
@@ -1806,7 +1793,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
}).get();
}
auto nr_ranges = ranges.size();
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, nullptr).get();
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, nullptr).get();
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
}
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, ks_erms | boost::adaptors::map_keys, source_dc);
@@ -1814,9 +1801,10 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
}
future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) {
assert(this_shard_id() == 0);
auto op = sstring("rebuild_with_repair");
if (source_dc.empty()) {
auto& topology = get_db().local().get_token_metadata().get_topology();
auto& topology = tmptr->get_topology();
source_dc = topology.get_datacenter();
}
auto reason = streaming::stream_reason::rebuild;
@@ -1829,9 +1817,10 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr,
}
future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::list<gms::inet_address> ignore_nodes) {
assert(this_shard_id() == 0);
auto cloned_tm = co_await tmptr->clone_async();
auto op = sstring("replace_with_repair");
auto& topology = get_db().local().get_token_metadata().get_topology();
auto& topology = tmptr->get_topology();
auto source_dc = topology.get_datacenter();
auto reason = streaming::stream_reason::replace;
// update a cloned version of tmptr

View File

@@ -20,6 +20,7 @@
#include <seastar/core/condition-variable.hh>
#include <seastar/core/gate.hh>
#include "locator/abstract_replication_strategy.hh"
#include "replica/database_fwd.hh"
#include "frozen_mutation.hh"
#include "utils/UUID.hh"
@@ -180,6 +181,7 @@ public:
gms::gossiper& gossiper;
const dht::sharder& sharder;
sstring keyspace;
locator::effective_replication_map_ptr erm;
dht::token_range_vector ranges;
std::vector<sstring> cfs;
std::vector<table_id> table_ids;
@@ -202,6 +204,7 @@ public:
public:
repair_info(repair_service& repair,
const sstring& keyspace_,
locator::effective_replication_map_ptr erm_,
const dht::token_range_vector& ranges_,
std::vector<table_id> table_ids_,
repair_uniq_id id_,

View File

@@ -13,6 +13,7 @@
#include "repair/repair.hh"
#include "repair/repair_task.hh"
#include "tasks/task_manager.hh"
#include "locator/abstract_replication_strategy.hh"
#include <seastar/core/distributed.hh>
#include <seastar/util/bool_class.hh>
@@ -133,9 +134,10 @@ public:
future<> cleanup_history(tasks::task_id repair_id);
future<> load_history();
int do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map);
future<int> do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map);
// The tokens are the tokens assigned to the bootstrap node.
// all repair-based node operation entry points must be called on shard 0
future<> bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
future<> decommission_with_repair(locator::token_metadata_ptr tmptr);
future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
@@ -145,13 +147,9 @@ private:
future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
future<> do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::list<gms::inet_address> ignore_nodes);
// Must be called on shard 0
future<> sync_data_using_repair(sstring keyspace,
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,
shared_ptr<node_ops_info> ops_info);
future<> do_sync_data_using_repair(sstring keyspace,
locator::effective_replication_map_ptr erm,
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,