batchlog_manager: do_batch_log_replay: use lambda coroutine

Ssimplify the function implemention and error handling
by invoking a lambda coroutine on shard 0 that keeps
a gate holder and semaphore units on its stack, for RAII-
style unwinding.

It then may invoke a function on another shard, using
the peered service container() to do the
replay on the destination shard.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-11-22 14:55:14 +02:00
parent 691afe1c4d
commit 85d0bbb4fc

View File

@@ -94,25 +94,22 @@ db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, batchlog_manag
}
future<> db::batchlog_manager::do_batch_log_replay() {
return container().invoke_on(0, [] (auto& bm) {
bm._gate.enter();
return bm._sem.wait().then([&bm] {
return bm._cpu++ % smp::count;
});
}).then([this] (auto dest) {
return container().invoke_on(0, [] (auto& bm) -> future<> {
auto gate_holder = bm._gate.hold();
auto sem_units = co_await get_units(bm._sem, 1);
auto dest = bm._cpu++ % smp::count;
blogger.debug("Batchlog replay on shard {}: starts", dest);
return container().invoke_on(dest, [] (auto& bm) {
return with_gate(bm._gate, [&bm] {
return bm.replay_all_failed_batches();
if (dest == 0) {
co_await bm.replay_all_failed_batches();
} else {
co_await bm.container().invoke_on(dest, [] (auto& bm) {
return with_gate(bm._gate, [&bm] {
return bm.replay_all_failed_batches();
});
});
}).then([dest] {
blogger.debug("Batchlog replay on shard {}: done", dest);
});
}).finally([this] {
return container().invoke_on(0, [] (auto& bm) {
bm._sem.signal();
bm._gate.leave();
});
}
blogger.debug("Batchlog replay on shard {}: done", dest);
});
}