mutation_writer: remove now unused on-disk partition segregator

Also removes related tests, including the exception safety test which
just spins forever with the memtable method.
This commit is contained in:
Botond Dénes
2021-11-02 09:00:50 +02:00
parent f2f529855d
commit 74f2290e49
3 changed files with 0 additions and 212 deletions

View File

@@ -26,119 +26,6 @@
namespace mutation_writer {
class partition_based_splitting_mutation_writer {
struct bucket {
bucket_writer writer;
dht::decorated_key last_key;
size_t data_size = 0;
};
private:
schema_ptr _schema;
reader_permit _permit;
reader_consumer _consumer;
unsigned _max_buckets;
std::vector<bucket> _buckets;
bucket* _current_bucket = nullptr;
future<> write_to_bucket(mutation_fragment&& mf) {
_current_bucket->data_size += mf.memory_usage();
return _current_bucket->writer.consume(std::move(mf));
}
future<bucket*> create_bucket_for(const dht::decorated_key& key) {
if (_buckets.size() < _max_buckets) {
co_return &_buckets.emplace_back(bucket{bucket_writer(_schema, _permit, _consumer), key});
}
auto it = std::max_element(_buckets.begin(), _buckets.end(), [] (const bucket& a, const bucket& b) {
return a.data_size < b.data_size;
});
it->writer.consume_end_of_stream();
co_await it->writer.close();
try {
*it = bucket{bucket_writer(_schema, _permit, _consumer), key};
} catch (...) {
_buckets.erase(it);
throw;
}
co_return &*it;
}
public:
partition_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer consumer, unsigned max_buckets)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _consumer(std::move(consumer))
, _max_buckets(max_buckets)
{}
future<> consume(partition_start ps) {
if (_buckets.empty()) {
_current_bucket = co_await create_bucket_for(ps.key());
} else if (dht::ring_position_tri_compare(*_schema, _current_bucket->last_key, ps.key()) < 0) {
// No need to change bucket, just update the last key.
_current_bucket->last_key = ps.key();
} else {
// Find the first bucket where this partition doesn't cause
// monotonicity violations. Prefer the buckets towards the head of the list.
auto it = std::find_if(_buckets.begin(), _buckets.end(), [this, &ps] (const bucket& b) {
return dht::ring_position_tri_compare(*_schema, b.last_key, ps.key()) < 0;
});
if (it == _buckets.end()) {
_current_bucket = co_await create_bucket_for(ps.key());
} else {
_current_bucket = &*it;
_current_bucket->last_key = ps.key();
}
}
co_return co_await write_to_bucket(mutation_fragment(*_schema, _permit, std::move(ps)));
}
future<> consume(static_row&& sr) {
return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(sr)));
}
future<> consume(clustering_row&& cr) {
return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(cr)));
}
future<> consume(range_tombstone&& rt) {
return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(rt)));
}
future<> consume(partition_end&& pe) {
return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(pe)));
}
void consume_end_of_stream() {
for (auto& bucket : _buckets) {
bucket.writer.consume_end_of_stream();
}
}
void abort(std::exception_ptr ep) {
for (auto& bucket : _buckets) {
bucket.writer.abort(ep);
}
}
future<> close() noexcept {
return parallel_for_each(_buckets, [] (bucket& bucket) {
return bucket.writer.close();
});
}
};
future<> segregate_by_partition(flat_mutation_reader producer, unsigned max_buckets, reader_consumer consumer) {
auto schema = producer.schema();
auto permit = producer.permit();
try {
return feed_writer(std::move(producer),
partition_based_splitting_mutation_writer(std::move(schema), std::move(permit), std::move(consumer), max_buckets));
} catch (...) {
return producer.close().then([ex = std::current_exception()] () mutable {
return make_exception_future<>(std::move(ex));
});
}
}
class partition_sorting_mutation_writer {
schema_ptr _schema;
reader_permit _permit;

View File

@@ -27,19 +27,6 @@
namespace mutation_writer {
// Given a producer that may contain partitions in the wrong order, or even
// contain partitions multiple times, separate them such that each output
// stream keeps the partition ordering guarantee. In other words, repair
// a stream that violates the ordering requirements by splitting it into output
// streams that honor it.
// This is useful for scrub compaction to split sstables containing out-of-order
// and/or duplicate partitions into sstables that honor the partition ordering.
//
// The parameter max_buckets limits the number of live buckets. When reaching the
// limit, an existing (the largest) bucket will be closed before a new one is
// created.
future<> segregate_by_partition(flat_mutation_reader producer, unsigned max_buckets, reader_consumer consumer);
struct segregate_config {
// For flushing the memtable which does the in-memory segregation (sorting)
// part.

View File

@@ -462,89 +462,3 @@ SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer) {
});
}).get();
}
// Check that the partition_based_splitting_mutation_writer limits the number of live buckets
SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer_bucket_limit) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(0, 2),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(0, 1));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
const auto max_buckets = 10u;
auto input_mutations = tests::generate_random_mutations(random_schema).get();
std::shuffle(input_mutations.begin(), input_mutations.end(), tests::random::gen());
while (input_mutations.size() < max_buckets * 4) {
input_mutations.push_back(input_mutations.at(tests::random::get_int<uint32_t>(input_mutations.size() - 1)));
}
unsigned num_buckets = 0;
mutation_writer::segregate_by_partition(flat_mutation_reader_from_mutations(semaphore.make_permit(), std::move(input_mutations)),
mutation_writer::on_disk_segregate_config{max_buckets}, [&num_buckets] (flat_mutation_reader rd) {
++num_buckets;
BOOST_REQUIRE(num_buckets <= max_buckets);
return async([&num_buckets, rd = std::move(rd)] () mutable {
assert_that(std::move(rd)).has_monotonic_positions();
BOOST_REQUIRE(num_buckets);
--num_buckets;
});
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer_exception_safety) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(0, 2),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(0, 1));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
const auto max_buckets = 10u;
auto input_mutations = tests::generate_random_mutations(random_schema).get();
std::shuffle(input_mutations.begin(), input_mutations.end(), tests::random::gen());
while (input_mutations.size() < max_buckets * 4) {
input_mutations.push_back(input_mutations.at(tests::random::get_int<uint32_t>(input_mutations.size() - 1)));
}
auto& injector = memory::local_failure_injector();
uint64_t i = 0;
do {
auto reader = flat_mutation_reader_from_mutations(semaphore.make_permit(), input_mutations);
testlog.trace("i={}", i);
try {
injector.fail_after(i++);
mutation_writer::segregate_by_partition(std::move(reader), mutation_writer::segregate_config{default_priority_class(), 1000},
[&injector] (flat_mutation_reader rd) {
try {
injector.on_alloc_point();
} catch (...) {
(void)rd.close();
throw;
}
// We are not interested in the various ways the below can fail.
memory::scoped_critical_alloc_section _;
return async([rd = std::move(rd)] () mutable {
auto close_reader = deferred_close(rd);
try {
while (auto mfopt = rd().get());
} catch (...) {
}
});
}).get();
injector.cancel();
} catch (const std::bad_alloc&) {
}
} while (injector.failed());
}