From bf0777e97a0abb3bebddf84c82de774ebfd9255d Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 22 Jun 2021 13:38:30 +0200 Subject: [PATCH] view: generate view updates in smaller parts In order to avoid large allocations and too large mutations generated from large view updates, granularity of the process is broken down from per-partition to smaller chunks. The view update builder now produces partial updates, no more than 100 view rows at a time. --- db/view/view.cc | 71 +++++++++++++++++++++++++++++++------------------ db/view/view.hh | 11 +++++--- table.cc | 59 ++++++++++++++++++++++++++++------------ 3 files changed, 95 insertions(+), 46 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 084bebd5b3..eedc3be8ea 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -67,7 +67,6 @@ #include "db/system_keyspace_view_types.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" -#include "frozen_mutation.hh" #include "gms/inet_address.hh" #include "keys.hh" #include "locator/network_topology_strategy.hh" @@ -328,11 +327,13 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit }); } -void view_updates::move_to(utils::chunked_vector& mutations) && { +void view_updates::move_to(utils::chunked_vector& mutations) { std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) { auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second)); - return frozen_mutation_and_schema{freeze(mut), std::move(_view)}; + return frozen_mutation_and_schema{freeze(mut), _view}; }); + _updates.clear(); + _op_count = 0; } mutation_partition& view_updates::partition_for(partition_key&& key) { @@ -343,6 +344,10 @@ mutation_partition& view_updates::partition_for(partition_key&& key) { return _updates.emplace(std::move(key), mutation_partition(_view)).first->second; } +size_t view_updates::op_count() const { + return _op_count++;; +} + row_marker view_updates::compute_row_marker(const clustering_row& base_row) const { /* * We need to compute both the timestamp and expiration. @@ -584,6 +589,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_ r.apply(marker); r.apply(update.tomb()); add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r.cells()); + _op_count++; } /** @@ -619,6 +625,7 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus add_cells_to_view(*_base, *_view, std::move(diff), r.cells()); } r.apply(update.tomb()); + _op_count++; } /* @@ -723,6 +730,7 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_ auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); add_cells_to_view(*_base, *_view, std::move(diff), r.cells()); + _op_count++; } void view_updates::generate_update( @@ -836,24 +844,33 @@ future view_update_builder::stop() const { return make_ready_future(stop_iteration::yes); } -future> view_update_builder::build() { - return advance_all().then([this] (auto&& ignored) { - assert(_update && _update->is_partition_start()); - _key = std::move(std::move(_update)->as_partition_start().key().key()); - _update_tombstone_tracker.set_partition_tombstone(_update->as_partition_start().partition_tombstone()); +future> view_update_builder::build_some() { + return advance_all().then([this] (stop_iteration ignored) { + bool do_advance_updates = false; + bool do_advance_existings = false; + if (_update && _update->is_partition_start()) { + _key = std::move(std::move(_update)->as_partition_start().key().key()); + _update_tombstone_tracker.set_partition_tombstone(_update->as_partition_start().partition_tombstone()); + do_advance_updates = true; + } if (_existing && _existing->is_partition_start()) { _existing_tombstone_tracker.set_partition_tombstone(_existing->as_partition_start().partition_tombstone()); + do_advance_existings = true; } - }).then([this] { - return advance_all().then([this] (auto&& ignored) { - return repeat([this] { - return this->on_results(); - }); + if (do_advance_updates) { + return do_advance_existings ? advance_all() : advance_updates(); + } else if (do_advance_existings) { + return advance_existings(); + } + return make_ready_future(stop_iteration::no); + }).then([this] (stop_iteration ignored) { + return repeat([this] { + return this->on_results(); }); }).then([this] { utils::chunked_vector mutations; - for (auto&& update : _view_updates) { - std::move(update).move_to(mutations); + for (auto& update : _view_updates) { + update.move_to(mutations); } return mutations; }); @@ -887,6 +904,12 @@ static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clust } future view_update_builder::on_results() { + constexpr size_t max_rows_for_view_updates = 100; + size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) { + return acc + vu.op_count(); + }); + const bool stop_updates = rows_for_view_updates >= max_rows_for_view_updates; + if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) { auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position()); if (cmp < 0) { @@ -902,7 +925,7 @@ future view_update_builder::on_results() { : std::nullopt; generate_update(std::move(update), std::move(existing)); } - return advance_updates(); + return stop_updates ? stop() : advance_updates(); } if (cmp > 0) { // We have something existing but no update (which will happen either because it's a range tombstone marker in @@ -921,7 +944,7 @@ future view_update_builder::on_results() { generate_update(std::move(update), { std::move(existing) }); } } - return advance_existings(); + return stop_updates ? stop () : advance_existings(); } // We're updating a row that had pre-existing data if (_update->is_range_tombstone()) { @@ -938,7 +961,7 @@ future view_update_builder::on_results() { }); generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() }); } - return advance_all(); + return stop_updates ? stop() : advance_all(); } auto tombstone = _update_tombstone_tracker.current_tombstone(); @@ -949,7 +972,7 @@ future view_update_builder::on_results() { auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); generate_update(std::move(update), { std::move(existing) }); } - return advance_existings(); + return stop_updates ? stop() : advance_existings(); } // If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it @@ -964,13 +987,13 @@ future view_update_builder::on_results() { : std::nullopt; generate_update(std::move(*_update).as_clustering_row(), std::move(existing)); } - return advance_updates(); + return stop_updates ? stop() : advance_updates(); } return stop(); } -future> generate_view_updates( +future make_view_update_builder( const schema_ptr& base, std::vector&& views_to_update, flat_mutation_reader&& updates, @@ -984,11 +1007,7 @@ future> generate_view_updates( } return view_updates(std::move(v)); })); - auto builder = std::make_unique(base, std::move(vs), std::move(updates), std::move(existings), now); - auto f = builder->build(); - return f.finally([builder = std::move(builder)] { - return builder->close(); - }); + return make_ready_future(view_update_builder(base, std::move(vs), std::move(updates), std::move(existings), now)); } future calculate_affected_clustering_ranges(const schema& base, diff --git a/db/view/view.hh b/db/view/view.hh index eedc2adf4c..fc096da30e 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -26,6 +26,7 @@ #include "query-request.hh" #include "schema_fwd.hh" #include "flat_mutation_reader.hh" +#include "frozen_mutation.hh" class frozen_mutation_and_schema; struct cf_stats; @@ -140,6 +141,7 @@ class view_updates final { schema_ptr _base; base_info_ptr _base_info; std::unordered_map _updates; + mutable size_t _op_count = 0; public: explicit view_updates(view_and_base vab) : _view(std::move(vab.view)) @@ -149,9 +151,12 @@ public: , _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) { } - void move_to(utils::chunked_vector& mutations) &&; + void move_to(utils::chunked_vector& mutations); void generate_update(const partition_key& base_key, const clustering_row& update, const std::optional& existing, gc_clock::time_point now); + + size_t op_count() const; + private: mutation_partition& partition_for(partition_key&& key); row_marker compute_row_marker(const clustering_row& base_row) const; @@ -195,7 +200,7 @@ public: } view_update_builder(view_update_builder&& other) noexcept = default; - future> build(); + future> build_some(); future<> close() noexcept; @@ -210,7 +215,7 @@ private: future stop() const; }; -future> generate_view_updates( +future make_view_update_builder( const schema_ptr& base, std::vector&& views_to_update, flat_mutation_reader&& updates, diff --git a/table.cc b/table.cc index 0ad62b46a4..cb04f38efe 100644 --- a/table.cc +++ b/table.cc @@ -1666,20 +1666,28 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base, tracing::trace_state_ptr tr_state, gc_clock::time_point now) const { auto base_token = m.token(); - auto updates = co_await db::view::generate_view_updates( + db::view::view_update_builder builder = co_await db::view::make_view_update_builder( base, std::move(views), flat_mutation_reader_from_mutations(std::move(permit), {std::move(m)}), std::move(existings), now); - tracing::trace(tr_state, "Generated {} view update mutations", updates.size()); - auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates)); - try { - co_await db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(tr_state), - std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no); - } catch (...) { - // ignore + + while (true) { + try { + auto updates = co_await builder.build_some(); + if (updates.empty()) { + break; + } + tracing::trace(tr_state, "Generated {} view update mutations", updates.size()); + auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates)); + co_await db::view::mutate_MV(base_token, std::move(updates), _view_stats, *_config.cf_stats, tr_state, + std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no).handle_exception([] (auto ignored) { }); + } catch (...) { + // ignore + } } + co_await builder.close(); } /** @@ -1779,20 +1787,37 @@ future<> table::populate_views( dht::token base_token, flat_mutation_reader&& reader, gc_clock::time_point now) { - auto& schema = reader.schema(); - auto updates = co_await db::view::generate_view_updates( + auto schema = reader.schema(); + db::view::view_update_builder builder = co_await db::view::make_view_update_builder( schema, std::move(views), std::move(reader), { }, now); - size_t update_size = memory_usage_of(updates); - size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size); - auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for); - auto units_to_consume = update_size - units_to_wait_for; - units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, units_to_consume)); - co_await db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, - tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes); + + std::exception_ptr err; + while (true) { + try { + auto updates = co_await builder.build_some(); + if (updates.empty()) { + break; + } + size_t update_size = memory_usage_of(updates); + size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size); + auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for); + units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, update_size - units_to_wait_for)); + co_await db::view::mutate_MV(base_token, std::move(updates), _view_stats, *_config.cf_stats, + tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes); + } catch (...) { + if (!err) { + err = std::current_exception(); + } + } + } + co_await builder.close(); + if (err) { + std::rethrow_exception(err); + } } void table::set_hit_rate(gms::inet_address addr, cache_temperature rate) {