raft: server: fix indentation

This commit is contained in:
Kamil Braun
2024-01-18 16:23:35 +01:00
parent 527780987b
commit 754a7b54e4

View File

@@ -1007,138 +1007,138 @@ future<fsm_output> server_impl::poll_fsm_output() {
}
future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batch) {
if (batch.term_and_vote) {
// Current term and vote are always persisted
// together. A vote may change independently of
// term, but it's safe to update both in this
// case.
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
_stats.store_term_and_vote++;
}
if (batch.term_and_vote) {
// Current term and vote are always persisted
// together. A vote may change independently of
// term, but it's safe to update both in this
// case.
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
_stats.store_term_and_vote++;
}
if (batch.snp) {
auto& [snp, is_local, max_trailing_entries] = *batch.snp;
logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id);
// Persist the snapshot
co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries);
_stats.store_snapshot++;
// If this is locally generated snapshot there is no need to
// load it.
if (!is_local) {
co_await _apply_entries.push_eventually(std::move(snp));
if (batch.snp) {
auto& [snp, is_local, max_trailing_entries] = *batch.snp;
logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id);
// Persist the snapshot
co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries);
_stats.store_snapshot++;
// If this is locally generated snapshot there is no need to
// load it.
if (!is_local) {
co_await _apply_entries.push_eventually(std::move(snp));
}
}
for (const auto& snp_id: batch.snps_to_drop) {
_state_machine->drop_snapshot(snp_id);
}
if (batch.log_entries.size()) {
auto& entries = batch.log_entries;
if (last_stable >= entries[0]->idx) {
co_await _persistence->truncate_log(entries[0]->idx);
_stats.truncate_persisted_log++;
}
utils::get_local_injector().inject("store_log_entries/test-failure",
[] { throw std::runtime_error("store_log_entries/test-failure"); });
// Combine saving and truncating into one call?
// will require persistence to keep track of last idx
co_await _persistence->store_log_entries(entries);
last_stable = (*entries.crbegin())->idx;
_stats.persisted_log_entries += entries.size();
}
// Update RPC server address mappings. Add servers which are joining
// the cluster according to the new configuration (obtained from the
// last_conf_idx).
//
// It should be done prior to sending the messages since the RPC
// module needs to know who should it send the messages to (actual
// network addresses of the joining servers).
rpc_config_diff rpc_diff;
if (batch.configuration) {
rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration);
for (const auto& addr: rpc_diff.joining) {
add_to_rpc_config(addr);
}
_rpc->on_configuration_change(rpc_diff.joining, {});
}
// After entries are persisted we can send messages.
for (auto&& m : batch.messages) {
try {
send_message(m.first, std::move(m.second));
} catch(...) {
// Not being able to send a message is not a critical error
logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception());
}
}
if (batch.configuration) {
for (const auto& addr: rpc_diff.leaving) {
abort_snapshot_transfer(addr.id);
remove_from_rpc_config(addr);
}
_rpc->on_configuration_change({}, rpc_diff.leaving);
}
// Process committed entries.
if (batch.committed.size()) {
if (_non_joint_conf_commit_promise) {
for (const auto& e: batch.committed) {
const auto* cfg = get_if<raft::configuration>(&e->data);
if (cfg != nullptr && !cfg->is_joint()) {
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value();
break;
}
}
}
co_await _persistence->store_commit_idx(batch.committed.back()->idx);
_stats.queue_entries_for_apply += batch.committed.size();
co_await _apply_entries.push_eventually(std::move(batch.committed));
}
for (const auto& snp_id: batch.snps_to_drop) {
_state_machine->drop_snapshot(snp_id);
}
if (batch.log_entries.size()) {
auto& entries = batch.log_entries;
if (last_stable >= entries[0]->idx) {
co_await _persistence->truncate_log(entries[0]->idx);
_stats.truncate_persisted_log++;
}
utils::get_local_injector().inject("store_log_entries/test-failure",
[] { throw std::runtime_error("store_log_entries/test-failure"); });
// Combine saving and truncating into one call?
// will require persistence to keep track of last idx
co_await _persistence->store_log_entries(entries);
last_stable = (*entries.crbegin())->idx;
_stats.persisted_log_entries += entries.size();
}
// Update RPC server address mappings. Add servers which are joining
// the cluster according to the new configuration (obtained from the
// last_conf_idx).
//
// It should be done prior to sending the messages since the RPC
// module needs to know who should it send the messages to (actual
// network addresses of the joining servers).
rpc_config_diff rpc_diff;
if (batch.configuration) {
rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration);
for (const auto& addr: rpc_diff.joining) {
add_to_rpc_config(addr);
}
_rpc->on_configuration_change(rpc_diff.joining, {});
}
// After entries are persisted we can send messages.
for (auto&& m : batch.messages) {
try {
send_message(m.first, std::move(m.second));
} catch(...) {
// Not being able to send a message is not a critical error
logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception());
}
}
if (batch.configuration) {
for (const auto& addr: rpc_diff.leaving) {
abort_snapshot_transfer(addr.id);
remove_from_rpc_config(addr);
}
_rpc->on_configuration_change({}, rpc_diff.leaving);
}
// Process committed entries.
if (batch.committed.size()) {
if (_non_joint_conf_commit_promise) {
for (const auto& e: batch.committed) {
const auto* cfg = get_if<raft::configuration>(&e->data);
if (cfg != nullptr && !cfg->is_joint()) {
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value();
break;
}
}
}
co_await _persistence->store_commit_idx(batch.committed.back()->idx);
_stats.queue_entries_for_apply += batch.committed.size();
co_await _apply_entries.push_eventually(std::move(batch.committed));
}
if (batch.max_read_id_with_quorum) {
while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) {
_reads.front().promise.set_value(_reads.front().idx);
_reads.pop_front();
}
}
if (!_fsm->is_leader()) {
if (_stepdown_promise) {
std::exchange(_stepdown_promise, std::nullopt)->set_value();
}
if (!_current_rpc_config.contains(_id)) {
// - It's important we push this after we pushed committed entries above. It
// will cause `applier_fiber` to drop waiters, which should be done after we
// notify all waiters for entries committed in this batch.
// - This may happen multiple times if `io_fiber` gets multiple batches when
// we're outside the configuration, but it should eventually (and generally
// quickly) stop happening (we're outside the config after all).
co_await _apply_entries.push_eventually(removed_from_config{});
}
// request aborts of snapshot transfers
abort_snapshot_transfers();
// abort all read barriers
for (auto& r : _reads) {
r.promise.set_value(not_a_leader{_fsm->current_leader()});
}
_reads.clear();
} else if (batch.abort_leadership_transfer) {
if (_stepdown_promise) {
std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out"));
}
}
if (_leader_promise && _fsm->current_leader()) {
std::exchange(_leader_promise, std::nullopt)->set_value();
}
if (_state_change_promise && batch.state_changed) {
std::exchange(_state_change_promise, std::nullopt)->set_value();
}
if (batch.max_read_id_with_quorum) {
while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) {
_reads.front().promise.set_value(_reads.front().idx);
_reads.pop_front();
}
}
if (!_fsm->is_leader()) {
if (_stepdown_promise) {
std::exchange(_stepdown_promise, std::nullopt)->set_value();
}
if (!_current_rpc_config.contains(_id)) {
// - It's important we push this after we pushed committed entries above. It
// will cause `applier_fiber` to drop waiters, which should be done after we
// notify all waiters for entries committed in this batch.
// - This may happen multiple times if `io_fiber` gets multiple batches when
// we're outside the configuration, but it should eventually (and generally
// quickly) stop happening (we're outside the config after all).
co_await _apply_entries.push_eventually(removed_from_config{});
}
// request aborts of snapshot transfers
abort_snapshot_transfers();
// abort all read barriers
for (auto& r : _reads) {
r.promise.set_value(not_a_leader{_fsm->current_leader()});
}
_reads.clear();
} else if (batch.abort_leadership_transfer) {
if (_stepdown_promise) {
std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out"));
}
}
if (_leader_promise && _fsm->current_leader()) {
std::exchange(_leader_promise, std::nullopt)->set_value();
}
if (_state_change_promise && batch.state_changed) {
std::exchange(_state_change_promise, std::nullopt)->set_value();
}
}
future<> server_impl::io_fiber(index_t last_stable) {