Merge "Enhance topology request status tracking" from Gleb

Currently to figure out if a topology request is complete a submitter
checks the topology state and tries to figure out from that the status
of the request. This is not exact. Lets look at rebuild handling for
instance. To figure out if request is completed the code waits for
request object to disappear from the topology, but if another rebuild
starts between the end of the previous one and the code noticing that
it completed the code will continue waiting for the next rebuild.
Another problem is that in case of operation failure there is no way to
pass an error back to the initiator.

This series solves those problems by assigning an id for each request and
tracking the status of each request in a separate table. The initiator
can query the request status from the table and see if the request was
completed successfully or if it failed with an error, which is also
evadable from the table.

The schema for the table is:

    CREATE TABLE system.topology_requests (
        id timeuuid PRIMARY KEY,

        initiating_host uuid,
        start_time timestamp,

        done boolean,
        error text,
        end_time timestamp,
    );

and all entries have TTL of one month.
This commit is contained in:
Tomasz Grabiec
2024-01-17 00:36:14 +01:00
9 changed files with 342 additions and 177 deletions

View File

@@ -53,6 +53,7 @@ namespace {
schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY,
system_keyspace::BROADCAST_KV_STORE,
system_keyspace::TOPOLOGY,
system_keyspace::TOPOLOGY_REQUESTS,
system_keyspace::CDC_GENERATIONS_V3,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
@@ -64,6 +65,7 @@ namespace {
system_keyspace::PAXOS,
system_keyspace::BROADCAST_KV_STORE,
system_keyspace::TOPOLOGY,
system_keyspace::TOPOLOGY_REQUESTS,
system_keyspace::CDC_GENERATIONS_V3,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
@@ -223,6 +225,7 @@ schema_ptr system_keyspace::topology() {
.with_column("ignore_msb", int32_type)
.with_column("cleanup_status", utf8_type)
.with_column("supported_features", set_type_impl::get_instance(utf8_type, true))
.with_column("request_id", timeuuid_type)
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
.with_column("version", long_type, column_kind::static_column)
.with_column("fence_version", long_type, column_kind::static_column)
@@ -241,6 +244,23 @@ schema_ptr system_keyspace::topology() {
return schema;
}
schema_ptr system_keyspace::topology_requests() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, TOPOLOGY_REQUESTS);
return schema_builder(NAME, TOPOLOGY_REQUESTS, std::optional(id))
.with_column("id", timeuuid_type, column_kind::partition_key)
.with_column("initiating_host", uuid_type)
.with_column("start_time", timestamp_type)
.with_column("done", boolean_type)
.with_column("error", utf8_type)
.with_column("end_time", timestamp_type)
.set_comment("Topology request tracking")
.with_version(generate_schema_version(id))
.build();
}();
return schema;
}
extern thread_local data_type cdc_streams_set_type;
/* An internal table used by nodes to store CDC generation data.
@@ -1919,7 +1939,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
r.insert(r.end(), {raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery()});
if (cfg.check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
r.insert(r.end(), {topology(), cdc_generations_v3()});
r.insert(r.end(), {topology(), cdc_generations_v3(), topology_requests()});
}
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
@@ -2499,6 +2519,7 @@ future<service::topology> system_keyspace::load_topology_state() {
size_t shard_count = row.get_as<int32_t>("shard_count");
uint8_t ignore_msb = row.get_as<int32_t>("ignore_msb");
sstring cleanup_status = row.get_as<sstring>("cleanup_status");
utils::UUID request_id = row.get_as<utils::UUID>("request_id");
service::node_state nstate = service::node_state_from_string(row.get_as<sstring>("node_state"));
@@ -2634,7 +2655,7 @@ future<service::topology> system_keyspace::load_topology_state() {
map->emplace(host_id, service::replica_state{
nstate, std::move(datacenter), std::move(rack), std::move(release_version),
ring_slice, shard_count, ignore_msb, std::move(supported_features),
service::cleanup_status_from_string(cleanup_status)});
service::cleanup_status_from_string(cleanup_status), request_id});
}
}
@@ -2851,6 +2872,23 @@ future<> system_keyspace::sstables_registry_list(sstring location, sstable_regis
});
}
future<service::topology_request_state> system_keyspace::get_topology_request_state(utils::UUID id) {
auto rs = co_await execute_cql(
format("SELECT done, error FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id));
if (!rs || rs->empty()) {
on_internal_error(slogger, format("no entry for request id {}", id));
}
auto& row = rs->one();
sstring error;
if (row.has("error")) {
error = row.get_as<sstring>("error");
}
co_return service::topology_request_state{row.get_as<bool>("done"), std::move(error)};
}
sstring system_keyspace_name() {
return system_keyspace::NAME;
}

View File

@@ -43,6 +43,7 @@ namespace paxos {
class proposal;
} // namespace service::paxos
struct topology_request_state;
}
namespace netw {
@@ -156,6 +157,7 @@ public:
static constexpr auto DISCOVERY = "discovery";
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
static constexpr auto TOPOLOGY = "topology";
static constexpr auto TOPOLOGY_REQUESTS = "topology_requests";
static constexpr auto SSTABLES_REGISTRY = "sstables";
static constexpr auto CDC_GENERATIONS_V3 = "cdc_generations_v3";
static constexpr auto TABLETS = "tablets";
@@ -240,6 +242,7 @@ public:
static schema_ptr discovery();
static schema_ptr broadcast_kv_store();
static schema_ptr topology();
static schema_ptr topology_requests();
static schema_ptr sstables_registry();
static schema_ptr cdc_generations_v3();
static schema_ptr tablets();
@@ -517,6 +520,7 @@ public:
future<bool> get_must_synchronize_topology();
future<> set_must_synchronize_topology(bool);
future<service::topology_request_state> get_topology_request_state(utils::UUID id);
private:
static service::topology_features decode_topology_features_state(::shared_ptr<cql3::untyped_result_set> rs);

View File

@@ -21,6 +21,7 @@ struct join_node_request_params {
uint32_t shard_count;
uint32_t ignore_msb;
std::vector<sstring> supported_features;
utils::UUID request_id;
};
struct join_node_request_result {

View File

@@ -29,7 +29,6 @@ struct raft_topology_cmd {
barrier,
barrier_and_drain,
stream_ranges,
shutdown,
wait_for_ip
};
service::raft_topology_cmd::command cmd;
@@ -46,6 +45,7 @@ struct raft_topology_cmd_result {
struct raft_topology_snapshot {
std::vector<canonical_mutation> topology_mutations;
std::vector<canonical_mutation> cdc_generation_mutations;
std::vector<canonical_mutation> topology_requests_mutations;
};
struct raft_topology_pull_params {};

View File

@@ -27,6 +27,7 @@ struct join_node_request_params {
uint32_t shard_count;
uint32_t ignore_msb;
std::vector<sstring> supported_features;
utils::UUID request_id;
};
struct join_node_request_result {

View File

@@ -10,9 +10,11 @@
*/
#include "storage_service.hh"
#include "gc_clock.hh"
#include "service/topology_guard.hh"
#include "service/session.hh"
#include "dht/boot_strapper.hh"
#include <optional>
#include <seastar/core/distributed.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/as_future.hh>
@@ -627,7 +629,7 @@ future<> storage_service::topology_transition() {
future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
std::vector<mutation> muts;
muts.reserve(snp.topology_mutations.size() + (snp.cdc_generation_mutations.size()));
muts.reserve(snp.topology_mutations.size() + snp.cdc_generation_mutations.size() + snp.topology_requests_mutations.size());
{
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
boost::transform(snp.topology_mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) {
@@ -640,6 +642,12 @@ future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
return m.to_mutation(s);
});
}
if (snp.topology_requests_mutations.size()) {
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
boost::transform(snp.topology_requests_mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) {
return m.to_mutation(s);
});
}
co_await _db.local().apply(freeze(muts), db::no_timeout);
}
@@ -748,6 +756,17 @@ protected:
template<std::ranges::range C>
requires std::convertible_to<std::ranges::range_value_t<C>, data_value>
Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c);
Builder& set(const char* cell, node_state value);
Builder& set(const char* cell, topology_request value);
Builder& set(const char* cell, const sstring& value);
Builder& set(const char* cell, const raft::server_id& value);
Builder& set(const char* cell, const uint32_t& value);
Builder& set(const char* cell, cleanup_status value);
Builder& set(const char* cell, const utils::UUID& value);
Builder& set(const char* cell, bool value);
Builder& set(const char* cell, const char* value);
Builder& set(const char* cell, const db_clock::time_point& value);
Builder& del(const char* cell);
};
@@ -765,23 +784,19 @@ private:
row& row();
api::timestamp_type timestamp() const;
const schema& schema() const;
ttl_opt ttl() const { return std::nullopt; }
public:
topology_node_mutation_builder(topology_mutation_builder&, raft::server_id);
topology_node_mutation_builder& set(const char* cell, node_state value);
topology_node_mutation_builder& set(const char* cell, topology_request value);
topology_node_mutation_builder& set(const char* cell, const sstring& value);
topology_node_mutation_builder& set(const char* cell, const raft::server_id& value);
using builder_base::set;
using builder_base::del;
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids);
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
template<typename S>
requires std::constructible_from<sstring, S>
topology_node_mutation_builder& set(const char* cell, const std::set<S>& value);
topology_node_mutation_builder& set(const char* cell, const uint32_t& value);
topology_node_mutation_builder& set(const char* cell, cleanup_status value);
topology_node_mutation_builder& set(const char* cell, const utils::UUID& value);
topology_node_mutation_builder& del(const char* cell);
canonical_mutation build();
};
@@ -801,6 +816,7 @@ private:
row& row();
api::timestamp_type timestamp() const;
const schema& schema() const;
ttl_opt ttl() const { return std::nullopt; }
public:
topology_mutation_builder(api::timestamp_type ts);
@@ -840,7 +856,7 @@ template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::apply_atomic(const char* cell, const data_value& value) {
const column_definition* cdef = self().schema().get_column_definition(cell);
assert(cdef);
self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value)));
self().row().apply(*cdef, atomic_cell::make_live(*cdef->type, self().timestamp(), cdef->type->decompose(value), self().ttl()));
return self();
}
@@ -860,7 +876,7 @@ Builder& topology_mutation_builder_base<Builder>::apply_set(const char* cell, co
collection_mutation_description cm;
cm.cells.reserve(cset.size());
for (const bytes& raw : cset) {
cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view()));
cm.cells.emplace_back(raw, atomic_cell::make_live(*bytes_type, self().timestamp(), bytes_view(), self().ttl()));
}
if (apply_mode == collection_apply_mode::overwrite) {
@@ -885,6 +901,57 @@ Builder& topology_mutation_builder_base<Builder>::del(const char* cell) {
return self();
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, node_state value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, topology_request value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const sstring& value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const raft::server_id& value) {
return apply_atomic(cell, value.uuid());
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const uint32_t& value) {
return apply_atomic(cell, int32_t(value));
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, cleanup_status value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const utils::UUID& value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, bool value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const char* value) {
return apply_atomic(cell, value);
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const db_clock::time_point& value) {
return apply_atomic(cell, value);
}
row& topology_node_mutation_builder::row() {
return _r.cells();
}
@@ -897,39 +964,6 @@ const schema& topology_node_mutation_builder::schema() const {
return *_builder._s;
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, node_state value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, topology_request value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const sstring& value) {
return apply_atomic(cell, value);
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const raft::server_id& value) {
return apply_atomic(cell, value.uuid());
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const uint32_t& value) {
return apply_atomic(cell, int32_t(value));
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, cleanup_status value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
topology_node_mutation_builder& topology_node_mutation_builder::set(
const char* cell, const utils::UUID& value) {
return apply_atomic(cell, value);
}
topology_node_mutation_builder& topology_node_mutation_builder::del(const char* cell) {
return builder_base::del(cell);
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<raft::server_id>& nodes_ids) {
return apply_set(cell, collection_apply_mode::overwrite, nodes_ids | boost::adaptors::transformed([] (const auto& node_id) { return node_id.id; }));
}
@@ -1035,6 +1069,60 @@ topology_node_mutation_builder& topology_mutation_builder::with_node(raft::serve
return *_node_builder;
}
class topology_request_tracking_mutation_builder :
public topology_mutation_builder_base<topology_request_tracking_mutation_builder> {
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
deletable_row& _r;
public:
row& row();
const schema& schema() const;
api::timestamp_type timestamp() const;
ttl_opt ttl() const;
topology_request_tracking_mutation_builder(utils::UUID id);
using builder_base::set;
using builder_base::del;
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
topology_request_tracking_mutation_builder::topology_request_tracking_mutation_builder(utils::UUID id) :
_s(db::system_keyspace::topology_requests()),
_m(_s, partition_key::from_singular(*_s, id)),
_ts(utils::UUID_gen::micros_timestamp(id)),
_r(_m.partition().clustered_row(*_s, clustering_key::make_empty())) {
_r.apply(row_marker(_ts, *ttl(), gc_clock::now() + *ttl()));
}
ttl_opt topology_request_tracking_mutation_builder::ttl() const {
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::microseconds(_ts)) + std::chrono::months(1)
- std::chrono::duration_cast<std::chrono::seconds>(gc_clock::now().time_since_epoch());
}
const schema& topology_request_tracking_mutation_builder::schema() const {
return *_s;
}
row& topology_request_tracking_mutation_builder::row() {
return _r.cells();
}
api::timestamp_type topology_request_tracking_mutation_builder::timestamp() const {
return _ts;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional<sstring> error) {
set("end_time", db_clock::now());
if (error) {
set("error", *error);
}
return set("done", true);
}
future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<service::storage_proxy>& proxy) noexcept {
while (!_group0_as.abort_requested()) {
bool err = false;
@@ -1160,8 +1248,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
using drop_guard_and_retake = bool_class<class retake_guard_tag>;
// True if an ongoing topology change should be rolled back
bool _rollback = false;
// Engaged if an ongoing topology change should be rolled back. The string inside
// will indicate a reason for the rollback.
std::optional<sstring> _rollback;
const locator::token_metadata& get_token_metadata() const noexcept {
return *_shared_tm.get();
@@ -1217,6 +1306,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
struct cancel_requests {
group0_guard guard;
std::unordered_set<raft::server_id> dead_nodes;
};
struct start_cleanup {
@@ -1298,7 +1388,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// We did not find a request that has enough live node to proceed
// Cancel all requests to let admin know that no operation can succeed
slogger.warn("topology coordinator: cancel request queue because no request can proceed. Dead nodes: {}", dead_nodes);
return cancel_requests{std::move(guard)};
return cancel_requests{std::move(guard), std::move(dead_nodes)};
}
auto [id, req] = *next_req;
@@ -2181,7 +2271,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
return std::make_pair(false, std::move(guard));
}
future<> cancel_all_requests(group0_guard guard) {
future<> cancel_all_requests(group0_guard guard, std::unordered_set<raft::server_id> dead_nodes) {
std::vector<canonical_mutation> muts;
std::vector<raft::server_id> reject_join;
if (_topo_sm._topology.requests.empty()) {
@@ -2189,16 +2279,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
auto ts = guard.write_timestamp();
for (auto& [id, req] : _topo_sm._topology.requests) {
topology_mutation_builder builder(ts);
topology_request_tracking_mutation_builder rtbuilder(_topo_sm._topology.find(id)->second.request_id);
auto node_builder = builder.with_node(id).del("topology_request");
rtbuilder.done(fmt::format("Canceled. Dead nodes: {}", dead_nodes));
switch (req) {
case topology_request::replace:
[[fallthrough]];
case topology_request::join: {
topology_mutation_builder builder(ts);
builder.with_node(id)
.set("node_state", node_state::left)
.del("topology_request");
node_builder.set("node_state", node_state::left);
reject_join.emplace_back(id);
muts.emplace_back(builder.build());
try {
co_await wait_for_ip(id, _address_map, _as);
} catch (...) {
@@ -2211,13 +2301,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case topology_request::rebuild:
[[fallthrough]];
case topology_request::remove: {
topology_mutation_builder builder(ts);
builder.with_node(id)
.del("topology_request");
muts.emplace_back(builder.build());
}
break;
}
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
}
co_await update_topology_state(std::move(guard), std::move(muts), "cancel all topology requests");
@@ -2251,7 +2339,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
if (auto* cancel = std::get_if<cancel_requests>(&work)) {
co_await cancel_all_requests(std::move(cancel->guard));
co_await cancel_all_requests(std::move(cancel->guard), std::move(cancel->dead_nodes));
co_return true;
}
@@ -2287,11 +2375,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// stop the topology transition.
if (!accepted) {
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::left);
rtbuilder.done("join is not accepted");
auto reason = ::format("bootstrap: failed to accept {}", node.id);
co_await update_topology_state(std::move(node.guard), {builder.build()}, reason);
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason);
slogger.info("raft topology: node {} moved to left state", node.id);
@@ -2363,7 +2453,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
} catch (...) {
slogger.error("raft topology: transition_state::commit_cdc_generation, "
"raft_topology_cmd::command::barrier failed, error {}", std::current_exception());
_rollback = true;
_rollback = fmt::format("Failed to commit cdc generation: {}", std::current_exception());
break;
}
@@ -2441,7 +2531,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (...) {
slogger.error("raft topology: tablets draining failed with {}. Aborting the topology operation", std::current_exception());
_rollback = true;
_rollback = fmt::format("Failed to drain tablets: {}", std::current_exception());
}
break;
case topology::transition_state::write_both_read_old: {
@@ -2458,7 +2548,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
slogger.error("raft topology: transition_state::write_both_read_old, "
"global_token_metadata_barrier failed, error {}",
std::current_exception());
_rollback = true;
_rollback = fmt::format("global_token_metadata_barrier failed in write_both_read_old state {}", std::current_exception());
break;
}
@@ -2512,7 +2602,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
} catch (...) {
slogger.error("raft topology: send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is {}): {}", node.rs->state, std::current_exception());
_rollback = true;
_rollback = fmt::format("Failed stream ranges: {}", std::current_exception());
break;
}
// Streaming completed. We can now move tokens state to topology::transition_state::write_both_read_new
@@ -2551,6 +2641,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();
switch(node.rs->state) {
case node_state::bootstrapping: {
std::vector<canonical_mutation> muts;
@@ -2561,6 +2653,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.with_node(node.id)
.set("node_state", node_state::normal);
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
co_await update_topology_state(take_guard(std::move(node)), std::move(muts),
"bootstrap: read fence completed");
}
@@ -2578,7 +2671,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.del("tokens")
.set("node_state", next_state);
auto str = ::format("{}: read fence completed", node.rs->state);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
std::vector<canonical_mutation> muts;
muts.reserve(2);
muts.push_back(builder.build());
if (next_state == node_state::left) {
muts.push_back(rtbuilder.build());
}
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
}
break;
case node_state::replacing: {
@@ -2597,7 +2696,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
builder2.with_node(replaced_node_id)
.del("tokens")
.set("node_state", node_state::left);
co_await update_topology_state(take_guard(std::move(node)), {builder1.build(), builder2.build()},
co_await update_topology_state(take_guard(std::move(node)), {builder1.build(), builder2.build(), rtbuilder.build()},
"replace: read fence completed");
}
break;
@@ -2676,11 +2775,14 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
if (auto* reject = std::get_if<join_node_response_params::rejected>(&validation_result)) {
// Transition to left
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.with_node(node.id)
.del("topology_request")
.set("node_state", node_state::left);
rtbuilder.done("Join is rejected during validation");
auto reason = ::format("bootstrap: node rejected");
co_await update_topology_state(std::move(node.guard), {builder.build()}, reason);
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason);
slogger.info("raft topology: rejected node moved to left state {}", node.id);
@@ -2703,6 +2805,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// if the state is none there have to be either 'join' or 'replace' request
// if the state is normal there have to be either 'leave', 'remove' or 'rebuild' request
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.set("start_time", db_clock::now());
switch (node.request.value()) {
case topology_request::join: {
assert(!node.rs->ring);
@@ -2712,7 +2816,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set("node_state", node_state::bootstrapping)
.del("topology_request");
auto reason = ::format("bootstrap: accept node");
co_await update_topology_state(std::move(node.guard), {builder.build()}, reason);
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason);
break;
}
case topology_request::leave:
@@ -2725,7 +2829,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.with_node(node.id)
.set("node_state", node_state::decommissioning)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"start decommission");
break;
case topology_request::remove: {
@@ -2735,7 +2839,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
if (_gossiper.is_alive(ip)) {
builder.with_node(node.id)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
rtbuilder.done("the node is alive");
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"reject removenode");
slogger.warn("raft topology: rejected removenode operation for node {} "
"because it is alive", node.id);
@@ -2747,7 +2852,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.with_node(node.id)
.set("node_state", node_state::removing)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"start removenode");
break;
}
@@ -2757,7 +2862,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.with_node(node.id)
.set("node_state", node_state::replacing)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"replace: accept node");
break;
}
@@ -2766,7 +2871,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
builder.with_node(node.id)
.set("node_state", node_state::rebuilding)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"start rebuilding");
break;
}
@@ -2777,10 +2882,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
node = co_await exec_direct_command(
std::move(node), raft_topology_cmd::command::stream_ranges);
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.with_node(node.id)
.set("node_state", node_state::normal)
.del("rebuild_option");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, "rebuilding completed");
rtbuilder.done();
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed");
}
break;
case node_state::left_token_ring: {
@@ -2819,6 +2926,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
node = retake_node(co_await start_operation(), node.id);
}
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring sate");
// Tell the node to shut down.
// This is done to improve user experience when there are no failures.
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
@@ -2831,7 +2944,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto node_id = node.id;
bool shutdown_failed = false;
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::shutdown);
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
slogger.warn("raft topology: failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
@@ -2873,15 +2986,17 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
node = retake_node(std::move(node.guard), node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.set_fence_version(_topo_sm._topology.version) // fence requests in case the drain above failed
.set_transition_state(topology::transition_state::tablet_migration) // in case tablet drain failed we need to complete tablet transitions
.with_node(node.id)
.set("node_state", node_state::normal);
rtbuilder.done();
auto str = fmt::format("complete rollback of {} to state normal", node.id);
slogger.info("{}", str);
co_await update_topology_state(std::move(node.guard), {builder.build()}, str);
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str);
}
break;
case node_state::bootstrapping:
@@ -3157,17 +3272,19 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
}
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", state);
rtbuilder.set("error", fmt::format("Rolled back: {}", *_rollback));
std::vector<canonical_mutation> muts;
// We are in the process of aborting remove or decommission which may have streamed some
// ranges to other nodes. Cleanup is needed.
muts = mark_nodes_as_cleanup_needed(node, true);
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
auto str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state);
@@ -3195,7 +3312,7 @@ future<> topology_coordinator::run() {
if (_rollback) {
co_await rollback_current_topology_op(std::move(guard));
_rollback = false;
_rollback = std::nullopt;
continue;
}
@@ -3317,7 +3434,7 @@ std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(
return ids;
}
canonical_mutation storage_service::build_mutation_from_join_params(const join_node_request_params& params, service::group0_guard& guard) {
std::vector<canonical_mutation> storage_service::build_mutation_from_join_params(const join_node_request_params& params, service::group0_guard& guard) {
topology_mutation_builder builder(guard.write_timestamp());
auto& node_builder = builder.with_node(params.host_id)
.set("node_state", node_state::none)
@@ -3346,8 +3463,12 @@ canonical_mutation storage_service::build_mutation_from_join_params(const join_n
node_builder
.set("topology_request", topology_request::join);
}
node_builder.set("request_id", params.request_id);
topology_request_tracking_mutation_builder rtbuilder(params.request_id);
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
return builder.build();
return {builder.build(), rtbuilder.build()};
}
class join_node_rpc_handshaker : public service::group0_handshaker {
@@ -3409,7 +3530,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se
slogger.info("raft topology: adding myself as the first node to the topology");
auto guard = co_await _group0->client().start_operation(&_group0_as);
auto insert_join_request_mutation = build_mutation_from_join_params(params, guard);
auto insert_join_request_mutations = build_mutation_from_join_params(params, guard);
// We are the first node and we define the cluster.
// Set the enabled_features field to our features.
@@ -3417,7 +3538,8 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se
builder.add_enabled_features(boost::copy_range<std::set<sstring>>(params.supported_features));
auto enable_features_mutation = builder.build();
topology_change change{{std::move(enable_features_mutation), std::move(insert_join_request_mutation)}};
insert_join_request_mutations.push_back(std::move(enable_features_mutation));
topology_change change{std::move(insert_join_request_mutations)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
"bootstrap: adding myself as the first node to the topology");
try {
@@ -3755,6 +3877,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
.shard_count = smp::count,
.ignore_msb = _db.local().get_config().murmur3_partitioner_ignore_msb_bits(),
.supported_features = boost::copy_range<std::vector<sstring>>(_feature_service.supported_feature_set()),
.request_id = utils::UUID_gen::get_time_UUID(),
};
if (raft_replace_info) {
@@ -3811,23 +3934,18 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
// we do that here.
co_await raft_initialize_discovery_leader(*raft_server, join_params);
auto leaving = [&] {
return _topology_state_machine._topology.left_nodes.contains(raft_server->id()) ||
(_topology_state_machine._topology.transition_nodes.contains(raft_server->id()) &&
_topology_state_machine._topology.transition_nodes[raft_server->id()].state == node_state::left_token_ring);
};
sstring err;
// Wait until we enter one of the final states
co_await _topology_state_machine.event.when([this, raft_server, &leaving] {
return _topology_state_machine._topology.normal_nodes.contains(raft_server->id()) || leaving();
});
if (leaving()) {
if (_sys_ks.local().bootstrap_complete()) {
if (_sys_ks.local().bootstrap_complete()) {
if (_topology_state_machine._topology.left_nodes.contains(raft_server->id())) {
throw std::runtime_error("A node that already left the cluster cannot be restarted");
} else {
throw std::runtime_error(fmt::format("{} failed. See earlier errors", raft_replace_info ? "Replace" : "Bootstrap"));
}
} else {
err = co_await wait_for_topology_request_completion(join_params.request_id);
}
if (!err.empty()) {
throw std::runtime_error(fmt::format("{} failed. See earlier errors ({})", raft_replace_info ? "Replace" : "Bootstrap", err));
}
co_await update_topology_with_local_metadata(*raft_server);
@@ -5353,10 +5471,7 @@ void on_streaming_finished() {
future<> storage_service::raft_decommission() {
auto& raft_server = _group0->group0_server();
auto disengage_shutdown_promise = defer([this] {
_shutdown_request_promise = std::nullopt;
});
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
@@ -5379,10 +5494,15 @@ future<> storage_service::raft_decommission() {
slogger.info("raft topology: request decommission for: {}", raft_server.id());
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("topology_request", topology_request::leave);
topology_change change{{builder.build()}};
.set("topology_request", topology_request::leave)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id());
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}", raft_server.id()));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
} catch (group0_concurrent_modification&) {
@@ -5392,40 +5512,14 @@ future<> storage_service::raft_decommission() {
break;
}
// Wait for the coordinator to tell us to shut down or for decomission request to disappear
bool abort_wait = false;
auto error = co_await wait_for_topology_request_completion(request_id);
auto f1 = _shutdown_request_promise.emplace().get_future().then([this, &abort_wait] {
// shutdown was signalled, abort the wait for the topology event
abort_wait = true;
_topology_state_machine.event.broadcast();
});
auto f2 = _topology_state_machine.event.wait([this, &raft_server, &abort_wait] {
if (abort_wait) {
return true; // the wait is aborted
}
// Wait for decommission request to be removed, but node stay as normal which means decommission failed
auto it = _topology_state_machine._topology.find(raft_server.id());
if (it->second.state == node_state::normal) {
auto rit = _topology_state_machine._topology.requests.find(raft_server.id());
if (rit == _topology_state_machine._topology.requests.end() || rit->second != topology_request::leave) {
_shutdown_request_promise->set_exception(std::runtime_error("Decommission failure"));
return true; // node is normal, but leave request is gone. It means decommission failed
}
}
return false;
});
slogger.info("raft topology: decommission: wait for completion");
auto res = co_await when_all(std::move(f1), std::move(f2));
if (!std::get<0>(res).failed()) {
if (error.empty()) {
// Need to set it otherwise gossiper will try to send shutdown on exit
co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count()) }});
} else {
constexpr auto err = "Decommission failed. See earlier errors";
slogger.error(err);
} else {
auto err = fmt::format("Decommission failed. See earlier errors ({})", error);
slogger.error("{}", err);
throw std::runtime_error(err);
}
}
@@ -5708,6 +5802,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
future<> storage_service::raft_removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
auto id = raft::server_id{host_id.uuid()};
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
@@ -5745,10 +5840,15 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(id)
.set("ignore_nodes", ignored_ids)
.set("topology_request", topology_request::remove);
topology_change change{{builder.build()}};
.set("topology_request", topology_request::remove)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id());
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
} catch (group0_concurrent_modification&) {
@@ -5760,33 +5860,19 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
}
slogger.info("raft topology: removenode: wait for completion");
bool left = false;
co_await _topology_state_machine.event.when([this, id, &left] {
// Wait for this node to move to state left which means that removenode completed
// or wait for removenode request to be removed, but node stay as normal which means removenode failed
auto it = _topology_state_machine._topology.find(id);
if (!it) {
left = true;
return true; // node either left or on the way
}
if (it->second.state == node_state::normal) {
auto rit = _topology_state_machine._topology.requests.find(id);
if (rit == _topology_state_machine._topology.requests.end() || rit->second != topology_request::remove) {
return true; // node is normal, but remove request is gone. It means removenode failed
}
}
return false;
});
if (left) {
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (error.empty()) {
try {
co_await _group0->remove_from_raft_config(id);
} catch (raft::not_a_member&) {
slogger.info("raft topology removenode: already removed from the raft config by the topology coordinator");
}
} else {
constexpr auto err = "Removenode failed. See earlier errors";
slogger.error(err);
auto err = fmt::format("Removenode failed. See earlier errors ({})", error);
slogger.error("{}", err);
throw std::runtime_error(err);
}
}
@@ -6379,8 +6465,21 @@ future<> storage_service::do_cluster_cleanup() {
slogger.info("raft topology: cluster cleanup done");
}
future<sstring> storage_service::wait_for_topology_request_completion(utils::UUID id) {
while (true) {
auto [done, error] = co_await _sys_ks.local().get_topology_request_state(id);
if (done) {
co_return error;
}
co_await _topology_state_machine.event.when();
}
co_return sstring();
}
future<> storage_service::raft_rebuild(sstring source_dc) {
auto& raft_server = _group0->group0_server();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
@@ -6404,10 +6503,16 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("topology_request", topology_request::rebuild)
.set("rebuild_option", source_dc);
topology_change change{{builder.build()}};
.set("rebuild_option", source_dc)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id());
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
} catch (group0_concurrent_modification&) {
@@ -6417,10 +6522,11 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
break;
}
// Wait until rebuild completes. We know it completes when the request parameter is empty
co_await _topology_state_machine.event.when([this, &raft_server] {
return !_topology_state_machine._topology.req_param.contains(raft_server.id());
});
// Wait until request completes
auto err = co_await wait_for_topology_request_completion(request_id);
if (!err.empty()) {
throw std::runtime_error(::format("rebuild failed: {}", err));
}
}
future<> storage_service::raft_check_and_repair_cdc_streams() {
@@ -7225,13 +7331,6 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
}
break;
case raft_topology_cmd::command::shutdown:
if (_shutdown_request_promise) {
std::exchange(_shutdown_request_promise, std::nullopt)->set_value();
} else {
slogger.warn("raft topology: got shutdown request while not decommissioning");
}
break;
case raft_topology_cmd::command::wait_for_ip: {
std::vector<raft::server_id> ids;
{
@@ -7893,9 +7992,25 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled)
});
}
std::vector<canonical_mutation> topology_requests_mutations;
{
// FIXME: make it an rwlock, here we only need to lock for reads,
// might be useful if multiple nodes are trying to pull concurrently.
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
auto rs = co_await db::system_keyspace::query_mutations(
ss._db, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
topology_requests_mutations.reserve(rs->partitions().size());
boost::range::transform(
rs->partitions(), std::back_inserter(topology_requests_mutations), [s] (const partition& p) {
return canonical_mutation{p.mut().unfreeze(s)};
});
}
co_return raft_topology_snapshot{
.topology_mutations = std::move(topology_mutations),
.cdc_generation_mutations = std::move(cdc_generation_mutations),
.topology_requests_mutations = std::move(topology_requests_mutations),
};
});
});

