db/view/view_building_worker: register staging sstable to view building coordinator when needed
Change return type of `check_needs_view_update_path()`. Instead of
retrning bool which tells whether to use staging directory (and register
to `view_update_generator`) or use normal directory.
Now the function returns enum with possible values:
- `normal_directory` - use normal directory for the sstable
- `staging_directly_to_generator` - use staging directory and register
to `view_update_generator`
- `staging_managed_by_vbc` - use staging directory but don't register it
to `view_update_generator` but create view building tasks for
later
The third option is new, it's used when the table has any view which is
in building process currrently. In this case, registering it to `view_update_generator`
prematurely may lead to base-view inconsistency
(for example when a replica is in a pending state).
This commit is contained in:
@@ -3353,19 +3353,67 @@ future<> view_builder::register_staging_sstable(sstables::shared_sstable sst, lw
|
||||
return _vug.register_staging_sstable(std::move(sst), std::move(table));
|
||||
}
|
||||
|
||||
future<bool> check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t, streaming::stream_reason reason) {
|
||||
future<sstable_destination_decision> check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t, streaming::stream_reason reason) {
|
||||
if (is_internal_keyspace(t.schema()->ks_name())) {
|
||||
return make_ready_future<bool>(false);
|
||||
co_return sstable_destination_decision::normal_directory;
|
||||
}
|
||||
if (reason == streaming::stream_reason::repair && !t.views().empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
return do_with(std::move(tmptr), t.views(), [&vb] (locator::token_metadata_ptr& tmptr, auto& views) {
|
||||
return map_reduce(views,
|
||||
|
||||
if (vb.get_db().find_keyspace(t.schema()->ks_name()).uses_tablets()) {
|
||||
// views are managed by view building coordinator
|
||||
if (t.views().empty()) {
|
||||
co_return sstable_destination_decision::normal_directory;
|
||||
}
|
||||
|
||||
auto build_status_map = co_await vb.get_sys_ks().get_view_build_status_map();
|
||||
auto views_names = t.views()
|
||||
| std::views::transform([] (const view_ptr& v) { return std::make_pair(v->ks_name(), v->cf_name()); })
|
||||
| std::ranges::to<std::set>();
|
||||
|
||||
bool any_started_building = false;
|
||||
bool any_started = false;
|
||||
bool all_success = true;
|
||||
for (auto& [view_name, statuses]: build_status_map) {
|
||||
if (!views_names.contains(view_name)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
any_started_building = true;
|
||||
any_started = any_started || std::ranges::any_of(statuses | std::views::values, [] (const build_status& status) {
|
||||
return status == build_status::STARTED;
|
||||
});
|
||||
all_success = all_success && std::ranges::all_of(statuses | std::views::values, [] (const build_status& status) {
|
||||
return status == build_status::SUCCESS;
|
||||
});
|
||||
}
|
||||
|
||||
if (!any_started_building) {
|
||||
// If all of the views didn't start building yet (and none of them is finished building)
|
||||
// the sstable can go to normal directory and no view update will be lost
|
||||
co_return sstable_destination_decision::normal_directory;
|
||||
} else if (any_started) {
|
||||
// If any of the views started building and didn't finished yet, we need to use view building
|
||||
// coordinator to schedule staging sstables processing to control if replica is not in a pending state.
|
||||
co_return sstable_destination_decision::staging_managed_by_vbc;
|
||||
} else if (all_success) {
|
||||
// If all of the views were built, the sstable can be registered to view update generator directly (in case of `stream_reason::repair`).
|
||||
co_return reason == streaming::stream_reason::repair ? sstable_destination_decision::staging_directly_to_generator : sstable_destination_decision::normal_directory;
|
||||
}
|
||||
// This should be unreachable, reaching this point would mean that,
|
||||
// any of the views started building, none of the status is `STARTED` and not all statuses are `SUCCESS`.
|
||||
// Since there are only 2 statuses: STARTED and SUCCESS, this is unreachable.
|
||||
on_internal_error(vlogger, fmt::format("check_needs_view_update_path() reached unreachable branch. any_started_building: {} | any_started: {} | all_success: {}", any_started_building, any_started, all_success));
|
||||
} else {
|
||||
// views are managed by view builder
|
||||
if (reason == streaming::stream_reason::repair && !t.views().empty()) {
|
||||
co_return sstable_destination_decision::staging_directly_to_generator;
|
||||
}
|
||||
|
||||
auto any_view_build_ongoing = co_await map_reduce(t.views(),
|
||||
[&] (const view_ptr& view) { return vb.check_view_build_ongoing(*tmptr, view->ks_name(), view->cf_name()); },
|
||||
false,
|
||||
std::logical_or<bool>());
|
||||
});
|
||||
co_return any_view_build_ongoing ? sstable_destination_decision::staging_directly_to_generator : sstable_destination_decision::normal_directory;
|
||||
}
|
||||
}
|
||||
|
||||
void view_updating_consumer::do_flush_buffer() {
|
||||
|
||||
@@ -195,6 +195,7 @@ public:
|
||||
static constexpr size_t batch_memory_max = 1024*1024;
|
||||
|
||||
replica::database& get_db() noexcept { return _db; }
|
||||
db::system_keyspace& get_sys_ks() noexcept { return _sys_ks; }
|
||||
|
||||
public:
|
||||
view_builder(replica::database&, db::system_keyspace&, db::system_distributed_keyspace&, service::migration_notifier&, view_update_generator& vug,
|
||||
|
||||
@@ -20,7 +20,13 @@ class table;
|
||||
namespace db::view {
|
||||
class view_builder;
|
||||
|
||||
future<bool> check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t,
|
||||
enum class sstable_destination_decision {
|
||||
normal_directory, // use normal sstable directory
|
||||
staging_directly_to_generator, // use staging directory and create view building task for the sstable
|
||||
staging_managed_by_vbc // use staging directory and register the sstable to view update generator
|
||||
};
|
||||
|
||||
future<sstable_destination_decision> check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t,
|
||||
streaming::stream_reason reason);
|
||||
|
||||
}
|
||||
|
||||
6
main.cc
6
main.cc
@@ -1767,7 +1767,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
|
||||
checkpoint(stop_signal, "starting repair service");
|
||||
auto max_memory_repair = memory::stats().total_memory() * 0.1;
|
||||
repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
|
||||
repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
|
||||
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
|
||||
repair.stop().get();
|
||||
});
|
||||
@@ -1781,7 +1781,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
|
||||
debug::the_stream_manager = &stream_manager;
|
||||
checkpoint(stop_signal, "starting streaming service");
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(view_building_worker), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
|
||||
// FIXME -- keep the instances alive, just call .stop on them
|
||||
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
|
||||
@@ -2136,7 +2136,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting sstables loader");
|
||||
sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
|
||||
sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
|
||||
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
|
||||
sst_loader.stop().get();
|
||||
});
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include "dht/auto_refreshing_sharder.hh"
|
||||
#include "db/view/view_building_worker.hh"
|
||||
#include "gms/endpoint_state.hh"
|
||||
#include "repair/repair.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
@@ -419,6 +420,7 @@ class repair_writer_impl : public repair_writer::impl {
|
||||
mutation_fragment_queue _mq;
|
||||
sharded<replica::database>& _db;
|
||||
db::view::view_builder& _view_builder;
|
||||
sharded<db::view::view_building_worker>& _view_building_worker;
|
||||
streaming::stream_reason _reason;
|
||||
mutation_reader _queue_reader;
|
||||
service::frozen_topology_guard _topo_guard;
|
||||
@@ -429,6 +431,7 @@ public:
|
||||
reader_permit permit,
|
||||
sharded<replica::database>& db,
|
||||
db::view::view_builder& view_builder,
|
||||
sharded<db::view::view_building_worker>& view_building_worker,
|
||||
streaming::stream_reason reason,
|
||||
mutation_fragment_queue queue,
|
||||
mutation_reader queue_reader,
|
||||
@@ -439,6 +442,7 @@ public:
|
||||
, _mq(std::move(queue))
|
||||
, _db(db)
|
||||
, _view_builder(view_builder)
|
||||
, _view_building_worker(view_building_worker)
|
||||
, _reason(reason)
|
||||
, _queue_reader(std::move(queue_reader))
|
||||
, _topo_guard(topo_guard)
|
||||
@@ -546,7 +550,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
|
||||
auto sharder = get_sharder_helper(t, *(w->schema()), _topo_guard);
|
||||
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder.sharder, std::move(_queue_reader),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, _view_building_worker, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason),
|
||||
_topo_guard, _repaired_at, w->get_sstable_list_to_mark_as_repaired()),
|
||||
t.stream_in_progress()).then([w] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
@@ -567,10 +571,11 @@ lw_shared_ptr<repair_writer> make_repair_writer(
|
||||
streaming::stream_reason reason,
|
||||
sharded<replica::database>& db,
|
||||
db::view::view_builder& view_builder,
|
||||
sharded<db::view::view_building_worker>& view_building_worker,
|
||||
service::frozen_topology_guard topo_guard) {
|
||||
auto [queue_reader, queue_handle] = make_queue_reader(schema, permit);
|
||||
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
|
||||
auto i = std::make_unique<repair_writer_impl>(schema, repaired_at, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader), topo_guard);
|
||||
auto i = std::make_unique<repair_writer_impl>(schema, repaired_at, permit, db, view_builder, view_building_worker, reason, std::move(queue), std::move(queue_reader), topo_guard);
|
||||
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
|
||||
}
|
||||
|
||||
@@ -877,7 +882,7 @@ public:
|
||||
, _same_sharding_config(is_same_sharding_config(cf))
|
||||
, _nr_peer_nodes(nr_peer_nodes)
|
||||
, _repaired_at(repaired_at)
|
||||
, _repair_writer(make_repair_writer(_schema, _repaired_at, _permit, _reason, _db, rs.get_view_builder(), topo_guard))
|
||||
, _repair_writer(make_repair_writer(_schema, _repaired_at, _permit, _reason, _db, rs.get_view_builder(), rs.get_view_building_worker(), topo_guard))
|
||||
, _gate(format("repair_meta[{}]", repair_meta_id))
|
||||
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
||||
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, locator::host_id addr) {
|
||||
@@ -3505,6 +3510,7 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
sharded<db::batchlog_manager>& bm,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
db::view::view_builder& vb,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm,
|
||||
size_t max_repair_memory)
|
||||
@@ -3516,6 +3522,7 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
, _bm(bm)
|
||||
, _sys_ks(sys_ks)
|
||||
, _view_builder(vb)
|
||||
, _view_building_worker(vbw)
|
||||
, _repair_module(seastar::make_shared<repair::task_manager_module>(tm, *this, max_repair_memory))
|
||||
, _mm(mm)
|
||||
, _node_ops_metrics(_repair_module)
|
||||
|
||||
@@ -38,6 +38,10 @@ class system_keyspace;
|
||||
class system_distributed_keyspace;
|
||||
class batchlog_manager;
|
||||
|
||||
namespace view {
|
||||
class view_building_worker;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace gms {
|
||||
@@ -103,6 +107,7 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
sharded<db::batchlog_manager>& _bm;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
db::view::view_builder& _view_builder;
|
||||
sharded<db::view::view_building_worker>& _view_building_worker;
|
||||
shared_ptr<repair::task_manager_module> _repair_module;
|
||||
service::migration_manager& _mm;
|
||||
node_ops_metrics _node_ops_metrics;
|
||||
@@ -143,6 +148,7 @@ public:
|
||||
sharded<db::batchlog_manager>& bm,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
db::view::view_builder& vb,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm, size_t max_repair_memory);
|
||||
~repair_service();
|
||||
@@ -200,6 +206,7 @@ public:
|
||||
sharded<replica::database>& get_db() noexcept { return _db; }
|
||||
service::migration_manager& get_migration_manager() noexcept { return _mm; }
|
||||
db::view::view_builder& get_view_builder() noexcept { return _view_builder; }
|
||||
sharded<db::view::view_building_worker>& get_view_building_worker() noexcept { return _view_building_worker; }
|
||||
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
|
||||
size_t max_repair_memory() const { return _max_repair_memory; }
|
||||
seastar::semaphore& memory_sem() { return _memory_sem; }
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "db/view/view_building_worker.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include <fmt/std.h>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
@@ -123,15 +125,15 @@ distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<r
|
||||
// Loads SSTables into the main directory (or staging) and returns how many were loaded
|
||||
future<size_t>
|
||||
distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded<replica::database>& db,
|
||||
sharded<db::view::view_builder>& vb, bool needs_view_update, sstring ks, sstring cf) {
|
||||
sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf) {
|
||||
|
||||
auto& table = db.local().find_column_family(ks, cf);
|
||||
auto new_sstables = std::vector<sstables::shared_sstable>();
|
||||
|
||||
co_await dir.do_for_each_sstable([&table, needs_view_update, &new_sstables] (sstables::shared_sstable sst) -> future<> {
|
||||
auto gen = table.calculate_generation_for_new_table();
|
||||
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), needs_view_update ? "staging" : "base", gen);
|
||||
co_await sst->pick_up_from_upload(!needs_view_update ? sstables::sstable_state::normal : sstables::sstable_state::staging, gen);
|
||||
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), needs_view_update == db::view::sstable_destination_decision::normal_directory ? "base" : "staging", gen);
|
||||
co_await sst->pick_up_from_upload(needs_view_update == db::view::sstable_destination_decision::normal_directory ? sstables::sstable_state::normal : sstables::sstable_state::staging, gen);
|
||||
// When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels.
|
||||
sst->set_sstable_level(0);
|
||||
new_sstables.push_back(std::move(sst));
|
||||
@@ -147,23 +149,27 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
|
||||
abort();
|
||||
});
|
||||
|
||||
co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst->requires_view_building()) {
|
||||
co_await vb.local().register_staging_sstable(sst, table.shared_from_this());
|
||||
}
|
||||
});
|
||||
if (needs_view_update == db::view::sstable_destination_decision::staging_managed_by_vbc) {
|
||||
co_await vbw.local().register_staging_sstable_tasks(new_sstables, table.shared_from_this());
|
||||
} else if (needs_view_update == db::view::sstable_destination_decision::staging_directly_to_generator) {
|
||||
co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst->requires_view_building()) {
|
||||
co_await vb.local().register_staging_sstable(sst, table.shared_from_this());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
co_return new_sstables.size();
|
||||
}
|
||||
|
||||
future<>
|
||||
distributed_loader::process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks, sstring cf, bool skip_cleanup, bool skip_reshape) {
|
||||
distributed_loader::process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, sstring ks, sstring cf, bool skip_cleanup, bool skip_reshape) {
|
||||
const auto& rs = db.local().find_keyspace(ks).get_replication_strategy();
|
||||
if (rs.is_per_table()) {
|
||||
on_internal_error(dblog, "process_upload_dir is not supported with tablets");
|
||||
}
|
||||
|
||||
return seastar::async([&db, &vb, ks = std::move(ks), cf = std::move(cf), skip_cleanup, skip_reshape] {
|
||||
return seastar::async([&db, &vb, &vbw, ks = std::move(ks), cf = std::move(cf), skip_cleanup, skip_reshape] {
|
||||
auto global_table = get_table_on_all_shards(db, ks, cf).get();
|
||||
|
||||
sharded<sstables::sstable_directory> directory;
|
||||
@@ -208,10 +214,10 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, shard
|
||||
}
|
||||
|
||||
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
|
||||
const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get();
|
||||
const auto use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get();
|
||||
|
||||
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb] (sstables::sstable_directory& dir) {
|
||||
return make_sstables_available(dir, db, vb, use_view_update_path, ks, cf);
|
||||
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb, &vbw] (sstables::sstable_directory& dir) {
|
||||
return make_sstables_available(dir, db, vb, vbw, use_view_update_path, ks, cf);
|
||||
}, size_t(0), std::plus<size_t>()).get();
|
||||
|
||||
dblog.info("Loaded {} SSTables", loaded);
|
||||
|
||||
@@ -33,7 +33,9 @@ namespace db {
|
||||
class config;
|
||||
class system_keyspace;
|
||||
namespace view {
|
||||
enum class sstable_destination_decision;
|
||||
class view_builder;
|
||||
class view_building_worker;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,8 +74,8 @@ class distributed_loader {
|
||||
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags);
|
||||
static future<> lock_table(global_table_ptr&, sharded<sstables::sstable_directory>& dir);
|
||||
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
|
||||
sharded<replica::database>& db, sharded<db::view::view_builder>& vb,
|
||||
bool needs_view_update, sstring ks, sstring cf);
|
||||
sharded<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw,
|
||||
db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf);
|
||||
static future<> populate_keyspace(distributed<replica::database>& db, sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name);
|
||||
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
get_sstables_from(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg,
|
||||
@@ -91,7 +93,7 @@ public:
|
||||
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
|
||||
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
|
||||
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape);
|
||||
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -582,7 +582,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
|
||||
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true, scope, {});
|
||||
});
|
||||
} else {
|
||||
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name, skip_cleanup, skip_reshape);
|
||||
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, _view_building_worker, ks_name, cf_name, skip_cleanup, skip_reshape);
|
||||
}
|
||||
} catch (...) {
|
||||
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",
|
||||
@@ -751,12 +751,14 @@ future<> sstables_loader::download_task_impl::run() {
|
||||
sstables_loader::sstables_loader(sharded<replica::database>& db,
|
||||
netw::messaging_service& messaging,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
tasks::task_manager& tm,
|
||||
sstables::storage_manager& sstm,
|
||||
seastar::scheduling_group sg)
|
||||
: _db(db)
|
||||
, _messaging(messaging)
|
||||
, _view_builder(vb)
|
||||
, _view_building_worker(vbw)
|
||||
, _task_manager_module(make_shared<task_manager_module>(tm))
|
||||
, _storage_manager(sstm)
|
||||
, _sched_group(std::move(sg))
|
||||
|
||||
@@ -26,6 +26,7 @@ namespace netw { class messaging_service; }
|
||||
namespace db {
|
||||
namespace view {
|
||||
class view_builder;
|
||||
class view_building_worker;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,6 +68,7 @@ private:
|
||||
sharded<replica::database>& _db;
|
||||
netw::messaging_service& _messaging;
|
||||
sharded<db::view::view_builder>& _view_builder;
|
||||
sharded<db::view::view_building_worker>& _view_building_worker;
|
||||
shared_ptr<task_manager_module> _task_manager_module;
|
||||
sstables::storage_manager& _storage_manager;
|
||||
seastar::scheduling_group _sched_group;
|
||||
@@ -88,6 +90,7 @@ public:
|
||||
sstables_loader(sharded<replica::database>& db,
|
||||
netw::messaging_service& messaging,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
tasks::task_manager& tm,
|
||||
sstables::storage_manager& sstm,
|
||||
seastar::scheduling_group sg);
|
||||
|
||||
@@ -8,8 +8,10 @@
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/with_scheduling_group.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
|
||||
#include "consumer.hh"
|
||||
#include "db/view/view_building_worker.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "mutation/mutation_source_metadata.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
@@ -22,13 +24,14 @@ namespace streaming {
|
||||
mutation_reader_consumer make_streaming_consumer(sstring origin,
|
||||
sharded<replica::database>& db,
|
||||
db::view::view_builder& vb,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
uint64_t estimated_partitions,
|
||||
stream_reason reason,
|
||||
sstables::offstrategy offstrategy,
|
||||
service::frozen_topology_guard frozen_guard,
|
||||
std::optional<int64_t> repaired_at,
|
||||
lw_shared_ptr<sstables::sstable_list> sstable_list_to_mark_as_repaired) {
|
||||
return [&db, &vb = vb.container(), estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard, repaired_at, sstable_list_to_mark_as_repaired] (mutation_reader reader) -> future<> {
|
||||
return [&db, &vb = vb.container(), &vbw, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard, repaired_at, sstable_list_to_mark_as_repaired] (mutation_reader reader) -> future<> {
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) {
|
||||
@@ -38,7 +41,7 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
|
||||
|
||||
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
|
||||
auto guard = service::topology_guard(frozen_guard);
|
||||
bool use_view_update_path = co_await with_scheduling_group(db.local().get_gossip_scheduling_group(), [&] {
|
||||
auto use_view_update_path = co_await with_scheduling_group(db.local().get_gossip_scheduling_group(), [&] {
|
||||
return db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata_ptr(), *cf, reason);
|
||||
});
|
||||
//FIXME: for better estimations this should be transmitted from remote
|
||||
@@ -48,11 +51,11 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
|
||||
// means partition estimation shouldn't be adjusted.
|
||||
const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema());
|
||||
mutation_reader_consumer consumer =
|
||||
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, origin = std::move(origin),
|
||||
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, &vbw, origin = std::move(origin),
|
||||
offstrategy, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard] (mutation_reader reader) {
|
||||
sstables::shared_sstable sst;
|
||||
try {
|
||||
sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
|
||||
sst = use_view_update_path == db::view::sstable_destination_decision::normal_directory ? cf->make_streaming_sstable_for_write() : cf->make_streaming_staging_sstable();
|
||||
} catch (...) {
|
||||
return current_exception_as_future().finally([reader = std::move(reader)] () mutable {
|
||||
return reader.close();
|
||||
@@ -77,11 +80,13 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
|
||||
cf->enable_off_strategy_trigger();
|
||||
}
|
||||
co_await cf->add_sstable_and_update_cache(sst, offstrategy);
|
||||
}).then([cf, s, sst, use_view_update_path, &vb]() mutable -> future<> {
|
||||
if (!use_view_update_path) {
|
||||
return make_ready_future<>();
|
||||
}).then([cf, s, sst, use_view_update_path, &vb, &vbw]() mutable -> future<> {
|
||||
if (use_view_update_path == db::view::sstable_destination_decision::staging_managed_by_vbc) {
|
||||
return vbw.local().register_staging_sstable_tasks({sst}, std::move(cf));
|
||||
} else if (use_view_update_path == db::view::sstable_destination_decision::staging_directly_to_generator) {
|
||||
return vb.local().register_staging_sstable(sst, std::move(cf));
|
||||
}
|
||||
return vb.local().register_staging_sstable(sst, std::move(cf));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
};
|
||||
if (!offstrategy) {
|
||||
|
||||
@@ -20,6 +20,7 @@ class database;
|
||||
namespace db {
|
||||
namespace view {
|
||||
class view_builder;
|
||||
class view_building_worker;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +29,7 @@ namespace streaming {
|
||||
mutation_reader_consumer make_streaming_consumer(sstring origin,
|
||||
sharded<replica::database>& db,
|
||||
db::view::view_builder& vb,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
uint64_t estimated_partitions,
|
||||
stream_reason reason,
|
||||
sstables::offstrategy offstrategy,
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
*/
|
||||
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include "db/view/view_building_worker.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "streaming/stream_manager.hh"
|
||||
#include "streaming/stream_result_future.hh"
|
||||
@@ -27,11 +28,13 @@ extern logging::logger sslog;
|
||||
stream_manager::stream_manager(db::config& cfg,
|
||||
sharded<replica::database>& db,
|
||||
db::view::view_builder& view_builder,
|
||||
sharded<db::view::view_building_worker>& view_building_worker,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
sharded<service::migration_manager>& mm,
|
||||
gms::gossiper& gossiper, scheduling_group sg)
|
||||
: _db(db)
|
||||
, _view_builder(view_builder)
|
||||
, _view_building_worker(view_building_worker)
|
||||
, _ms(ms)
|
||||
, _mm(mm)
|
||||
, _gossiper(gossiper)
|
||||
|
||||
@@ -29,6 +29,7 @@ namespace db {
|
||||
class config;
|
||||
namespace view {
|
||||
class view_builder;
|
||||
class view_building_worker;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,6 +85,7 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
db::view::view_builder& _view_builder;
|
||||
sharded<db::view::view_building_worker>& _view_building_worker;
|
||||
sharded<netw::messaging_service>& _ms;
|
||||
sharded<service::migration_manager>& _mm;
|
||||
gms::gossiper& _gossiper;
|
||||
@@ -105,6 +107,7 @@ private:
|
||||
public:
|
||||
stream_manager(db::config& cfg, sharded<replica::database>& db,
|
||||
db::view::view_builder& view_builder,
|
||||
sharded<db::view::view_building_worker>& view_building_worker,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
sharded<service::migration_manager>& mm,
|
||||
gms::gossiper& gossiper, scheduling_group sg);
|
||||
|
||||
@@ -87,7 +87,7 @@ public:
|
||||
|
||||
mutation_reader_consumer
|
||||
stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) {
|
||||
return streaming::make_streaming_consumer("streaming", _db, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard);
|
||||
return streaming::make_streaming_consumer("streaming", _db, _view_builder, _view_building_worker, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard);
|
||||
}
|
||||
|
||||
void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
|
||||
@@ -6084,7 +6084,7 @@ SEASTAR_TEST_CASE(test_sstable_load_mixed_generation_type) {
|
||||
|
||||
// Load sstables with mixed generation types
|
||||
copy_directory("test/resource/sstables/mixed_generation_type", upload_dir);
|
||||
replica::distributed_loader::process_upload_dir(e.db(), e.view_builder(), "ks", "test", false, false).get();
|
||||
replica::distributed_loader::process_upload_dir(e.db(), e.view_builder(), e.view_building_worker(), "ks", "test", false, false).get();
|
||||
|
||||
// Verify the expected data is present
|
||||
assert_that(e.execute_cql("SELECT * FROM ks.test").get()).is_rows()
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include "gms/generation-number.hh"
|
||||
#include "db/view/view_building_worker.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "cdc/generation_service.hh"
|
||||
@@ -141,6 +142,7 @@ private:
|
||||
sharded<cql3::query_processor> _qp;
|
||||
sharded<auth::service> _auth_service;
|
||||
sharded<db::view::view_builder> _view_builder;
|
||||
sharded<db::view::view_building_worker> _view_building_worker;
|
||||
sharded<db::view::view_update_generator> _view_update_generator;
|
||||
sharded<service::migration_notifier> _mnotifier;
|
||||
sharded<qos::service_level_controller> _sl_controller;
|
||||
@@ -363,6 +365,10 @@ public:
|
||||
return _view_update_generator.local();
|
||||
}
|
||||
|
||||
virtual distributed<db::view::view_building_worker>& view_building_worker() override {
|
||||
return _view_building_worker;
|
||||
}
|
||||
|
||||
virtual service::migration_notifier& local_mnotifier() override {
|
||||
return _mnotifier.local();
|
||||
}
|
||||
@@ -922,7 +928,7 @@ private:
|
||||
_view_builder.stop().get();
|
||||
});
|
||||
|
||||
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_view_building_worker), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
|
||||
|
||||
_ss.start(std::ref(abort_sources), std::ref(_db),
|
||||
@@ -1070,6 +1076,11 @@ private:
|
||||
|
||||
group0_service.setup_group0_if_exist(_sys_ks.local(), _ss.local(), _qp.local(), _mm.local()).get();
|
||||
|
||||
_view_building_worker.start(std::ref(_db), std::ref(_sys_ks), std::ref(_mnotifier), std::ref(group0_client), std::ref(_view_update_generator), std::ref(_ms), std::ref(_view_building_state_machine)).get();
|
||||
auto stop_view_building_worker = defer_verbose_shutdown("view building worker", [this] {
|
||||
_view_building_worker.stop().get();
|
||||
});
|
||||
|
||||
const auto generation_number = gms::generation_type(_sys_ks.local().increment_and_get_generation().get());
|
||||
|
||||
try {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include "db/view/view_building_worker.hh"
|
||||
#include "db/view/view_update_generator.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "replica/database.hh"
|
||||
@@ -160,6 +161,8 @@ public:
|
||||
virtual distributed<db::view::view_builder>& view_builder() = 0;
|
||||
virtual db::view::view_builder& local_view_builder() = 0;
|
||||
|
||||
virtual distributed<db::view::view_building_worker>& view_building_worker() = 0;
|
||||
|
||||
virtual db::view::view_update_generator& local_view_update_generator() = 0;
|
||||
|
||||
virtual service::migration_notifier& local_mnotifier() = 0;
|
||||
|
||||
Reference in New Issue
Block a user