distributed_loader: Kill table's _sstables_opened_but_not_loaded

_sstables_opened_but_not_loaded was needed because the old loader would
open sstables from all shards before loading them.
In the new loader, introduced with reshape, make_sstables_available()
is called on each shard after resharding and reshape finished, so
there's no need whatsoever for that mess.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210618200026.1002621-1-raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2021-06-18 17:00:25 -03:00
committed by Avi Kivity
parent ee28eb4100
commit 88119a5c81
2 changed files with 12 additions and 17 deletions

View File

@@ -414,11 +414,6 @@ private:
// have not been deleted yet, so must not GC any tombstones in other sstables
// that may delete data in these sstables:
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
// sstables that have been opened but not loaded yet, that's because refresh
// needs to load all opened sstables atomically, and now, we open a sstable
// in all shards at the same time, which makes it hard to store all sstables
// we need to load later on for all shards.
std::vector<sstables::shared_sstable> _sstables_opened_but_not_loaded;
// sstables that should not be compacted (e.g. because they need to be used
// to generate view updates later)
std::unordered_map<uint64_t, sstables::shared_sstable> _sstables_staging;

View File

@@ -325,25 +325,26 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
auto& table = db.local().find_column_family(ks, cf);
return do_with(dht::ring_position::max(), dht::ring_position::min(), [&table, &dir, &view_update_generator, datadir = std::move(datadir)] (dht::ring_position& min, dht::ring_position& max) {
return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &min, &max] (sstables::shared_sstable sst) {
return do_with(dht::ring_position::max(), dht::ring_position::min(), std::vector<sstables::shared_sstable>(),
[&table, &dir, &view_update_generator, datadir = std::move(datadir)] (dht::ring_position& min, dht::ring_position& max, std::vector<sstables::shared_sstable>& new_sstables) {
return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &min, &max, &new_sstables] (sstables::shared_sstable sst) {
min = std::min(dht::ring_position(sst->get_first_decorated_key()), min, dht::ring_position_less_comparator(*table.schema()));
max = std::max(dht::ring_position(sst->get_last_decorated_key()) , max, dht::ring_position_less_comparator(*table.schema()));
auto gen = table.calculate_generation_for_new_table();
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen);
return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, sst] {
table._sstables_opened_but_not_loaded.push_back(std::move(sst));
return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, &new_sstables, sst] {
new_sstables.push_back(std::move(sst));
return make_ready_future<>();
});
}).then([&table, &min, &max] {
}).then([&table, &min, &max, &new_sstables] {
// nothing loaded
if (min.is_max() && max.is_min()) {
return make_ready_future<>();
}
return table.get_row_cache().invalidate(row_cache::external_updater([&table] () noexcept {
for (auto& sst : table._sstables_opened_but_not_loaded) {
return table.get_row_cache().invalidate(row_cache::external_updater([&table, &new_sstables] () noexcept {
for (auto& sst : new_sstables) {
try {
table.load_sstable(sst, true);
} catch (...) {
@@ -352,17 +353,16 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
}
}
}), dht::partition_range::make({min, true}, {max, true}));
}).then([&view_update_generator, &table] {
return parallel_for_each(table._sstables_opened_but_not_loaded, [&view_update_generator, &table] (sstables::shared_sstable& sst) {
}).then([&view_update_generator, &table, &new_sstables] {
return parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable& sst) {
if (sst->requires_view_building()) {
return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
}
return make_ready_future<>();
});
}).then_wrapped([&table] (future<> f) {
auto opened = std::exchange(table._sstables_opened_but_not_loaded, {});
}).then_wrapped([&new_sstables] (future<> f) {
if (!f.failed()) {
return make_ready_future<size_t>(opened.size());
return make_ready_future<size_t>(new_sstables.size());
} else {
return make_exception_future<size_t>(f.get_exception());
}