server, raft_group0_client: remove the default nullptr values

The previous commit has fixed 5 bugs of the same type - incorrectly
passing the default nullptr to one of the changed functions. At
least some of these bugs wouldn't appear if there was no default
value. It's much harder to make this kind of a bug if you have to
write "nullptr". It's also much easier to detect it in review.

Moreover, these default values are rarely used outside tests.
Keeping them is just not worth the time spent on debugging.
This commit is contained in:
Patryk Jędrzejczak
2024-01-05 18:08:44 +01:00
parent 3d4af4ecf1
commit df2034ebd7
7 changed files with 29 additions and 22 deletions

View File

@@ -81,14 +81,14 @@ public:
// server interface
future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) override;
future<> set_configuration(config_member_set c_new, seastar::abort_source* as = nullptr) override;
future<> add_entry(command command, wait_type type, seastar::abort_source* as) override;
future<> set_configuration(config_member_set c_new, seastar::abort_source* as) override;
raft::configuration get_configuration() const override;
future<> start() override;
future<> abort(sstring reason) override;
bool is_alive() const override;
term_t get_current_term() const override;
future<> read_barrier(seastar::abort_source* as = nullptr) override;
future<> read_barrier(seastar::abort_source* as) override;
void wait_until_candidate() override;
future<> wait_election_done() override;
future<> wait_log_idx_term(std::pair<index_t, term_t> idx_log) override;
@@ -100,7 +100,7 @@ public:
raft::server_id id() const override;
void set_applier_queue_max_size(size_t queue_max_size) override;
future<> stepdown(logical_clock::duration timeout) override;
future<> modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as = nullptr) override;
future<> modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) override;
future<entry_id> add_entry_on_leader(command command, seastar::abort_source* as);
void register_metrics() override;
size_t max_command_size() const override;
@@ -276,7 +276,7 @@ private:
// A helper to wait for a leader to get elected
future<> wait_for_leader(seastar::abort_source* as);
future<> wait_for_state_change(seastar::abort_source* as = nullptr) override;
future<> wait_for_state_change(seastar::abort_source* as) override;
// Get "safe to read" index from a leader
future<read_barrier_reply> get_read_idx(server_id leader, seastar::abort_source* as);

View File

@@ -77,6 +77,7 @@ public:
// committed locally means simply that the commit index is beyond this entry's index.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// It it passes nullptr, the operation is unabortable.
//
// Successful `add_entry` with `wait_type::committed` does not guarantee that `state_machine::apply` will be called
// locally for this entry. Between the commit and the application we may receive a snapshot containing this entry,
@@ -98,7 +99,7 @@ public:
// Thrown if abort() was called on the server instance.
// raft::not_a_leader
// Thrown if the node is not a leader and forwarding is not enabled through enable_forwarding config option.
virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) = 0;
virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as) = 0;
// Set a new cluster configuration. If the configuration is
// identical to the previous one does nothing.
@@ -124,6 +125,7 @@ public:
// returned even in case of a successful config change.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// It it passes nullptr, the operation is unabortable.
//
// Exceptions:
// raft::conf_change_in_progress
@@ -135,7 +137,7 @@ public:
// forwarding the corresponding add_entry to the leader.
// raft::request_aborted
// Thrown if abort is requested before the operation finishes.
virtual future<> set_configuration(config_member_set c_new, seastar::abort_source* as = nullptr) = 0;
virtual future<> set_configuration(config_member_set c_new, seastar::abort_source* as) = 0;
// A simplified wrapper around set_configuration() which adds
// and deletes servers. Unlike set_configuration(),
@@ -158,6 +160,7 @@ public:
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// If abort is requested before the operation finishes, the future will contain `raft::request_aborted` exception.
// It the caller passes nullptr, the operation is unabortable.
//
// Exceptions:
// raft::commit_status_unknown
@@ -173,7 +176,7 @@ public:
// raft::conf_change_in_progress
// Thrown if the previous set_configuration/modify_config is not completed.
virtual future<> modify_config(std::vector<config_member> add,
std::vector<server_id> del, seastar::abort_source* as = nullptr) = 0;
std::vector<server_id> del, seastar::abort_source* as) = 0;
// Return the currently known configuration
virtual raft::configuration get_configuration() const = 0;
@@ -203,13 +206,14 @@ public:
// future has resolved successfully.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// It it passes nullptr, the operation is unabortable.
//
// Exceptions:
// raft::request_aborted
// Thrown if abort is requested before the operation finishes.
// raft::stopped_error
// Thrown if abort() was called on the server instance.
virtual future<> read_barrier(seastar::abort_source* as = nullptr) = 0;
virtual future<> read_barrier(seastar::abort_source* as) = 0;
// Initiate leader stepdown process.
//
@@ -245,7 +249,10 @@ public:
// State changes can be coalesced, so it is not guaranteed that the caller will
// get notification about each one of them. The state can even be the same after
// the call as before, but term should be different.
virtual future<> wait_for_state_change(seastar::abort_source* as = nullptr) = 0;
//
// The caller may pass a pointer to an abort_source to make the function abortable.
// It it passes nullptr, the function is unabortable.
virtual future<> wait_for_state_change(seastar::abort_source* as) = 0;
// Ad hoc functions for testing
virtual void wait_until_candidate() = 0;

