cross-shard-barrier: Add .abort() method

The method makes all the .arrive_and_wait()s in the current phase
to resolve with barrier_aborted_exception() exceptional future.

The barrier turns into a broken state and is not supposed to serve
any subsequence arrivals anyhow reasonably.

The .abort() method is re-entrable in two senses. The first is that
more than one shard can abort a barrier, which is pretty natural.
The second is that the exception-safety fuses like that imply that
if the arrive_and_wait() resolves into exception the caller will try
to abort() the barrier as well, even though the phase would be over.
This case is also "supported".

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-04-05 16:04:55 +03:00
parent 0f3cd6ad18
commit 8d7a7cbe21

View File

@@ -18,6 +18,13 @@ using namespace seastar;
namespace utils {
class barrier_aborted_exception : public std::exception {
public:
virtual const char* what() const noexcept override {
return "barrier aborted";
}
};
// Shards-coordination mechanism that allows shards to wait each other at
// certain points. The barrier should be copied to each shard, then when
// each shard calls .arrive_and_wait()-s it will be blocked and woken up
@@ -58,9 +65,10 @@ namespace utils {
class cross_shard_barrier {
struct barrier {
std::atomic<int> counter;
std::atomic<bool> alive;
std::vector<std::optional<promise<>>> wakeup;
barrier() : counter(smp::count) {
barrier() : counter(smp::count), alive(true) {
wakeup.reserve(smp::count);
for (unsigned i = 0; i < smp::count; i++) {
wakeup.emplace_back();
@@ -96,16 +104,43 @@ public:
return i == 1 ? complete() : wait();
}
/**
* Wakes up all arrivals with the barrier_aborted_exception() and
* returns this exception itself. Once called the barrier becomes
* unusable, any subsequent arrive_and_wait()s can (and actually
* will) hang forever.
*/
void abort() noexcept {
// We can get here from shards that had already visited the
// arrive_and_wait() and got the exceptional future. In this
// case the counter would be set to smp::count and none of the
// fetch_add(-1)s below will make it call complete()
_b->alive.store(false);
auto i = _b->counter.fetch_add(-1);
if (i == 1) {
(void)complete().handle_exception([] (std::exception_ptr ignored) {});
}
}
private:
future<> complete() {
_b->counter.fetch_add(smp::count);
return smp::invoke_on_all([this, sid = this_shard_id()] {
bool alive = _b->alive.load(std::memory_order_relaxed);
return smp::invoke_on_all([this, sid = this_shard_id(), alive] {
if (this_shard_id() != sid) {
std::optional<promise<>>& w = _b->wakeup[this_shard_id()];
assert(w.has_value());
w->set_value();
w.reset();
if (alive) {
assert(w.has_value());
w->set_value();
w.reset();
} else if (w.has_value()) {
w->set_exception(barrier_aborted_exception());
w.reset();
}
}
return alive ? make_ready_future<>()
: make_exception_future<>(barrier_aborted_exception());
});
}