db,view: use chunked_vector for view updates

The number of view updates can grow large, especially in corner
cases like removing large base partitions. Chunked vector
prevents large allocations.
This commit is contained in:
Piotr Sarna
2021-06-14 15:27:49 +02:00
parent 6bdf8c4c46
commit a7f7716ecf
3 changed files with 11 additions and 11 deletions

View File

@@ -343,7 +343,7 @@ public:
, _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) {
}
void move_to(std::vector<frozen_mutation_and_schema>& mutations) && {
void move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations) && {
std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) {
auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second));
return frozen_mutation_and_schema{freeze(mut), std::move(_view)};
@@ -858,7 +858,7 @@ public:
, _now(now) {
}
future<std::vector<frozen_mutation_and_schema>> build();
future<utils::chunked_vector<frozen_mutation_and_schema>> build();
future<> close() noexcept {
return when_all_succeed(_updates.close(), _existings->close()).discard_result();
@@ -899,7 +899,7 @@ private:
}
};
future<std::vector<frozen_mutation_and_schema>> view_update_builder::build() {
future<utils::chunked_vector<frozen_mutation_and_schema>> view_update_builder::build() {
return advance_all().then([this] (auto&& ignored) {
assert(_update && _update->is_partition_start());
_key = std::move(std::move(_update)->as_partition_start().key().key());
@@ -914,7 +914,7 @@ future<std::vector<frozen_mutation_and_schema>> view_update_builder::build() {
});
});
}).then([this] {
std::vector<frozen_mutation_and_schema> mutations;
utils::chunked_vector<frozen_mutation_and_schema> mutations;
for (auto&& update : _view_updates) {
std::move(update).move_to(mutations);
}
@@ -1033,7 +1033,7 @@ future<stop_iteration> view_update_builder::on_results() {
return stop();
}
future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
future<utils::chunked_vector<frozen_mutation_and_schema>> generate_view_updates(
const schema_ptr& base,
std::vector<view_and_base>&& views_to_update,
flat_mutation_reader&& updates,
@@ -1199,7 +1199,7 @@ static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address
// for the writes to complete.
future<> mutate_MV(
dht::token base_token,
std::vector<frozen_mutation_and_schema> view_updates,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,

View File

@@ -134,7 +134,7 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck, gc_clock::time_point now);
future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
future<utils::chunked_vector<frozen_mutation_and_schema>> generate_view_updates(
const schema_ptr& base,
std::vector<view_and_base>&& views_to_update,
flat_mutation_reader&& updates,
@@ -152,7 +152,7 @@ struct wait_for_all_updates_tag {};
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
future<> mutate_MV(
dht::token base_token,
std::vector<frozen_mutation_and_schema> view_updates,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,

View File

@@ -1635,7 +1635,7 @@ std::vector<view_ptr> table::affected_views(const schema_ptr& base, const mutati
}));
}
static size_t memory_usage_of(const std::vector<frozen_mutation_and_schema>& ms) {
static size_t memory_usage_of(const utils::chunked_vector<frozen_mutation_and_schema>& ms) {
// Overhead of sending a view mutation, in terms of data structures used by the storage_proxy.
constexpr size_t base_overhead_bytes = 256;
return boost::accumulate(ms | boost::adaptors::transformed([] (const frozen_mutation_and_schema& m) {
@@ -1669,7 +1669,7 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
std::move(views),
flat_mutation_reader_from_mutations(std::move(permit), {std::move(m)}),
std::move(existings),
now).then([this, base_token = std::move(base_token), tr_state = std::move(tr_state)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
now).then([this, base_token = std::move(base_token), tr_state = std::move(tr_state)] (utils::chunked_vector<frozen_mutation_and_schema>&& updates) mutable {
tracing::trace(tr_state, "Generated {} view update mutations", updates.size());
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(tr_state),
@@ -1780,7 +1780,7 @@ future<> table::populate_views(
std::move(views),
std::move(reader),
{ },
now).then([base_token = std::move(base_token), this] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
now).then([base_token = std::move(base_token), this] (utils::chunked_vector<frozen_mutation_and_schema>&& updates) mutable {
size_t update_size = memory_usage_of(updates);
size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size);
return seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for).then(