tablets: Store "stage" in transition info
It's needed to implement tablet migration. It stores the current step of tablet migration state machine. The state machine will be advanced by the topology change coordinator. See the "Tablet migration" section of topology-over-raft.md
This commit is contained in:
@@ -135,6 +135,36 @@ const tablet_transition_info* tablet_map::get_tablet_transition_info(tablet_id i
|
||||
return &i->second;
|
||||
}
|
||||
|
||||
// The names are persisted in system tables so should not be changed.
|
||||
static const std::unordered_map<tablet_transition_stage, sstring> tablet_transition_stage_to_name = {
|
||||
{tablet_transition_stage::allow_write_both_read_old, "allow_write_both_read_old"},
|
||||
{tablet_transition_stage::write_both_read_old, "write_both_read_old"},
|
||||
{tablet_transition_stage::write_both_read_new, "write_both_read_new"},
|
||||
{tablet_transition_stage::streaming, "streaming"},
|
||||
{tablet_transition_stage::use_new, "use_new"},
|
||||
{tablet_transition_stage::cleanup, "cleanup"},
|
||||
};
|
||||
|
||||
static const std::unordered_map<sstring, tablet_transition_stage> tablet_transition_stage_from_name = std::invoke([] {
|
||||
std::unordered_map<sstring, tablet_transition_stage> result;
|
||||
for (auto&& [v, s] : tablet_transition_stage_to_name) {
|
||||
result.emplace(s, v);
|
||||
}
|
||||
return result;
|
||||
});
|
||||
|
||||
sstring tablet_transition_stage_to_string(tablet_transition_stage stage) {
|
||||
auto i = tablet_transition_stage_to_name.find(stage);
|
||||
if (i == tablet_transition_stage_to_name.end()) {
|
||||
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
|
||||
}
|
||||
return i->second;
|
||||
}
|
||||
|
||||
tablet_transition_stage tablet_transition_stage_from_string(const sstring& name) {
|
||||
return tablet_transition_stage_from_name.at(name);
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, tablet_id id) {
|
||||
return out << size_t(id);
|
||||
}
|
||||
@@ -156,7 +186,7 @@ std::ostream& operator<<(std::ostream& out, const tablet_map& r) {
|
||||
}
|
||||
out << format("\n [{}]: last_token={}, replicas={}", tid, r.get_last_token(tid), tablet.replicas);
|
||||
if (auto tr = r.get_tablet_transition_info(tid)) {
|
||||
out << format(", new_replicas={}, pending={}", tr->next, tr->pending_replica);
|
||||
out << format(", stage={}, new_replicas={}, pending={}", tr->stage, tr->next, tr->pending_replica);
|
||||
}
|
||||
first = false;
|
||||
tid = *r.next_tablet(tid);
|
||||
@@ -352,3 +382,8 @@ effective_replication_map_ptr tablet_aware_replication_strategy::do_make_replica
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
auto fmt::formatter<locator::tablet_transition_stage>::format(const locator::tablet_transition_stage& stage, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "{}", locator::tablet_transition_stage_to_string(stage));
|
||||
}
|
||||
|
||||
@@ -55,9 +55,35 @@ struct tablet_info {
|
||||
bool operator==(const tablet_info&) const = default;
|
||||
};
|
||||
|
||||
/// Represents states of the tablet migration state machine.
|
||||
///
|
||||
/// The stage serves two major purposes:
|
||||
///
|
||||
/// Firstly, it determines which action should be taken by the topology change coordinator on behalf
|
||||
/// of the tablet before it can move to the next step. When stage is advanced, it means that
|
||||
/// expected invariants about cluster-wide state relevant to the tablet, associated with the new stage, hold.
|
||||
///
|
||||
/// Also, stage affects which replicas are used by the coordinator for reads and writes.
|
||||
/// Replica selectors kept in tablet_transition_info::writes and tablet_transition_info::reads,
|
||||
/// are directly derived from the stage stored in group0.
|
||||
///
|
||||
/// See "Tablet migration" in docs/dev/topology-over-raft.md
|
||||
enum class tablet_transition_stage {
|
||||
allow_write_both_read_old,
|
||||
write_both_read_old,
|
||||
streaming,
|
||||
write_both_read_new,
|
||||
use_new,
|
||||
cleanup,
|
||||
};
|
||||
|
||||
sstring tablet_transition_stage_to_string(tablet_transition_stage);
|
||||
tablet_transition_stage tablet_transition_stage_from_string(const sstring&);
|
||||
|
||||
/// Used for storing tablet state transition during topology changes.
|
||||
/// Describes transition of a single tablet.
|
||||
struct tablet_transition_info {
|
||||
tablet_transition_stage stage;
|
||||
tablet_replica_set next;
|
||||
tablet_replica pending_replica; // Optimization (next - tablet_info::replicas)
|
||||
|
||||
@@ -232,3 +258,8 @@ struct hash<locator::tablet_replica> {
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<locator::tablet_transition_stage> : fmt::formatter<std::string_view> {
|
||||
auto format(const locator::tablet_transition_stage&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
@@ -75,6 +75,7 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke
|
||||
auto ck = clustering_key::from_single_value(*s, data_value(dht::token::to_int64(last_token)).serialize_nonnull());
|
||||
m.set_clustered_cell(ck, "replicas", make_set_value(replica_set_type, replicas_to_data_value(tablet.replicas)), ts);
|
||||
if (auto tr_info = tablets.get_tablet_transition_info(tid)) {
|
||||
m.set_clustered_cell(ck, "stage", tablet_transition_stage_to_string(tr_info->stage), ts);
|
||||
m.set_clustered_cell(ck, "new_replicas", make_set_value(replica_set_type, replicas_to_data_value(tr_info->next)), ts);
|
||||
}
|
||||
tid = *tablets.next_tablet(tid);
|
||||
@@ -153,7 +154,9 @@ future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
|
||||
new_tablet_replicas = deserialize_replica_set(row.get_view("new_replicas"));
|
||||
}
|
||||
|
||||
if (!new_tablet_replicas.empty()) {
|
||||
if (row.has("stage")) {
|
||||
auto stage = tablet_transition_stage_from_string(row.get_as<sstring>("stage"));
|
||||
|
||||
std::unordered_set<tablet_replica> pending(new_tablet_replicas.begin(), new_tablet_replicas.end());
|
||||
for (auto&& r : tablet_replicas) {
|
||||
pending.erase(r);
|
||||
@@ -162,7 +165,7 @@ future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
|
||||
throw std::runtime_error(format("Too many pending replicas for table {} tablet {}: {}",
|
||||
table, current->tid, pending));
|
||||
}
|
||||
current->map.set_tablet_transition_info(current->tid, tablet_transition_info{
|
||||
current->map.set_tablet_transition_info(current->tid, tablet_transition_info{stage,
|
||||
std::move(new_tablet_replicas), *pending.begin()});
|
||||
}
|
||||
|
||||
|
||||
@@ -124,6 +124,7 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
|
||||
tb = *tmap.next_tablet(tb);
|
||||
|
||||
tmap.set_tablet_transition_info(tb, tablet_transition_info{
|
||||
tablet_transition_stage::allow_write_both_read_old,
|
||||
tablet_replica_set {
|
||||
tablet_replica {h3, 3},
|
||||
tablet_replica {h1, 7},
|
||||
@@ -133,6 +134,7 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
|
||||
|
||||
tb = *tmap.next_tablet(tb);
|
||||
tmap.set_tablet_transition_info(tb, tablet_transition_info{
|
||||
tablet_transition_stage::use_new,
|
||||
tablet_replica_set {
|
||||
tablet_replica {h1, 4},
|
||||
tablet_replica {h2, 2},
|
||||
@@ -243,6 +245,7 @@ SEASTAR_TEST_CASE(test_get_shard) {
|
||||
}
|
||||
});
|
||||
tmap.set_tablet_transition_info(tid, tablet_transition_info {
|
||||
tablet_transition_stage::allow_write_both_read_old,
|
||||
tablet_replica_set {
|
||||
tablet_replica {h1, 0},
|
||||
tablet_replica {h2, 3},
|
||||
@@ -307,6 +310,7 @@ SEASTAR_TEST_CASE(test_sharder) {
|
||||
}
|
||||
});
|
||||
tmap.set_tablet_transition_info(tid, tablet_transition_info {
|
||||
tablet_transition_stage::use_new,
|
||||
tablet_replica_set {
|
||||
tablet_replica {h1, 1},
|
||||
tablet_replica {h2, 3},
|
||||
|
||||
Reference in New Issue
Block a user