sstables_loader: Implement tablet based load-and-stream

Similar treatment to repair is given to load-and-stream.

Jumps into a new streaming session for every tablet, so we guarantee
data will be segregated into tablets co-habiting the same shard.

Fixes #17315.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2024-02-19 20:51:46 -03:00
parent b9158e36ef
commit ab498489fe

View File

@@ -148,12 +148,74 @@ protected:
future<> stream_sstable_mutations(const dht::partition_range&, std::vector<sstables::shared_sstable>, bool primary_replica_only);
};
class tablet_sstable_streamer : public sstable_streamer {
const locator::tablet_map& _tablet_map;
public:
tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables)
: sstable_streamer(ms, db, table_id, std::move(sstables))
, _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) {
}
virtual future<> stream(bool primary_replica_only) override;
private:
future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only) {
// FIXME: fully contained sstables can be optimized.
return stream_sstables(pr, std::move(sstables), primary_replica_only);
}
};
future<> sstable_streamer::stream(bool primary_replica_only) {
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
co_await stream_sstables(full_partition_range, std::move(_sstables), primary_replica_only);
}
future<> tablet_sstable_streamer::stream(bool primary_replica_only) {
// sstables are sorted by first key in reverse order.
auto sstable_it = _sstables.rbegin();
for (auto tablet_id : _tablet_map.tablet_ids()) {
auto tablet_range = _tablet_map.get_token_range(tablet_id);
auto sstable_token_range = [] (const sstables::shared_sstable& sst) {
return dht::token_range(sst->get_first_decorated_key().token(),
sst->get_last_decorated_key().token());
};
std::vector<sstables::shared_sstable> sstables_fully_contained;
std::vector<sstables::shared_sstable> sstables_partially_contained;
// sstable is exhausted if its last key is before the current tablet range
auto exhausted = [&tablet_range] (const sstables::shared_sstable& sst) {
return tablet_range.before(sst->get_last_decorated_key().token(), dht::token_comparator{});
};
while (sstable_it != _sstables.rend() && exhausted(*sstable_it)) {
sstable_it++;
}
for (; sstable_it != _sstables.rend(); sstable_it++) {
auto sst_token_range = sstable_token_range(*sstable_it);
// sstables are sorted by first key, so we're done with current tablet when
// the next sstable doesn't overlap with its owned token range.
if (!tablet_range.overlaps(sst_token_range, dht::token_comparator{})) {
break;
}
if (tablet_range.contains(sst_token_range, dht::token_comparator{})) {
sstables_fully_contained.push_back(*sstable_it);
} else {
sstables_partially_contained.push_back(*sstable_it);
}
co_await coroutine::maybe_yield();
}
auto tablet_pr = dht::to_partition_range(tablet_range);
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), primary_replica_only);
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), primary_replica_only);
}
}
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only) {
while (!sstables.empty()) {
@@ -284,13 +346,22 @@ future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range&
co_return;
}
template <typename... Args>
static std::unique_ptr<sstable_streamer> make_sstable_streamer(bool uses_tablets, Args&&... args) {
if (uses_tablets) {
return std::make_unique<tablet_sstable_streamer>(std::forward<Args>(args)...);
}
return std::make_unique<sstable_streamer>(std::forward<Args>(args)...);
}
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary_replica_only) {
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
// throughout its lifetime.
sstable_streamer streamer(_messaging, _db.local(), table_id, std::move(sstables));
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
_messaging, _db.local(), table_id, std::move(sstables));
co_await streamer.stream(primary_replica_only);
co_await streamer->stream(primary_replica_only);
}
// For more details, see distributed_loader::process_upload_dir().