view: mutate_MV: lookup base and view erms synchronously
Although at the moment storage_service::replicate_to_all_cores may yield between updating the base and view tables with a new effective_replication_map, scylladb/scylladb#21781 was submitted to change that so that they are updated atomically together. This change prepares for the above change, and is harmless at the moment. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -1864,12 +1864,23 @@ future<> view_update_generator::mutate_MV(
|
||||
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
|
||||
// on a view, like we have the synchronous_updates_flag.
|
||||
bool use_legacy_self_pairing = !ks.uses_tablets();
|
||||
auto base_ermp = base->table().get_effective_replication_map();
|
||||
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
|
||||
auto get_erm = [&] (table_id id) {
|
||||
auto it = erms.find(id);
|
||||
if (it == erms.end()) {
|
||||
it = erms.emplace(id, _db.find_column_family(id).get_effective_replication_map()).first;
|
||||
}
|
||||
return it->second;
|
||||
};
|
||||
auto base_ermp = get_erm(base->id());
|
||||
for (const auto& mut : view_updates) {
|
||||
(void)get_erm(mut.s->id());
|
||||
}
|
||||
static constexpr size_t max_concurrent_updates = 128;
|
||||
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
|
||||
co_await max_concurrent_for_each(view_updates, max_concurrent_updates, [&] (frozen_mutation_and_schema mut) mutable -> future<> {
|
||||
auto view_token = dht::get_token(*mut.s, mut.fm.key());
|
||||
auto view_ermp = mut.s->table().get_effective_replication_map();
|
||||
auto view_ermp = erms.at(mut.s->id());
|
||||
auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token, use_legacy_self_pairing, cf_stats);
|
||||
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
|
||||
auto sem_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_updates.split(memory_usage_of(mut)));
|
||||
|
||||
Reference in New Issue
Block a user