db/view/view_building: send coordinator's term in the RPC

To avoid case when an old coordinator (which hasn't been stopped yet)
dictates what should be done, add raft term to the `work_on_view_building_tasks`
RPC.
The worker needs to check if the term matches the current term from raft
server, and deny the request when the term is bad.
This commit is contained in:
Michał Jadwiszczak
2025-11-07 04:46:08 +01:00
committed by Piotr Dulikowski
parent 24d69b4005
commit fb8cbf1615
6 changed files with 36 additions and 26 deletions

View File

@@ -487,7 +487,7 @@ future<std::optional<std::vector<utils::UUID>>> view_building_coordinator::work_
bool rpc_failed = false;
try {
remote_results = co_await ser::view_rpc_verbs::send_work_on_view_building_tasks(&_messaging, replica.host, _as, replica.shard, tasks);
remote_results = co_await ser::view_rpc_verbs::send_work_on_view_building_tasks(&_messaging, replica.host, _as, _term, replica.shard, tasks);
} catch (...) {
vbc_logger.log(log_level::warn, rate_limit, "Work on tasks {} on replica {}, failed with error: {}",
tasks, replica, std::current_exception());

View File

@@ -22,6 +22,7 @@
#include "replica/database.hh"
#include "service/storage_proxy.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/raft/raft_group0.hh"
#include "schema/schema_fwd.hh"
#include "idl/view.dist.hh"
#include "sstables/sstables.hh"
@@ -114,11 +115,11 @@ static locator::tablet_id get_sstable_tablet_id(const locator::tablet_map& table
return tablet_id;
}
view_building_worker::view_building_worker(replica::database& db, db::system_keyspace& sys_ks, service::migration_notifier& mnotifier, service::raft_group0_client& group0_client, view_update_generator& vug, netw::messaging_service& ms, view_building_state_machine& vbsm)
view_building_worker::view_building_worker(replica::database& db, db::system_keyspace& sys_ks, service::migration_notifier& mnotifier, service::raft_group0& group0, view_update_generator& vug, netw::messaging_service& ms, view_building_state_machine& vbsm)
: _db(db)
, _sys_ks(sys_ks)
, _mnotifier(mnotifier)
, _group0_client(group0_client)
, _group0(group0)
, _vug(vug)
, _messaging(ms)
, _vb_state_machine(vbsm)
@@ -224,7 +225,7 @@ future<> view_building_worker::create_staging_sstable_tasks() {
utils::chunked_vector<canonical_mutation> cmuts;
auto guard = co_await _group0_client.start_operation(_as);
auto guard = co_await _group0.client().start_operation(_as);
auto my_host_id = _db.get_token_metadata().get_topology().my_host_id();
for (auto& [table_id, sst_infos]: _sstables_to_register) {
for (auto& sst_info: sst_infos) {
@@ -232,14 +233,14 @@ future<> view_building_worker::create_staging_sstable_tasks() {
utils::UUID_gen::get_time_UUID(), view_building_task::task_type::process_staging, false,
table_id, ::table_id{}, {my_host_id, sst_info.shard}, sst_info.last_token
};
auto mut = co_await _group0_client.sys_ks().make_view_building_task_mutation(guard.write_timestamp(), task);
auto mut = co_await _group0.client().sys_ks().make_view_building_task_mutation(guard.write_timestamp(), task);
cmuts.emplace_back(std::move(mut));
}
}
vbw_logger.debug("Creating {} process_staging view_building_tasks", cmuts.size());
auto cmd = _group0_client.prepare_command(service::write_mutations{std::move(cmuts)}, guard, "create view building tasks");
co_await _group0_client.add_entry(std::move(cmd), std::move(guard), _as);
auto cmd = _group0.client().prepare_command(service::write_mutations{std::move(cmuts)}, guard, "create view building tasks");
co_await _group0.client().add_entry(std::move(cmd), std::move(guard), _as);
// Move staging sstables from `_sstables_to_register` (on shard0) to `_staging_sstables` on corresponding shards.
// Firstly reorgenize `_sstables_to_register` for easier movement.
@@ -342,7 +343,7 @@ future<> view_building_worker::run_view_building_state_observer() {
bool sleep = false;
try {
vbw_logger.trace("view_building_state_observer() iteration");
auto read_apply_mutex_holder = co_await _group0_client.hold_read_apply_mutex(_as);
auto read_apply_mutex_holder = co_await _group0.client().hold_read_apply_mutex(_as);
co_await update_built_views();
co_await check_for_aborted_tasks();
@@ -376,7 +377,7 @@ future<> view_building_worker::update_built_views() {
auto schema = _db.find_schema(table_id);
return std::make_pair(schema->ks_name(), schema->cf_name());
};
auto& sys_ks = _group0_client.sys_ks();
auto& sys_ks = _group0.client().sys_ks();
std::set<std::pair<sstring, sstring>> built_views;
for (auto& [id, statuses]: _vb_state_machine.views_state.status_map) {
@@ -431,9 +432,9 @@ future<> view_building_worker::check_for_aborted_tasks() {
}
void view_building_worker::init_messaging_service() {
ser::view_rpc_verbs::register_work_on_view_building_tasks(&_messaging, [this] (shard_id shard, std::vector<utils::UUID> ids) -> future<std::vector<utils::UUID>> {
return container().invoke_on(shard, [ids = std::move(ids)] (auto& vbw) mutable -> future<std::vector<utils::UUID>> {
return vbw.work_on_tasks(std::move(ids));
ser::view_rpc_verbs::register_work_on_view_building_tasks(&_messaging, [this] (raft::term_t term, shard_id shard, std::vector<utils::UUID> ids) -> future<std::vector<utils::UUID>> {
return container().invoke_on(shard, [term, ids = std::move(ids)] (auto& vbw) mutable -> future<std::vector<utils::UUID>> {
return vbw.work_on_tasks(term, std::move(ids));
});
});
}
@@ -711,15 +712,23 @@ void view_building_worker::cleanup_staging_sstables(locator::effective_replicati
_staging_sstables[table_id].erase(first, last);
}
future<view_building_state> view_building_worker::get_latest_view_building_state() {
return smp::submit_to(0, [&sharded_vbw = container()] {
future<view_building_state> view_building_worker::get_latest_view_building_state(raft::term_t term) {
return smp::submit_to(0, [&sharded_vbw = container(), term] () -> future<view_building_state> {
auto& vbw = sharded_vbw.local();
auto guard = vbw._group0_client.start_operation(vbw._as);
return vbw._vb_state_machine.building_state;
// auto guard = vbw._group0.client().start_operation(vbw._as);
auto& raft_server = vbw._group0.group0_server();
auto group0_holder = vbw._group0.hold_group0_gate();
co_await raft_server.read_barrier(&vbw._as);
if (raft_server.get_current_term() != term) {
throw std::runtime_error(fmt::format("Invalid raft term. Got {} but current term is {}", term, raft_server.get_current_term()));
}
co_return vbw._vb_state_machine.building_state;
});
}
future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(std::vector<utils::UUID> ids) {
future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_t term, std::vector<utils::UUID> ids) {
auto collect_completed_tasks = [&] {
std::vector<utils::UUID> completed;
for (auto& id: ids) {
@@ -744,7 +753,7 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(std::vector
}
lock.return_all();
auto building_state = co_await get_latest_view_building_state();
auto building_state = co_await get_latest_view_building_state(term);
lock = co_await get_units(_state._mutex, 1, _as);
co_await _state.update_processing_base_table(_db, building_state, _as);

View File

@@ -16,6 +16,7 @@
#include <unordered_set>
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "raft/raft.hh"
#include "seastar/core/gate.hh"
#include "db/view/view_building_state.hh"
#include "sstables/shared_sstable.hh"
@@ -31,7 +32,7 @@ class messaging_service;
}
namespace service {
class raft_group0_client;
class raft_group0;
}
namespace db {
@@ -119,7 +120,7 @@ private:
replica::database& _db;
db::system_keyspace& _sys_ks;
service::migration_notifier& _mnotifier;
service::raft_group0_client& _group0_client;
service::raft_group0& _group0;
view_update_generator& _vug;
netw::messaging_service& _messaging;
view_building_state_machine& _vb_state_machine;
@@ -138,7 +139,7 @@ private:
public:
view_building_worker(replica::database& db, db::system_keyspace& sys_ks, service::migration_notifier& mnotifier,
service::raft_group0_client& group0_client, view_update_generator& vug, netw::messaging_service& ms,
service::raft_group0& group0, view_update_generator& vug, netw::messaging_service& ms,
view_building_state_machine& vbsm);
future<> init();
@@ -157,7 +158,7 @@ public:
void cleanup_staging_sstables(locator::effective_replication_map_ptr erm, table_id table_id, locator::tablet_id tid);
private:
future<view_building_state> get_latest_view_building_state();
future<view_building_state> get_latest_view_building_state(raft::term_t term);
future<> check_for_aborted_tasks();
future<> run_view_building_state_observer();
@@ -175,7 +176,7 @@ private:
void init_messaging_service();
future<> uninit_messaging_service();
future<std::vector<utils::UUID>> work_on_tasks(std::vector<utils::UUID> ids);
future<std::vector<utils::UUID>> work_on_tasks(raft::term_t term, std::vector<utils::UUID> ids);
};
}

View File

@@ -18,4 +18,4 @@ class update_backlog {
}
}
verb [[cancellable]] work_on_view_building_tasks(shard_id shard, std::vector<utils::UUID> tasks_ids) -> std::vector<utils::UUID>
verb [[cancellable]] work_on_view_building_tasks(raft::term_t term, shard_id shard, std::vector<utils::UUID> tasks_ids) -> std::vector<utils::UUID>

View File

@@ -1764,7 +1764,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
});
checkpoint(stop_signal, "starting the view building worker");
view_building_worker.start(std::ref(db), std::ref(sys_ks), std::ref(mm_notifier), std::ref(group0_client), std::ref(view_update_generator), std::ref(messaging), std::ref(vbsm)).get();
view_building_worker.start(std::ref(db), std::ref(sys_ks), std::ref(mm_notifier), std::ref(group0_service), std::ref(view_update_generator), std::ref(messaging), std::ref(vbsm)).get();
auto stop_view_building_worker = defer_verbose_shutdown("view building worker", [] {
view_building_worker.stop().get();
});

View File

@@ -1083,7 +1083,7 @@ private:
group0_service.setup_group0_if_exist(_sys_ks.local(), _ss.local(), _qp.local(), _mm.local()).get();
_view_building_worker.start(std::ref(_db), std::ref(_sys_ks), std::ref(_mnotifier), std::ref(group0_client), std::ref(_view_update_generator), std::ref(_ms), std::ref(_view_building_state_machine)).get();
_view_building_worker.start(std::ref(_db), std::ref(_sys_ks), std::ref(_mnotifier), std::ref(group0_service), std::ref(_view_update_generator), std::ref(_ms), std::ref(_view_building_state_machine)).get();
auto stop_view_building_worker = defer_verbose_shutdown("view building worker", [this] {
_view_building_worker.stop().get();
});