test/boost/mutation_test: add test for multishard permit safety
Add a test checking that the multishard reader will not deadlock, when created with an admitted permit, on a semaphore with a single count resource.
This commit is contained in:
@@ -4284,3 +4284,75 @@ SEASTAR_THREAD_TEST_CASE(test_generating_reader_v2) {
|
||||
};
|
||||
run_mutation_source_tests(populator_v2, false);
|
||||
}
|
||||
|
||||
// Check that the multishard reader is safe to create with an admitted permit,
|
||||
// i.e. a permit which already has a count resource (and memory resources).
|
||||
// Create semaphroes with a single count resource, admit a permit and create a
|
||||
// multishard reader with said permit and ensure this doesn't end up in a
|
||||
// deadlock (timeout) when the multishard reader creates the shard reader on the
|
||||
// same shard.
|
||||
SEASTAR_TEST_CASE(test_multishard_reader_safe_to_create_with_admitted_permit) {
|
||||
class semaphore_factory : public test_reader_lifecycle_policy::semaphore_factory {
|
||||
std::vector<foreign_ptr<lw_shared_ptr<reader_concurrency_semaphore>>>& _semaphores;
|
||||
public:
|
||||
explicit semaphore_factory(std::vector<foreign_ptr<lw_shared_ptr<reader_concurrency_semaphore>>>& semaphores) : _semaphores(semaphores) { }
|
||||
virtual lw_shared_ptr<reader_concurrency_semaphore> create(sstring name) override {
|
||||
auto semaphore = _semaphores.at(this_shard_id()).release();
|
||||
_semaphores[this_shard_id()] = make_foreign(semaphore);
|
||||
return semaphore;
|
||||
}
|
||||
virtual future<> stop(reader_concurrency_semaphore& semaphore) override {
|
||||
return make_ready_future<>(); // NOOP, we stop the semaphore in the layer above
|
||||
}
|
||||
};
|
||||
|
||||
return do_with_cql_env_thread([] (cql_test_env& env) {
|
||||
simple_schema s;
|
||||
|
||||
std::vector<foreign_ptr<lw_shared_ptr<reader_concurrency_semaphore>>> semaphores;
|
||||
semaphores.resize(smp::count);
|
||||
parallel_for_each(std::views::iota(0u, smp::count), [&semaphores] (shard_id shard) {
|
||||
return smp::submit_to(shard, [&semaphores] {
|
||||
semaphores[this_shard_id()] = make_foreign(make_lw_shared<reader_concurrency_semaphore>(
|
||||
reader_concurrency_semaphore::for_tests{},
|
||||
seastar::format("{}:{}", get_name(), this_shard_id()),
|
||||
1,
|
||||
1 * 1024 * 1024));
|
||||
});
|
||||
}).get();
|
||||
auto stop_semaphores = defer([&semaphores] {
|
||||
parallel_for_each(std::views::iota(0u, smp::count), [&semaphores] (shard_id shard) {
|
||||
return smp::submit_to(shard, [&semaphores] () -> future<> {
|
||||
auto semaphore = semaphores[this_shard_id()].release();
|
||||
co_await semaphore->stop();
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
|
||||
std::map<dht::token, unsigned> pkeys_by_tokens;
|
||||
for (unsigned i = 0; i < smp::count * 2; ++i) {
|
||||
pkeys_by_tokens.emplace(s.make_pkey(i).token(), i);
|
||||
}
|
||||
auto sharder = std::make_unique<dummy_sharder>(s.schema()->get_sharder(), std::move(pkeys_by_tokens));
|
||||
|
||||
auto reader_factory = [] (
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range&,
|
||||
const query::partition_slice&,
|
||||
tracing::trace_state_ptr,
|
||||
mutation_reader::forwarding) {
|
||||
return make_empty_flat_reader_v2(std::move(schema), std::move(permit));
|
||||
};
|
||||
|
||||
// timeout is used to break the deadlock in case this test fails
|
||||
auto permit = semaphores.at(this_shard_id())->obtain_permit(s.schema(), "multishard_reader", 128 * 1024, db::timeout_clock::now() + 60s, {}).get();
|
||||
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(reader_factory, std::make_unique<semaphore_factory>(semaphores));
|
||||
|
||||
auto reader = make_multishard_combining_reader_v2_for_tests(*sharder, std::move(lifecycle_policy), s.schema(), std::move(permit),
|
||||
query::full_partition_range, s.schema()->full_slice());
|
||||
auto close_reader = deferred_close(reader);
|
||||
|
||||
reader().get();
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user