view: Plug view update generator to database
The database is low-level service and currently view update generator implicitly depend on it via storage proxy. However, database does need to push view updates with the help of mutate_MV helper, thus adding the dependency loop. This patch exploits the fact that view updates start being pushed late enough, by that time all other service, including proxy and view update generator, seem to be up and running. This allows a "weak dependency" from database to view update generator, like there's one from database to system keyspace already. So in this patch the v.u.g. puts the shared-from-this pointer onto the database at the time it starts. On stop it removes this pointer after database is drained and (hopefully) all view updates are pushed. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -97,6 +97,7 @@ view_update_generator::view_update_generator(replica::database& db, sharded<serv
|
||||
setup_metrics();
|
||||
discover_staging_sstables();
|
||||
(void)_proxy;
|
||||
_db.plug_view_update_generator(*this);
|
||||
}
|
||||
|
||||
view_update_generator::~view_update_generator() {}
|
||||
@@ -206,6 +207,7 @@ future<> view_update_generator::start() {
|
||||
}
|
||||
|
||||
future<> view_update_generator::stop() {
|
||||
_db.unplug_view_update_generator();
|
||||
_as.request_abort();
|
||||
_pending_sstables.signal();
|
||||
return std::move(_started).then([this] {
|
||||
|
||||
@@ -28,7 +28,7 @@ class storage_proxy;
|
||||
|
||||
namespace db::view {
|
||||
|
||||
class view_update_generator {
|
||||
class view_update_generator : public async_sharded_service<view_update_generator> {
|
||||
public:
|
||||
static constexpr size_t registration_queue_size = 5;
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@
|
||||
#include "timeout_config.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "db/operation_type.hh"
|
||||
#include "db/view/view_update_generator.hh"
|
||||
|
||||
#include "utils/human_readable.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
@@ -2719,6 +2720,14 @@ void database::unplug_system_keyspace() noexcept {
|
||||
_large_data_handler->unplug_system_keyspace();
|
||||
}
|
||||
|
||||
void database::plug_view_update_generator(db::view::view_update_generator& generator) noexcept {
|
||||
_view_update_generator = generator.shared_from_this();
|
||||
}
|
||||
|
||||
void database::unplug_view_update_generator() noexcept {
|
||||
_view_update_generator = nullptr;
|
||||
}
|
||||
|
||||
} // namespace replica
|
||||
|
||||
template <typename T>
|
||||
|
||||
@@ -123,6 +123,10 @@ class table_selector;
|
||||
|
||||
future<> system_keyspace_make(db::system_keyspace& sys_ks, distributed<replica::database>& db, distributed<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<service::raft_group_registry>& raft_gr, db::config& cfg, system_table_load_phase phase);
|
||||
|
||||
namespace view {
|
||||
class view_update_generator;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class mutation_reordered_with_truncate_exception : public std::exception {};
|
||||
@@ -1323,6 +1327,7 @@ private:
|
||||
db::timeout_semaphore _view_update_concurrency_sem{max_memory_pending_view_updates()};
|
||||
|
||||
cache_tracker _row_cache_tracker;
|
||||
seastar::shared_ptr<db::view::view_update_generator> _view_update_generator;
|
||||
|
||||
inheriting_concrete_execution_stage<
|
||||
future<>,
|
||||
@@ -1400,6 +1405,9 @@ public:
|
||||
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
|
||||
void unplug_system_keyspace() noexcept;
|
||||
|
||||
void plug_view_update_generator(db::view::view_update_generator& generator) noexcept;
|
||||
void unplug_view_update_generator() noexcept;
|
||||
|
||||
private:
|
||||
future<> flush_non_system_column_families();
|
||||
future<> flush_system_column_families();
|
||||
|
||||
Reference in New Issue
Block a user