View File

@@ -30,7 +30,7 @@ bool is_broadcast_table_statement(const sstring& keyspace, const sstring& column
future<query_result> execute(service::raft_group0_client& group0_client, const query& query) {
auto group0_cmd = group0_client.prepare_command(broadcast_table_query{query}, "broadcast_tables query");
auto guard = group0_client.create_result_guard(group0_cmd.new_state_id);
co_await group0_client.add_entry_unguarded(std::move(group0_cmd));
co_await group0_client.add_entry_unguarded(std::move(group0_cmd), nullptr);
co_return guard.get();
}

View File

@@ -160,7 +160,7 @@ void raft_group0::init_rpc_verbs(raft_group0& shard0_this) {
ser::group0_rpc_verbs::register_group0_modify_config(&shard0_this._ms.local(),
[&shard0_this] (const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, std::vector<raft::config_member> add, std::vector<raft::server_id> del) {
return smp::submit_to(0, [&shard0_this, gid, add = std::move(add), del = std::move(del)]() mutable {
return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del));
return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del), nullptr);
});
});

View File

@@ -104,9 +104,9 @@ public:
// Call after `system_keyspace` is initialized.
future<> init();
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as = nullptr);
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as);
future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as = nullptr);
future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as);
// Ensures that all previously finished operations on group 0 are visible on this node;
// in particular, performs a Raft read barrier on group 0.
@@ -128,7 +128,7 @@ public:
// FIXME?: this is kind of annoying for the user.
// we could forward the call to shard 0, have group0_guard keep a foreign_ptr to the internal data structures on shard 0,
// and add_entry would again forward to shard 0.
future<group0_guard> start_operation(seastar::abort_source* as = nullptr);
future<group0_guard> start_operation(seastar::abort_source* as);
template<typename Command>
requires std::same_as<Command, broadcast_table_query> || std::same_as<Command, write_mutations>

View File

@@ -26,9 +26,9 @@ SEASTAR_THREAD_TEST_CASE(test_check_abort_on_client_api) {
return sstring(e.what()) == sstring("Raft instance is stopped, reason: \"test crash\"");
};
BOOST_CHECK_EXCEPTION(cluster.add_entries(1, 0).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).modify_config({}, {to_raft_id(0)}).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).read_barrier().get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).set_configuration({}).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).modify_config({}, {to_raft_id(0)}, nullptr).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).read_barrier(nullptr).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).set_configuration({}, nullptr).get0(), raft::stopped_error, check_error);
}
SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) {

View File

@@ -951,7 +951,7 @@ future<> raft_cluster<Clock>::add_entry(size_t val, std::optional<size_t> server
while (true) {
try {
auto& at = _servers[server ? *server : _leader].server;
co_await at->add_entry(create_command(val), raft::wait_type::committed);
co_await at->add_entry(create_command(val), raft::wait_type::committed, nullptr);
break;
} catch (raft::commit_status_unknown& e) {
// FIXME: in some cases when we get `commit_status_unknown` the entry may have been applied.
@@ -1201,7 +1201,7 @@ future<> raft_cluster<Clock>::change_configuration(set_config sc) {
}
tlogger.debug("Changing configuration on leader {}", _leader);
co_await _servers[_leader].server->set_configuration(std::move(set));
co_await _servers[_leader].server->set_configuration(std::move(set), nullptr);
if (!new_config.contains(_leader)) {
co_await free_election();
@@ -1211,7 +1211,7 @@ future<> raft_cluster<Clock>::change_configuration(set_config sc) {
// Add a dummy entry to confirm new configuration was committed
try {
co_await _servers[_leader].server->add_entry(create_command(dummy_command),
raft::wait_type::committed);
raft::wait_type::committed, nullptr);
} catch (raft::not_a_leader& e) {
// leader stepped down, implying config fully changed
} catch (raft::commit_status_unknown& e) {}
@@ -1338,7 +1338,7 @@ future<> raft_cluster<Clock>::tick(::tick t) {
template <typename Clock>
future<> raft_cluster<Clock>::read(read_value r) {
co_await _servers[r.node_idx].server->read_barrier();
co_await _servers[r.node_idx].server->read_barrier(nullptr);
auto val = _servers[r.node_idx].sm->hasher->finalize_uint64();
auto expected = hasher_int::hash_range(r.expected_index).finalize_uint64();
BOOST_CHECK_MESSAGE(val == expected,