storage_service: Advance tablets independently

This change makes the topology state machine advance each tablet
independently which allows them to finish migrations at different
speeds, not at the speed of the slowest tablet.

It will also open the possibility of starting new transitions concurrently
with already active ones.

This is implemented by having a single transition state "tablet
migration", and handling it by scanning all the transitions and
advancing tablet state machines. Updates and barriers are batched for
all tablets in each cycle.

One complication is the tracking of streaming sessions. The operations
are no longer nested in the scope of a single handle method, and
cannot be waited on explicitly, as that would inhibit progress of the
coordinator, which starts later migrations. They live as independent
fibers, which associated with tablets in a transient data structure
which lives within the coordinator instance. This data structure is
consulted for a given tablet in each cycle of the
handle_tablet_migration() pump to check if streaming has finished and
we can move the tablet to the next stage. If the pump has no work,
only then it waits for any streaming to finish by blocking on the
_topo_sm.event.
This commit is contained in:
Tomasz Grabiec
2023-07-16 23:23:57 +02:00
parent 2811b1df0a
commit 889f2ceb1e
3 changed files with 161 additions and 140 deletions

View File

@@ -388,17 +388,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
return read_new_t::no;
}
switch (*state) {
case topology::transition_state::tablet_allow_write_both_read_old:
[[fallthrough]];
case topology::transition_state::tablet_write_both_read_new:
[[fallthrough]];
case topology::transition_state::tablet_write_both_read_old:
[[fallthrough]];
case topology::transition_state::tablet_streaming:
[[fallthrough]];
case topology::transition_state::tablet_use_new:
[[fallthrough]];
case topology::transition_state::tablet_cleanup:
case topology::transition_state::tablet_migration:
[[fallthrough]];
case topology::transition_state::commit_cdc_generation:
[[fallthrough]];
@@ -1181,116 +1171,170 @@ class topology_coordinator {
co_return std::move(guard);
}
future<> set_tablet_transition_stage(std::vector<canonical_mutation>& out, group0_guard& guard,
locator::tablet_transition_stage stage) {
// Represents a two-state state machine which changes monotonically
// from "not executed" to "executed successfully". This state
// machine is transient, lives only on this coordinator.
// The transition is achieved by execution of an idempotent async
// operation which is tracked by a future. Even though the async
// action is idempotent, it is costly, so we want to avoid
// re-executing it if it was already started by this coordinator,
// that's why we track it.
using background_action_holder = std::optional<future<>>;
// Transient state of tablet migration which lives on this coordinator.
// It is guaranteed to die when migration is finished.
// Next migration of the same tablet is guaranteed to use a different instance.
struct tablet_migration_state {
background_action_holder streaming;
};
std::unordered_map<locator::global_tablet_id, tablet_migration_state> _tablets;
// Set to true when any action started on behalf of a background_action_holder
// for any tablet finishes, or fails and needs to be restarted.
bool _tablets_ready = false;
seastar::gate _async_gate;
// This function drives background_action_holder towards "executed successfully"
// by starting the action if it is not already running or if the previous instance
// of the action failed. If the action is already running, it does nothing.
// Returns true iff background_action_holder reached the "executed successfully" state.
bool advance_in_background(locator::global_tablet_id gid, background_action_holder& holder, const char* name,
std::function<future<>()> action) {
if (!holder || holder->failed()) {
holder = futurize_invoke(action)
.finally([this, g = _async_gate.hold(), gid, name] () noexcept {
slogger.trace("raft topology: {} for tablet {} resolved.", name, gid);
_tablets_ready = true;
_topo_sm.event.broadcast();
});
return false;
}
if (!holder->available()) {
slogger.trace("raft topology: Tablet {} still doing {}", gid, name);
return false;
}
return true;
}
future<> for_each_tablet_transition(std::function<void(const locator::tablet_map&,
schema_ptr,
locator::global_tablet_id,
const locator::tablet_transition_info&)> func) {
auto tm = get_token_metadata_ptr();
for (auto&& [table, tmap] : tm->tablets().all_tables()) {
co_await coroutine::maybe_yield();
auto s = _db.find_schema(table);
for (auto&& [tablet, trinfo] : tmap.transitions()) {
for (auto&& [tablet, trinfo]: tmap.transitions()) {
co_await coroutine::maybe_yield();
auto last_token = tmap.get_last_token(tablet);
out.emplace_back(
auto gid = locator::global_tablet_id {table, tablet};
func(tmap, s, gid, trinfo);
}
}
}
future<> handle_tablet_migration(group0_guard guard) {
// This step acts like a pump which advances state machines of individual tablets,
// batching barriers and group0 updates.
// If progress cannot be made, e.g. because all transitions are streaming, we block
// and wait for notification.
slogger.trace("raft topology: handle_tablet_migration()");
std::vector<canonical_mutation> updates;
bool needs_barrier = false;
bool has_transitions = false;
_tablets_ready = false;
co_await for_each_tablet_transition([&] (const locator::tablet_map& tmap,
schema_ptr s,
locator::global_tablet_id gid,
const locator::tablet_transition_info& trinfo) {
has_transitions = true;
auto last_token = tmap.get_last_token(gid.tablet);
auto& tablet_state = _tablets[gid];
table_id table = s->id();
auto transition_to = [&] (locator::tablet_transition_stage stage) {
slogger.trace("raft topology: Will set tablet {} stage to {}", gid, stage);
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
.set_stage(last_token, stage)
.build());
};
auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) {
needs_barrier = true;
transition_to(stage);
};
switch (trinfo.stage) {
case locator::tablet_transition_stage::allow_write_both_read_old:
transition_to_with_barrier(locator::tablet_transition_stage::write_both_read_old);
break;
case locator::tablet_transition_stage::write_both_read_old:
transition_to_with_barrier(locator::tablet_transition_stage::streaming);
break;
// The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't
// get admitted before global_tablet_token_metadata_barrier() is finished for earlier
// stage in case of coordinator failover.
case locator::tablet_transition_stage::streaming:
if (advance_in_background(gid, tablet_state.streaming, "streaming", [&] {
slogger.info("raft topology: Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica);
auto dst = trinfo.pending_replica.host;
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
netw::msg_addr(id2ip(dst)), _as, gid);
})) {
transition_to(locator::tablet_transition_stage::write_both_read_new);
}
break;
case locator::tablet_transition_stage::write_both_read_new:
transition_to_with_barrier(locator::tablet_transition_stage::use_new);
break;
case locator::tablet_transition_stage::use_new:
transition_to_with_barrier(locator::tablet_transition_stage::cleanup);
break;
case locator::tablet_transition_stage::cleanup:
// FIXME: Actually perform the cleanup. Block on integration with compaction groups.
_tablets.erase(gid);
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
.del_transition(last_token)
.set_replicas(last_token, trinfo.next)
.build());
break;
}
}
}
});
future<> del_tablet_transitions(std::vector<canonical_mutation>& out, group0_guard& guard) {
auto tm = get_token_metadata_ptr();
for (auto&& [table, tmap] : tm->tablets().all_tables()) {
co_await coroutine::maybe_yield();
auto s = _db.find_schema(table);
for (auto&& [tablet, trinfo] : tmap.transitions()) {
co_await coroutine::maybe_yield();
auto last_token = tmap.get_last_token(tablet);
out.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
.del_transition(last_token)
.set_replicas(last_token, trinfo.next)
.build());
}
}
}
future<> transition_tablets_to_stage(group0_guard guard, locator::tablet_transition_stage stage, topology::transition_state topo_state) {
std::vector<canonical_mutation> updates;
co_await set_tablet_transition_stage(updates, guard, stage);
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topo_state)
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), format("Moved tablet migration to stage: {}", stage));
}
future<> handle_tablet_allow_write_both_read_old(group0_guard guard) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::write_both_read_old,
topology::transition_state::tablet_write_both_read_old);
}
future<> handle_tablet_write_both_read_old(group0_guard guard) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::streaming,
topology::transition_state::tablet_streaming);
}
// The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't
// get admitted before global_tablet_token_metadata_barrier() is finished in
// handle_tablet_write_both_read_old().
future<> handle_tablet_streaming(group0_guard guard) {
std::vector<future<>> calls;
auto tm = get_token_metadata_ptr();
for (auto&& [table, tmap] : tm->tablets().all_tables()) {
co_await coroutine::maybe_yield();
for (auto&& [tablet, trinfo] : tmap.transitions()) {
co_await coroutine::maybe_yield();
auto gid = locator::global_tablet_id{table, tablet};
auto dst = trinfo.pending_replica.host;
slogger.info("Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica);
calls.emplace_back(futurize_invoke([&] {
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
netw::msg_addr(id2ip(dst)), _as, gid);
}).handle_exception([gid, dst](auto ep) {
slogger.error("Failed to stream tablet {} to {}: {}", gid, dst, ep);
std::rethrow_exception(ep);
}));
}
if (needs_barrier) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
}
release_guard(std::move(guard));
co_await seastar::when_all_succeed(calls.begin(), calls.end());
// It's ok to execute planned updates after retaking the guard because as long
// as topology is in tablet_migration state only this coordinator has a right
// to advance the state machine of tablets.
guard = co_await start_operation();
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::write_both_read_new,
topology::transition_state::tablet_write_both_read_new);
}
if (!updates.empty()) {
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), format("Tablet migration"));
co_return;
}
future<> handle_tablet_write_both_read_new(group0_guard guard) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::use_new,
topology::transition_state::tablet_use_new);
}
if (has_transitions) {
// Streaming may have finished after we checked. To avoid missed notification, we need
// to check atomically with event.wait()
if (!_tablets_ready) {
slogger.trace("raft topology: Going to sleep with active tablet transitions");
co_await await_event();
}
co_return;
}
future<> handle_tablet_use_new(group0_guard guard) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::cleanup,
topology::transition_state::tablet_cleanup);
}
future<> handle_tablet_cleanup(group0_guard guard) {
// FIXME: Actually perform cleanup
std::vector<canonical_mutation> updates;
co_await del_tablet_transitions(updates, guard);
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.del_transition_state()
@@ -1531,24 +1575,8 @@ class topology_coordinator {
// Reads are fenced. We can now remove topology::transition_state and move node state to normal
}
break;
case topology::transition_state::tablet_allow_write_both_read_old:
co_await handle_tablet_allow_write_both_read_old(std::move(guard));
break;
case topology::transition_state::tablet_write_both_read_old:
co_await handle_tablet_write_both_read_old(std::move(guard));
break;
case topology::transition_state::tablet_streaming:
co_await handle_tablet_streaming(std::move(guard));
break;
case topology::transition_state::tablet_write_both_read_new:
co_await handle_tablet_write_both_read_new(std::move(guard));
break;
case topology::transition_state::tablet_use_new:
co_await handle_tablet_use_new(std::move(guard));
break;
case topology::transition_state::tablet_cleanup:
co_await handle_tablet_cleanup(std::move(guard));
break;
case topology::transition_state::tablet_migration:
co_await handle_tablet_migration(std::move(guard));
}
co_return true;
};
@@ -1797,7 +1825,7 @@ future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard gua
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::tablet_allow_write_both_read_old)
.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topo_sm._topology.version + 1)
.build());
@@ -1838,10 +1866,15 @@ future<> topology_coordinator::run() {
sleep = true;
}
if (sleep) {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
try {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (...) {
slogger.debug("raft topology: sleep failed: {}", std::current_exception());
}
}
co_await coroutine::maybe_yield();
}
co_await _async_gate.close();
}
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, cdc::generation_service& cdc_gen_svc, sharded<db::system_distributed_keyspace>& sys_dist_ks) {

View File

@@ -45,12 +45,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
{topology::transition_state::publish_cdc_generation, "publish cdc generation"},
{topology::transition_state::write_both_read_old, "write both read old"},
{topology::transition_state::write_both_read_new, "write both read new"},
{topology::transition_state::tablet_allow_write_both_read_old, "tablet allow write both read old"},
{topology::transition_state::tablet_write_both_read_old, "tablet write both read old"},
{topology::transition_state::tablet_write_both_read_new, "tablet write both read new"},
{topology::transition_state::tablet_streaming, "tablet streaming"},
{topology::transition_state::tablet_use_new, "tablet use new"},
{topology::transition_state::tablet_cleanup, "tablet cleanup"},
{topology::transition_state::tablet_migration, "tablet migration"},
};
std::ostream& operator<<(std::ostream& os, topology::transition_state s) {

View File

@@ -73,14 +73,7 @@ struct topology {
publish_cdc_generation,
write_both_read_old,
write_both_read_new,
// Tablet migration steps
tablet_allow_write_both_read_old,
tablet_write_both_read_old,
tablet_write_both_read_new,
tablet_streaming,
tablet_use_new,
tablet_cleanup,
tablet_migration,
};
std::optional<transition_state> tstate;