view: generate view updates in smaller parts

In order to avoid large allocations and too large mutations
generated from large view updates, granularity of the process
is broken down from per-partition to smaller chunks.
The view update builder now produces partial updates, no more
than 100 view rows at a time.
This commit is contained in:
Piotr Sarna
2021-06-22 13:38:30 +02:00
parent 1000d52cfa
commit bf0777e97a
3 changed files with 95 additions and 46 deletions

View File

@@ -67,7 +67,6 @@
#include "db/system_keyspace_view_types.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "frozen_mutation.hh"
#include "gms/inet_address.hh"
#include "keys.hh"
#include "locator/network_topology_strategy.hh"
@@ -328,11 +327,13 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit
});
}
void view_updates::move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations) && {
void view_updates::move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations) {
std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) {
auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second));
return frozen_mutation_and_schema{freeze(mut), std::move(_view)};
return frozen_mutation_and_schema{freeze(mut), _view};
});
_updates.clear();
_op_count = 0;
}
mutation_partition& view_updates::partition_for(partition_key&& key) {
@@ -343,6 +344,10 @@ mutation_partition& view_updates::partition_for(partition_key&& key) {
return _updates.emplace(std::move(key), mutation_partition(_view)).first->second;
}
size_t view_updates::op_count() const {
return _op_count++;;
}
row_marker view_updates::compute_row_marker(const clustering_row& base_row) const {
/*
* We need to compute both the timestamp and expiration.
@@ -584,6 +589,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
r.apply(marker);
r.apply(update.tomb());
add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r.cells());
_op_count++;
}
/**
@@ -619,6 +625,7 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
add_cells_to_view(*_base, *_view, std::move(diff), r.cells());
}
r.apply(update.tomb());
_op_count++;
}
/*
@@ -723,6 +730,7 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
add_cells_to_view(*_base, *_view, std::move(diff), r.cells());
_op_count++;
}
void view_updates::generate_update(
@@ -836,24 +844,33 @@ future<stop_iteration> view_update_builder::stop() const {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
future<utils::chunked_vector<frozen_mutation_and_schema>> view_update_builder::build() {
return advance_all().then([this] (auto&& ignored) {
assert(_update && _update->is_partition_start());
_key = std::move(std::move(_update)->as_partition_start().key().key());
_update_tombstone_tracker.set_partition_tombstone(_update->as_partition_start().partition_tombstone());
future<utils::chunked_vector<frozen_mutation_and_schema>> view_update_builder::build_some() {
return advance_all().then([this] (stop_iteration ignored) {
bool do_advance_updates = false;
bool do_advance_existings = false;
if (_update && _update->is_partition_start()) {
_key = std::move(std::move(_update)->as_partition_start().key().key());
_update_tombstone_tracker.set_partition_tombstone(_update->as_partition_start().partition_tombstone());
do_advance_updates = true;
}
if (_existing && _existing->is_partition_start()) {
_existing_tombstone_tracker.set_partition_tombstone(_existing->as_partition_start().partition_tombstone());
do_advance_existings = true;
}
}).then([this] {
return advance_all().then([this] (auto&& ignored) {
return repeat([this] {
return this->on_results();
});
if (do_advance_updates) {
return do_advance_existings ? advance_all() : advance_updates();
} else if (do_advance_existings) {
return advance_existings();
}
return make_ready_future<stop_iteration>(stop_iteration::no);
}).then([this] (stop_iteration ignored) {
return repeat([this] {
return this->on_results();
});
}).then([this] {
utils::chunked_vector<frozen_mutation_and_schema> mutations;
for (auto&& update : _view_updates) {
std::move(update).move_to(mutations);
for (auto& update : _view_updates) {
update.move_to(mutations);
}
return mutations;
});
@@ -887,6 +904,12 @@ static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clust
}
future<stop_iteration> view_update_builder::on_results() {
constexpr size_t max_rows_for_view_updates = 100;
size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) {
return acc + vu.op_count();
});
const bool stop_updates = rows_for_view_updates >= max_rows_for_view_updates;
if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) {
auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position());
if (cmp < 0) {
@@ -902,7 +925,7 @@ future<stop_iteration> view_update_builder::on_results() {
: std::nullopt;
generate_update(std::move(update), std::move(existing));
}
return advance_updates();
return stop_updates ? stop() : advance_updates();
}
if (cmp > 0) {
// We have something existing but no update (which will happen either because it's a range tombstone marker in
@@ -921,7 +944,7 @@ future<stop_iteration> view_update_builder::on_results() {
generate_update(std::move(update), { std::move(existing) });
}
}
return advance_existings();
return stop_updates ? stop () : advance_existings();
}
// We're updating a row that had pre-existing data
if (_update->is_range_tombstone()) {
@@ -938,7 +961,7 @@ future<stop_iteration> view_update_builder::on_results() {
});
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
}
return advance_all();
return stop_updates ? stop() : advance_all();
}
auto tombstone = _update_tombstone_tracker.current_tombstone();
@@ -949,7 +972,7 @@ future<stop_iteration> view_update_builder::on_results() {
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
generate_update(std::move(update), { std::move(existing) });
}
return advance_existings();
return stop_updates ? stop() : advance_existings();
}
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
@@ -964,13 +987,13 @@ future<stop_iteration> view_update_builder::on_results() {
: std::nullopt;
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
}
return advance_updates();
return stop_updates ? stop() : advance_updates();
}
return stop();
}
future<utils::chunked_vector<frozen_mutation_and_schema>> generate_view_updates(
future<view_update_builder> make_view_update_builder(
const schema_ptr& base,
std::vector<view_and_base>&& views_to_update,
flat_mutation_reader&& updates,
@@ -984,11 +1007,7 @@ future<utils::chunked_vector<frozen_mutation_and_schema>> generate_view_updates(
}
return view_updates(std::move(v));
}));
auto builder = std::make_unique<view_update_builder>(base, std::move(vs), std::move(updates), std::move(existings), now);
auto f = builder->build();
return f.finally([builder = std::move(builder)] {
return builder->close();
});
return make_ready_future<view_update_builder>(view_update_builder(base, std::move(vs), std::move(updates), std::move(existings), now));
}
future<query::clustering_row_ranges> calculate_affected_clustering_ranges(const schema& base,

View File

@@ -26,6 +26,7 @@
#include "query-request.hh"
#include "schema_fwd.hh"
#include "flat_mutation_reader.hh"
#include "frozen_mutation.hh"
class frozen_mutation_and_schema;
struct cf_stats;
@@ -140,6 +141,7 @@ class view_updates final {
schema_ptr _base;
base_info_ptr _base_info;
std::unordered_map<partition_key, mutation_partition, partition_key::hashing, partition_key::equality> _updates;
mutable size_t _op_count = 0;
public:
explicit view_updates(view_and_base vab)
: _view(std::move(vab.view))
@@ -149,9 +151,12 @@ public:
, _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) {
}
void move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations) &&;
void move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations);
void generate_update(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
size_t op_count() const;
private:
mutation_partition& partition_for(partition_key&& key);
row_marker compute_row_marker(const clustering_row& base_row) const;
@@ -195,7 +200,7 @@ public:
}
view_update_builder(view_update_builder&& other) noexcept = default;
future<utils::chunked_vector<frozen_mutation_and_schema>> build();
future<utils::chunked_vector<frozen_mutation_and_schema>> build_some();
future<> close() noexcept;
@@ -210,7 +215,7 @@ private:
future<stop_iteration> stop() const;
};
future<utils::chunked_vector<frozen_mutation_and_schema>> generate_view_updates(
future<view_update_builder> make_view_update_builder(
const schema_ptr& base,
std::vector<view_and_base>&& views_to_update,
flat_mutation_reader&& updates,

View File

@@ -1666,20 +1666,28 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
tracing::trace_state_ptr tr_state,
gc_clock::time_point now) const {
auto base_token = m.token();
auto updates = co_await db::view::generate_view_updates(
db::view::view_update_builder builder = co_await db::view::make_view_update_builder(
base,
std::move(views),
flat_mutation_reader_from_mutations(std::move(permit), {std::move(m)}),
std::move(existings),
now);
tracing::trace(tr_state, "Generated {} view update mutations", updates.size());
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
try {
co_await db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(tr_state),
std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no);
} catch (...) {
// ignore
while (true) {
try {
auto updates = co_await builder.build_some();
if (updates.empty()) {
break;
}
tracing::trace(tr_state, "Generated {} view update mutations", updates.size());
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
co_await db::view::mutate_MV(base_token, std::move(updates), _view_stats, *_config.cf_stats, tr_state,
std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no).handle_exception([] (auto ignored) { });
} catch (...) {
// ignore
}
}
co_await builder.close();
}
/**
@@ -1779,20 +1787,37 @@ future<> table::populate_views(
dht::token base_token,
flat_mutation_reader&& reader,
gc_clock::time_point now) {
auto& schema = reader.schema();
auto updates = co_await db::view::generate_view_updates(
auto schema = reader.schema();
db::view::view_update_builder builder = co_await db::view::make_view_update_builder(
schema,
std::move(views),
std::move(reader),
{ },
now);
size_t update_size = memory_usage_of(updates);
size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size);
auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for);
auto units_to_consume = update_size - units_to_wait_for;
units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, units_to_consume));
co_await db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats,
tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
std::exception_ptr err;
while (true) {
try {
auto updates = co_await builder.build_some();
if (updates.empty()) {
break;
}
size_t update_size = memory_usage_of(updates);
size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size);
auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for);
units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, update_size - units_to_wait_for));
co_await db::view::mutate_MV(base_token, std::move(updates), _view_stats, *_config.cf_stats,
tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
} catch (...) {
if (!err) {
err = std::current_exception();
}
}
}
co_await builder.close();
if (err) {
std::rethrow_exception(err);
}
}
void table::set_hit_rate(gms::inet_address addr, cache_temperature rate) {