db: view_update_generator: always clean up staging sstables
Since they are currently not cleaned up by cleanup compaction filter their tokens, processing only tokens owned by the current node (based on the keyspace replication strategy). Refs #9559 Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/progress_monitor.hh"
|
||||
#include "readers/evictable.hh"
|
||||
#include "dht/partition_filter.hh"
|
||||
|
||||
static logging::logger vug_logger("view_update_generator");
|
||||
|
||||
@@ -158,7 +159,8 @@ future<> view_update_generator::start() {
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
inject_failure("view_update_generator_consume_staging_sstable");
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle));
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle),
|
||||
dht::incremental_owned_ranges_checker::make_partition_filter(_db.get_keyspace_local_ranges(s->ks_name())));
|
||||
staging_sstable_reader.close().get();
|
||||
if (result == stop_iteration::yes) {
|
||||
break;
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "sharder.hh"
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include "dht/token-sharding.hh"
|
||||
#include "dht/partition_filter.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "types.hh"
|
||||
#include "utils/murmur_hash.hh"
|
||||
@@ -362,4 +363,10 @@ split_range_to_shards(dht::partition_range pr, const schema& s) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2::filter incremental_owned_ranges_checker::make_partition_filter(const dht::token_range_vector& sorted_owned_ranges) {
|
||||
return [checker = incremental_owned_ranges_checker(sorted_owned_ranges)] (const dht::decorated_key& dk) mutable {
|
||||
return checker.belongs_to_current_node(dk.token());
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "readers/flat_mutation_reader_v2.hh"
|
||||
|
||||
namespace dht {
|
||||
|
||||
@@ -33,6 +34,8 @@ public:
|
||||
|
||||
return _it != _sorted_owned_ranges.end() && _it->contains(t, dht::token_comparator());
|
||||
}
|
||||
|
||||
static flat_mutation_reader_v2::filter make_partition_filter(const dht::token_range_vector& sorted_owned_ranges);
|
||||
};
|
||||
|
||||
} // dht
|
||||
|
||||
Reference in New Issue
Block a user