From 10f8f13b9004fd5f73cab6a527be2fad291c044f Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 8 Nov 2022 19:21:53 +0200 Subject: [PATCH] db: view_update_generator: always clean up staging sstables Since they are currently not cleaned up by cleanup compaction filter their tokens, processing only tokens owned by the current node (based on the keyspace replication strategy). Refs #9559 Signed-off-by: Benny Halevy --- db/view/view_update_generator.cc | 4 +++- dht/i_partitioner.cc | 7 +++++++ dht/partition_filter.hh | 3 +++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 5021026856..8b3528b1a1 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -15,6 +15,7 @@ #include "sstables/sstables.hh" #include "sstables/progress_monitor.hh" #include "readers/evictable.hh" +#include "dht/partition_filter.hh" static logging::logger vug_logger("view_update_generator"); @@ -158,7 +159,8 @@ future<> view_update_generator::start() { ::mutation_reader::forwarding::no); inject_failure("view_update_generator_consume_staging_sstable"); - auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle)); + auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle), + dht::incremental_owned_ranges_checker::make_partition_filter(_db.get_keyspace_local_ranges(s->ks_name()))); staging_sstable_reader.close().get(); if (result == stop_iteration::yes) { break; diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index 517b721839..d9aab7383d 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -10,6 +10,7 @@ #include "sharder.hh" #include #include "dht/token-sharding.hh" +#include "dht/partition_filter.hh" #include "utils/class_registrator.hh" #include "types.hh" #include "utils/murmur_hash.hh" @@ -362,4 +363,10 @@ split_range_to_shards(dht::partition_range pr, const schema& s) { return ret; } +flat_mutation_reader_v2::filter incremental_owned_ranges_checker::make_partition_filter(const dht::token_range_vector& sorted_owned_ranges) { + return [checker = incremental_owned_ranges_checker(sorted_owned_ranges)] (const dht::decorated_key& dk) mutable { + return checker.belongs_to_current_node(dk.token()); + }; +} + } diff --git a/dht/partition_filter.hh b/dht/partition_filter.hh index 30f8058fde..d2660bc13c 100644 --- a/dht/partition_filter.hh +++ b/dht/partition_filter.hh @@ -10,6 +10,7 @@ #pragma once #include "dht/i_partitioner.hh" +#include "readers/flat_mutation_reader_v2.hh" namespace dht { @@ -33,6 +34,8 @@ public: return _it != _sorted_owned_ranges.end() && _it->contains(t, dht::token_comparator()); } + + static flat_mutation_reader_v2::filter make_partition_filter(const dht::token_range_vector& sorted_owned_ranges); }; } // dht