From 754a7b54e4cb6309411e1f6926101b664dbf8f07 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 18 Jan 2024 16:23:35 +0100 Subject: [PATCH] raft: server: fix indentation --- raft/server.cc | 256 ++++++++++++++++++++++++------------------------- 1 file changed, 128 insertions(+), 128 deletions(-) diff --git a/raft/server.cc b/raft/server.cc index ecb8af01d7..55dd60f883 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -1007,138 +1007,138 @@ future server_impl::poll_fsm_output() { } future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batch) { - if (batch.term_and_vote) { - // Current term and vote are always persisted - // together. A vote may change independently of - // term, but it's safe to update both in this - // case. - co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second); - _stats.store_term_and_vote++; - } + if (batch.term_and_vote) { + // Current term and vote are always persisted + // together. A vote may change independently of + // term, but it's safe to update both in this + // case. + co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second); + _stats.store_term_and_vote++; + } - if (batch.snp) { - auto& [snp, is_local, max_trailing_entries] = *batch.snp; - logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id); - // Persist the snapshot - co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries); - _stats.store_snapshot++; - // If this is locally generated snapshot there is no need to - // load it. - if (!is_local) { - co_await _apply_entries.push_eventually(std::move(snp)); + if (batch.snp) { + auto& [snp, is_local, max_trailing_entries] = *batch.snp; + logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id); + // Persist the snapshot + co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries); + _stats.store_snapshot++; + // If this is locally generated snapshot there is no need to + // load it. + if (!is_local) { + co_await _apply_entries.push_eventually(std::move(snp)); + } + } + + for (const auto& snp_id: batch.snps_to_drop) { + _state_machine->drop_snapshot(snp_id); + } + + if (batch.log_entries.size()) { + auto& entries = batch.log_entries; + + if (last_stable >= entries[0]->idx) { + co_await _persistence->truncate_log(entries[0]->idx); + _stats.truncate_persisted_log++; + } + + utils::get_local_injector().inject("store_log_entries/test-failure", + [] { throw std::runtime_error("store_log_entries/test-failure"); }); + + // Combine saving and truncating into one call? + // will require persistence to keep track of last idx + co_await _persistence->store_log_entries(entries); + + last_stable = (*entries.crbegin())->idx; + _stats.persisted_log_entries += entries.size(); + } + + // Update RPC server address mappings. Add servers which are joining + // the cluster according to the new configuration (obtained from the + // last_conf_idx). + // + // It should be done prior to sending the messages since the RPC + // module needs to know who should it send the messages to (actual + // network addresses of the joining servers). + rpc_config_diff rpc_diff; + if (batch.configuration) { + rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration); + for (const auto& addr: rpc_diff.joining) { + add_to_rpc_config(addr); + } + _rpc->on_configuration_change(rpc_diff.joining, {}); + } + + // After entries are persisted we can send messages. + for (auto&& m : batch.messages) { + try { + send_message(m.first, std::move(m.second)); + } catch(...) { + // Not being able to send a message is not a critical error + logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception()); + } + } + + if (batch.configuration) { + for (const auto& addr: rpc_diff.leaving) { + abort_snapshot_transfer(addr.id); + remove_from_rpc_config(addr); + } + _rpc->on_configuration_change({}, rpc_diff.leaving); + } + + // Process committed entries. + if (batch.committed.size()) { + if (_non_joint_conf_commit_promise) { + for (const auto& e: batch.committed) { + const auto* cfg = get_if(&e->data); + if (cfg != nullptr && !cfg->is_joint()) { + std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value(); + break; } } + } + co_await _persistence->store_commit_idx(batch.committed.back()->idx); + _stats.queue_entries_for_apply += batch.committed.size(); + co_await _apply_entries.push_eventually(std::move(batch.committed)); + } - for (const auto& snp_id: batch.snps_to_drop) { - _state_machine->drop_snapshot(snp_id); - } - - if (batch.log_entries.size()) { - auto& entries = batch.log_entries; - - if (last_stable >= entries[0]->idx) { - co_await _persistence->truncate_log(entries[0]->idx); - _stats.truncate_persisted_log++; - } - - utils::get_local_injector().inject("store_log_entries/test-failure", - [] { throw std::runtime_error("store_log_entries/test-failure"); }); - - // Combine saving and truncating into one call? - // will require persistence to keep track of last idx - co_await _persistence->store_log_entries(entries); - - last_stable = (*entries.crbegin())->idx; - _stats.persisted_log_entries += entries.size(); - } - - // Update RPC server address mappings. Add servers which are joining - // the cluster according to the new configuration (obtained from the - // last_conf_idx). - // - // It should be done prior to sending the messages since the RPC - // module needs to know who should it send the messages to (actual - // network addresses of the joining servers). - rpc_config_diff rpc_diff; - if (batch.configuration) { - rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration); - for (const auto& addr: rpc_diff.joining) { - add_to_rpc_config(addr); - } - _rpc->on_configuration_change(rpc_diff.joining, {}); - } - - // After entries are persisted we can send messages. - for (auto&& m : batch.messages) { - try { - send_message(m.first, std::move(m.second)); - } catch(...) { - // Not being able to send a message is not a critical error - logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception()); - } - } - - if (batch.configuration) { - for (const auto& addr: rpc_diff.leaving) { - abort_snapshot_transfer(addr.id); - remove_from_rpc_config(addr); - } - _rpc->on_configuration_change({}, rpc_diff.leaving); - } - - // Process committed entries. - if (batch.committed.size()) { - if (_non_joint_conf_commit_promise) { - for (const auto& e: batch.committed) { - const auto* cfg = get_if(&e->data); - if (cfg != nullptr && !cfg->is_joint()) { - std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value(); - break; - } - } - } - co_await _persistence->store_commit_idx(batch.committed.back()->idx); - _stats.queue_entries_for_apply += batch.committed.size(); - co_await _apply_entries.push_eventually(std::move(batch.committed)); - } - - if (batch.max_read_id_with_quorum) { - while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) { - _reads.front().promise.set_value(_reads.front().idx); - _reads.pop_front(); - } - } - if (!_fsm->is_leader()) { - if (_stepdown_promise) { - std::exchange(_stepdown_promise, std::nullopt)->set_value(); - } - if (!_current_rpc_config.contains(_id)) { - // - It's important we push this after we pushed committed entries above. It - // will cause `applier_fiber` to drop waiters, which should be done after we - // notify all waiters for entries committed in this batch. - // - This may happen multiple times if `io_fiber` gets multiple batches when - // we're outside the configuration, but it should eventually (and generally - // quickly) stop happening (we're outside the config after all). - co_await _apply_entries.push_eventually(removed_from_config{}); - } - // request aborts of snapshot transfers - abort_snapshot_transfers(); - // abort all read barriers - for (auto& r : _reads) { - r.promise.set_value(not_a_leader{_fsm->current_leader()}); - } - _reads.clear(); - } else if (batch.abort_leadership_transfer) { - if (_stepdown_promise) { - std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out")); - } - } - if (_leader_promise && _fsm->current_leader()) { - std::exchange(_leader_promise, std::nullopt)->set_value(); - } - if (_state_change_promise && batch.state_changed) { - std::exchange(_state_change_promise, std::nullopt)->set_value(); - } + if (batch.max_read_id_with_quorum) { + while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) { + _reads.front().promise.set_value(_reads.front().idx); + _reads.pop_front(); + } + } + if (!_fsm->is_leader()) { + if (_stepdown_promise) { + std::exchange(_stepdown_promise, std::nullopt)->set_value(); + } + if (!_current_rpc_config.contains(_id)) { + // - It's important we push this after we pushed committed entries above. It + // will cause `applier_fiber` to drop waiters, which should be done after we + // notify all waiters for entries committed in this batch. + // - This may happen multiple times if `io_fiber` gets multiple batches when + // we're outside the configuration, but it should eventually (and generally + // quickly) stop happening (we're outside the config after all). + co_await _apply_entries.push_eventually(removed_from_config{}); + } + // request aborts of snapshot transfers + abort_snapshot_transfers(); + // abort all read barriers + for (auto& r : _reads) { + r.promise.set_value(not_a_leader{_fsm->current_leader()}); + } + _reads.clear(); + } else if (batch.abort_leadership_transfer) { + if (_stepdown_promise) { + std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out")); + } + } + if (_leader_promise && _fsm->current_leader()) { + std::exchange(_leader_promise, std::nullopt)->set_value(); + } + if (_state_change_promise && batch.state_changed) { + std::exchange(_state_change_promise, std::nullopt)->set_value(); + } } future<> server_impl::io_fiber(index_t last_stable) {