db/view/view_building_worker: don't organize staging sstables by last token

There was a problem with staging sstables after tablet merge.
Let's say there were 2 tablets and tablet 1 (lower last token)
had an staging sstable. Then a tablet merge occured, so there is only
one tablet now (higher last token).
But entries in `_staging_sstables`, which are grouped by last token, are
never adjusted.

Since there shouldn't be thousands of sstables, we can just hold list of
sstables per table and filter necessary entries when doing
`process_staging` view building task.

(cherry picked from commit 2e8c096930)
This commit is contained in:
Michał Jadwiszczak
2025-10-08 12:25:38 +02:00
committed by GitHub Action
parent 35e5f59c75
commit 67ba46d7ea
2 changed files with 32 additions and 11 deletions

View File

@@ -245,23 +245,21 @@ future<> view_building_worker::create_staging_sstable_tasks() {
// Firstly reorgenize `_sstables_to_register` for easier movement.
// This is done in separate loop after commiting the group0 command, because we need to move values from `_sstables_to_register`
// (`staging_sstable_task_info` is non-copyable because of `foreign_ptr` field).
std::unordered_map<shard_id, std::unordered_map<table_id, std::unordered_map<dht::token, std::vector<foreign_ptr<sstables::shared_sstable>>>>> new_sstables_per_shard;
std::unordered_map<shard_id, std::unordered_map<table_id, std::vector<foreign_ptr<sstables::shared_sstable>>>> new_sstables_per_shard;
for (auto& [table_id, sst_infos]: _sstables_to_register) {
for (auto& sst_info: sst_infos) {
new_sstables_per_shard[sst_info.shard][table_id][sst_info.last_token].push_back(std::move(sst_info.sst_foreign_ptr));
new_sstables_per_shard[sst_info.shard][table_id].push_back(std::move(sst_info.sst_foreign_ptr));
}
}
for (auto& [shard, sstables_per_table]: new_sstables_per_shard) {
co_await container().invoke_on(shard, [sstables_for_this_shard = std::move(sstables_per_table)] (view_building_worker& local_vbw) mutable {
for (auto& [tid, ssts_map]: sstables_for_this_shard) {
for (auto& [token, ssts]: ssts_map) {
for (auto& [tid, ssts]: sstables_for_this_shard) {
auto unwrapped_ssts = ssts | std::views::as_rvalue | std::views::transform([] (auto&& fptr) {
return fptr.unwrap_on_owner_shard();
}) | std::ranges::to<std::vector>();
auto& tid_ssts = local_vbw._staging_sstables[tid][token];
auto& tid_ssts = local_vbw._staging_sstables[tid];
tid_ssts.insert(tid_ssts.end(), std::make_move_iterator(unwrapped_ssts.begin()), std::make_move_iterator(unwrapped_ssts.end()));
}
}
});
}
@@ -328,7 +326,7 @@ std::unordered_map<table_id, std::vector<view_building_worker::staging_sstable_t
// or maybe it can be registered to view_update_generator directly.
tasks_to_create[table_id].emplace_back(table_id, shard, last_token, make_foreign(std::move(sstable)));
} else {
_staging_sstables[table_id][last_token].push_back(std::move(sstable));
_staging_sstables[table_id].push_back(std::move(sstable));
}
}
});
@@ -847,13 +845,36 @@ future<> view_building_worker::do_build_range(table_id base_id, std::vector<tabl
}
future<> view_building_worker::do_process_staging(table_id table_id, dht::token last_token) {
if (_staging_sstables[table_id][last_token].empty()) {
if (_staging_sstables[table_id].empty()) {
co_return;
}
auto table = _db.get_tables_metadata().get_table(table_id).shared_from_this();
auto sstables = std::exchange(_staging_sstables[table_id][last_token], {});
co_await _vug.process_staging_sstables(std::move(table), std::move(sstables));
auto& tablet_map = table->get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
auto tid = tablet_map.get_tablet_id(last_token);
auto tablet_range = tablet_map.get_token_range(tid);
// Select sstables belonging to the tablet (identified by `last_token`)
std::vector<sstables::shared_sstable> sstables_to_process;
for (auto& sst: _staging_sstables[table_id]) {
auto sst_last_token = sst->get_last_decorated_key().token();
if (tablet_range.contains(sst_last_token, dht::token_comparator())) {
sstables_to_process.push_back(sst);
}
}
co_await _vug.process_staging_sstables(std::move(table), sstables_to_process);
try {
// Remove processed sstables from `_staging_sstables` map
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
std::unordered_set<sstables::shared_sstable> sstables_to_remove(sstables_to_process.begin(), sstables_to_process.end());
auto [first, last] = std::ranges::remove_if(_staging_sstables[table_id], [&] (auto& sst) {
return sstables_to_remove.contains(sst);
});
_staging_sstables[table_id].erase(first, last);
} catch (semaphore_aborted&) {
vbw_logger.warn("Semaphore was aborted while waiting to removed processed sstables for table {}", table_id);
}
}
}

View File

@@ -160,7 +160,7 @@ private:
condition_variable _sstables_to_register_event;
semaphore _staging_sstables_mutex = semaphore(1);
std::unordered_map<table_id, std::vector<staging_sstable_task_info>> _sstables_to_register;
std::unordered_map<table_id, std::unordered_map<dht::token, std::vector<sstables::shared_sstable>>> _staging_sstables;
std::unordered_map<table_id, std::vector<sstables::shared_sstable>> _staging_sstables;
future<> _staging_sstables_registrator = make_ready_future<>();
public: