test: move away from make_permit()

Use the most appropriate up-front admission variant.
This commit is contained in:
Botond Dénes
2021-04-08 13:29:28 +03:00
parent 7bfa40a2f1
commit c07db00b70
12 changed files with 45 additions and 59 deletions

View File

@@ -647,7 +647,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){
}
{
auto rd = mt.make_flat_reader(s, db.get_reader_concurrency_semaphore().make_permit(s.get(), "test"));
auto rd = mt.make_flat_reader(s, db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test"));
auto close_rd = deferred_close(rd);
auto mopt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
BOOST_REQUIRE(mopt);

View File

@@ -427,7 +427,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
reader_concurrency_semaphore sem(1, 100, get_name());
auto stop_sem = deferred_stop(sem);
auto permit = sem.make_permit(s.schema().get(), get_name());
auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name());
const auto available_res = sem.available_resources();
const sstring val(1024, 'a');

View File

@@ -1025,7 +1025,7 @@ public:
});
_reader.close().get();
_reader = make_restricted_flat_reader(std::move(ms), schema, semaphore.make_permit(schema.get(), "reader-wrapper"));
_reader = make_restricted_flat_reader(std::move(ms), schema, semaphore.make_tracking_only_permit(schema.get(), "reader-wrapper"));
}
reader_wrapper(
@@ -2625,7 +2625,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
auto& local_partitioner = schema->get_sharder();
auto remote_partitioner = dht::sharder(local_partitioner.shard_count() - 1, local_partitioner.sharding_ignore_msb());
auto tested_reader = make_multishard_streaming_reader(env.db(), schema, tests::make_permit(),
auto tested_reader = make_multishard_streaming_reader(env.db(), schema, make_reader_permit(env),
[sharder = dht::selective_token_range_sharder(remote_partitioner, token_range, 0)] () mutable -> std::optional<dht::partition_range> {
if (auto next = sharder.next()) {
return dht::to_partition_range(*next);
@@ -3191,7 +3191,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name());
const auto pkey = s.make_pkey();
size_t max_buffer_size = 512;
@@ -3285,7 +3285,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name());
auto pkeys = s.make_pkeys(4);
std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
@@ -3643,7 +3643,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name());
auto pkeys = s.make_pkeys(6);
boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
@@ -3694,7 +3694,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
reader_concurrency_semaphore semaphore(1, 0, get_name());
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name());
auto pkeys = s.make_pkeys(2);
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {

View File

@@ -116,7 +116,7 @@ private:
Querier make_querier(const dht::partition_range& range) {
return Querier(_mutation_source,
_s.schema(),
_sem.make_permit(_s.schema().get(), "make-querier"),
_sem.make_tracking_only_permit(_s.schema().get(), "make-querier"),
range,
_s.schema()->full_slice(),
service::get_local_sstable_query_read_priority(),
@@ -757,16 +757,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
test_querier_cache t;
auto& sem = t.get_semaphore();
auto permit1 = sem.make_permit(t.get_schema().get(), get_name());
auto permit2 = sem.make_permit(t.get_schema().get(), get_name());
permit1.wait_admission(0, db::no_timeout).get();
auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout).get0();
auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0));
BOOST_CHECK_EQUAL(sem.available_resources().count, 0);
auto fut = permit2.wait_admission(1, db::no_timeout);
auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout);
BOOST_CHECK_EQUAL(sem.waiters(), 1);
@@ -792,8 +789,8 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
.with_column("v", int32_type)
.build();
auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader(schema, sem1.make_permit(schema.get(), get_name())));
auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader(schema, sem2.make_permit(schema.get(), get_name())));
auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader(schema, sem1.make_tracking_only_permit(schema.get(), get_name())));
auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader(schema, sem2.make_tracking_only_permit(schema.get(), get_name())));
// Sanity check that lookup still works with empty handle.
BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{}));

View File