View File

@@ -751,8 +751,6 @@ private:
std::optional<shared_future<>> _rebuild_result;
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
tablet_op_registry _tablet_ops;
// During decommission, the node waits for the coordinator to tell it to shut down.
std::optional<promise<>> _shutdown_request_promise;
struct {
raft::term_t term{0};
uint64_t last_index{0};
@@ -799,7 +797,7 @@ private:
// raft_group0_client::_read_apply_mutex must be held
future<> merge_topology_snapshot(raft_topology_snapshot snp);
canonical_mutation build_mutation_from_join_params(const join_node_request_params& params, service::group0_guard& guard);
std::vector<canonical_mutation> build_mutation_from_join_params(const join_node_request_params& params, service::group0_guard& guard);
future<join_node_request_result> join_node_request_handler(join_node_request_params params);
future<join_node_response_result> join_node_response_handler(join_node_response_params params);
@@ -810,6 +808,9 @@ private:
future<> _sstable_cleanup_fiber = make_ready_future<>();
future<> sstable_cleanup_fiber(raft::server& raft, sharded<service::storage_proxy>& proxy) noexcept;
// Waits for a topology request with a given ID to complete and return non empty error string
// if request completes with an error
future<sstring> wait_for_topology_request_completion(utils::UUID id);
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
abort_source _group0_as;

View File

@@ -209,9 +209,6 @@ std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd
case raft_topology_cmd::command::stream_ranges:
os << "stream_ranges";
break;
case raft_topology_cmd::command::shutdown:
os << "shutdown";
break;
case raft_topology_cmd::command::wait_for_ip:
os << "wait_for_ip";
break;

View File

@@ -94,6 +94,7 @@ struct replica_state {
uint8_t ignore_msb;
std::set<sstring> supported_features;
cleanup_status cleanup;
utils::UUID request_id; // id of the current request for the node or the last one if no current one exists
};
struct topology_features {
@@ -186,6 +187,9 @@ struct raft_topology_snapshot {
// Mutations for system.cdc_generations_v3, contains all the CDC generation data.
std::vector<canonical_mutation> cdc_generation_mutations;
// Mutations for system.topology_requests table
std::vector<canonical_mutation> topology_requests_mutations;
};
struct raft_topology_pull_params {
@@ -205,7 +209,6 @@ struct raft_topology_cmd {
barrier_and_drain, // same + drain requests which use previous versions
stream_ranges, // request to stream data, return when streaming is
// done
shutdown, // a decommissioning node should shut down
wait_for_ip // wait for a joining node IP to appear in raft_address_map
};
command cmd;
@@ -234,6 +237,11 @@ struct fencing_token {
}
};
struct topology_request_state {
bool done;
sstring error;
};
std::ostream& operator<<(std::ostream& os, const fencing_token& fencing_token);
std::ostream& operator<<(std::ostream& os, topology::transition_state s);
topology::transition_state transition_state_from_string(const sstring& s);