Merge 'Add an API to trigger snapshot in Raft servers' from Kamil Braun

This allows the user of `raft::server` to cause it to create a snapshot
and truncate the Raft log (leaving no trailing entries; in the future we
may extend the API to specify number of trailing entries left if
needed). In a later commit we'll add a REST endpoint to Scylla to
trigger group 0 snapshots.

One use case for this API is to create group 0 snapshots in Scylla
deployments which upgraded to Raft in version 5.2 and started with an
empty Raft log with no snapshot at the beginning. This causes problems,
e.g. when a new node bootstraps to the cluster, it will not receive a
snapshot that would contain both schema and group 0 history, which would
then lead to inconsistent schema state and trigger assertion failures as
observed in scylladb/scylladb#16683.

In 5.4 the logic of initial group 0 setup was changed to start the Raft
log with a snapshot at index 1 (ff386e7a44)
but a problem remains with these existing deployments coming from 5.2,
we need a way to trigger a snapshot in them (other than performing 1000
arbitrary schema changes).

Another potential use case in the future would be to trigger snapshots
based on external memory pressure in tablet Raft groups (for strongly
consistent tables).

The PR adds the API to `raft::server` and a HTTP endpoint that uses it.

In a follow-up PR, we plan to modify group 0 server startup logic to automatically
call this API if it sees that no snapshot is present yet (to automatically
fix the aforementioned 5.2 deployments once they upgrade.)

Closes scylladb/scylladb#16816

* github.com:scylladb/scylladb:
  raft: remove `empty()` from `fsm_output`
  test: add test for manual triggering of Raft snapshots
  api: add HTTP endpoint to trigger Raft snapshots
  raft: server: add `trigger_snapshot` API
  raft: server: track last persisted snapshot descriptor index
  raft: server: framework for handling server requests
  raft: server: inline `poll_fsm_output`
  raft: server: fix indentation
  raft: server: move `io_fiber`'s processing of `batch` to a separate function
  raft: move `poll_output()` from `fsm` to `server`
  raft: move `_sm_events` from `fsm` to `server`
  raft: fsm: remove constructor used only in tests
  raft: fsm: move trace message from `poll_output` to `has_output`
  raft: fsm: extract `has_output()`
  raft: pass `max_trailing_entries` through `fsm_output` to `store_snapshot_descriptor`
  raft: server: pass `*_aborted` to `set_exception` call
This commit is contained in:
Botond Dénes
2024-01-29 15:06:04 +02:00
17 changed files with 602 additions and 207 deletions

43
api/api-doc/raft.json Normal file
View File

@@ -0,0 +1,43 @@
{
"apiVersion":"0.0.1",
"swaggerVersion":"1.2",
"basePath":"{{Protocol}}://{{Host}}",
"resourcePath":"/raft",
"produces":[
"application/json"
],
"apis":[
{
"path":"/raft/trigger_snapshot/{group_id}",
"operations":[
{
"method":"POST",
"summary":"Triggers snapshot creation and log truncation for the given Raft group",
"type":"string",
"nickname":"trigger_snapshot",
"produces":[
"application/json"
],
"parameters":[
{
"name":"group_id",
"description":"The ID of the group which should get snapshotted",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
},
{
"name":"timeout",
"description":"Timeout in seconds after which the endpoint returns a failure. If not provided, 60s is used.",
"required":false,
"allowMultiple":false,
"type":"long",
"paramType":"query"
}
]
}
]
}
]
}

View File

@@ -33,6 +33,7 @@
#include "task_manager.hh"
#include "task_manager_test.hh"
#include "tasks.hh"
#include "raft.hh"
logging::logger apilog("api");
@@ -326,6 +327,18 @@ future<> unset_server_tasks_compaction_module(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_tasks_compaction_module(ctx, r); });
}
future<> set_server_raft(http_context& ctx, sharded<service::raft_group_registry>& raft_gr) {
auto rb = std::make_shared<api_registry_builder>(ctx.api_doc);
return ctx.http_server.set_routes([rb, &ctx, &raft_gr] (routes& r) {
rb->register_function(r, "raft", "The Raft API");
set_raft(ctx, r, raft_gr);
});
}
future<> unset_server_raft(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_raft(ctx, r); });
}
void req_params::process(const request& req) {
// Process mandatory parameters
for (auto& [name, ent] : params) {

View File

@@ -23,6 +23,7 @@ class load_meter;
class storage_proxy;
class storage_service;
class raft_group0_client;
class raft_group_registry;
} // namespace service
@@ -128,5 +129,7 @@ future<> set_server_task_manager_test(http_context& ctx, sharded<tasks::task_man
future<> unset_server_task_manager_test(http_context& ctx);
future<> set_server_tasks_compaction_module(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& snap_ctl);
future<> unset_server_tasks_compaction_module(http_context& ctx);
future<> set_server_raft(http_context&, sharded<service::raft_group_registry>&);
future<> unset_server_raft(http_context&);
}

70
api/raft.cc Normal file
View File

@@ -0,0 +1,70 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include "api/api.hh"
#include "api/api-doc/raft.json.hh"
#include "service/raft/raft_group_registry.hh"
using namespace seastar::httpd;
extern logging::logger apilog;
namespace api {
namespace r = httpd::raft_json;
using namespace json;
void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr) {
r::trigger_snapshot.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
raft::group_id gid{utils::UUID{req->param["group_id"]}};
auto timeout_dur = std::invoke([timeout_str = req->get_query_param("timeout")] {
if (timeout_str.empty()) {
return std::chrono::seconds{60};
}
auto dur = std::stoll(timeout_str);
if (dur <= 0) {
throw std::runtime_error{"Timeout must be a positive number."};
}
return std::chrono::seconds{dur};
});
std::atomic<bool> found_srv{false};
co_await raft_gr.invoke_on_all([gid, timeout_dur, &found_srv] (service::raft_group_registry& raft_gr) -> future<> {
auto* srv = raft_gr.find_server(gid);
if (!srv) {
co_return;
}
found_srv = true;
abort_on_expiry aoe(lowres_clock::now() + timeout_dur);
apilog.info("Triggering Raft group {} snapshot", gid);
auto result = co_await srv->trigger_snapshot(&aoe.abort_source());
if (result) {
apilog.info("New snapshot for Raft group {} created", gid);
} else {
apilog.info("Could not create new snapshot for Raft group {}, no new entries applied", gid);
}
});
if (!found_srv) {
throw std::runtime_error{fmt::format("Server for group ID {} not found", gid)};
}
co_return json_void{};
});
}
void unset_raft(http_context&, httpd::routes& r) {
r::trigger_snapshot.unset(r);
}
}

