db,compaction: use utils::chunked_vector for cache invalidation ranges
Instead of dht::partition_ranges_vector, which is an std::vector<> and
have been seen to cause large allocations when calculating ranges to be
invalidated after compaction:
seastar_memory - oversized allocation: 147456 bytes. This is non-fatal, but could lead to latency and/or fragmentation issues. Please report: at
[Backtrace #0]
void seastar::backtrace<seastar::current_backtrace_tasklocal()::$_0>(seastar::current_backtrace_tasklocal()::$_0&&, bool) at ./build/release/seastar/./seastar/include/seastar/util/backtrace.hh:89
(inlined by) seastar::current_backtrace_tasklocal() at ./build/release/seastar/./seastar/src/util/backtrace.cc:99
seastar::current_tasktrace() at ./build/release/seastar/./seastar/src/util/backtrace.cc:136
seastar::current_backtrace() at ./build/release/seastar/./seastar/src/util/backtrace.cc:169
seastar::memory::cpu_pages::warn_large_allocation(unsigned long) at ./build/release/seastar/./seastar/src/core/memory.cc:840
seastar::memory::cpu_pages::check_large_allocation(unsigned long) at ./build/release/seastar/./seastar/src/core/memory.cc:903
(inlined by) seastar::memory::cpu_pages::allocate_large(unsigned int, bool) at ./build/release/seastar/./seastar/src/core/memory.cc:910
(inlined by) seastar::memory::allocate_large(unsigned long, bool) at ./build/release/seastar/./seastar/src/core/memory.cc:1533
(inlined by) seastar::memory::allocate_slowpath(unsigned long) at ./build/release/seastar/./seastar/src/core/memory.cc:1679
seastar::memory::allocate(unsigned long) at ././seastar/src/core/memory.cc:1698
(inlined by) operator new(unsigned long) at ././seastar/src/core/memory.cc:2440
(inlined by) std::__new_allocator<interval<dht::ring_position>>::allocate(unsigned long, void const*) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/new_allocator.h:151
(inlined by) std::allocator<interval<dht::ring_position>>::allocate(unsigned long) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/allocator.h:203
(inlined by) std::allocator_traits<std::allocator<interval<dht::ring_position>>>::allocate(std::allocator<interval<dht::ring_position>>&, unsigned long) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/alloc_traits.h:614
(inlined by) std::_Vector_base<interval<dht::ring_position>, std::allocator<interval<dht::ring_position>>>::_M_allocate(unsigned long) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/stl_vector.h:387
(inlined by) std::vector<interval<dht::ring_position>, std::allocator<interval<dht::ring_position>>>::reserve(unsigned long) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/vector.tcc:79
dht::to_partition_ranges(utils::chunked_vector<interval<dht::token>, 131072ul> const&, seastar::bool_class<utils::can_yield_tag>) at ./dht/i_partitioner.cc:347
compaction::compaction::get_ranges_for_invalidation(std::vector<seastar::lw_shared_ptr<sstables::sstable>, std::allocator<seastar::lw_shared_ptr<sstables::sstable>>> const&) at ./compaction/compaction.cc:619
(inlined by) compaction::compaction::get_compaction_completion_desc(std::vector<seastar::lw_shared_ptr<sstables::sstable>, std::allocator<seastar::lw_shared_ptr<sstables::sstable>>>, std::vector<seastar::lw_shared_ptr<sstables::sstable>, std::allocator<seastar::lw_shared_ptr<sstables::sstable>>>) at ./compaction/compaction.cc:719
(inlined by) compaction::regular_compaction::replace_remaining_exhausted_sstables() at ./compaction/compaction.cc:1362
compaction::compaction::finish(std::chrono::time_point<db_clock, std::chrono::duration<long, std::ratio<1l, 1000l>>>, std::chrono::time_point<db_clock, std::chrono::duration<long, std::ratio<1l, 1000l>>>) at ./compaction/compaction.cc:1021
compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0::operator()() at ./compaction/compaction.cc:1960
(inlined by) compaction::compaction_result std::__invoke_impl<compaction::compaction_result, compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(std::__invoke_other, compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/invoke.h:63
(inlined by) std::__invoke_result<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>::type std::__invoke<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/invoke.h:98
(inlined by) decltype(auto) std::__apply_impl<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0, std::tuple<>>(compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&, std::tuple<>&&, std::integer_sequence<unsigned long, ...>) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/tuple:2920
(inlined by) decltype(auto) std::apply<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0, std::tuple<>>(compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&, std::tuple<>&&) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/tuple:2935
(inlined by) seastar::future<compaction::compaction_result> seastar::futurize<compaction::compaction_result>::apply<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&, std::tuple<>&&) at ././seastar/include/seastar/core/future.hh:1930
(inlined by) seastar::futurize<std::invoke_result<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>::type>::type seastar::async<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(seastar::thread_attributes, compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&)::'lambda'()::operator()() const at ././seastar/include/seastar/core/thread.hh:267
(inlined by) seastar::noncopyable_function<void ()>::direct_vtable_for<seastar::futurize<std::invoke_result<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>::type>::type seastar::async<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(seastar::thread_attributes, compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&)::'lambda'()>::call(seastar::noncopyable_function<void ()> const*) at ././seastar/include/seastar/util/noncopyable_function.hh:138
seastar::noncopyable_function<void ()>::operator()() const at ./build/release/seastar/./seastar/include/seastar/util/noncopyable_function.hh:224
(inlined by) seastar::thread_context::main() at ./build/release/seastar/./seastar/src/core/thread.cc:318
dht::partition_ranges_vector is used on the hot path, so just convert
the problematic user -- cache invalidation -- to use
utils::chunked_vector<dht::partition_range> instead.
Fixes: SCYLLADB-121
Closes scylladb/scylladb#28855
(cherry picked from commit 13ff9c4394)
Closes scylladb/scylladb#28975
This commit is contained in:
@@ -48,6 +48,7 @@
|
||||
#include "mutation/mutation_fragment_stream_validator.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "utils/pretty_printers.hh"
|
||||
#include "readers/multi_range.hh"
|
||||
#include "readers/compacting.hh"
|
||||
@@ -611,23 +612,23 @@ private:
|
||||
}
|
||||
|
||||
// Called in a seastar thread
|
||||
dht::partition_range_vector
|
||||
utils::chunked_vector<dht::partition_range>
|
||||
get_ranges_for_invalidation(const std::vector<sstables::shared_sstable>& sstables) {
|
||||
// If owned ranges is disengaged, it means no cleanup work was done and
|
||||
// so nothing needs to be invalidated.
|
||||
if (!_owned_ranges) {
|
||||
return dht::partition_range_vector{};
|
||||
return {};
|
||||
}
|
||||
auto owned_ranges = dht::to_partition_ranges(*_owned_ranges, utils::can_yield::yes);
|
||||
auto owned_ranges = dht::to_partition_ranges_chunked(*_owned_ranges).get();
|
||||
|
||||
auto non_owned_ranges = sstables
|
||||
| std::views::transform([] (const sstables::shared_sstable& sst) {
|
||||
seastar::thread::maybe_yield();
|
||||
return dht::partition_range::make({sst->get_first_decorated_key(), true},
|
||||
{sst->get_last_decorated_key(), true});
|
||||
}) | std::ranges::to<dht::partition_range_vector>();
|
||||
}) | std::ranges::to<utils::chunked_vector<dht::partition_range>>();
|
||||
|
||||
return dht::subtract_ranges(*_schema, non_owned_ranges, std::move(owned_ranges)).get();
|
||||
return dht::subtract_ranges(*_schema, std::move(non_owned_ranges), std::move(owned_ranges)).get();
|
||||
}
|
||||
protected:
|
||||
compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor, use_backlog_tracker use_backlog_tracker)
|
||||
@@ -718,8 +719,8 @@ protected:
|
||||
|
||||
compaction_completion_desc
|
||||
get_compaction_completion_desc(std::vector<sstables::shared_sstable> input_sstables, std::vector<sstables::shared_sstable> output_sstables) {
|
||||
auto ranges_for_for_invalidation = get_ranges_for_invalidation(input_sstables);
|
||||
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges_for_for_invalidation)};
|
||||
auto ranges = get_ranges_for_invalidation(input_sstables);
|
||||
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges)};
|
||||
}
|
||||
|
||||
// Tombstone expiration is enabled based on the presence of sstable set.
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "compaction_fwd.hh"
|
||||
#include "mutation_writer/token_group_based_splitting_writer.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
|
||||
namespace compaction {
|
||||
|
||||
@@ -38,7 +39,7 @@ struct compaction_completion_desc {
|
||||
// New, fresh SSTables that should be added to SSTable set, replacing the old ones.
|
||||
std::vector<sstables::shared_sstable> new_sstables;
|
||||
// Set of compacted partition ranges that should be invalidated in the cache.
|
||||
dht::partition_range_vector ranges_for_cache_invalidation;
|
||||
utils::chunked_vector<dht::partition_range> ranges_for_cache_invalidation;
|
||||
};
|
||||
|
||||
// creates a new SSTable for a given shard
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/labels.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
|
||||
namespace cache {
|
||||
|
||||
@@ -1215,10 +1216,10 @@ future<> row_cache::invalidate(external_updater eu, const dht::decorated_key& dk
|
||||
}
|
||||
|
||||
future<> row_cache::invalidate(external_updater eu, const dht::partition_range& range, cache_invalidation_filter filter) {
|
||||
return invalidate(std::move(eu), dht::partition_range_vector({range}), std::move(filter));
|
||||
return invalidate(std::move(eu), utils::chunked_vector<dht::partition_range>({range}), std::move(filter));
|
||||
}
|
||||
|
||||
future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&& ranges, cache_invalidation_filter filter) {
|
||||
future<> row_cache::invalidate(external_updater eu, utils::chunked_vector<dht::partition_range>&& ranges, cache_invalidation_filter filter) {
|
||||
return do_update(std::move(eu), [this, ranges = std::move(ranges), filter = std::move(filter)] mutable {
|
||||
return seastar::async([this, ranges = std::move(ranges), filter = std::move(filter)] {
|
||||
auto on_failure = defer([this] () noexcept {
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#include "utils/histogram.hh"
|
||||
#include "mutation/partition_version.hh"
|
||||
#include "utils/double-decker.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "db/cache_tracker.hh"
|
||||
#include "readers/empty.hh"
|
||||
#include "readers/mutation_source.hh"
|
||||
@@ -457,7 +458,7 @@ public:
|
||||
// mutation source made prior to the call to invalidate().
|
||||
future<> invalidate(external_updater, const dht::decorated_key&);
|
||||
future<> invalidate(external_updater, const dht::partition_range& = query::full_partition_range, cache_invalidation_filter filter = [] (const auto&) { return true; });
|
||||
future<> invalidate(external_updater, dht::partition_range_vector&&, cache_invalidation_filter filter = [] (const auto&) { return true; });
|
||||
future<> invalidate(external_updater, utils::chunked_vector<dht::partition_range>&&, cache_invalidation_filter filter = [] (const auto&) { return true; });
|
||||
|
||||
// Evicts entries from cache.
|
||||
//
|
||||
|
||||
@@ -352,6 +352,16 @@ dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& r
|
||||
return prs;
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<dht::partition_range>> to_partition_ranges_chunked(const dht::token_range_vector& ranges) {
|
||||
utils::chunked_vector<dht::partition_range> prs;
|
||||
prs.reserve(ranges.size());
|
||||
for (auto& range : ranges) {
|
||||
prs.push_back(dht::to_partition_range(range));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return prs;
|
||||
}
|
||||
|
||||
std::map<unsigned, dht::partition_range_vector>
|
||||
split_range_to_shards(dht::partition_range pr, const schema& s, const sharder& raw_sharder) {
|
||||
std::map<unsigned, dht::partition_range_vector> ret;
|
||||
@@ -364,11 +374,11 @@ split_range_to_shards(dht::partition_range pr, const schema& s, const sharder& r
|
||||
return ret;
|
||||
}
|
||||
|
||||
future<dht::partition_range_vector> subtract_ranges(const schema& schema, const dht::partition_range_vector& source_ranges, dht::partition_range_vector ranges_to_subtract) {
|
||||
future<utils::chunked_vector<dht::partition_range>> subtract_ranges(const schema& schema, utils::chunked_vector<dht::partition_range> source_ranges, utils::chunked_vector<dht::partition_range> ranges_to_subtract) {
|
||||
auto cmp = dht::ring_position_comparator(schema);
|
||||
// optimize set of potentially overlapping ranges by deoverlapping them.
|
||||
auto ranges = dht::partition_range::deoverlap(source_ranges, cmp);
|
||||
dht::partition_range_vector res;
|
||||
auto ranges = dht::partition_range::deoverlap(std::move(source_ranges), cmp);
|
||||
utils::chunked_vector<dht::partition_range> res;
|
||||
res.reserve(ranges.size() * 2);
|
||||
|
||||
auto range = ranges.begin();
|
||||
|
||||
@@ -91,6 +91,7 @@ inline token get_token(const schema& s, partition_key_view key) {
|
||||
|
||||
dht::partition_range to_partition_range(dht::token_range);
|
||||
dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& ranges, utils::can_yield can_yield = utils::can_yield::no);
|
||||
future<utils::chunked_vector<dht::partition_range>> to_partition_ranges_chunked(const dht::token_range_vector& ranges);
|
||||
|
||||
// Each shard gets a sorted, disjoint vector of ranges
|
||||
std::map<unsigned, dht::partition_range_vector>
|
||||
@@ -105,7 +106,7 @@ std::unique_ptr<dht::i_partitioner> make_partitioner(sstring name);
|
||||
// Returns a sorted and deoverlapped list of ranges that are
|
||||
// the result of subtracting all ranges from ranges_to_subtract.
|
||||
// ranges_to_subtract must be sorted and deoverlapped.
|
||||
future<dht::partition_range_vector> subtract_ranges(const schema& schema, const dht::partition_range_vector& ranges, dht::partition_range_vector ranges_to_subtract);
|
||||
future<utils::chunked_vector<dht::partition_range>> subtract_ranges(const schema& schema, utils::chunked_vector<dht::partition_range> ranges, utils::chunked_vector<dht::partition_range> ranges_to_subtract);
|
||||
|
||||
// Returns a token_range vector split based on the given number of most-significant bits
|
||||
dht::token_range_vector split_token_range_msb(unsigned most_significant_bits);
|
||||
|
||||
@@ -719,7 +719,7 @@ SEASTAR_THREAD_TEST_CASE(test_dht_subtract_ranges) {
|
||||
|
||||
auto get_random_ranges = [&] (size_t max_count) {
|
||||
auto count = tests::random::get_int<size_t>(1, max_count);
|
||||
dht::partition_range_vector ranges;
|
||||
utils::chunked_vector<dht::partition_range> ranges;
|
||||
ranges.reserve(count);
|
||||
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
|
||||
Reference in New Issue
Block a user