view: Capture v.u.generator on view_updating_consumer lambda

The consumer is in fact pushing the updates and _that_'s the component
that would really need the view_update_generator at hand. The consumer
is created from the generator itself so no troubles getting the pointer.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-03-28 11:21:13 +03:00
parent d5557ef0e2
commit 2652dffd89
3 changed files with 7 additions and 4 deletions

View File

@@ -35,6 +35,7 @@
#include "db/view/view.hh"
#include "db/view/view_builder.hh"
#include "db/view/view_updating_consumer.hh"
#include "db/view/view_update_generator.hh"
#include "db/system_keyspace_view_types.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
@@ -2597,10 +2598,10 @@ void view_updating_consumer::maybe_flush_buffer_mid_partition() {
}
}
view_updating_consumer::view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
view_updating_consumer::view_updating_consumer(view_update_generator& gen, schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
evictable_reader_handle_v2& staging_reader_handle)
: view_updating_consumer(std::move(schema), std::move(permit), as, staging_reader_handle,
[table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable {
[table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables), gen = gen.shared_from_this()] (mutation m) mutable {
auto s = m.schema();
return table->stream_view_replica_updates(std::move(s), std::move(m), db::no_timeout, excluded_sstables);
})

View File

@@ -165,7 +165,7 @@ future<> view_update_generator::start() {
::mutation_reader::forwarding::no);
inject_failure("view_update_generator_consume_staging_sstable");
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle),
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(*this, s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle),
dht::incremental_owned_ranges_checker::make_partition_filter(_db.get_keyspace_local_ranges(s->ks_name())));
staging_sstable_reader.close().get();
if (result == stop_iteration::yes) {

View File

@@ -24,6 +24,8 @@ class evictable_reader_handle_v2;
namespace db::view {
class view_update_generator;
/*
* A consumer that pushes materialized view updates for each consumed mutation.
* It is expected to be run in seastar::async threaded context through consume_in_thread()
@@ -62,7 +64,7 @@ public:
, _view_update_pusher(std::move(view_update_pusher))
{ }
view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
view_updating_consumer(view_update_generator& gen, schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
evictable_reader_handle_v2& staging_reader_handle);
view_updating_consumer(view_updating_consumer&&) = default;