test: Extract throttle object to separate header

This commit is contained in:
Tomasz Grabiec
2023-06-06 01:34:59 +02:00
parent 643e69af89
commit ad6d2b42f2
2 changed files with 70 additions and 54 deletions

View File

@@ -34,6 +34,7 @@
#include "test/lib/log.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/random_utils.hh"
#include "utils/throttle.hh"
#include <boost/range/algorithm/min_element.hpp>
#include "readers/from_mutations_v2.hh"
@@ -1230,62 +1231,16 @@ SEASTAR_TEST_CASE(test_update_failure) {
}
#endif
class throttle {
unsigned _block_counter = 0;
promise<> _p; // valid when _block_counter != 0, resolves when goes down to 0
std::optional<promise<>> _entered;
bool _one_shot;
public:
// one_shot means whether only the first enter() after block() will block.
throttle(bool one_shot = false) : _one_shot(one_shot) {}
future<> enter() {
if (_block_counter && (!_one_shot || _entered)) {
promise<> p1;
promise<> p2;
auto f1 = p1.get_future();
// Intentional, the future is waited on indirectly.
(void)p2.get_future().then([p1 = std::move(p1), p3 = std::move(_p)] () mutable {
p1.set_value();
p3.set_value();
});
_p = std::move(p2);
if (_entered) {
_entered->set_value();
_entered.reset();
}
return f1;
} else {
return make_ready_future<>();
}
}
future<> block() {
++_block_counter;
_p = promise<>();
_entered = promise<>();
return _entered->get_future();
}
void unblock() {
assert(_block_counter);
if (--_block_counter == 0) {
_p.set_value();
}
}
};
class throttled_mutation_source {
private:
class impl : public enable_lw_shared_from_this<impl> {
mutation_source _underlying;
::throttle& _throttle;
utils::throttle& _throttle;
private:
class reader : public delegating_reader_v2 {
throttle& _throttle;
utils::throttle& _throttle;
public:
reader(throttle& t, flat_mutation_reader_v2 r)
reader(utils::throttle& t, flat_mutation_reader_v2 r)
: delegating_reader_v2(std::move(r))
, _throttle(t)
{}
@@ -1296,7 +1251,7 @@ private:
}
};
public:
impl(::throttle& t, mutation_source underlying)
impl(utils::throttle& t, mutation_source underlying)
: _underlying(std::move(underlying))
, _throttle(t)
{ }
@@ -1308,7 +1263,7 @@ private:
};
lw_shared_ptr<impl> _impl;
public:
throttled_mutation_source(throttle& t, mutation_source underlying)
throttled_mutation_source(utils::throttle& t, mutation_source underlying)
: _impl(make_lw_shared<impl>(t, std::move(underlying)))
{ }
@@ -1388,7 +1343,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
memtable_snapshot_source memtables(s);
throttle thr;
utils::throttle thr;
auto cache_source = make_decorated_snapshot_source(snapshot_source([&] { return memtables(); }), [&] (mutation_source src) {
return throttled_mutation_source(thr, std::move(src));
});
@@ -1527,7 +1482,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
memtable_snapshot_source memtables(s);
throttle thr;
utils::throttle thr;
auto cache_source = make_decorated_snapshot_source(snapshot_source([&] { return memtables(); }), [&] (mutation_source src) {
return throttled_mutation_source(thr, std::move(src));
});
@@ -4268,7 +4223,7 @@ SEASTAR_TEST_CASE(test_eviction_of_upper_bound_of_population_range) {
cache_mt->apply(m1);
cache_tracker tracker;
throttle thr(true);
utils::throttle thr(true);
auto cache_source = make_decorated_snapshot_source(snapshot_source([&] { return cache_mt->as_data_source(); }),
[&] (mutation_source src) {
return throttled_mutation_source(thr, std::move(src));

61
utils/throttle.hh Normal file
View File

@@ -0,0 +1,61 @@
#pragma once
#include <seastar/core/future.hh>
#include <optional>
namespace utils {
using namespace seastar;
/// Synchronizes two processes (primary and secondary) in a way such that the primary process can know
/// that between block().get() and unblock() the secondary process is at a particular execution point, blocked in enter().
///
/// The primary calls block() to arm the throttle. The returned future resolves when the secondary calls enter().
/// enter() will return a future to the secondary which will resolve when the primary calls unblock().
class throttle {
unsigned _block_counter = 0;
promise<> _p; // valid when _block_counter != 0, resolves when goes down to 0
std::optional<promise<>> _entered;
bool _one_shot;
public:
// one_shot means whether only the first enter() after block() will block.
throttle(bool one_shot = false) : _one_shot(one_shot) {}
future<> enter() {
if (_block_counter && (!_one_shot || _entered)) {
promise<> p1;
promise<> p2;
auto f1 = p1.get_future();
// Intentional, the future is waited on indirectly.
(void)p2.get_future().then([p1 = std::move(p1), p3 = std::move(_p)] () mutable {
p1.set_value();
p3.set_value();
});
_p = std::move(p2);
if (_entered) {
_entered->set_value();
_entered.reset();
}
return f1;
} else {
return make_ready_future<>();
}
}
future<> block() {
++_block_counter;
_p = promise<>();
_entered = promise<>();
return _entered->get_future();
}
void unblock() {
assert(_block_counter);
if (--_block_counter == 0) {
_p.set_value();
}
}
};
} // namespace utils