From a280dcfe4c3b98746f164d8736aa9191aa482d25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 11 Mar 2019 10:36:27 +0200 Subject: [PATCH] compaction_strategy: add add_interposer_consumer() This will be the customization point for compaction strategies, used to inject a specific interposer consumer that can manipulate the fragment stream so that it satisfies the requirements of the compaction strategy. For now the only candidate for injecting such an interposer is time-window compaction strategy, which needs to write sstables that only contains atoms belonging to the same time-window. By default no interposer is injected. Also add an accompanying customization point `adjust_partition_estimate()` which returns the estimated per-sstable partition-estimate that the interposer will produce. --- compaction_strategy.hh | 12 ++++++++++++ sstables/compaction_strategy.cc | 16 ++++++++++++++++ sstables/compaction_strategy_impl.hh | 4 ++++ 3 files changed, 32 insertions(+) diff --git a/compaction_strategy.hh b/compaction_strategy.hh index 677c03ea24..8e7670ca7d 100644 --- a/compaction_strategy.hh +++ b/compaction_strategy.hh @@ -21,6 +21,9 @@ #pragma once +#include +#include + #include "schema_fwd.hh" #include "sstables/shared_sstable.hh" #include "exceptions/exceptions.hh" @@ -29,6 +32,9 @@ class table; using column_family = table; +class flat_mutation_reader; +struct mutation_source_metadata; + namespace sstables { enum class compaction_strategy_type { @@ -46,6 +52,8 @@ class sstable_set; struct compaction_descriptor; struct resharding_descriptor; +using reader_consumer = noncopyable_function (flat_mutation_reader)>; + class compaction_strategy { ::shared_ptr _compaction_strategy_impl; public: @@ -128,6 +136,10 @@ public: sstable_set make_sstable_set(schema_ptr schema) const; compaction_backlog_tracker& get_backlog_tracker(); + + uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate); + + reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer); }; // Creates a compaction_strategy object from one of the strategies available. diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 36db679aa6..2336efc58d 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -473,6 +473,14 @@ compaction_strategy_impl::get_resharding_jobs(column_family& cf, std::vectorget_backlog_tracker(); } +uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) { + return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate); +} + +reader_consumer compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) { + return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer)); +} + compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map& options) { ::shared_ptr impl; diff --git a/sstables/compaction_strategy_impl.hh b/sstables/compaction_strategy_impl.hh index e7b5dbad85..e2506c2084 100644 --- a/sstables/compaction_strategy_impl.hh +++ b/sstables/compaction_strategy_impl.hh @@ -95,5 +95,9 @@ public: bool worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point gc_before); virtual compaction_backlog_tracker& get_backlog_tracker() = 0; + + virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate); + + virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer); }; }