18
api/raft.hh Normal file
View File

@@ -0,0 +1,18 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "api_init.hh"
namespace api {
void set_raft(http_context& ctx, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr);
void unset_raft(http_context& ctx, httpd::routes& r);
}

View File

@@ -1246,6 +1246,8 @@ api = ['api/api.cc',
Json2Code('api/api-doc/error_injection.json'),
'api/authorization_cache.cc',
Json2Code('api/api-doc/authorization_cache.json'),
'api/raft.cc',
Json2Code('api/api-doc/raft.json'),
]
alternator = [

View File

@@ -1450,6 +1450,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
supervisor::notify("starting Raft Group Registry service");
raft_gr.invoke_on_all(&service::raft_group_registry::start).get();
api::set_server_raft(ctx, raft_gr).get();
auto stop_raft_api = defer_verbose_shutdown("Raft API", [&ctx] {
api::unset_server_raft(ctx).get();
});
group0_client.init().get();
// schema migration, if needed, is also done on shard 0

View File

@@ -19,9 +19,10 @@ leader::~leader() {
}
fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config config) :
index_t commit_idx, failure_detector& failure_detector, fsm_config config,
seastar::condition_variable& sm_events) :
_my_id(id), _current_term(current_term), _voted_for(voted_for),
_log(std::move(log)), _failure_detector(failure_detector), _config(config) {
_log(std::move(log)), _failure_detector(failure_detector), _config(config), _sm_events(sm_events) {
if (id == raft::server_id{}) {
throw std::invalid_argument("raft::fsm: raft instance cannot have id zero");
}
@@ -41,10 +42,6 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
}
}
fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config config) :
fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {}
future<semaphore_units<>> fsm::wait_for_memory_permit(seastar::abort_source* as, size_t size) {
check_is_leader();
@@ -303,23 +300,15 @@ void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
}
}
future<fsm_output> fsm::poll_output() {
logger.trace("fsm::poll_output() {} stable index: {} last index: {}",
bool fsm::has_output() const {
logger.trace("fsm::has_output() {} stable index: {} last index: {}",
_my_id, _log.stable_idx(), _log.last_idx());
while (true) {
auto diff = _log.last_idx() - _log.stable_idx();
auto diff = _log.last_idx() - _log.stable_idx();
if (diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum ||
(is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty() || _output.state_changed) {
break;
}
co_await _sm_events.wait();
}
while (utils::get_local_injector().enter("fsm::poll_output/pause")) {
co_await seastar::sleep(std::chrono::milliseconds(100));
}
co_return get_output();
return diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum
|| (is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty()
|| _output.state_changed;
}
fsm_output fsm::get_output() {
@@ -1029,7 +1018,7 @@ bool fsm::apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, s
// If the snapshot is local, _commit_idx is larger than snp.idx.
// Otherwise snp.idx becomes the new commit index.
_commit_idx = std::max(_commit_idx, snp.idx);
_output.snp.emplace(fsm_output::applied_snapshot{snp, local});
_output.snp.emplace(fsm_output::applied_snapshot{snp, local, max_trailing_entries});
size_t units = _log.apply_snapshot(std::move(snp), max_trailing_entries, max_trailing_bytes);
if (is_leader()) {
logger.trace("apply_snapshot[{}]: signal {} available units", _my_id, units);
@@ -1142,7 +1131,6 @@ void fsm::stop() {
// (in particular, abort waits on log_limiter_semaphore and prevent new ones).
become_follower({});
}
_sm_events.broken();
}
std::ostream& operator<<(std::ostream& os, const fsm& f) {

View File

@@ -21,6 +21,11 @@ struct fsm_output {
struct applied_snapshot {
snapshot_descriptor snp;
bool is_local;
// Always 0 for non-local snapshots.
size_t max_trailing_entries;
// FIXME: include max_trailing_bytes here and in store_snapshot_descriptor
};
std::optional<std::pair<term_t, server_id>> term_and_vote;
std::vector<log_entry_ptr> log_entries;
@@ -41,14 +46,6 @@ struct fsm_output {
bool state_changed = false;
// Set to true if a leadership transfer was aborted since the last output
bool abort_leadership_transfer;
// True if there is no new output
bool empty() const {
return !term_and_vote &&
log_entries.size() == 0 && messages.size() == 0 &&
committed.size() == 0 && !snp && snps_to_drop.empty() &&
!configuration && !max_read_id_with_quorum;
}
};
struct fsm_config {
@@ -141,9 +138,13 @@ struct leader {
// in-memory state machine with a catch-all API step(message)
// method. The method handles any kind of input and performs the
// needed state machine state transitions. To get state machine output
// poll_output() function has to be called. This call produces an output
// get_output() function has to be called. To check first if
// any new output is present, call has_output(). To wait for new
// new output events, use the sm_events condition variable passed
// to fsm constructor; fs` signals it each time new output may appear.
// The get_output() call produces an output
// object, which encapsulates a list of actions that must be
// performed until the next poll_output() call can be made. The time is
// performed until the next get_output() call can be made. The time is
// represented with a logical timer. The client is responsible for
// periodically invoking tick() method, which advances the state
// machine time and allows it to track such events as election or
@@ -231,7 +232,7 @@ private:
std::vector<std::pair<server_id, rpc_message>> _messages;
// Signaled when there is a IO event to process.
seastar::condition_variable _sm_events;
seastar::condition_variable& _sm_events;
// Called when one of the replicas advances its match index
// so it may be the case that some entries are committed now.
@@ -343,10 +344,8 @@ protected: // For testing
public:
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config conf);
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config conf);
index_t commit_idx, failure_detector& failure_detector, fsm_config conf,
seastar::condition_variable& sm_events);
bool is_leader() const {
return std::holds_alternative<leader>(_state);
@@ -414,12 +413,9 @@ public:
// committed to the persistent Raft log afterwards.
template<typename T> const log_entry& add_entry(T command);
// Wait until there is, and return state machine output that
// needs to be handled.
// This includes a list of the entries that need
// to be logged. The logged entries are eventually
// discarded from the state machine after applying a snapshot.
future<fsm_output> poll_output();
// Check if there is any state machine output
// that `get_output()` will return.
bool has_output() const;
// Get state machine output, if there is any. Doesn't
// wait. It is public for use in testing.
@@ -432,7 +428,7 @@ public:
// Feed one Raft RPC message into the state machine.
// Advances the state machine state and generates output,
// accessible via poll_output().
// accessible via get_output().
template <typename Message>
void step(server_id from, Message&& msg);

View File

@@ -105,6 +105,8 @@ public:
void register_metrics() override;
size_t max_command_size() const override;
private:
seastar::condition_variable _events;
std::unique_ptr<rpc> _rpc;
std::unique_ptr<state_machine> _state_machine;
std::unique_ptr<persistence> _persistence;
@@ -120,6 +122,8 @@ private:
std::optional<shared_promise<>> _state_change_promise;
// Index of the last entry applied to `_state_machine`.
index_t _applied_idx;
// Index of the last persisted snapshot descriptor.
index_t _snapshot_desc_idx;
std::list<active_read> _reads;
std::multimap<index_t, awaited_index> _awaited_indexes;
@@ -132,13 +136,20 @@ private:
// Signaled when apply index is changed
condition_variable _applied_index_changed;
// Signaled when _snapshot_desc_idx is changed
condition_variable _snapshot_desc_idx_changed;
struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd
struct removed_from_config{}; // sent to applier_fiber when we're not a leader and we're outside the current configuration
struct trigger_snapshot_msg{};
using applier_fiber_message = std::variant<
std::vector<log_entry_ptr>,
snapshot_descriptor,
removed_from_config>;
removed_from_config,
trigger_snapshot_msg>;
queue<applier_fiber_message> _apply_entries = queue<applier_fiber_message>(10);
struct stats {
@@ -212,6 +223,16 @@ private:
};
absl::flat_hash_map<server_id, append_request_queue> _append_request_status;
struct server_requests {
bool snapshot = false;
bool empty() const {
return !snapshot;
}
};
server_requests _new_server_requests;
// Called to commit entries (on a leader or otherwise).
void notify_waiters(std::map<index_t, op_status>& waiters, const std::vector<log_entry_ptr>& entries);
@@ -223,10 +244,15 @@ private:
// to be applied.
void signal_applied();
// This fiber processes FSM output by doing the following steps in order:
// Processes FSM output by doing the following steps in order:
// - persist the current term and vote
// - persist unstable log entries on disk.
// - send out messages
future<> process_fsm_output(index_t& stable_idx, fsm_output&&);
future<> process_server_requests(server_requests&&);
// Processes new FSM outputs and server requests as they appear.
future<> io_fiber(index_t stable_idx);
// This fiber runs in the background and applies committed entries.
@@ -278,6 +304,8 @@ private:
future<> wait_for_state_change(seastar::abort_source* as) override;
virtual future<bool> trigger_snapshot(seastar::abort_source* as) override;
// Get "safe to read" index from a leader
future<read_barrier_reply> get_read_idx(server_id leader, seastar::abort_source* as);
// Wait for an entry with a specific term to get committed or
@@ -352,12 +380,14 @@ future<> server_impl::start() {
.append_request_threshold = _config.append_request_threshold,
.max_log_size = _config.max_log_size,
.enable_prevoting = _config.enable_prevoting
});
},
_events);
_applied_idx = index_t{0};
_snapshot_desc_idx = index_t{0};
if (snapshot.id) {
co_await _state_machine->load_snapshot(snapshot.id);
_applied_idx = snapshot.idx;
_snapshot_desc_idx = _applied_idx = snapshot.idx;
}
if (!rpc_config.current.empty()) {
@@ -432,6 +462,54 @@ future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
}
}
future<bool> server_impl::trigger_snapshot(seastar::abort_source* as) {
check_not_aborted();
if (_applied_idx <= _snapshot_desc_idx) {
logger.debug(
"[{}] trigger_snapshot: last persisted snapshot descriptor index is up-to-date"
", applied index: {}, persisted snapshot descriptor index: {}, last fsm log index: {}"
", last fsm snapshot index: {}", _id, _applied_idx, _snapshot_desc_idx,
_fsm->log_last_idx(), _fsm->log_last_snapshot_idx());
co_return false;
}
_new_server_requests.snapshot = true;
_events.signal();
// Wait for persisted snapshot index to catch up to this index.
auto awaited_idx = _applied_idx;
logger.debug("[{}] snapshot request waiting for index {}", _id, awaited_idx);
try {
optimized_optional<abort_source::subscription> sub;
if (as) {
as->check();
sub = as->subscribe([this] () noexcept { _snapshot_desc_idx_changed.broadcast(); });
assert(sub); // due to `check()` above
}
co_await _snapshot_desc_idx_changed.when([this, as, awaited_idx] {
return (as && as->abort_requested()) || awaited_idx <= _snapshot_desc_idx;
});
if (as) {
as->check();
}
} catch (abort_requested_exception&) {
throw request_aborted();
} catch (seastar::broken_condition_variable&) {
throw request_aborted();
}
logger.debug(
"[{}] snapshot request satisfied, awaited index {}, persisted snapshot descriptor index: {}"
", current applied index {}, last fsm log index {}, last fsm snapshot index {}",
_id, awaited_idx, _snapshot_desc_idx, _applied_idx,
_fsm->log_last_idx(), _fsm->log_last_snapshot_idx());
co_return true;
}
future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as) {
// The entry may have been already committed and even applied
// in case it was forwarded to the leader. In this case
@@ -986,144 +1064,175 @@ static rpc_config_diff diff_address_sets(const server_address_set& prev, const c
return result;
}
future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batch) {
if (batch.term_and_vote) {
// Current term and vote are always persisted
// together. A vote may change independently of
// term, but it's safe to update both in this
// case.
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
_stats.store_term_and_vote++;
}
if (batch.snp) {
auto& [snp, is_local, max_trailing_entries] = *batch.snp;
logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id);
// Persist the snapshot
co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries);
_snapshot_desc_idx = snp.idx;
_snapshot_desc_idx_changed.broadcast();
_stats.store_snapshot++;
// If this is locally generated snapshot there is no need to
// load it.
if (!is_local) {
co_await _apply_entries.push_eventually(std::move(snp));
}
}
for (const auto& snp_id: batch.snps_to_drop) {
_state_machine->drop_snapshot(snp_id);
}
if (batch.log_entries.size()) {
auto& entries = batch.log_entries;
if (last_stable >= entries[0]->idx) {
co_await _persistence->truncate_log(entries[0]->idx);
_stats.truncate_persisted_log++;
}
utils::get_local_injector().inject("store_log_entries/test-failure",
[] { throw std::runtime_error("store_log_entries/test-failure"); });
// Combine saving and truncating into one call?
// will require persistence to keep track of last idx
co_await _persistence->store_log_entries(entries);
last_stable = (*entries.crbegin())->idx;
_stats.persisted_log_entries += entries.size();
}
// Update RPC server address mappings. Add servers which are joining
// the cluster according to the new configuration (obtained from the
// last_conf_idx).
//
// It should be done prior to sending the messages since the RPC
// module needs to know who should it send the messages to (actual
// network addresses of the joining servers).
rpc_config_diff rpc_diff;
if (batch.configuration) {
rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration);
for (const auto& addr: rpc_diff.joining) {
add_to_rpc_config(addr);
}
_rpc->on_configuration_change(rpc_diff.joining, {});
}
// After entries are persisted we can send messages.
for (auto&& m : batch.messages) {
try {
send_message(m.first, std::move(m.second));
} catch(...) {
// Not being able to send a message is not a critical error
logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception());
}
}
if (batch.configuration) {
for (const auto& addr: rpc_diff.leaving) {
abort_snapshot_transfer(addr.id);
remove_from_rpc_config(addr);
}
_rpc->on_configuration_change({}, rpc_diff.leaving);
}
// Process committed entries.
if (batch.committed.size()) {
if (_non_joint_conf_commit_promise) {
for (const auto& e: batch.committed) {
const auto* cfg = get_if<raft::configuration>(&e->data);
if (cfg != nullptr && !cfg->is_joint()) {
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value();
break;
}
}
}
co_await _persistence->store_commit_idx(batch.committed.back()->idx);
_stats.queue_entries_for_apply += batch.committed.size();
co_await _apply_entries.push_eventually(std::move(batch.committed));
}
if (batch.max_read_id_with_quorum) {
while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) {
_reads.front().promise.set_value(_reads.front().idx);
_reads.pop_front();
}
}
if (!_fsm->is_leader()) {
if (_stepdown_promise) {
std::exchange(_stepdown_promise, std::nullopt)->set_value();
}
if (!_current_rpc_config.contains(_id)) {
// - It's important we push this after we pushed committed entries above. It
// will cause `applier_fiber` to drop waiters, which should be done after we
// notify all waiters for entries committed in this batch.
// - This may happen multiple times if `io_fiber` gets multiple batches when
// we're outside the configuration, but it should eventually (and generally
// quickly) stop happening (we're outside the config after all).
co_await _apply_entries.push_eventually(removed_from_config{});
}
// request aborts of snapshot transfers
abort_snapshot_transfers();
// abort all read barriers
for (auto& r : _reads) {
r.promise.set_value(not_a_leader{_fsm->current_leader()});
}
_reads.clear();
} else if (batch.abort_leadership_transfer) {
if (_stepdown_promise) {
std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out"));
}
}
if (_leader_promise && _fsm->current_leader()) {
std::exchange(_leader_promise, std::nullopt)->set_value();
}
if (_state_change_promise && batch.state_changed) {
std::exchange(_state_change_promise, std::nullopt)->set_value();
}
}
future<> server_impl::process_server_requests(server_requests&& requests) {
if (requests.snapshot) {
co_await _apply_entries.push_eventually(trigger_snapshot_msg{});
}
}
future<> server_impl::io_fiber(index_t last_stable) {
logger.trace("[{}] io_fiber start", _id);
try {
while (true) {
auto batch = co_await _fsm->poll_output();
bool has_fsm_output = false;
bool has_server_request = false;
co_await _events.when([this, &has_fsm_output, &has_server_request] {
has_fsm_output = _fsm->has_output();
has_server_request = !_new_server_requests.empty();
return has_fsm_output || has_server_request;
});
while (utils::get_local_injector().enter("poll_fsm_output/pause")) {
co_await seastar::sleep(std::chrono::milliseconds(100));
}
_stats.polls++;
if (batch.term_and_vote) {
// Current term and vote are always persisted
// together. A vote may change independently of
// term, but it's safe to update both in this
// case.
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
_stats.store_term_and_vote++;
if (has_fsm_output) {
auto batch = _fsm->get_output();
co_await process_fsm_output(last_stable, std::move(batch));
}
if (batch.snp) {
auto& [snp, is_local] = *batch.snp;
logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id);
// Persist the snapshot
co_await _persistence->store_snapshot_descriptor(snp, is_local ? _config.snapshot_trailing : 0);
_stats.store_snapshot++;
// If this is locally generated snapshot there is no need to
// load it.
if (!is_local) {
co_await _apply_entries.push_eventually(std::move(snp));
}
}
for (const auto& snp_id: batch.snps_to_drop) {
_state_machine->drop_snapshot(snp_id);
}
if (batch.log_entries.size()) {
auto& entries = batch.log_entries;
if (last_stable >= entries[0]->idx) {
co_await _persistence->truncate_log(entries[0]->idx);
_stats.truncate_persisted_log++;
}
utils::get_local_injector().inject("store_log_entries/test-failure",
[] { throw std::runtime_error("store_log_entries/test-failure"); });
// Combine saving and truncating into one call?
// will require persistence to keep track of last idx
co_await _persistence->store_log_entries(entries);
last_stable = (*entries.crbegin())->idx;
_stats.persisted_log_entries += entries.size();
}
// Update RPC server address mappings. Add servers which are joining
// the cluster according to the new configuration (obtained from the
// last_conf_idx).
//
// It should be done prior to sending the messages since the RPC
// module needs to know who should it send the messages to (actual
// network addresses of the joining servers).
rpc_config_diff rpc_diff;
if (batch.configuration) {
rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration);
for (const auto& addr: rpc_diff.joining) {
add_to_rpc_config(addr);
}
_rpc->on_configuration_change(rpc_diff.joining, {});
}
// After entries are persisted we can send messages.
for (auto&& m : batch.messages) {
try {
send_message(m.first, std::move(m.second));
} catch(...) {
// Not being able to send a message is not a critical error
logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception());
}
}
if (batch.configuration) {
for (const auto& addr: rpc_diff.leaving) {
abort_snapshot_transfer(addr.id);
remove_from_rpc_config(addr);
}
_rpc->on_configuration_change({}, rpc_diff.leaving);
}
// Process committed entries.
if (batch.committed.size()) {
if (_non_joint_conf_commit_promise) {
for (const auto& e: batch.committed) {
const auto* cfg = get_if<raft::configuration>(&e->data);
if (cfg != nullptr && !cfg->is_joint()) {
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value();
break;
}
}
}
co_await _persistence->store_commit_idx(batch.committed.back()->idx);
_stats.queue_entries_for_apply += batch.committed.size();
co_await _apply_entries.push_eventually(std::move(batch.committed));
}
if (batch.max_read_id_with_quorum) {
while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) {
_reads.front().promise.set_value(_reads.front().idx);
_reads.pop_front();
}
}
if (!_fsm->is_leader()) {
if (_stepdown_promise) {
std::exchange(_stepdown_promise, std::nullopt)->set_value();
}
if (!_current_rpc_config.contains(_id)) {
// - It's important we push this after we pushed committed entries above. It
// will cause `applier_fiber` to drop waiters, which should be done after we
// notify all waiters for entries committed in this batch.
// - This may happen multiple times if `io_fiber` gets multiple batches when
// we're outside the configuration, but it should eventually (and generally
// quickly) stop happening (we're outside the config after all).
co_await _apply_entries.push_eventually(removed_from_config{});
}
// request aborts of snapshot transfers
abort_snapshot_transfers();
// abort all read barriers
for (auto& r : _reads) {
r.promise.set_value(not_a_leader{_fsm->current_leader()});
}
_reads.clear();
} else if (batch.abort_leadership_transfer) {
if (_stepdown_promise) {
std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out"));
}
}
if (_leader_promise && _fsm->current_leader()) {
std::exchange(_leader_promise, std::nullopt)->set_value();
}
if (_state_change_promise && batch.state_changed) {
std::exchange(_state_change_promise, std::nullopt)->set_value();
if (has_server_request) {
auto requests = std::exchange(_new_server_requests, server_requests{});
co_await process_server_requests(std::move(requests));
}
}
} catch (seastar::broken_condition_variable&) {
@@ -1260,9 +1369,9 @@ future<> server_impl::applier_fiber() {
// Note that at this point (after the `co_await`), _fsm may already have applied a later snapshot.
// That's fine, `_fsm->apply_snapshot` will simply ignore our current attempt; we will soon receive
// a later snapshot from the queue.
if (!_fsm->apply_snapshot(snp,
force_snapshot ? 0 : _config.snapshot_trailing,
force_snapshot ? 0 : _config.snapshot_trailing_size, true)) {
auto max_trailing = force_snapshot ? 0 : _config.snapshot_trailing;
auto max_trailing_bytes = force_snapshot ? 0 : _config.snapshot_trailing_size;
if (!_fsm->apply_snapshot(snp, max_trailing, max_trailing_bytes, true)) {
logger.trace("[{}] applier fiber: while taking snapshot term={} idx={} id={},"
" fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx());
}
@@ -1284,6 +1393,23 @@ future<> server_impl::applier_fiber() {
// it may never know the status of entries it submitted.
drop_waiters();
co_return;
},
[this] (const trigger_snapshot_msg&) -> future<> {
auto applied_term = _fsm->log_term_for(_applied_idx);
// last truncation index <= snapshot index <= applied index
assert(applied_term);
snapshot_descriptor snp;
snp.term = *applied_term;
snp.idx = _applied_idx;
snp.config = _fsm->log_last_conf_for(_applied_idx);
logger.trace("[{}] taking snapshot at term={}, idx={} due to request", _id, snp.term, snp.idx);
snp.id = co_await _state_machine->take_snapshot();
if (!_fsm->apply_snapshot(snp, 0, 0, true)) {
logger.trace("[{}] while taking snapshot term={} idx={} id={} due to request,"
" fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx());
}
_stats.snapshots_taken++;
}
), v);
@@ -1437,6 +1563,8 @@ future<> server_impl::abort(sstring reason) {
_aborted = std::move(reason);
logger.trace("[{}]: abort() called", _id);
_fsm->stop();
_events.broken();
_snapshot_desc_idx_changed.broken();
// IO and applier fibers may update waiters and start new snapshot
// transfers, so abort them first
@@ -1493,7 +1621,7 @@ future<> server_impl::abort(sstring reason) {
}
if (_state_change_promise) {
_state_change_promise->set_exception(stopped_error());
_state_change_promise->set_exception(stopped_error(*_aborted));
}
abort_snapshot_transfers();

View File

@@ -254,6 +254,22 @@ public:
// It it passes nullptr, the function is unabortable.
virtual future<> wait_for_state_change(seastar::abort_source* as) = 0;
// Manually trigger snapshot creation and log truncation.
//
// Does nothing if the current apply index is less or equal to the last persisted snapshot descriptor index
// and returns `false`.
//
// Otherwise returns `true`; when the future resolves, it is guaranteed that the snapshot descriptor
// is persisted, but not that the snapshot is loaded to the state machine yet (it will be eventually).
//
// The request may be resolved by the regular snapshotting mechanisms (e.g. a snapshot
// is created because the Raft log grows too large). In this case there is no guarantee
// how many trailing entries will be left trailing behind the snapshot. However,
// if there are no operations running on the server concurrently with the request and all
// committed entries are already applied, the created snapshot is guaranteed to leave
// zero trailing entries.
virtual future<bool> trigger_snapshot(seastar::abort_source* as) = 0;
// Ad hoc functions for testing
virtual void wait_until_candidate() = 0;
virtual future<> wait_election_done() = 0;

View File

@@ -100,7 +100,7 @@ SEASTAR_TEST_CASE(test_group0_cmd_merge) {
raft::command cmd;
ser::serialize(cmd, group0_cmd);
auto merges = mm.canonical_mutation_merge_count;
utils::get_local_injector().enable("fsm::poll_output/pause");
utils::get_local_injector().enable("poll_fsm_output/pause");
auto f = when_all(
group0.add_entry(cmd, raft::wait_type::applied, nullptr),
group0.add_entry(cmd, raft::wait_type::applied, nullptr),
@@ -108,7 +108,7 @@ SEASTAR_TEST_CASE(test_group0_cmd_merge) {
// Sleep is needed for all the entries added above to hit the log
seastar::sleep(std::chrono::milliseconds(100)).get();
// After unpause all entries added above will be committed and applied together
utils::get_local_injector().disable("fsm::poll_output/pause");
utils::get_local_injector().disable("poll_fsm_output/pause");
f.get(); // Wait for apply to complete
// Thete should be two calls to migration manager since two out of
// three command should be merged.

View File

@@ -298,7 +298,7 @@ BOOST_AUTO_TEST_CASE(test_vote_from_any_state) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd, fsm_cfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), fd, fsm_cfg);
// Follower
BOOST_CHECK(fsm.is_follower());
@@ -360,7 +360,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_1) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
election_timeout(fsm);
BOOST_CHECK(fsm.is_candidate());
@@ -425,7 +425,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_2) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
election_timeout(fsm);
output = fsm.get_output();
@@ -485,7 +485,7 @@ BOOST_AUTO_TEST_CASE(test_single_node_commit) {
server_id id1{utils::UUID(0, 1)};
raft::configuration cfg = config_from_ids({id1});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
BOOST_CHECK(fsm.is_leader()); // Single node skips candidate state
output = fsm.get_output();
@@ -578,11 +578,11 @@ BOOST_AUTO_TEST_CASE(test_dueling_candidates) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log1{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg);
fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg);
raft::log log2{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg);
fsm_debug fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg);
raft::log log3{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg);
fsm_debug fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg);
// fsm1 and fsm3 don't see each other
make_candidate(fsm1);
@@ -621,11 +621,11 @@ BOOST_AUTO_TEST_CASE(test_dueling_pre_candidates) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log1{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre);
fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre);
raft::log log2{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg_pre);
fsm_debug fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg_pre);
raft::log log3{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg_pre);
fsm_debug fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg_pre);
// fsm1 and fsm3 don't see each other
make_candidate(fsm1);
@@ -667,7 +667,7 @@ BOOST_AUTO_TEST_CASE(test_single_node_pre_candidate) {
server_id id1{utils::UUID(0, 1)};
raft::configuration cfg = config_from_ids({id1});
raft::log log1{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre);
fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre);
BOOST_CHECK(fsm1.is_leader());
}
@@ -743,7 +743,7 @@ void handle_proposal(unsigned nodes, std::vector<int> accepting_int) {
raft::configuration cfg = config_from_ids(ids);
raft::log log1{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm1(raft::server_id{utils::UUID(0, 1)}, term_t{}, server_id{}, std::move(log1),
fsm_debug fsm1(raft::server_id{utils::UUID(0, 1)}, term_t{}, server_id{}, std::move(log1),
trivial_failure_detector, fsm_cfg);
// promote 1 to become leader (i.e. gets votes)

View File

@@ -304,7 +304,7 @@ void test_election_single_node_helper(raft::fsm_config fcfg) {
server_id id1 = id();
raft::configuration cfg = config_from_ids({id1});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg);
election_timeout(fsm);
@@ -529,7 +529,7 @@ BOOST_AUTO_TEST_CASE(test_election_two_nodes_prevote) {
raft::configuration cfg = config_from_ids({id1, id2});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg);
// Initial state is follower
BOOST_CHECK(fsm.is_follower());
@@ -595,7 +595,7 @@ BOOST_AUTO_TEST_CASE(test_election_four_nodes_prevote) {
raft::configuration cfg = config_from_ids({id1, id2, id3, id4});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd, fcfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), fd, fcfg);
// Initial state is follower
BOOST_CHECK(fsm.is_follower());
@@ -652,7 +652,7 @@ BOOST_AUTO_TEST_CASE(test_log_matching_rule) {
log.emplace_back(seastar::make_lw_shared<raft::log_entry>(raft::log_entry{term_t{10}, index_t{1000}}));
log.stable_to(log.last_idx());
raft::fsm fsm(id1, term_t{10}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{10}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
// Initial state is follower
BOOST_CHECK(fsm.is_follower());
@@ -929,7 +929,7 @@ BOOST_AUTO_TEST_CASE(test_leader_stepdown) {
{server_addr_from_id(id1), true}, {server_addr_from_id(id2), true}, {server_addr_from_id(id3), false}});
raft::log log(raft::snapshot_descriptor{.config = cfg});
raft::fsm fsm(id1, term_t{1}, /* voted for */ server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{1}, /* voted for */ server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
// Check that we move to candidate state on timeout_now message
fsm.step(id2, raft::timeout_now{fsm.get_current_term()});
@@ -1034,7 +1034,7 @@ BOOST_AUTO_TEST_CASE(test_leader_stepdown) {
{server_addr_from_id(id1), true}, {server_addr_from_id(id2), true}, {server_addr_from_id(id3), true}});
raft::log log2(raft::snapshot_descriptor{.config = cfg});
raft::fsm fsm2(id1, term_t{1}, /* voted for */ server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg);
fsm_debug fsm2(id1, term_t{1}, /* voted for */ server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg);
election_timeout(fsm2);
// Turn to a leader
@@ -1152,7 +1152,7 @@ BOOST_AUTO_TEST_CASE(test_confchange_a_to_b) {
// A somewhat awkward way to obtain B's log for restart
log.emplace_back(make_lw_shared<raft::log_entry>(B.add_entry(config_from_ids({A_id}))));
log.stable_to(log.last_idx());
raft::fsm B_1(B_id, B.get_current_term(), B_id, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug B_1(B_id, B.get_current_term(), B_id, std::move(log), trivial_failure_detector, fsm_cfg);
election_timeout(B_1);
communicate(A, B_1);
BOOST_CHECK(B_1.is_follower());
@@ -1469,7 +1469,7 @@ BOOST_AUTO_TEST_CASE(test_zero) {
BOOST_AUTO_TEST_CASE(test_reordered_reject) {
auto id1 = id();
raft::fsm fsm1(id1, term_t{1}, server_id{},
fsm_debug fsm1(id1, term_t{1}, server_id{},
raft::log{raft::snapshot_descriptor{.config = config_from_ids({id1})}},
trivial_failure_detector, fsm_cfg);
@@ -1481,7 +1481,7 @@ BOOST_AUTO_TEST_CASE(test_reordered_reject) {
(void)fsm1.get_output();
auto id2 = id();
raft::fsm fsm2(id2, term_t{1}, server_id{},
fsm_debug fsm2(id2, term_t{1}, server_id{},
raft::log{raft::snapshot_descriptor{.config = raft::configuration{}}},
trivial_failure_detector, fsm_cfg);

View File

@@ -94,7 +94,8 @@ communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map) {
has_traffic = false;
for (auto e : map) {
raft::fsm& from = *e.second;
for (auto output = from.get_output(); !output.empty(); output = from.get_output()) {
for (bool has_output = from.has_output(); has_output; has_output = from.has_output()) {
auto output = from.get_output();
if (stop_pred()) {
return;
}

View File

@@ -63,9 +63,20 @@ raft::command create_command(T val) {
extern raft::fsm_config fsm_cfg;
extern raft::fsm_config fsm_cfg_pre;
class fsm_debug : public raft::fsm {
struct sm_events_container {
seastar::condition_variable sm_events;
};
class fsm_debug : public sm_events_container, public raft::fsm {
public:
using raft::fsm::fsm;
explicit fsm_debug(raft::server_id id, raft::term_t current_term, raft::server_id voted_for, raft::log log,
raft::failure_detector& failure_detector, raft::fsm_config conf)
: sm_events_container()
, fsm(id, current_term, voted_for, std::move(log), raft::index_t{0}, failure_detector, conf, sm_events) {
}
void become_follower(raft::server_id leader) {
raft::fsm::become_follower(leader);
}

View File

@@ -0,0 +1,101 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
import pytest
import time
import logging
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier
logger = logging.getLogger(__name__)
async def get_raft_log_size(cql, host) -> int:
query = "select count(\"index\") from system.raft"
return (await cql.run_async(query, host=host))[0][0]
async def get_raft_snap_id(cql, host) -> str:
query = "select snapshot_id from system.raft_snapshots"
return (await cql.run_async(query, host=host))[0].snapshot_id
async def trigger_snapshot(manager: ManagerClient, group0_id: str, ip_addr) -> None:
await manager.api.client.post(f"/raft/trigger_snapshot/{group0_id}", host=ip_addr)
@pytest.mark.asyncio
async def test_raft_snapshot_request(manager: ManagerClient):
servers = await manager.servers_add(3)
cql = manager.get_cql()
s1 = servers[0]
h1 = (await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60))[0]
group0_id = (await cql.run_async(
"select value from system.scylla_local where key = 'raft_group0_id'",
host=h1))[0].value
# Verify that snapshotting updates the snapshot ID and truncates the log.
log_size = await get_raft_log_size(cql, h1)
logger.info(f"Log size on {s1}: {log_size}")
snap_id = await get_raft_snap_id(cql, h1)
logger.info(f"Snapshot ID on {s1}: {snap_id}")
assert log_size > 0
await trigger_snapshot(manager, group0_id, s1.ip_addr)
new_log_size = await get_raft_log_size(cql, h1)
logger.info(f"New log size on {s1}: {new_log_size}")
new_snap_id = await get_raft_snap_id(cql, h1)
logger.info(f"New snapshot ID on {s1}: {new_snap_id}")
assert new_log_size == 0
assert new_snap_id != snap_id
# If a server misses a command and a snapshot is created on the leader,
# the server once alive should eventually receive that snapshot.
s2 = servers[2]
h2 = (await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60))[0]
s2_log_size = await get_raft_log_size(cql, h2)
logger.info(f"Log size on {s2}: {s2_log_size}")
s2_snap_id = await get_raft_snap_id(cql, h2)
logger.info(f"Snapshot ID on {s2}: {s2_snap_id}")
await manager.server_stop_gracefully(s2.server_id)
logger.info(f"Stopped {s2}")
# Restarting the two servers will cause a newly elected leader to append a dummy command.
await asyncio.gather(*(manager.server_restart(s.server_id) for s in servers[:2]))
logger.info(f"Restarted {servers[:2]}")
# Wait for one server to append the command and do a read_barrier on the other
# to make sure both appended
async def appended_command() -> int | None:
await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60)
s = await get_raft_log_size(cql, h1)
if s > 0:
return s
return None
log_size = await wait_for(appended_command, time.time() + 60)
logger.info(f"{servers[0]} appended new command")
h = (await wait_for_cql_and_get_hosts(cql, [servers[1]], time.time() + 60))[0]
await read_barrier(cql, h)
logger.info(f"Read barrier done on {servers[1]}")
# We don't know who the leader is, so trigger a snapshot on both servers.
for s in servers[:2]:
await trigger_snapshot(manager, group0_id, s.ip_addr)
h = (await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]
snap = await get_raft_snap_id(cql, h)
logger.info("New snapshot ID on {s}: {snap}")
await manager.server_start(s2.server_id)
logger.info(f"Server {s2} restarted")
await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60)
async def received_snapshot() -> str | None:
new_s2_snap_id = await get_raft_snap_id(cql, h2)
if s2_snap_id != new_s2_snap_id:
return new_s2_snap_id
return None
new_s2_snap_id = await wait_for(received_snapshot, time.time() + 60)
logger.info(f"{s2} received new snapshot: {new_s2_snap_id}")
new_s2_log_size = await get_raft_log_size(cql, h2)
assert new_s2_log_size == 0