@@ -38,7 +38,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
auto stop_sem = deferred_stop(semaphore);
for (int i = 0; i < 10; ++i) {
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), semaphore.make_permit(s.schema().get(), get_name()))));
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name()))));
}
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
@@ -50,7 +50,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
handles.clear();
for (int i = 0; i < 10; ++i) {
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), semaphore.make_permit(s.schema().get(), get_name()))));
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name()))));
}
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
@@ -68,14 +68,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
// Not admitted, active
{
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name());
auto units2 = permit.consume_memory(1024);
}
BOOST_REQUIRE(semaphore.available_resources() == initial_resources);
// Not admitted, inactive
{
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name());
auto units2 = permit.consume_memory(1024);
auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit));
@@ -85,17 +85,15 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
// Admitted, active
{
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto units1 = permit.wait_admission(1024, db::no_timeout).get0();
auto units2 = permit.consume_memory(1024);
auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get0();
auto units1 = permit.consume_memory(1024);
}
BOOST_REQUIRE(semaphore.available_resources() == initial_resources);
// Admitted, inactive
{
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto units1 = permit.wait_admission(1024, db::no_timeout).get0();
auto units2 = permit.consume_memory(1024);
auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get0();
auto units1 = permit.consume_memory(1024);
auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit));
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
@@ -378,8 +376,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
return async([&] {
reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name());
auto stop_sem = deferred_stop(semaphore);
auto permit = semaphore.make_permit(nullptr, get_name());
permit.wait_admission(0, db::no_timeout).get();
auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get();
{
auto tracked_file = make_tracked_file(file(shared_ptr<file_impl>(make_shared<dummy_file_impl>())), permit);
@@ -439,14 +436,11 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
{
auto timeout = db::timeout_clock::now() + std::chrono::duration_cast<db::timeout_clock::time_point::duration>(std::chrono::milliseconds{1});
auto permit1 = semaphore.make_permit(nullptr, "permit1");
std::optional<reader_permit::resource_units> permit1_res = permit1.wait_admission(new_reader_base_cost, timeout).get();
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", new_reader_base_cost, timeout).get();
auto permit2 = semaphore.make_permit(nullptr, "permit2");
auto permit2_fut = permit2.wait_admission(new_reader_base_cost, timeout);
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", new_reader_base_cost, timeout);
auto permit3 = semaphore.make_permit(nullptr, "permit3");
auto permit3_fut = permit3.wait_admission(new_reader_base_cost, timeout);
auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", new_reader_base_cost, timeout);
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 2);
@@ -459,7 +453,7 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
} else {
// We need special cleanup when the test failed to avoid invalid
// memory access.
permit1_res.reset();
permit1 = {};
BOOST_CHECK(eventually_true([&] { return permit2_fut.available(); }));
{
@@ -484,24 +478,20 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
auto stop_sem = deferred_stop(semaphore);
{
auto permit1 = semaphore.make_permit(nullptr, "permit1");
auto permit1_res = permit1.wait_admission(new_reader_base_cost, db::no_timeout).get();
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", new_reader_base_cost, db::no_timeout).get();
auto permit2 = semaphore.make_permit(nullptr, "permit2");
auto permit2_fut = permit2.wait_admission(new_reader_base_cost, db::no_timeout);
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", new_reader_base_cost, db::no_timeout);
auto permit3 = semaphore.make_permit(nullptr, "permit3");
auto permit3_fut = permit3.wait_admission(new_reader_base_cost, db::no_timeout);
auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", new_reader_base_cost, db::no_timeout);
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 2);
auto permit4 = semaphore.make_permit(nullptr, "permit4");
auto permit4_fut = permit4.wait_admission(new_reader_base_cost, db::no_timeout);
auto permit4_fut = semaphore.obtain_permit(nullptr, "permit4", new_reader_base_cost, db::no_timeout);
// The queue should now be full.
BOOST_REQUIRE_THROW(permit4_fut.get(), std::runtime_error);
permit1_res.reset();
permit1 = {};
{
auto res = permit2_fut.get0();
}

View File

@@ -439,7 +439,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer("test");
auto& pc = service::get_local_streaming_priority();
auto permit = e.local_db().get_reader_concurrency_semaphore().make_permit(s.get(), "test");
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test");
sst->write_components(flat_mutation_reader_from_mutations(std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get();
sst->open_data().get();
t->add_sstable_and_update_cache(sst).get();
@@ -549,7 +549,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test");
auto& pc = service::get_local_streaming_priority();
auto permit = e.local_db().get_reader_concurrency_semaphore().make_permit(s.get(), "test");
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test");
sst->write_components(flat_mutation_reader_from_mutations(std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get();
sst->open_data().get();
t->add_sstable_and_update_cache(sst).get();
@@ -626,7 +626,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak
sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test");
auto& pc = service::get_local_streaming_priority();
auto permit = e.local_db().get_reader_concurrency_semaphore().make_permit(s.get(), "test");
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test");
sst->write_components(flat_mutation_reader_from_mutations(std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get();
sst->open_data().get();
t->add_sstable_and_update_cache(sst).get();
@@ -702,7 +702,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
class consumer_verifier {
schema_ptr _schema;
reader_permit _permit;
reader_concurrency_semaphore& _semaphore;
const partition_size_map& _partition_rows;
std::vector<mutation>& _collected_muts;
std::unique_ptr<row_locker> _rl;
@@ -727,7 +727,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
void check(mutation mut) {
// First we check that we would be able to create a reader, even
// though the staging reader consumed all resources.
auto res_units = _permit.wait_admission(new_reader_base_cost, db::timeout_clock::now()).get0();
auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", new_reader_base_cost, db::timeout_clock::now()).get0();
const size_t current_rows = rows_in_mut(mut);
const auto total_rows = _partition_rows.at(mut.decorated_key());
@@ -773,7 +773,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
public:
consumer_verifier(schema_ptr schema, reader_concurrency_semaphore& sem, const partition_size_map& partition_rows, std::vector<mutation>& collected_muts, bool& ok)
: _schema(std::move(schema))
, _permit(sem.make_permit(_schema.get(), "consumer_verifier"))
, _semaphore(sem)
, _partition_rows(partition_rows)
, _collected_muts(collected_muts)
, _rl(std::make_unique<row_locker>(_schema))
@@ -832,7 +832,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
return less(a.decorated_key(), b.decorated_key());
});
auto permit = sem.make_permit(schema.get(), get_name());
auto permit = sem.obtain_permit_nowait(schema.get(), get_name(), new_reader_base_cost, db::no_timeout).get0();
auto mt = make_lw_shared<memtable>(schema);
for (const auto& mut : muts) {

View File

@@ -320,7 +320,7 @@ public:
table_name = std::move(table_name)] (database& db) mutable {
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf.schema();
auto permit = db.get_reader_concurrency_semaphore().make_permit(schema.get(), "require_column_has_value()");
auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()");
return cf.find_partition_slow(schema, permit, pkey)
.then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) {
assert(p != nullptr);
@@ -784,7 +784,7 @@ future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_tes
}
reader_permit make_reader_permit(cql_test_env& env) {
return env.local_db().get_reader_concurrency_semaphore().make_permit(nullptr, "test");
return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test");
}
namespace debug {

View File

@@ -39,7 +39,7 @@ public:
}
reader_concurrency_semaphore& semaphore() { return *_semaphore; };
reader_permit make_permit() { return _semaphore->make_permit(nullptr, "test"); }
reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test"); }
};
} // namespace tests

View File

@@ -82,7 +82,7 @@ public:
test_env_sstables_manager& manager() { return *_mgr; }
reader_concurrency_semaphore& semaphore() { return *_semaphore; }
reader_permit make_reader_permit(const schema* const s = nullptr, const char* n = "test") { return _semaphore->make_permit(s, n); }
reader_permit make_reader_permit(const schema* const s = nullptr, const char* n = "test") { return _semaphore->make_tracking_only_permit(s, n); }
future<> working_sst(schema_ptr schema, sstring dir, unsigned long generation) {
return reusable_sst(std::move(schema), dir, generation).then([] (auto ptr) { return make_ready_future<>(); });

View File

@@ -187,8 +187,7 @@ void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned
if (sem.waiters()) {
testlog.trace("Waiting for queue to drain");
auto permit = sem.make_permit(&s, "drain");
permit.wait_admission(1, db::no_timeout).get();
sem.obtain_permit(&s, "drain", 1, db::no_timeout).get();
}
}

View File

@@ -79,7 +79,7 @@ reader_concurrency_semaphore_wrapper::~reader_concurrency_semaphore_wrapper() {
}
reader_permit reader_concurrency_semaphore_wrapper::make_permit() {
return _semaphore->make_permit(nullptr, "perf");
return _semaphore->make_tracking_only_permit(nullptr, "perf");
}
} // namespace perf

View File

@@ -67,7 +67,7 @@ struct table {
}
reader_permit make_permit() {
return semaphore.make_permit(s.schema().get(), "test");
return semaphore.make_tracking_only_permit(s.schema().get(), "test");
}
future<> stop() noexcept {
return semaphore.stop();