|
|
|
|
@@ -1008,6 +1008,16 @@ class topology_coordinator {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<node_to_work_on> global_token_metadata_barrier(node_to_work_on&& node) {
|
|
|
|
|
node = co_await exec_global_command(std::move(node),
|
|
|
|
|
raft_topology_cmd { raft_topology_cmd::command::barrier_and_drain },
|
|
|
|
|
true);
|
|
|
|
|
node = co_await exec_global_command(std::move(node),
|
|
|
|
|
raft_topology_cmd { raft_topology_cmd::command::fence },
|
|
|
|
|
true);
|
|
|
|
|
co_return std::move(node);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Returns `true` iff there was work to do.
|
|
|
|
|
future<bool> handle_topology_transition(group0_guard guard) {
|
|
|
|
|
auto tstate = _topo_sm._topology.tstate;
|
|
|
|
|
@@ -1097,7 +1107,8 @@ class topology_coordinator {
|
|
|
|
|
// majority and commit.
|
|
|
|
|
topology_mutation_builder builder(guard.write_timestamp());
|
|
|
|
|
builder.set_transition_state(topology::transition_state::publish_cdc_generation)
|
|
|
|
|
.set_current_cdc_generation_id(cdc_gen_id);
|
|
|
|
|
.set_current_cdc_generation_id(cdc_gen_id)
|
|
|
|
|
.set_version(_topo_sm._topology.version + 1);
|
|
|
|
|
auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id);
|
|
|
|
|
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
|
|
|
|
|
}
|
|
|
|
|
@@ -1126,11 +1137,10 @@ class topology_coordinator {
|
|
|
|
|
|
|
|
|
|
// make sure all nodes know about new topology (we require all nodes to be alive for topo change for now)
|
|
|
|
|
{
|
|
|
|
|
auto f = co_await coroutine::as_future(exec_global_command(
|
|
|
|
|
std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false));
|
|
|
|
|
auto f = co_await coroutine::as_future(global_token_metadata_barrier(std::move(node)));
|
|
|
|
|
if (f.failed()) {
|
|
|
|
|
slogger.error("raft topology: transition_state::write_both_read_old, "
|
|
|
|
|
"raft_topology_cmd::command::barrier failed, error {}",
|
|
|
|
|
"global_token_metadata_barrier failed, error {}",
|
|
|
|
|
f.get_exception());
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@@ -1160,7 +1170,9 @@ class topology_coordinator {
|
|
|
|
|
}
|
|
|
|
|
// Streaming completed. We can now move tokens state to topology::transition_state::write_both_read_new
|
|
|
|
|
topology_mutation_builder builder(node.guard.write_timestamp());
|
|
|
|
|
builder.set_transition_state(topology::transition_state::write_both_read_new);
|
|
|
|
|
builder
|
|
|
|
|
.set_transition_state(topology::transition_state::write_both_read_new)
|
|
|
|
|
.set_version(_topo_sm._topology.version + 1);
|
|
|
|
|
auto str = ::format("{}: streaming completed for node {}", node.rs->state, node.id);
|
|
|
|
|
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
|
|
|
|
|
}
|
|
|
|
|
@@ -1171,11 +1183,10 @@ class topology_coordinator {
|
|
|
|
|
// In this state writes goes to old and new replicas but reads start to be done from new replicas
|
|
|
|
|
// Before we stop writing to old replicas we need to wait for all previous reads to complete
|
|
|
|
|
{
|
|
|
|
|
auto f = co_await coroutine::as_future(exec_global_command(
|
|
|
|
|
std::move(node), raft_topology_cmd{raft_topology_cmd::command::fence_old_reads}, true));
|
|
|
|
|
auto f = co_await coroutine::as_future(global_token_metadata_barrier(std::move(node)));
|
|
|
|
|
if (f.failed()) {
|
|
|
|
|
slogger.error("raft topology: transition_state::write_both_read_new, "
|
|
|
|
|
"raft_topology_cmd::command::fence_old_reads failed, error {}",
|
|
|
|
|
"global_token_metadata_barrier failed, error {}",
|
|
|
|
|
f.get_exception());
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@@ -1185,6 +1196,7 @@ class topology_coordinator {
|
|
|
|
|
case node_state::bootstrapping: {
|
|
|
|
|
topology_mutation_builder builder(node.guard.write_timestamp());
|
|
|
|
|
builder.del_transition_state()
|
|
|
|
|
.set_version(_topo_sm._topology.version + 1)
|
|
|
|
|
.with_node(node.id)
|
|
|
|
|
.set("node_state", node_state::normal);
|
|
|
|
|
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
|
|
|
|
|
@@ -1195,6 +1207,7 @@ class topology_coordinator {
|
|
|
|
|
case node_state::removing: {
|
|
|
|
|
topology_mutation_builder builder(node.guard.write_timestamp());
|
|
|
|
|
builder.del_transition_state()
|
|
|
|
|
.set_version(_topo_sm._topology.version + 1)
|
|
|
|
|
.with_node(node.id)
|
|
|
|
|
.del("tokens")
|
|
|
|
|
.set("node_state", node_state::left);
|
|
|
|
|
@@ -1206,6 +1219,7 @@ class topology_coordinator {
|
|
|
|
|
topology_mutation_builder builder1(node.guard.write_timestamp());
|
|
|
|
|
// Move new node to 'normal'
|
|
|
|
|
builder1.del_transition_state()
|
|
|
|
|
.set_version(_topo_sm._topology.version + 1)
|
|
|
|
|
.with_node(node.id)
|
|
|
|
|
.set("node_state", node_state::normal);
|
|
|
|
|
|
|
|
|
|
@@ -1275,6 +1289,7 @@ class topology_coordinator {
|
|
|
|
|
// meaning that reads will go to the replica being decommissioned
|
|
|
|
|
// but writes will go to new owner as well
|
|
|
|
|
builder.set_transition_state(topology::transition_state::write_both_read_old)
|
|
|
|
|
.set_version(_topo_sm._topology.version + 1)
|
|
|
|
|
.with_node(node.id)
|
|
|
|
|
.set("node_state", node_state::decommissioning)
|
|
|
|
|
.del("topology_request");
|
|
|
|
|
@@ -1287,6 +1302,7 @@ class topology_coordinator {
|
|
|
|
|
// meaning that reads will go to the replica being removed (it is dead though)
|
|
|
|
|
// but writes will go to new owner as well
|
|
|
|
|
builder.set_transition_state(topology::transition_state::write_both_read_old)
|
|
|
|
|
.set_version(_topo_sm._topology.version + 1)
|
|
|
|
|
.with_node(node.id)
|
|
|
|
|
.set("node_state", node_state::removing)
|
|
|
|
|
.del("topology_request");
|
|
|
|
|
@@ -1303,6 +1319,7 @@ class topology_coordinator {
|
|
|
|
|
// and put them into write_both_read_old state meaning that reads will go
|
|
|
|
|
// to the replica being removed (it is dead though) but writes will go to new owner as well
|
|
|
|
|
builder.set_transition_state(topology::transition_state::write_both_read_old)
|
|
|
|
|
.set_version(_topo_sm._topology.version + 1)
|
|
|
|
|
.with_node(node.id)
|
|
|
|
|
.set("node_state", node_state::replacing)
|
|
|
|
|
.del("topology_request")
|
|
|
|
|
@@ -5111,12 +5128,6 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
case raft_topology_cmd::command::fence_old_reads:
|
|
|
|
|
// We need to make sure all reads that used old topology are completed
|
|
|
|
|
// The simplest way to do it for now is to sleep for read timeout
|
|
|
|
|
//co_await sleep_abortable(_db.local().get_config().read_request_timeout_in_ms() * std::chrono::milliseconds(1), _abort_source);
|
|
|
|
|
result.status = raft_topology_cmd_result::command_status::success;
|
|
|
|
|
break;
|
|
|
|
|
case raft_topology_cmd::command::fence: {
|
|
|
|
|
// We can have several concurrent fence commands in case topology change
|
|
|
|
|
// coordinator migrated to another node. The update_fence_version function
|
|
|
|
|
|