From a7f7716ecf8e818b242eb0b1ce4f4b2e36546618 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 14 Jun 2021 15:27:49 +0200 Subject: [PATCH] db,view: use chunked_vector for view updates The number of view updates can grow large, especially in corner cases like removing large base partitions. Chunked vector prevents large allocations. --- db/view/view.cc | 12 ++++++------ db/view/view.hh | 4 ++-- table.cc | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 17efd546fe..71d997451a 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -343,7 +343,7 @@ public: , _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) { } - void move_to(std::vector& mutations) && { + void move_to(utils::chunked_vector& mutations) && { std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) { auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second)); return frozen_mutation_and_schema{freeze(mut), std::move(_view)}; @@ -858,7 +858,7 @@ public: , _now(now) { } - future> build(); + future> build(); future<> close() noexcept { return when_all_succeed(_updates.close(), _existings->close()).discard_result(); @@ -899,7 +899,7 @@ private: } }; -future> view_update_builder::build() { +future> view_update_builder::build() { return advance_all().then([this] (auto&& ignored) { assert(_update && _update->is_partition_start()); _key = std::move(std::move(_update)->as_partition_start().key().key()); @@ -914,7 +914,7 @@ future> view_update_builder::build() { }); }); }).then([this] { - std::vector mutations; + utils::chunked_vector mutations; for (auto&& update : _view_updates) { std::move(update).move_to(mutations); } @@ -1033,7 +1033,7 @@ future view_update_builder::on_results() { return stop(); } -future> generate_view_updates( +future> generate_view_updates( const schema_ptr& base, std::vector&& views_to_update, flat_mutation_reader&& updates, @@ -1199,7 +1199,7 @@ static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address // for the writes to complete. future<> mutate_MV( dht::token base_token, - std::vector view_updates, + utils::chunked_vector view_updates, db::view::stats& stats, cf_stats& cf_stats, tracing::trace_state_ptr tr_state, diff --git a/db/view/view.hh b/db/view/view.hh index 8c6554c6c0..bd70d55bce 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -134,7 +134,7 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck, gc_clock::time_point now); -future> generate_view_updates( +future> generate_view_updates( const schema_ptr& base, std::vector&& views_to_update, flat_mutation_reader&& updates, @@ -152,7 +152,7 @@ struct wait_for_all_updates_tag {}; using wait_for_all_updates = bool_class; future<> mutate_MV( dht::token base_token, - std::vector view_updates, + utils::chunked_vector view_updates, db::view::stats& stats, cf_stats& cf_stats, tracing::trace_state_ptr tr_state, diff --git a/table.cc b/table.cc index 398c912e59..6756beaa54 100644 --- a/table.cc +++ b/table.cc @@ -1635,7 +1635,7 @@ std::vector table::affected_views(const schema_ptr& base, const mutati })); } -static size_t memory_usage_of(const std::vector& ms) { +static size_t memory_usage_of(const utils::chunked_vector& ms) { // Overhead of sending a view mutation, in terms of data structures used by the storage_proxy. constexpr size_t base_overhead_bytes = 256; return boost::accumulate(ms | boost::adaptors::transformed([] (const frozen_mutation_and_schema& m) { @@ -1669,7 +1669,7 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base, std::move(views), flat_mutation_reader_from_mutations(std::move(permit), {std::move(m)}), std::move(existings), - now).then([this, base_token = std::move(base_token), tr_state = std::move(tr_state)] (std::vector&& updates) mutable { + now).then([this, base_token = std::move(base_token), tr_state = std::move(tr_state)] (utils::chunked_vector&& updates) mutable { tracing::trace(tr_state, "Generated {} view update mutations", updates.size()); auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates)); return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(tr_state), @@ -1780,7 +1780,7 @@ future<> table::populate_views( std::move(views), std::move(reader), { }, - now).then([base_token = std::move(base_token), this] (std::vector&& updates) mutable { + now).then([base_token = std::move(base_token), this] (utils::chunked_vector&& updates) mutable { size_t update_size = memory_usage_of(updates); size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size); return seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for).then(