view: Make mutate_MV() method of view_update_generator

Nowadays its a static helper, but internally it depends on storage
proxy, so it grabs its global instance. Making it a method of view
update generator makes it possible to use the proxy dependency from the
generator.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-03-28 12:12:56 +03:00
parent e78e64a920
commit 7cabdc54a6
4 changed files with 34 additions and 21 deletions

View File

@@ -1614,7 +1614,7 @@ static bool should_update_synchronously(const schema& s) {
// to a modification of a single base partition, and apply them to the
// appropriate paired replicas. This is done asynchronously - we do not wait
// for the writes to complete.
future<> mutate_MV(
future<> view_update_generator::mutate_MV(
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,

View File

@@ -21,11 +21,6 @@ namespace replica {
struct cf_stats;
}
namespace service {
struct allow_hints_tag;
using allow_hints = bool_class<allow_hints_tag>;
}
namespace db {
namespace view {
@@ -315,18 +310,6 @@ future<query::clustering_row_ranges> calculate_affected_clustering_ranges(
bool needs_static_row(const mutation_partition& mp, const std::vector<view_and_base>& views);
struct wait_for_all_updates_tag {};
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
future<> mutate_MV(
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
replica::cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,
db::timeout_semaphore_units pending_view_updates,
service::allow_hints allow_hints,
wait_for_all_updates wait_for_all);
/**
* create_virtual_column() adds a "virtual column" to a schema builder.
* The definition of a "virtual column" is based on the given definition

View File

@@ -9,7 +9,10 @@
#pragma once
#include "sstables/shared_sstable.hh"
#include "db/timeout_clock.hh"
#include "utils/chunked_vector.hh"
#include <seastar/core/sharded.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/condition-variable.hh>
@@ -17,17 +20,34 @@
using namespace seastar;
struct frozen_mutation_and_schema;
namespace dht {
class token;
}
namespace tracing {
class trace_state_ptr;
}
namespace replica {
class database;
class table;
struct cf_stats;
}
namespace service {
class storage_proxy;
struct allow_hints_tag;
using allow_hints = bool_class<allow_hints_tag>;
}
namespace db::view {
class stats;
struct wait_for_all_updates_tag {};
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
class view_update_generator : public async_sharded_service<view_update_generator> {
public:
static constexpr size_t registration_queue_size = 5;
@@ -52,6 +72,16 @@ public:
future<> stop();
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
future<> mutate_MV(
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
replica::cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,
db::timeout_semaphore_units pending_view_updates,
service::allow_hints allow_hints,
wait_for_all_updates wait_for_all);
ssize_t available_register_units() const { return _registration_sem.available_units(); }
private:
bool should_throttle() const;

View File

@@ -34,7 +34,7 @@
#include "db/system_keyspace.hh"
#include "db/query_context.hh"
#include "query-result-writer.hh"
#include "db/view/view.hh"
#include "db/view/view_update_generator.hh"
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/map.hpp>
#include "utils/error_injection.hh"
@@ -1999,7 +1999,7 @@ future<> table::generate_and_propagate_view_updates(shared_ptr<db::view::view_up
tracing::trace(tr_state, "Generated {} view update mutations", updates->size());
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(*updates));
try {
co_await db::view::mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state,
co_await gen->mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state,
std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no);
} catch (...) {
// Ignore exceptions: any individual failure to propagate a view update will be reported
@@ -2132,7 +2132,7 @@ future<> table::populate_views(
size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size);
auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for);
units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, update_size - units_to_wait_for));
co_await db::view::mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats,
co_await gen->mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats,
tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
} catch (...) {
if (!err) {