compaction_strategy: add add_interposer_consumer()

This will be the customization point for compaction strategies, used to
inject a specific interposer consumer that can manipulate the fragment
stream so that it satisfies the requirements of the compaction strategy.
For now the only candidate for injecting such an interposer is
time-window compaction strategy, which needs to write sstables that
only contains atoms belonging to the same time-window. By default no
interposer is injected.
Also add an accompanying customization point
`adjust_partition_estimate()` which returns the estimated per-sstable
partition-estimate that the interposer will produce.
This commit is contained in:
Botond Dénes
2019-03-11 10:36:27 +02:00
parent 3ce902a4be
commit a280dcfe4c
3 changed files with 32 additions and 0 deletions

View File

@@ -21,6 +21,9 @@
#pragma once
#include <seastar/core/future.hh>
#include <seastar/util/noncopyable_function.hh>
#include "schema_fwd.hh"
#include "sstables/shared_sstable.hh"
#include "exceptions/exceptions.hh"
@@ -29,6 +32,9 @@
class table;
using column_family = table;
class flat_mutation_reader;
struct mutation_source_metadata;
namespace sstables {
enum class compaction_strategy_type {
@@ -46,6 +52,8 @@ class sstable_set;
struct compaction_descriptor;
struct resharding_descriptor;
using reader_consumer = noncopyable_function<future<> (flat_mutation_reader)>;
class compaction_strategy {
::shared_ptr<compaction_strategy_impl> _compaction_strategy_impl;
public:
@@ -128,6 +136,10 @@ public:
sstable_set make_sstable_set(schema_ptr schema) const;
compaction_backlog_tracker& get_backlog_tracker();
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
};
// Creates a compaction_strategy object from one of the strategies available.

View File

@@ -473,6 +473,14 @@ compaction_strategy_impl::get_resharding_jobs(column_family& cf, std::vector<sst
return jobs;
}
uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) {
return partition_estimate;
}
reader_consumer compaction_strategy_impl::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) {
return end_consumer;
}
// The backlog for TWCS is just the sum of the individual backlogs in each time window.
// We'll keep various SizeTiered backlog tracker objects-- one per window for the static SSTables.
// We then scan the current compacting and in-progress writes and matching them to existing time
@@ -803,6 +811,14 @@ compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() {
return _compaction_strategy_impl->get_backlog_tracker();
}
uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) {
return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate);
}
reader_consumer compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) {
return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer));
}
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
::shared_ptr<compaction_strategy_impl> impl;

View File

@@ -95,5 +95,9 @@ public:
bool worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point gc_before);
virtual compaction_backlog_tracker& get_backlog_tracker() = 0;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
};
}