table: Carry v.u.generator down to do_push_view_replica_updates()

The latter is the place where mutate_MV is called and it needs the
view updates generator nearby.

The call-stack starts at database::do_apply(). As was described in one
of the previous patches, applying mutations that need updating views
happen late enough, so if the view updates generator is not plugged to
the database yet, it's OK to bail out with exception. If it's plugged,
it's carried over thus keeping the generator instance alive and waited
for on its stop.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-03-28 11:36:38 +03:00
parent ddc8c8b019
commit a95d3446fd
4 changed files with 21 additions and 16 deletions

View File

@@ -2606,7 +2606,7 @@ view_updating_consumer::view_updating_consumer(view_update_generator& gen, schem
: view_updating_consumer(std::move(schema), std::move(permit), as, staging_reader_handle,
[table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables), gen = gen.shared_from_this()] (mutation m) mutable {
auto s = m.schema();
return table->stream_view_replica_updates(std::move(s), std::move(m), db::no_timeout, excluded_sstables);
return table->stream_view_replica_updates(gen, std::move(s), std::move(m), db::no_timeout, excluded_sstables);
})
{ }

View File

@@ -1940,7 +1940,11 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra
row_locker::lock_holder lock;
if (!cf.views().empty()) {
auto lock_f = co_await coroutine::as_future(cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore()));
if (!_view_update_generator) {
co_await coroutine::return_exception(std::runtime_error("view update generator not plugged to push updates"));
}
auto lock_f = co_await coroutine::as_future(cf.push_view_replica_updates(_view_update_generator, s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore()));
if (lock_f.failed()) {
auto ex = lock_f.get_exception();
if (is_timeout_exception(ex)) {

View File

@@ -1026,12 +1026,12 @@ public:
void remove_view(view_ptr v);
void clear_views();
const std::vector<view_ptr>& views() const;
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout,
future<row_locker::lock_holder> push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout,
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const;
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
future<row_locker::lock_holder> push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const;
future<row_locker::lock_holder>
stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
stream_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
std::vector<sstables::shared_sstable>& excluded_sstables) const;
void add_coordinator_read_latency(utils::estimated_histogram::duration latency);
@@ -1067,10 +1067,10 @@ public:
size_t estimate_read_memory_cost() const;
private:
future<row_locker::lock_holder> do_push_view_replica_updates(schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
future<row_locker::lock_holder> do_push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
future<> generate_and_propagate_view_updates(const schema_ptr& base,
future<> generate_and_propagate_view_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base,
reader_permit permit,
std::vector<db::view::view_and_base>&& views,
mutation&& m,

View File

@@ -1967,7 +1967,7 @@ static size_t memory_usage_of(const utils::chunked_vector<frozen_mutation_and_sc
* but has simply some updated values.
* @return a future resolving to the mutations to apply to the views, which can be empty.
*/
future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
future<> table::generate_and_propagate_view_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base,
reader_permit permit,
std::vector<db::view::view_and_base>&& views,
mutation&& m,
@@ -2572,14 +2572,14 @@ future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable>
* Given an update for the base table, calculates the set of potentially affected views,
* generates the relevant updates, and sends them to the paired view replicas.
*/
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm,
future<row_locker::lock_holder> table::push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, const frozen_mutation& fm,
db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const {
//FIXME: Avoid unfreezing here.
auto m = fm.unfreeze(s);
return push_view_replica_updates(s, std::move(m), timeout, std::move(tr_state), sem);
return push_view_replica_updates(std::move(gen), s, std::move(m), timeout, std::move(tr_state), sem);
}
future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
future<row_locker::lock_holder> table::do_push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const {
if (!_config.view_update_concurrency_semaphore->current()) {
// We don't have resources to generate view updates for this write. If we reached this point, we failed to
@@ -2605,7 +2605,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s
const bool need_static = db::view::needs_static_row(m.partition(), views);
if (!need_regular && !need_static) {
tracing::trace(tr_state, "View updates do not require read-before-write");
co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now);
co_await generate_and_propagate_view_updates(gen, base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now);
// In this case we are not doing a read-before-write, just a
// write, so no lock is needed.
co_return row_locker::lock_holder();
@@ -2640,7 +2640,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s
auto pk = dht::partition_range::make_singular(m.decorated_key());
auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout, tr_state);
auto reader = source.make_reader_v2(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now);
co_await this->generate_and_propagate_view_updates(gen, base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now);
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
// return the local partition/row lock we have taken so it
// remains locked until the caller is done modifying this
@@ -2649,16 +2649,17 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s
}
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
future<row_locker::lock_holder> table::push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const {
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source(),
return do_push_view_replica_updates(std::move(gen), s, std::move(m), timeout, as_mutation_source(),
std::move(tr_state), sem, service::get_local_sstable_query_read_priority(), {});
}
future<row_locker::lock_holder>
table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
table::stream_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
std::vector<sstables::shared_sstable>& excluded_sstables) const {
return do_push_view_replica_updates(
std::move(gen),
s,
std::move(m),
timeout,