db/view/view_building_coordinator: batch finished tasks reporting
In previous implementation to execute view building tasks, the
coordinator needed to firstly set their states to `STARTED`
and then it needed to remove them before it could start the next ones.
This logic required a lot of group0 commits, especially in large
clusters with higher number of nodes and big tablet count.
After previous commit to the view building worker, the coordinator
can start view building tasks without setting the `STARTED` state
and deleting finished tasks.
This patch adjusts the coordinator to save finished tasks locally,
so it can continue to execute next ones and the finished tasks are
periodically removed from the group0 by `finished_task_gc_fiber()`.
(cherry picked from commit eb04af5020)
This commit is contained in:
@@ -32,6 +32,8 @@
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/log.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
static logging::logger vbc_logger("view_building_coordinator");
|
||||
|
||||
namespace db {
|
||||
@@ -104,6 +106,8 @@ future<> view_building_coordinator::run() {
|
||||
_vb_sm.event.broadcast();
|
||||
});
|
||||
|
||||
auto finished_tasks_gc_fiber = finished_task_gc_fiber();
|
||||
|
||||
while (!_as.abort_requested()) {
|
||||
co_await utils::get_local_injector().inject("view_building_coordinator_pause_main_loop", utils::wait_for_message(std::chrono::minutes(2)));
|
||||
if (utils::get_local_injector().enter("view_building_coordinator_skip_main_loop")) {
|
||||
@@ -121,12 +125,7 @@ future<> view_building_coordinator::run() {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto started_new_work = co_await work_on_view_building(std::move(*guard_opt));
|
||||
if (started_new_work) {
|
||||
// If any tasks were started, do another iteration, so the coordinator can attach itself to the tasks (via RPC)
|
||||
vbc_logger.debug("view building coordinator started new tasks, do next iteration without waiting for event");
|
||||
continue;
|
||||
}
|
||||
co_await work_on_view_building(std::move(*guard_opt));
|
||||
co_await await_event();
|
||||
} catch (...) {
|
||||
handle_coordinator_error(std::current_exception());
|
||||
@@ -142,6 +141,66 @@ future<> view_building_coordinator::run() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
co_await std::move(finished_tasks_gc_fiber);
|
||||
}
|
||||
|
||||
future<> view_building_coordinator::finished_task_gc_fiber() {
|
||||
static auto task_gc_interval = 200ms;
|
||||
|
||||
while (!_as.abort_requested()) {
|
||||
try {
|
||||
co_await clean_finished_tasks();
|
||||
co_await sleep_abortable(task_gc_interval, _as);
|
||||
} catch (abort_requested_exception&) {
|
||||
vbc_logger.debug("view_building_coordinator::finished_task_gc_fiber got abort_requested_exception");
|
||||
} catch (service::group0_concurrent_modification&) {
|
||||
vbc_logger.info("view_building_coordinator::finished_task_gc_fiber got group0_concurrent_modification");
|
||||
} catch (raft::request_aborted&) {
|
||||
vbc_logger.debug("view_building_coordinator::finished_task_gc_fiber got raft::request_aborted");
|
||||
} catch (service::term_changed_error&) {
|
||||
vbc_logger.debug("view_building_coordinator::finished_task_gc_fiber notices term change {} -> {}", _term, _raft.get_current_term());
|
||||
} catch (raft::commit_status_unknown&) {
|
||||
vbc_logger.warn("view_building_coordinator::finished_task_gc_fiber got raft::commit_status_unknown");
|
||||
} catch (...) {
|
||||
vbc_logger.error("view_building_coordinator::finished_task_gc_fiber got error: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> view_building_coordinator::clean_finished_tasks() {
|
||||
// Avoid acquiring a group0 operation if there are no tasks.
|
||||
if (_finished_tasks.empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto guard = co_await start_operation();
|
||||
auto lock = co_await get_unique_lock(_mutex);
|
||||
|
||||
if (!_vb_sm.building_state.currently_processed_base_table || std::ranges::all_of(_finished_tasks, [] (auto& e) { return e.second.empty(); })) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
view_building_task_mutation_builder builder(guard.write_timestamp());
|
||||
for (auto& [replica, tasks]: _finished_tasks) {
|
||||
for (auto& task_id: tasks) {
|
||||
// The task might be aborted in the meantime. In this case we cannot remove it because we need it to create a new task.
|
||||
//
|
||||
// TODO: When we're aborting a view building task (for instance due to tablet migration),
|
||||
// we can look if we already finished it (check if it's in `_finished_tasks`).
|
||||
// If yes, we can just remove it instead of aborting it.
|
||||
auto task_opt = _vb_sm.building_state.get_task(*_vb_sm.building_state.currently_processed_base_table, replica, task_id);
|
||||
if (task_opt && task_opt->get().state != view_building_task::task_state::aborted) {
|
||||
builder.del_task(task_id);
|
||||
vbc_logger.debug("Removing finished task with ID: {}", task_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
co_await commit_mutations(std::move(guard), {builder.build()}, "remove finished view building tasks");
|
||||
for (auto& [_, tasks_set]: _finished_tasks) {
|
||||
tasks_set.clear();
|
||||
}
|
||||
}
|
||||
|
||||
future<std::optional<service::group0_guard>> view_building_coordinator::update_state(service::group0_guard guard) {
|
||||
@@ -301,18 +360,15 @@ future<> view_building_coordinator::update_views_statuses(const service::group0_
|
||||
}
|
||||
}
|
||||
|
||||
future<bool> view_building_coordinator::work_on_view_building(service::group0_guard guard) {
|
||||
future<> view_building_coordinator::work_on_view_building(service::group0_guard guard) {
|
||||
if (!_vb_sm.building_state.currently_processed_base_table) {
|
||||
vbc_logger.debug("No base table is selected, nothing to do.");
|
||||
co_return false;
|
||||
}
|
||||
|
||||
utils::chunked_vector<mutation> muts;
|
||||
std::unordered_set<locator::tablet_replica> _remote_work_keys_to_erase;
|
||||
// Acquire unique lock of `_finished_tasks` to ensure each replica has its own entry in it
|
||||
// and to select tasks for them.
|
||||
auto lock = co_await get_unique_lock(_mutex);
|
||||
for (auto& replica: get_replicas_with_tasks()) {
|
||||
// Check whether the coordinator already waits for the remote work on the replica to be finished.
|
||||
// If so: check if the work is done and and remove the shared_future, skip this replica otherwise.
|
||||
bool skip_work_on_this_replica = false;
|
||||
if (_remote_work.contains(replica)) {
|
||||
if (!_remote_work[replica].available()) {
|
||||
vbc_logger.debug("Replica {} is still doing work", replica);
|
||||
@@ -320,21 +376,7 @@ future<bool> view_building_coordinator::work_on_view_building(service::group0_gu
|
||||
}
|
||||
|
||||
auto remote_results_opt = co_await _remote_work[replica].get_future();
|
||||
if (remote_results_opt) {
|
||||
auto results_muts = co_await update_state_after_work_is_done(guard, replica, std::move(*remote_results_opt));
|
||||
muts.insert(muts.end(), std::make_move_iterator(results_muts.begin()), std::make_move_iterator(results_muts.end()));
|
||||
// If the replica successfully finished its work, we need to commit mutations generated above before selecting next task
|
||||
skip_work_on_this_replica = !results_muts.empty();
|
||||
}
|
||||
|
||||
// If there were no mutations for this replica, we can just remove the entry from `_remote_work` map
|
||||
// and start new work in the same iteration.
|
||||
// Otherwise, the entry needs to be removed after the mutations are committed successfully.
|
||||
if (skip_work_on_this_replica) {
|
||||
_remote_work_keys_to_erase.insert(replica);
|
||||
} else {
|
||||
_remote_work.erase(replica);
|
||||
}
|
||||
_remote_work.erase(replica);
|
||||
}
|
||||
|
||||
const bool ignore_gossiper = utils::get_local_injector().enter("view_building_coordinator_ignore_gossiper");
|
||||
@@ -343,31 +385,16 @@ future<bool> view_building_coordinator::work_on_view_building(service::group0_gu
|
||||
continue;
|
||||
}
|
||||
|
||||
if (skip_work_on_this_replica) {
|
||||
continue;
|
||||
if (!_finished_tasks.contains(replica)) {
|
||||
_finished_tasks.insert({replica, {}});
|
||||
}
|
||||
|
||||
if (auto already_started_ids = _vb_sm.building_state.get_started_tasks(*_vb_sm.building_state.currently_processed_base_table, replica); !already_started_ids.empty()) {
|
||||
// If the replica has any task in `STARTED` state, attach the coordinator to the work.
|
||||
attach_to_started_tasks(replica, std::move(already_started_ids));
|
||||
} else if (auto todo_ids = select_tasks_for_replica(replica); !todo_ids.empty()) {
|
||||
// If the replica has no started tasks and there are tasks to do, mark them as started.
|
||||
// The coordinator will attach itself to the work in next iteration.
|
||||
auto new_mutations = co_await start_tasks(guard, std::move(todo_ids));
|
||||
muts.insert(muts.end(), std::make_move_iterator(new_mutations.begin()), std::make_move_iterator(new_mutations.end()));
|
||||
if (auto todo_ids = select_tasks_for_replica(replica); !todo_ids.empty()) {
|
||||
start_remote_worker(replica, std::move(todo_ids));
|
||||
} else {
|
||||
vbc_logger.debug("Nothing to do for replica {}", replica);
|
||||
}
|
||||
}
|
||||
|
||||
if (!muts.empty()) {
|
||||
co_await commit_mutations(std::move(guard), std::move(muts), "start view building tasks");
|
||||
for (auto& key: _remote_work_keys_to_erase) {
|
||||
_remote_work.erase(key);
|
||||
}
|
||||
co_return true;
|
||||
}
|
||||
co_return false;
|
||||
}
|
||||
|
||||
std::set<locator::tablet_replica> view_building_coordinator::get_replicas_with_tasks() {
|
||||
@@ -404,7 +431,29 @@ std::vector<utils::UUID> view_building_coordinator::select_tasks_for_replica(loc
|
||||
}
|
||||
|
||||
auto& tablet_map = _db.get_token_metadata().tablets().get_tablet_map(*_vb_sm.building_state.currently_processed_base_table);
|
||||
for (auto& [token, tasks]: _vb_sm.building_state.collect_tasks_by_last_token(*_vb_sm.building_state.currently_processed_base_table, replica)) {
|
||||
auto tasks_by_last_token = _vb_sm.building_state.collect_tasks_by_last_token(*_vb_sm.building_state.currently_processed_base_table, replica);
|
||||
|
||||
// Remove completed tasks in `_finished_tasks` from `tasks_by_last_token`
|
||||
auto it = tasks_by_last_token.begin();
|
||||
while (it != tasks_by_last_token.end()) {
|
||||
auto task_it = it->second.begin();
|
||||
while (task_it != it->second.end()) {
|
||||
if (_finished_tasks.at(replica).contains(task_it->id)) {
|
||||
task_it = it->second.erase(task_it);
|
||||
} else {
|
||||
++task_it;
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the entry from `tasks_by_last_token` if its vector is empty
|
||||
if (it->second.empty()) {
|
||||
it = tasks_by_last_token.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& [token, tasks]: tasks_by_last_token) {
|
||||
auto tid = tablet_map.get_tablet_id(token);
|
||||
if (tablet_map.get_tablet_transition_info(tid)) {
|
||||
vbc_logger.debug("Tablet {} on replica {} is in transition.", tid, replica);
|
||||
@@ -426,18 +475,7 @@ std::vector<utils::UUID> view_building_coordinator::select_tasks_for_replica(loc
|
||||
return {};
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<mutation>> view_building_coordinator::start_tasks(const service::group0_guard& guard, std::vector<utils::UUID> tasks) {
|
||||
vbc_logger.info("Starting tasks {}", tasks);
|
||||
|
||||
utils::chunked_vector<mutation> muts;
|
||||
for (auto& t: tasks) {
|
||||
auto mut = co_await _sys_ks.make_update_view_building_task_state_mutation(guard.write_timestamp(), t, view_building_task::task_state::started);
|
||||
muts.push_back(std::move(mut));
|
||||
}
|
||||
co_return muts;
|
||||
}
|
||||
|
||||
void view_building_coordinator::attach_to_started_tasks(const locator::tablet_replica& replica, std::vector<utils::UUID> tasks) {
|
||||
void view_building_coordinator::start_remote_worker(const locator::tablet_replica& replica, std::vector<utils::UUID> tasks) {
|
||||
vbc_logger.debug("Attaching to started tasks {} on replica {}", tasks, replica);
|
||||
shared_future<std::optional<std::vector<utils::UUID>>> work = work_on_tasks(replica, std::move(tasks));
|
||||
_remote_work.insert({replica, std::move(work)});
|
||||
@@ -464,36 +502,16 @@ future<std::optional<std::vector<utils::UUID>>> view_building_coordinator::work_
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
// In `view_building_coordinator::work_on_view_building()` we made sure that,
|
||||
// each replica has its own entry in the `_finished_tasks`, so now we can just take a shared lock
|
||||
// and insert its of finished tasks to this replica bucket as there is at most one instance of this method for each replica.
|
||||
auto lock = co_await get_shared_lock(_mutex);
|
||||
_finished_tasks.at(replica).insert_range(remote_results);
|
||||
|
||||
_vb_sm.event.broadcast();
|
||||
co_return remote_results;
|
||||
}
|
||||
|
||||
// Mark finished task as done (remove them from the table).
|
||||
// Retry failed tasks if possible (if failed tasks wasn't aborted).
|
||||
future<utils::chunked_vector<mutation>> view_building_coordinator::update_state_after_work_is_done(const service::group0_guard& guard, const locator::tablet_replica& replica, std::vector<utils::UUID> results) {
|
||||
vbc_logger.debug("Got results from replica {}, finished tasks: {}", replica, results);
|
||||
|
||||
utils::chunked_vector<mutation> muts;
|
||||
for (auto& result: results) {
|
||||
if (!_vb_sm.building_state.currently_processed_base_table) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// A task can be aborted by deleting it or by setting its state to `ABORTED`.
|
||||
// If the task was aborted by changing the state,
|
||||
// we shouldn't remove it here because it might be needed
|
||||
// to generate updated after tablet operation (migration/resize)
|
||||
// is finished.
|
||||
auto task_opt = _vb_sm.building_state.get_task(*_vb_sm.building_state.currently_processed_base_table, replica, result);
|
||||
if (task_opt && task_opt->get().state != view_building_task::task_state::aborted) {
|
||||
// Otherwise, the task was completed successfully and we can remove it.
|
||||
auto delete_mut = co_await _sys_ks.make_remove_view_building_task_mutation(guard.write_timestamp(), result);
|
||||
muts.push_back(std::move(delete_mut));
|
||||
}
|
||||
}
|
||||
co_return muts;
|
||||
}
|
||||
|
||||
future<> view_building_coordinator::stop() {
|
||||
co_await coroutine::parallel_for_each(std::move(_remote_work), [] (auto&& remote_work) -> future<> {
|
||||
co_await remote_work.second.get_future();
|
||||
|
||||
@@ -55,6 +55,8 @@ class view_building_coordinator : public service::endpoint_lifecycle_subscriber
|
||||
abort_source& _as;
|
||||
|
||||
std::unordered_map<locator::tablet_replica, shared_future<std::optional<std::vector<utils::UUID>>>> _remote_work;
|
||||
shared_mutex _mutex; // guards `_finished_tasks` field
|
||||
std::unordered_map<locator::tablet_replica, std::unordered_set<utils::UUID>> _finished_tasks;
|
||||
|
||||
public:
|
||||
view_building_coordinator(replica::database& db, raft::server& raft, service::raft_group0& group0,
|
||||
@@ -84,9 +86,11 @@ private:
|
||||
future<> commit_mutations(service::group0_guard guard, utils::chunked_vector<mutation> mutations, std::string_view description);
|
||||
void handle_coordinator_error(std::exception_ptr eptr);
|
||||
|
||||
future<> finished_task_gc_fiber();
|
||||
future<> clean_finished_tasks();
|
||||
|
||||
future<std::optional<service::group0_guard>> update_state(service::group0_guard guard);
|
||||
// Returns if any new tasks were started
|
||||
future<bool> work_on_view_building(service::group0_guard guard);
|
||||
future<> work_on_view_building(service::group0_guard guard);
|
||||
|
||||
future<> mark_view_build_status_started(const service::group0_guard& guard, table_id view_id, utils::chunked_vector<mutation>& out);
|
||||
future<> mark_all_remaining_view_build_statuses_started(const service::group0_guard& guard, table_id base_id, utils::chunked_vector<mutation>& out);
|
||||
@@ -95,10 +99,8 @@ private:
|
||||
std::set<locator::tablet_replica> get_replicas_with_tasks();
|
||||
std::vector<utils::UUID> select_tasks_for_replica(locator::tablet_replica replica);
|
||||
|
||||
future<utils::chunked_vector<mutation>> start_tasks(const service::group0_guard& guard, std::vector<utils::UUID> tasks);
|
||||
void attach_to_started_tasks(const locator::tablet_replica& replica, std::vector<utils::UUID> tasks);
|
||||
void start_remote_worker(const locator::tablet_replica& replica, std::vector<utils::UUID> tasks);
|
||||
future<std::optional<std::vector<utils::UUID>>> work_on_tasks(locator::tablet_replica replica, std::vector<utils::UUID> tasks);
|
||||
future<utils::chunked_vector<mutation>> update_state_after_work_is_done(const service::group0_guard& guard, const locator::tablet_replica& replica, std::vector<utils::UUID> results);
|
||||
};
|
||||
|
||||
void abort_view_building_tasks(const db::view::view_building_state_machine& vb_sm,
|
||||
|
||||
Reference in New Issue
Block a user