db,view: limit the number of simultaneous view update futures
Previously the view update code generated a continuation for each view update and stored them all in a vector. In certain cases the number of updates can grow really large (to millions and beyond), so it's better to only store a limited amount of these futures at a time.
This commit is contained in:
@@ -1207,9 +1207,9 @@ future<> mutate_MV(
|
||||
service::allow_hints allow_hints,
|
||||
wait_for_all_updates wait_for_all)
|
||||
{
|
||||
auto fs = std::make_unique<std::vector<future<>>>();
|
||||
fs->reserve(view_updates.size());
|
||||
for (frozen_mutation_and_schema& mut : view_updates) {
|
||||
static constexpr size_t max_concurrent_updates = 128;
|
||||
co_await max_concurrent_for_each(view_updates, max_concurrent_updates,
|
||||
[base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> {
|
||||
auto view_token = dht::get_token(*mut.s, mut.fm.key());
|
||||
auto& keyspace_name = mut.s->ks_name();
|
||||
auto target_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token);
|
||||
@@ -1243,6 +1243,7 @@ future<> mutate_MV(
|
||||
}
|
||||
}
|
||||
|
||||
future<> local_view_update = make_ready_future<>();
|
||||
if (target_endpoint && *target_endpoint == my_address) {
|
||||
++stats.view_updates_pushed_local;
|
||||
++cf_stats.total_view_updates_pushed_local;
|
||||
@@ -1250,7 +1251,7 @@ future<> mutate_MV(
|
||||
auto mut_ptr = remote_endpoints.empty() ? std::make_unique<frozen_mutation>(std::move(mut.fm)) : std::make_unique<frozen_mutation>(mut.fm);
|
||||
tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}",
|
||||
mut.s->ks_name(), mut.s->cf_name(), base_token, view_token);
|
||||
future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, std::move(tr_state), db::commitlog::force_sync::no).then_wrapped(
|
||||
local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, std::move(tr_state), db::commitlog::force_sync::no).then_wrapped(
|
||||
[s = mut.s, &stats, &cf_stats, tr_state, base_token, view_token, my_address, mut_ptr = std::move(mut_ptr),
|
||||
units = sem_units.split(sem_units.count())] (future<>&& f) {
|
||||
--stats.writes;
|
||||
@@ -1266,7 +1267,6 @@ future<> mutate_MV(
|
||||
tracing::trace(tr_state, "Successfully applied local view update for {}", my_address);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
fs->push_back(std::move(local_view_update));
|
||||
// We just applied a local update to the target endpoint, so it should now be removed
|
||||
// from the possible targets
|
||||
target_endpoint.reset();
|
||||
@@ -1279,6 +1279,7 @@ future<> mutate_MV(
|
||||
remote_endpoints.pop_back();
|
||||
}
|
||||
|
||||
future<> remote_view_update = make_ready_future<>();
|
||||
// If target_endpoint is engaged by this point, then either the update
|
||||
// is not local, or the local update was already applied but we still
|
||||
// have pending endpoints to send to.
|
||||
@@ -1305,16 +1306,15 @@ future<> mutate_MV(
|
||||
return make_ready_future<>();
|
||||
});
|
||||
if (wait_for_all) {
|
||||
fs->push_back(std::move(view_update));
|
||||
remote_view_update = std::move(view_update);
|
||||
} else {
|
||||
// The update is sent to background in order to preserve availability,
|
||||
// its parallelism is limited by view_update_concurrency_semaphore
|
||||
(void)view_update;
|
||||
}
|
||||
}
|
||||
}
|
||||
auto f = seastar::when_all_succeed(fs->begin(), fs->end());
|
||||
return f.finally([fs = std::move(fs)] { });
|
||||
return when_all_succeed(std::move(local_view_update), std::move(remote_view_update)).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks, service::migration_notifier& mn)
|
||||
|
||||
Reference in New Issue
Block a user