From 94f2e95a2f5d6058d6926a0089a5b4ac46444034 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 18:00:02 +0200 Subject: [PATCH 01/16] view: get_view_natural_endpoint: get topology from erm Get the topology for the effective replication map rather than from the storage_proxy to ensure its synchronized with the natural endpoints. Since there's no preemption between the two calls currently there is no issue, so this is merely a clean up of the code and not supposed to fix anything. Signed-off-by: Benny Halevy --- db/view/view.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/view/view.cc b/db/view/view.cc index 8c08b8393f..4d4ea7f888 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1394,9 +1394,9 @@ static std::optional get_view_natural_endpoint(const sstring& keyspace_name, const dht::token& base_token, const dht::token& view_token) { auto &db = service::get_local_storage_proxy().local_db(); - auto& topology = service::get_local_storage_proxy().get_token_metadata_ptr()->get_topology(); auto& ks = db.find_keyspace(keyspace_name); auto erm = ks.get_effective_replication_map(); + auto& topology = erm->get_token_metadata_ptr()->get_topology(); auto my_address = utils::fb_utilities::get_broadcast_address(); auto my_datacenter = topology.get_datacenter(); bool network_topology = dynamic_cast(&ks.get_replication_strategy()); From c22c4c852782bc6dbf0cb80d6c7a97f33b4bdfe4 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 18:15:04 +0200 Subject: [PATCH 02/16] repair: get topology from erm/token_metdata_ptr We want the topology to be synchronized with the respective effective_replication_map / token_metadata. Signed-off-by: Benny Halevy --- repair/repair.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 904b0b0308..91fc79f1b5 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -213,7 +213,7 @@ static std::vector get_neighbors(replica::database& db, remove_item(ret, utils::fb_utilities::get_broadcast_address()); if (!data_centers.empty()) { - auto dc_endpoints_map = db.get_token_metadata().get_topology().get_datacenter_endpoints(); + auto dc_endpoints_map = erm->get_token_metadata().get_topology().get_datacenter_endpoints(); std::unordered_set dc_endpoints; for (const sstring& dc : data_centers) { auto it = dc_endpoints_map.find(dc); @@ -1353,6 +1353,8 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens)] () mutable { seastar::sharded& db = get_db(); auto ks_erms = db.local().get_non_local_strategy_keyspaces_erms(); + auto& topology = tmptr->get_topology(); + auto local_dc = topology.get_datacenter(); auto myip = utils::fb_utilities::get_broadcast_address(); auto reason = streaming::stream_reason::bootstrap; // Calculate number of ranges to sync data @@ -1426,8 +1428,6 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr std::vector mandatory_neighbors; // All neighbors std::vector neighbors; - auto& topology = db.local().get_token_metadata().get_topology(); - auto local_dc = topology.get_datacenter(); auto get_node_losing_the_ranges = [&, &keyspace_name = keyspace_name] (const std::vector& old_nodes, const std::unordered_set& new_nodes) { // Remove the new nodes from the old nodes list, so // that it contains only the node that will lose @@ -1528,6 +1528,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m seastar::sharded& db = get_db(); auto myip = utils::fb_utilities::get_broadcast_address(); auto ks_erms = db.local().get_non_local_strategy_keyspaces_erms(); + auto& topology = tmptr->get_topology(); + auto local_dc = topology.get_datacenter(); bool is_removenode = myip != leaving_node; auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair"; streaming::stream_reason reason = is_removenode ? streaming::stream_reason::removenode : streaming::stream_reason::decommission; @@ -1577,8 +1579,6 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } std::unordered_map range_sources; dht::token_range_vector ranges_for_removenode; - auto& topology = db.local().get_token_metadata().get_topology(); - auto local_dc = topology.get_datacenter(); bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology; for (auto&r : ranges) { seastar::thread::maybe_yield(); @@ -1767,6 +1767,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ } auto& strat = erm->get_replication_strategy(); dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0(); + auto& topology = erm->get_token_metadata().get_topology(); std::unordered_map range_sources; auto nr_tables = get_nr_tables(db.local(), keyspace_name); rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}, ignore_nodes={}", op, keyspace_name, source_dc, ranges.size() * nr_tables, ignore_nodes); @@ -1774,7 +1775,6 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ auto& r = *it; seastar::thread::maybe_yield(); auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - auto& topology = db.local().get_token_metadata().get_topology(); auto neighbors = boost::copy_range>(strat.calculate_natural_endpoints(end_token, *tmptr).get0() | boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) { if (node == myip) { @@ -1816,7 +1816,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) { auto op = sstring("rebuild_with_repair"); if (source_dc.empty()) { - auto& topology = get_db().local().get_token_metadata().get_topology(); + auto& topology = tmptr->get_topology(); source_dc = topology.get_datacenter(); } auto reason = streaming::stream_reason::rebuild; @@ -1831,7 +1831,7 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set replacing_tokens, std::list ignore_nodes) { auto cloned_tm = co_await tmptr->clone_async(); auto op = sstring("replace_with_repair"); - auto& topology = get_db().local().get_token_metadata().get_topology(); + auto& topology = tmptr->get_topology(); auto source_dc = topology.get_datacenter(); auto reason = streaming::stream_reason::replace; // update a cloned version of tmptr From 881eb0df83fe9f4526fae77f5e13cd848f440232 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 18:33:44 +0200 Subject: [PATCH 03/16] repair: get_db().local() where needed In several places we get the sharded database using get_db() and then we only use db.local(). Simplify the code by keeping reference only to the local database upfront. Signed-off-by: Benny Halevy --- repair/repair.cc | 64 ++++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 91fc79f1b5..3bfc1b663c 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1011,8 +1011,8 @@ static future<> repair_ranges(lw_shared_ptr ri) { // repairs). It is fine to always do this on one CPU, because the function // itself does very little (mainly tell other nodes and CPUs what to do). int repair_service::do_repair_start(sstring keyspace, std::unordered_map options_map) { - seastar::sharded& db = get_db(); - auto& topology = db.local().get_token_metadata().get_topology(); + auto& db = get_db().local(); + auto& topology = db.get_token_metadata().get_topology(); get_repair_module().check_in_shutdown(); repair_options options(options_map); @@ -1042,14 +1042,14 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map 0 || options.hosts.size() > 0) { throw std::runtime_error("You need to run primary range repair on all nodes in the cluster."); } else { - ranges = get_primary_ranges(db.local(), keyspace); + ranges = get_primary_ranges(db, keyspace); } } else { - ranges = db.local().get_keyspace_local_ranges(keyspace); + ranges = db.get_keyspace_local_ranges(keyspace); } if (!options.data_centers.empty() && !options.hosts.empty()) { @@ -1103,7 +1103,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map cfs = - options.column_families.size() ? options.column_families : list_column_families(db.local(), keyspace); + options.column_families.size() ? options.column_families : list_column_families(db, keyspace); if (cfs.empty()) { rlogger.info("repair[{}]: completed successfully: no tables to repair", id.uuid()); return id.id; @@ -1115,9 +1115,9 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_maptombstone_gc_options(); if (options.mode() == tombstone_gc_mode::repair) { needs_flush_before_repair = true; @@ -1126,9 +1126,9 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map> repair_results; repair_results.reserve(smp::count); - auto table_ids = get_table_ids(db.local(), keyspace, cfs); + auto table_ids = get_table_ids(db, keyspace, cfs); abort_source as; auto off_strategy_updater = seastar::async([this, uuid = uuid.uuid(), &table_ids, &participants, &as] { auto tables = std::list(table_ids.begin(), table_ids.end()); @@ -1291,17 +1291,17 @@ future<> repair_service::do_sync_data_using_repair( std::unordered_map neighbors, streaming::stream_reason reason, shared_ptr ops_info) { - seastar::sharded& db = get_db(); + auto& db = get_db().local(); repair_uniq_id id = get_repair_module().new_repair_uniq_id(); rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace); return get_repair_module().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable { - auto cfs = list_column_families(db.local(), keyspace); + auto cfs = list_column_families(db, keyspace); if (cfs.empty()) { rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace); return; } - auto table_ids = get_table_ids(db.local(), keyspace, cfs); + auto table_ids = get_table_ids(db, keyspace, cfs); std::vector> repair_results; repair_results.reserve(smp::count); if (get_repair_module().is_aborted(id.uuid())) { @@ -1339,7 +1339,7 @@ future<> repair_service::do_sync_data_using_repair( }).then([id, keyspace] { rlogger.info("repair[{}]: sync data for keyspace={}, status=succeeded", id.uuid(), keyspace); }).handle_exception([&db, id, keyspace] (std::exception_ptr ep) { - if (!db.local().has_keyspace(keyspace)) { + if (!db.has_keyspace(keyspace)) { rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: keyspace does not exist any more, ignoring it, {}", id.uuid(), keyspace, ep); return make_ready_future<>(); } @@ -1351,8 +1351,8 @@ future<> repair_service::do_sync_data_using_repair( future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set bootstrap_tokens) { using inet_address = gms::inet_address; return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens)] () mutable { - seastar::sharded& db = get_db(); - auto ks_erms = db.local().get_non_local_strategy_keyspaces_erms(); + auto& db = get_db().local(); + auto ks_erms = db.get_non_local_strategy_keyspaces_erms(); auto& topology = tmptr->get_topology(); auto local_dc = topology.get_datacenter(); auto myip = utils::fb_utilities::get_broadcast_address(); @@ -1360,13 +1360,13 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr // Calculate number of ranges to sync data size_t nr_ranges_total = 0; for (const auto& [keyspace_name, erm] : ks_erms) { - if (!db.local().has_keyspace(keyspace_name)) { + if (!db.has_keyspace(keyspace_name)) { continue; } auto& strat = erm->get_replication_strategy(); dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, _sys_ks.local().local_dc_rack()).get0(); seastar::thread::maybe_yield(); - auto nr_tables = get_nr_tables(db.local(), keyspace_name); + auto nr_tables = get_nr_tables(db, keyspace_name); nr_ranges_total += desired_ranges.size() * nr_tables; } container().invoke_on_all([nr_ranges_total] (repair_service& rs) { @@ -1375,7 +1375,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr }).get(); rlogger.info("bootstrap_with_repair: started with keyspaces={}, nr_ranges_total={}", ks_erms | boost::adaptors::map_keys, nr_ranges_total); for (const auto& [keyspace_name, erm] : ks_erms) { - if (!db.local().has_keyspace(keyspace_name)) { + if (!db.has_keyspace(keyspace_name)) { rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name); continue; } @@ -1398,7 +1398,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr //Collects the source that will have its range moved to the new node std::unordered_map range_sources; - auto nr_tables = get_nr_tables(db.local(), keyspace_name); + auto nr_tables = get_nr_tables(db, keyspace_name); rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size() * nr_tables); for (auto& desired_range : desired_ranges) { for (auto& x : range_addresses) { @@ -1525,9 +1525,9 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { using inet_address = gms::inet_address; return seastar::async([this, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable { - seastar::sharded& db = get_db(); + auto& db = get_db().local(); auto myip = utils::fb_utilities::get_broadcast_address(); - auto ks_erms = db.local().get_non_local_strategy_keyspaces_erms(); + auto ks_erms = db.get_non_local_strategy_keyspaces_erms(); auto& topology = tmptr->get_topology(); auto local_dc = topology.get_datacenter(); bool is_removenode = myip != leaving_node; @@ -1536,7 +1536,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m size_t nr_ranges_total = 0; for (const auto& [keyspace_name, erm] : ks_erms) { dht::token_range_vector ranges = erm->get_ranges(leaving_node); - auto nr_tables = get_nr_tables(db.local(), keyspace_name); + auto nr_tables = get_nr_tables(db, keyspace_name); nr_ranges_total += ranges.size() * nr_tables; } if (reason == streaming::stream_reason::decommission) { @@ -1552,14 +1552,14 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, ks_erms | boost::adaptors::map_keys, leaving_node); for (const auto& [keyspace_name, erm] : ks_erms) { - if (!db.local().has_keyspace(keyspace_name)) { + if (!db.has_keyspace(keyspace_name)) { rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name); continue; } auto& strat = erm->get_replication_strategy(); // First get all ranges the leaving node is responsible for dht::token_range_vector ranges = erm->get_ranges(leaving_node); - auto nr_tables = get_nr_tables(db.local(), keyspace_name); + auto nr_tables = get_nr_tables(db, keyspace_name); rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size() * nr_tables); size_t nr_ranges_total = ranges.size() * nr_tables; size_t nr_ranges_skipped = 0; @@ -1732,18 +1732,18 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::list ignore_nodes) { return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable { - seastar::sharded& db = get_db(); - auto ks_erms = db.local().get_non_local_strategy_keyspaces_erms(); + auto& db = get_db().local(); + auto ks_erms = db.get_non_local_strategy_keyspaces_erms(); auto myip = utils::fb_utilities::get_broadcast_address(); size_t nr_ranges_total = 0; for (const auto& [keyspace_name, erm] : ks_erms) { - if (!db.local().has_keyspace(keyspace_name)) { + if (!db.has_keyspace(keyspace_name)) { continue; } auto& strat = erm->get_replication_strategy(); // Okay to yield since tm is immutable dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0(); - auto nr_tables = get_nr_tables(db.local(), keyspace_name); + auto nr_tables = get_nr_tables(db, keyspace_name); nr_ranges_total += ranges.size() * nr_tables; } @@ -1761,7 +1761,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ rlogger.info("{}: started with keyspaces={}, source_dc={}, nr_ranges_total={}, ignore_nodes={}", op, ks_erms | boost::adaptors::map_keys, source_dc, nr_ranges_total, ignore_nodes); for (const auto& [keyspace_name, erm] : ks_erms) { size_t nr_ranges_skipped = 0; - if (!db.local().has_keyspace(keyspace_name)) { + if (!db.has_keyspace(keyspace_name)) { rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name); continue; } @@ -1769,7 +1769,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0(); auto& topology = erm->get_token_metadata().get_topology(); std::unordered_map range_sources; - auto nr_tables = get_nr_tables(db.local(), keyspace_name); + auto nr_tables = get_nr_tables(db, keyspace_name); rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}, ignore_nodes={}", op, keyspace_name, source_dc, ranges.size() * nr_tables, ignore_nodes); for (auto it = ranges.begin(); it != ranges.end();) { auto& r = *it; From 59dc2567fdd3cb23a84261723239728110cb6343 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 19:02:27 +0200 Subject: [PATCH 04/16] repair: do_repair_start: check_in_shutdown first Signed-off-by: Benny Halevy --- repair/repair.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repair/repair.cc b/repair/repair.cc index 3bfc1b663c..c29dfe1b3e 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1011,9 +1011,9 @@ static future<> repair_ranges(lw_shared_ptr ri) { // repairs). It is fine to always do this on one CPU, because the function // itself does very little (mainly tell other nodes and CPUs what to do). int repair_service::do_repair_start(sstring keyspace, std::unordered_map options_map) { + get_repair_module().check_in_shutdown(); auto& db = get_db().local(); auto& topology = db.get_token_metadata().get_topology(); - get_repair_module().check_in_shutdown(); repair_options options(options_map); From 9200e6b0058ec96bc84163cccd020a656c1d3b3b Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 19:06:11 +0200 Subject: [PATCH 05/16] repair: do_repair_start: use keyspace erm for get_primary_ranges_within_dc Ensure the erm and topology are in sync. The function is synchronous so this change doesn't fix anything, just cleans up the code. Fix mistake in comment while at it. Signed-off-by: Benny Halevy --- repair/repair.cc | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index c29dfe1b3e..0f0c606929 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -686,15 +686,6 @@ static dht::token_range_vector get_primary_ranges( utils::fb_utilities::get_broadcast_address()); } -// get_primary_ranges_within_dc() is similar to get_primary_ranges(), -// but instead of each range being assigned just one primary owner -// across the entire cluster, here each range is assigned a primary -// owner in each of the clusters. -static dht::token_range_vector get_primary_ranges_within_dc( - replica::database& db, sstring keyspace) { - return db.find_keyspace(keyspace).get_effective_replication_map()->get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address()); -} - void repair_stats::add(const repair_stats& o) { round_nr += o.round_nr; round_nr_fast_path_already_synced += o.round_nr_fast_path_already_synced; @@ -1013,7 +1004,8 @@ static future<> repair_ranges(lw_shared_ptr ri) { int repair_service::do_repair_start(sstring keyspace, std::unordered_map options_map) { get_repair_module().check_in_shutdown(); auto& db = get_db().local(); - auto& topology = db.get_token_metadata().get_topology(); + auto erm = db.find_keyspace(keyspace).get_effective_replication_map(); + auto& topology = erm->get_token_metadata().get_topology(); repair_options options(options_map); @@ -1042,7 +1034,11 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_mapget_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address()); } else if (options.data_centers.size() > 0 || options.hosts.size() > 0) { throw std::runtime_error("You need to run primary range repair on all nodes in the cluster."); } else { From aaf74776c271ea613247375e1eb8efca1bb9a3b3 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 19:14:02 +0200 Subject: [PATCH 06/16] repair: do_repair_start: use keyspace erm for get_primary_ranges Ensure that the primary ranges are in sync with the keyspace erm. The function is synchronous so this change doesn't fix anything, it just cleans up the code. Signed-off-by: Benny Halevy --- repair/repair.cc | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 0f0c606929..6a936d8c2d 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -675,17 +675,6 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab }); } -static dht::token_range_vector get_primary_ranges_for_endpoint( - replica::database& db, sstring keyspace, gms::inet_address ep) { - return db.find_keyspace(keyspace).get_effective_replication_map()->get_primary_ranges(ep); -} - -static dht::token_range_vector get_primary_ranges( - replica::database& db, sstring keyspace) { - return get_primary_ranges_for_endpoint(db, keyspace, - utils::fb_utilities::get_broadcast_address()); -} - void repair_stats::add(const repair_stats& o) { round_nr += o.round_nr; round_nr_fast_path_already_synced += o.round_nr_fast_path_already_synced; @@ -1042,7 +1031,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map 0 || options.hosts.size() > 0) { throw std::runtime_error("You need to run primary range repair on all nodes in the cluster."); } else { - ranges = get_primary_ranges(db, keyspace); + ranges = erm->get_primary_ranges(utils::fb_utilities::get_broadcast_address()); } } else { ranges = db.get_keyspace_local_ranges(keyspace); From c7d753cd44448c637014912504984749fcbc7b93 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 19:09:32 +0200 Subject: [PATCH 07/16] repair: do_repair_start: use keyspace erm to get keyspace local ranges Rather than calling db.get_keyspace_local_ranges that looks up the keyspace and its erm again. We want all the inforamtion derived from the erm to be based on the same source. The function is synchronous so this changes doesn't fix anything, just cleans up the code. Signed-off-by: Benny Halevy --- repair/repair.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/repair/repair.cc b/repair/repair.cc index 6a936d8c2d..5074db436f 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1034,7 +1034,8 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_mapget_primary_ranges(utils::fb_utilities::get_broadcast_address()); } } else { - ranges = db.get_keyspace_local_ranges(keyspace); + // get keyspace local ranges + ranges = erm->get_ranges(utils::fb_utilities::get_broadcast_address()); } if (!options.data_centers.empty() && !options.hosts.empty()) { From 64b0756adcf06571b5da624064c25d448dd7ac9e Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 19:23:05 +0200 Subject: [PATCH 08/16] repair: repair_info: keep effective_replication_map Sampled when repair info is constructed. To be used throughout the repair process. Signed-off-by: Benny Halevy --- repair/repair.cc | 3 ++- repair/repair.hh | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/repair/repair.cc b/repair/repair.cc index 5074db436f..8070fd5442 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -562,6 +562,7 @@ repair_info::repair_info(repair_service& repair, , gossiper(repair.get_gossiper()) , sharder(get_sharder_for_tables(db, keyspace_, table_ids_)) , keyspace(keyspace_) + , erm(db.local().find_keyspace(keyspace).get_effective_replication_map()) , ranges(ranges_) , cfs(get_table_names(db.local(), table_ids_)) , table_ids(std::move(table_ids_)) @@ -570,7 +571,7 @@ repair_info::repair_info(repair_service& repair, , hosts(hosts_) , ignore_nodes(ignore_nodes_) , reason(reason_) - , total_rf(db.local().find_keyspace(keyspace).get_effective_replication_map()->get_replication_factor()) + , total_rf(erm->get_replication_factor()) , nr_ranges_total(ranges.size()) , _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) { if (as != nullptr) { diff --git a/repair/repair.hh b/repair/repair.hh index 059f841e2f..ecf452e051 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -20,6 +20,7 @@ #include #include +#include "locator/abstract_replication_strategy.hh" #include "replica/database_fwd.hh" #include "frozen_mutation.hh" #include "utils/UUID.hh" @@ -180,6 +181,7 @@ public: gms::gossiper& gossiper; const dht::sharder& sharder; sstring keyspace; + locator::effective_replication_map_ptr erm; dht::token_range_vector ranges; std::vector cfs; std::vector table_ids; From 0c56c75cf837d27da38d21b66406199bb558810f Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 19:45:01 +0200 Subject: [PATCH 09/16] repair: require all node operations to be called on shard 0 To simplify using of the effective_replication_map / token_metadata_ptr throught the operation. Signed-off-by: Benny Halevy --- repair/repair.cc | 7 +++++++ repair/row_level.hh | 1 + 2 files changed, 8 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index 8070fd5442..e6c4632a17 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1336,6 +1336,7 @@ future<> repair_service::do_sync_data_using_repair( } future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set bootstrap_tokens) { + assert(this_shard_id() == 0); using inet_address = gms::inet_address; return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens)] () mutable { auto& db = get_db().local(); @@ -1510,6 +1511,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr } future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { + assert(this_shard_id() == 0); using inet_address = gms::inet_address; return seastar::async([this, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable { auto& db = get_db().local(); @@ -1702,10 +1704,12 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } future<> repair_service::decommission_with_repair(locator::token_metadata_ptr tmptr) { + assert(this_shard_id() == 0); return do_decommission_removenode_with_repair(std::move(tmptr), utils::fb_utilities::get_broadcast_address(), {}); } future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { + assert(this_shard_id() == 0); return do_decommission_removenode_with_repair(std::move(tmptr), std::move(leaving_node), std::move(ops)).then([this] { rlogger.debug("Triggering off-strategy compaction for all non-system tables on removenode completion"); seastar::sharded& db = get_db(); @@ -1718,6 +1722,7 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt } future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::list ignore_nodes) { + assert(this_shard_id() == 0); return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable { auto& db = get_db().local(); auto ks_erms = db.get_non_local_strategy_keyspaces_erms(); @@ -1801,6 +1806,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ } future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) { + assert(this_shard_id() == 0); auto op = sstring("rebuild_with_repair"); if (source_dc.empty()) { auto& topology = tmptr->get_topology(); @@ -1816,6 +1822,7 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, } future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set replacing_tokens, std::list ignore_nodes) { + assert(this_shard_id() == 0); auto cloned_tm = co_await tmptr->clone_async(); auto op = sstring("replace_with_repair"); auto& topology = tmptr->get_topology(); diff --git a/repair/row_level.hh b/repair/row_level.hh index 2a29bf0309..efbc5bb425 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -136,6 +136,7 @@ public: int do_repair_start(sstring keyspace, std::unordered_map options_map); // The tokens are the tokens assigned to the bootstrap node. + // all repair-based node operation entry points must be called on shard 0 future<> bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set bootstrap_tokens); future<> decommission_with_repair(locator::token_metadata_ptr tmptr); future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); From d6b2124903a276c236601d2f78e0ddb9f216b1c8 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 19:48:57 +0200 Subject: [PATCH 10/16] repair: sync_data_using_repair: require to run on shard 0 And with that do_sync_data_using_repair can be folded into sync_data_using_repair. This will simplify using the effective_replication_map throughout the operation. Signed-off-by: Benny Halevy --- repair/repair.cc | 11 +---------- repair/row_level.hh | 7 +------ 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index e6c4632a17..21a461b865 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1267,17 +1267,8 @@ future<> repair_service::sync_data_using_repair( if (ranges.empty()) { return make_ready_future<>(); } - return container().invoke_on(0, [keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] (repair_service& local_repair) mutable { - return local_repair.do_sync_data_using_repair(std::move(keyspace), std::move(ranges), std::move(neighbors), reason, ops_info); - }); -} -future<> repair_service::do_sync_data_using_repair( - sstring keyspace, - dht::token_range_vector ranges, - std::unordered_map neighbors, - streaming::stream_reason reason, - shared_ptr ops_info) { + assert(this_shard_id() == 0); auto& db = get_db().local(); repair_uniq_id id = get_repair_module().new_repair_uniq_id(); diff --git a/repair/row_level.hh b/repair/row_level.hh index efbc5bb425..6eb1a4da70 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -146,18 +146,13 @@ private: future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); future<> do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::list ignore_nodes); + // Must be called on shard 0 future<> sync_data_using_repair(sstring keyspace, dht::token_range_vector ranges, std::unordered_map neighbors, streaming::stream_reason reason, shared_ptr ops_info); - future<> do_sync_data_using_repair(sstring keyspace, - dht::token_range_vector ranges, - std::unordered_map neighbors, - streaming::stream_reason reason, - shared_ptr ops_info); - future repair_update_system_table_handler( gms::inet_address from, repair_update_system_table_request req); From 2c677e294b3e7270c12fbfcd8f0a53c86ab2fc69 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 16 Nov 2022 20:07:02 +0200 Subject: [PATCH 11/16] shared_token_metadata: get_lock is const The lock is acquired using an a function that doesn't modify the shared_token_metadata object. Signed-off-by: Benny Halevy --- locator/token_metadata.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index a3cfc43064..1918b9f8f9 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -407,7 +407,7 @@ public: // using the schema_tables merge_lock. // // Must be called on shard 0. - future get_lock() noexcept { + future get_lock() const noexcept { return _lock_func(); } From 4b9269b7e284b38b21e1eef5c7cfb02d4969ccbb Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 16 Nov 2022 20:31:49 +0200 Subject: [PATCH 12/16] effective_replication_map: add global_effective_replication_map Class to hold a coherent view of a keyspace effective replication map on all shards. To be used in a following patch to pass the sharded keyspace e_r_m:s to repair. Signed-off-by: Benny Halevy --- locator/abstract_replication_strategy.cc | 40 ++++++++++++++++++++++++ locator/abstract_replication_strategy.hh | 28 +++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 76e8fbd77d..ac0fd4704c 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -12,6 +12,8 @@ #include #include #include +#include +#include "replica/database.hh" #include "utils/stall_free.hh" namespace locator { @@ -468,6 +470,44 @@ void effective_replication_map_factory::submit_background_work(future<> fut) { }); } +future<> global_effective_replication_map::get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name) { + return sharded_db.invoke_on(0, [this, &sharded_db, keyspace_name] (replica::database& db) -> future<> { + // To ensure we get the same effective_replication_map + // on all shards, acquire the shared_token_metadata lock. + // + // As a sanity check compare the ring_version on each shard + // to the reference version on shard 0. + // + // This invariant is achieved by storage_service::mutate_token_metadata + // and storage_service::replicate_to_all_cores that first acquire the + // shared_token_metadata lock, then prepare a mutated token metadata + // that will have an incremented ring_version, use it to re-calculate + // all e_r_m:s and clone both on all shards. including the ring version, + // all under the lock. + auto lk = co_await db.get_shared_token_metadata().get_lock(); + auto erm = db.find_keyspace(keyspace_name).get_effective_replication_map(); + auto ring_version = erm->get_token_metadata().get_ring_version(); + _erms[0] = make_foreign(std::move(erm)); + co_await coroutine::parallel_for_each(boost::irange(1u, smp::count), [this, &sharded_db, keyspace_name, ring_version] (unsigned shard) -> future<> { + _erms[shard] = co_await sharded_db.invoke_on(shard, [keyspace_name, ring_version] (const replica::database& db) { + const auto& ks = db.find_keyspace(keyspace_name); + auto erm = ks.get_effective_replication_map(); + auto local_ring_version = erm->get_token_metadata().get_ring_version(); + if (local_ring_version != ring_version) { + on_internal_error(rslogger, format("Inconsistent effective_replication_map ring_verion {}, expected {}", local_ring_version, ring_version)); + } + return make_foreign(std::move(erm)); + }); + }); + }); +} + +future make_global_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name) { + global_effective_replication_map ret; + co_await ret.get_keyspace_erms(sharded_db, keyspace_name); + co_return ret; +} + } // namespace locator std::ostream& operator<<(std::ostream& os, locator::replication_strategy_type t) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index d6bfadac65..bd0bd5487b 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -22,6 +22,7 @@ // forward declaration since replica/database.hh includes this file namespace replica { +class database; class keyspace; } @@ -265,6 +266,33 @@ inline mutable_effective_replication_map_ptr make_effective_replication_map(abst // Apply the replication strategy over the current configuration and the given token_metadata. future calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr); +// Class to hold a coherent view of a keyspace +// effective replication map on all shards +class global_effective_replication_map { + std::vector> _erms; + +public: + global_effective_replication_map() : _erms(smp::count) {} + global_effective_replication_map(global_effective_replication_map&&) = default; + global_effective_replication_map& operator=(global_effective_replication_map&&) = default; + + future<> get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name); + + const effective_replication_map& get() const noexcept { + return *_erms[this_shard_id()]; + } + + const effective_replication_map& operator*() const noexcept { + return get(); + } + + const effective_replication_map* operator->() const noexcept { + return &get(); + } +}; + +future make_global_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name); + } // namespace locator std::ostream& operator<<(std::ostream& os, locator::replication_strategy_type); From 58b1c17f5d1c115f82377d9bd78167f1df2ad51a Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 16 Nov 2022 21:24:49 +0200 Subject: [PATCH 13/16] repair: futurize do_repair_start Turn it into a coroutine to prepare for the next path that will co_await make_global_effective_replication_map. Signed-off-by: Benny Halevy --- repair/repair.cc | 6 +++--- repair/row_level.hh | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 21a461b865..1c68fa7aef 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -991,7 +991,7 @@ static future<> repair_ranges(lw_shared_ptr ri) { // CPU is that it allows us to keep some state (like a list of ongoing // repairs). It is fine to always do this on one CPU, because the function // itself does very little (mainly tell other nodes and CPUs what to do). -int repair_service::do_repair_start(sstring keyspace, std::unordered_map options_map) { +future repair_service::do_repair_start(sstring keyspace, std::unordered_map options_map) { get_repair_module().check_in_shutdown(); auto& db = get_db().local(); auto erm = db.find_keyspace(keyspace).get_effective_replication_map(); @@ -1093,7 +1093,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map repair_start(seastar::sharded& repair, diff --git a/repair/row_level.hh b/repair/row_level.hh index 6eb1a4da70..4a84bef4b3 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -133,7 +133,7 @@ public: future<> cleanup_history(tasks::task_id repair_id); future<> load_history(); - int do_repair_start(sstring keyspace, std::unordered_map options_map); + future do_repair_start(sstring keyspace, std::unordered_map options_map); // The tokens are the tokens assigned to the bootstrap node. // all repair-based node operation entry points must be called on shard 0 From c47d36b53d433c1e416fc6515a1735dd7488a07c Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 16 Nov 2022 21:26:19 +0200 Subject: [PATCH 14/16] repair: coroutinize sync_data_using_repair Prepare for the next path that will co_await make_global_effective_replication_map. Signed-off-by: Benny Halevy --- repair/repair.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 1c68fa7aef..e44746ba17 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1265,7 +1265,7 @@ future<> repair_service::sync_data_using_repair( streaming::stream_reason reason, shared_ptr ops_info) { if (ranges.empty()) { - return make_ready_future<>(); + co_return; } assert(this_shard_id() == 0); @@ -1273,7 +1273,7 @@ future<> repair_service::sync_data_using_repair( repair_uniq_id id = get_repair_module().new_repair_uniq_id(); rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace); - return get_repair_module().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable { + co_await get_repair_module().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable { auto cfs = list_column_families(db, keyspace); if (cfs.empty()) { rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace); From b69be61f41dd7007738732da8afd38fa6d8665f4 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 20:04:10 +0200 Subject: [PATCH 15/16] repair: pass effective_replication_map down to repair_info And make sure the token_metadata ring version is same as the reference one (from the erm on shard 0), when starting the repair on each shard. Refs #11993 Signed-off-by: Benny Halevy --- repair/repair.cc | 40 +++++++++++++++++++++++----------------- repair/repair.hh | 1 + repair/row_level.hh | 2 ++ 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index e44746ba17..465fbe2016 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -544,6 +544,7 @@ get_sharder_for_tables(seastar::sharded& db, const sstring& k repair_info::repair_info(repair_service& repair, const sstring& keyspace_, + locator::effective_replication_map_ptr erm_, const dht::token_range_vector& ranges_, std::vector table_ids_, repair_uniq_id id_, @@ -562,7 +563,7 @@ repair_info::repair_info(repair_service& repair, , gossiper(repair.get_gossiper()) , sharder(get_sharder_for_tables(db, keyspace_, table_ids_)) , keyspace(keyspace_) - , erm(db.local().find_keyspace(keyspace).get_effective_replication_map()) + , erm(std::move(erm_)) , ranges(ranges_) , cfs(get_table_names(db.local(), table_ids_)) , table_ids(std::move(table_ids_)) @@ -993,9 +994,11 @@ static future<> repair_ranges(lw_shared_ptr ri) { // itself does very little (mainly tell other nodes and CPUs what to do). future repair_service::do_repair_start(sstring keyspace, std::unordered_map options_map) { get_repair_module().check_in_shutdown(); - auto& db = get_db().local(); - auto erm = db.find_keyspace(keyspace).get_effective_replication_map(); - auto& topology = erm->get_token_metadata().get_topology(); + auto& sharded_db = get_db(); + auto& db = sharded_db.local(); + auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace)); + auto& erm = germs->get(); + auto& topology = erm.get_token_metadata().get_topology(); repair_options options(options_map); @@ -1028,15 +1031,15 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map // but instead of each range being assigned just one primary owner // across the entire cluster, here each range is assigned a primary // owner in each of the DCs. - ranges = erm->get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address()); + ranges = erm.get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address()); } else if (options.data_centers.size() > 0 || options.hosts.size() > 0) { throw std::runtime_error("You need to run primary range repair on all nodes in the cluster."); } else { - ranges = erm->get_primary_ranges(utils::fb_utilities::get_broadcast_address()); + ranges = erm.get_primary_ranges(utils::fb_utilities::get_broadcast_address()); } } else { // get keyspace local ranges - ranges = erm->get_ranges(utils::fb_utilities::get_broadcast_address()); + ranges = erm.get_ranges(utils::fb_utilities::get_broadcast_address()); } if (!options.data_centers.empty() && !options.hosts.empty()) { @@ -1097,7 +1100,7 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map } // Do it in the background. - (void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace), + (void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace), germs = std::move(germs), cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable { auto uuid = id.uuid(); @@ -1193,10 +1196,10 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map for (auto shard : boost::irange(unsigned(0), smp::count)) { auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, - data_centers = options.data_centers, hosts = options.hosts, ignore_nodes] (repair_service& local_repair) mutable { + data_centers = options.data_centers, hosts = options.hosts, ignore_nodes, germs] (repair_service& local_repair) mutable { local_repair.get_metrics().repair_total_ranges_sum += ranges.size(); auto ri = make_lw_shared(local_repair, - std::move(keyspace), std::move(ranges), std::move(table_ids), + std::move(keyspace), germs->get().shared_from_this(), std::move(ranges), std::move(table_ids), id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, nullptr, hints_batchlog_flushed); return repair_ranges(ri); }); @@ -1260,6 +1263,7 @@ future<> repair_service::abort_all() { future<> repair_service::sync_data_using_repair( sstring keyspace, + locator::effective_replication_map_ptr erm, dht::token_range_vector ranges, std::unordered_map neighbors, streaming::stream_reason reason, @@ -1269,11 +1273,13 @@ future<> repair_service::sync_data_using_repair( } assert(this_shard_id() == 0); - auto& db = get_db().local(); + auto& sharded_db = get_db(); + auto& db = sharded_db.local(); + auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace)); repair_uniq_id id = get_repair_module().new_repair_uniq_id(); rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace); - co_await get_repair_module().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable { + co_await get_repair_module().run(id, [this, id, &db, keyspace, germs = std::move(germs), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable { auto cfs = list_column_families(db, keyspace); if (cfs.empty()) { rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace); @@ -1286,14 +1292,14 @@ future<> repair_service::sync_data_using_repair( throw std::runtime_error("aborted by user request"); } for (auto shard : boost::irange(unsigned(0), smp::count)) { - auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info] (repair_service& local_repair) mutable { + auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info, germs] (repair_service& local_repair) mutable { auto data_centers = std::vector(); auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); bool hints_batchlog_flushed = false; abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr; auto ri = make_lw_shared(local_repair, - std::move(keyspace), std::move(ranges), std::move(table_ids), + std::move(keyspace), germs->get().shared_from_this(), std::move(ranges), std::move(table_ids), id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed); ri->neighbors = std::move(neighbors); return repair_ranges(ri); @@ -1494,7 +1500,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr } } auto nr_ranges = desired_ranges.size(); - sync_data_using_repair(keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get(); + sync_data_using_repair(keyspace_name, erm, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get(); rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges); } rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | boost::adaptors::map_keys); @@ -1686,7 +1692,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m ranges.swap(ranges_for_removenode); } auto nr_ranges_synced = ranges.size(); - sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, ops).get(); + sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, ops).get(); rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}", op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped); } @@ -1789,7 +1795,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ }).get(); } auto nr_ranges = ranges.size(); - sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, nullptr).get(); + sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, nullptr).get(); rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges); } rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, ks_erms | boost::adaptors::map_keys, source_dc); diff --git a/repair/repair.hh b/repair/repair.hh index ecf452e051..325292af28 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -204,6 +204,7 @@ public: public: repair_info(repair_service& repair, const sstring& keyspace_, + locator::effective_replication_map_ptr erm_, const dht::token_range_vector& ranges_, std::vector table_ids_, repair_uniq_id id_, diff --git a/repair/row_level.hh b/repair/row_level.hh index 4a84bef4b3..cf744e1d39 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -13,6 +13,7 @@ #include "repair/repair.hh" #include "repair/repair_task.hh" #include "tasks/task_manager.hh" +#include "locator/abstract_replication_strategy.hh" #include #include @@ -148,6 +149,7 @@ private: // Must be called on shard 0 future<> sync_data_using_repair(sstring keyspace, + locator::effective_replication_map_ptr erm, dht::token_range_vector ranges, std::unordered_map neighbors, streaming::stream_reason reason, From 53fdf75cf9b404159d230a1b1a81bed3b221f289 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 15 Nov 2022 20:22:17 +0200 Subject: [PATCH 16/16] repair: pass erm down to get_hosts_participating_in_repair and get_neighbors Now that it is available in repair_info. Fixes #11993 Signed-off-by: Benny Halevy --- repair/repair.cc | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 465fbe2016..3d42e34356 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -199,21 +199,18 @@ void remove_item(Collection& c, T& item) { } // Return all of the neighbors with whom we share the provided range. -static std::vector get_neighbors(replica::database& db, +static std::vector get_neighbors( + const locator::effective_replication_map& erm, const sstring& ksname, query::range range, const std::vector& data_centers, const std::vector& hosts, const std::unordered_set& ignore_nodes) { - - replica::keyspace& ks = db.find_keyspace(ksname); - auto erm = ks.get_effective_replication_map(); - dht::token tok = range.end() ? range.end()->value() : dht::maximum_token(); - auto ret = erm->get_natural_endpoints(tok); + auto ret = erm.get_natural_endpoints(tok); remove_item(ret, utils::fb_utilities::get_broadcast_address()); if (!data_centers.empty()) { - auto dc_endpoints_map = erm->get_token_metadata().get_topology().get_datacenter_endpoints(); + auto dc_endpoints_map = erm.get_token_metadata().get_topology().get_datacenter_endpoints(); std::unordered_set dc_endpoints; for (const sstring& dc : data_centers) { auto it = dc_endpoints_map.find(dc); @@ -276,7 +273,7 @@ static std::vector get_neighbors(replica::database& db, } if (ret.size() < 1) { auto me = utils::fb_utilities::get_broadcast_address(); - auto others = erm->get_natural_endpoints(tok); + auto others = erm.get_natural_endpoints(tok); remove_item(others, me); throw std::runtime_error(fmt::format("Repair requires at least two " "endpoints that are neighbors before it can continue, " @@ -318,7 +315,8 @@ static std::vector get_neighbors(replica::database& db, #endif } -static future> get_hosts_participating_in_repair(replica::database& db, +static future> get_hosts_participating_in_repair( + const locator::effective_replication_map& erm, const sstring& ksname, const dht::token_range_vector& ranges, const std::vector& data_centers, @@ -332,7 +330,7 @@ static future> get_hosts_participating_in_repair(re participating_hosts.insert(utils::fb_utilities::get_broadcast_address()); co_await do_for_each(ranges, [&] (const dht::token_range& range) { - const auto nbs = get_neighbors(db, ksname, range, data_centers, hosts, ignore_nodes); + const auto nbs = get_neighbors(erm, ksname, range, data_centers, hosts, ignore_nodes); for (const auto& nb : nbs) { participating_hosts.insert(nb); } @@ -607,7 +605,7 @@ void repair_info::check_in_abort() { repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range) { return neighbors.empty() ? - repair_neighbors(get_neighbors(db.local(), keyspace, range, data_centers, hosts, ignore_nodes)) : + repair_neighbors(get_neighbors(*erm, keyspace, range, data_centers, hosts, ignore_nodes)) : neighbors[range]; } @@ -1116,7 +1114,7 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map } bool hints_batchlog_flushed = false; - auto participants = get_hosts_participating_in_repair(db, keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get(); + auto participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get(); if (needs_flush_before_repair) { auto waiting_nodes = db.get_token_metadata().get_all_endpoints(); std::erase_if(waiting_nodes, [&] (const auto& addr) {