Merge 'Fix node replace with tablets for RF=N' from Tomasz Grabiec
This PR fixes a problem with replacing a node with tablets when RF=N. Currently, this will fail because tablet replica allocation for rebuild will not be able to find a viable destination, as the replacing node is not considered to be a candidate. It cannot be a candidate because replace rolls back on failure and we cannot roll back after tablets were migrated. The solution taken here is to not drain tablet replicas from replaced node during topology request but leave it to happen later after the replaced node is in left state and replacing node is in normal state. The replacing node waits for this draining to be complete on boot before the node is considered booted. Fixes https://github.com/scylladb/scylladb/issues/17025 Nodes in the left state will be kept in tablet replica sets for a while after node replace is done, until the new replica is rebuilt. So we need to know about those node's location (dc, rack) for two reasons: 1) algorithms which work with replica sets filter nodes based on their location. For example materialized views code which pairs base replicas with view replicas filters by datacenter first. 2) tablet scheduler needs to identify each node's location in order to make decisions about new replica placement. It's ok to not know the IP, and we don't keep it. Those nodes will not be present in the IP-based replica sets, e.g. those returned by get_natural_endpoints(), only in host_id-based replica sets. storage_proxy request coordination is not affected. Nodes in the left state are still not present in token ring, and not considered to be members of the ring (datacanter endpoints excludes them). In the future we could make the change even more transparent by only loading locator::node* for those nodes and keeping node* in tablet replica sets. Currently left nodes are never removed from topology, so will accumulate in memory. We could garbage-collect them from topology coordinator if a left node is absent in any replica set. That means we need a new state - left_for_real. Closes scylladb/scylladb#17388 * github.com:scylladb/scylladb: test: py: Add test for view replica pairing after replace raft, api: Add RESTful API to query current leader of a raft group test: test_tablets_removenode: Verify replacing when there is no spare node doc: topology-on-raft: Document replace behavior with tablets tablets, raft topology: Rebuild tablets after replacing node is normal tablets: load_balancer: Access node attributes via node struct tablets: load_balancer: Extract ensure_node() mv: Switch to using host_id-based replica set effective_replication_map: Introduce host_id-based get_replicas() raft topology: Keep nodes in the left state to topology tablets: Introduce read_required_hosts()
This commit is contained in:
@@ -38,6 +38,30 @@
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/raft/leader_host",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Returns host ID of the current leader of the given Raft group",
|
||||
"type":"string",
|
||||
"nickname":"get_leader_host",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"group_id",
|
||||
"description":"The ID of the group. When absent, group0 is used.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
14
api/raft.cc
14
api/raft.cc
@@ -60,10 +60,24 @@ void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_regis
|
||||
|
||||
co_return json_void{};
|
||||
});
|
||||
r::get_leader_host.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
|
||||
return smp::submit_to(0, [&] {
|
||||
auto& srv = std::invoke([&] () -> raft::server& {
|
||||
if (req->query_parameters.contains("group_id")) {
|
||||
raft::group_id id{utils::UUID{req->get_query_param("group_id")}};
|
||||
return raft_gr.local().get_server(id);
|
||||
} else {
|
||||
return raft_gr.local().group0();
|
||||
}
|
||||
});
|
||||
return json_return_type(srv.current_leader().to_sstring());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void unset_raft(http_context&, httpd::routes& r) {
|
||||
r::trigger_snapshot.unset(r);
|
||||
r::get_leader_host.unset(r);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2677,7 +2677,7 @@ static std::set<sstring> decode_features(const set_type_impl::native_type& featu
|
||||
return fset;
|
||||
}
|
||||
|
||||
future<service::topology> system_keyspace::load_topology_state() {
|
||||
future<service::topology> system_keyspace::load_topology_state(const std::unordered_set<locator::host_id>& force_load_hosts) {
|
||||
auto rs = co_await execute_cql(
|
||||
format("SELECT * FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY));
|
||||
assert(rs);
|
||||
@@ -2793,6 +2793,9 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
}
|
||||
} else if (nstate == service::node_state::left) {
|
||||
ret.left_nodes.emplace(host_id);
|
||||
if (force_load_hosts.contains(locator::host_id(host_id.uuid()))) {
|
||||
map = &ret.left_nodes_rs;
|
||||
}
|
||||
} else if (nstate == service::node_state::none) {
|
||||
map = &ret.new_nodes;
|
||||
} else {
|
||||
|
||||
@@ -535,7 +535,9 @@ public:
|
||||
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.
|
||||
future<bool> group0_history_contains(utils::UUID state_id);
|
||||
|
||||
future<service::topology> load_topology_state();
|
||||
// force_load_hosts is a set of hosts which must be loaded even if they are in the left state.
|
||||
future<service::topology> load_topology_state(const std::unordered_set<locator::host_id>& force_load_hosts);
|
||||
|
||||
future<std::optional<service::topology_features>> load_topology_features_state();
|
||||
|
||||
// Read CDC generation data with the given UUID as key.
|
||||
|
||||
@@ -1611,21 +1611,24 @@ get_view_natural_endpoint(
|
||||
const dht::token& view_token,
|
||||
bool use_legacy_self_pairing) {
|
||||
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
|
||||
auto my_address = topology.my_address();
|
||||
auto me = topology.my_host_id();
|
||||
auto my_datacenter = topology.get_datacenter();
|
||||
std::vector<gms::inet_address> base_endpoints, view_endpoints;
|
||||
for (auto&& base_endpoint : base_erm->get_natural_endpoints(base_token)) {
|
||||
std::vector<locator::host_id> base_endpoints, view_endpoints;
|
||||
|
||||
// We need to use get_replicas() for pairing to be stable in case base or view tablet
|
||||
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
|
||||
for (auto&& base_endpoint : base_erm->get_replicas(base_token)) {
|
||||
if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) {
|
||||
base_endpoints.push_back(base_endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto&& view_endpoint : view_erm->get_natural_endpoints(view_token)) {
|
||||
for (auto&& view_endpoint : view_erm->get_replicas(view_token)) {
|
||||
if (use_legacy_self_pairing) {
|
||||
// If this base replica is also one of the view replicas, we use
|
||||
// ourselves as the view replica.
|
||||
if (view_endpoint == my_address) {
|
||||
return view_endpoint;
|
||||
if (view_endpoint == me) {
|
||||
return topology.my_address();
|
||||
}
|
||||
// We have to remove any endpoint which is shared between the base
|
||||
// and the view, as it will select itself and throw off the counts
|
||||
@@ -1645,14 +1648,15 @@ get_view_natural_endpoint(
|
||||
}
|
||||
|
||||
assert(base_endpoints.size() == view_endpoints.size());
|
||||
auto base_it = std::find(base_endpoints.begin(), base_endpoints.end(), my_address);
|
||||
auto base_it = std::find(base_endpoints.begin(), base_endpoints.end(), me);
|
||||
if (base_it == base_endpoints.end()) {
|
||||
// This node is not a base replica of this key, so we return empty
|
||||
// FIXME: This case shouldn't happen, and if it happens, a view update
|
||||
// would be lost. We should reported or count this case.
|
||||
return {};
|
||||
}
|
||||
return view_endpoints[base_it - base_endpoints.begin()];
|
||||
auto replica = view_endpoints[base_it - base_endpoints.begin()];
|
||||
return topology.get_node(replica).endpoint();
|
||||
}
|
||||
|
||||
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
|
||||
|
||||
@@ -15,6 +15,10 @@ Node state can be one of those:
|
||||
|
||||
Nodes in state left are never removed from the state.
|
||||
|
||||
Nodes in state `left` may still appear as tablet replicas in host_id-based replica sets
|
||||
(`effective_replication_map::get_replicas()`), but they never appear in IP-based replica sets, e.g. those returned by
|
||||
`effective_replication_map::get_natural_endpoints()`.
|
||||
|
||||
State transition diagram for nodes:
|
||||
```mermaid
|
||||
stateDiagram-v2
|
||||
@@ -114,11 +118,28 @@ that there are no tablet transitions in the system.
|
||||
Tablets are migrated in parallel and independently.
|
||||
|
||||
There is a variant of tablet migration track called tablet draining track, which is invoked
|
||||
as a step of certain topology operations (e.g. decommission, removenode, replace). Its goal is to readjust tablet replicas
|
||||
as a step of certain topology operations (e.g. decommission, removenode). Its goal is to readjust tablet replicas
|
||||
so that a given topology change can proceed. For example, when decommissioning a node, we
|
||||
need to migrate tablet replicas away from the node being decommissioned.
|
||||
Tablet draining happens before making changes to vnode-based replication.
|
||||
|
||||
## Node replace with tablets
|
||||
|
||||
Tablet replicas on the replaced node are rebuilt after the replacing node is already in the normal state and
|
||||
the replaced node is in the left state.
|
||||
|
||||
Until old replicas are rebuilt, the availability in the cluster is reduced. If another node becomes unavailable, we
|
||||
may have two unavailable replicas for some tablets. Admin needs to know that and not start rolling restart for example.
|
||||
To avoid surprises, the replaced node waits on boot for tablet replicas to finish rebuilding
|
||||
so that admin sees the replace as finished after availability was restored.
|
||||
|
||||
### Impact on repair
|
||||
|
||||
When tablet is rebuilt in the background after replace, its primary replica may be on the node which is no
|
||||
longer in topology. This means that running repair -pr on all nodes will not repair such a tablet, but it's fine because
|
||||
we decided that repair can be optimistic. It's safe with regards to tombstone gc because expiry is decided per table per token range
|
||||
based on actual repair time of that range. Unrepaired tablets will not have their token range marked as repaired.
|
||||
|
||||
# Tablet transitions
|
||||
|
||||
Tablets can undergo a process called "transition", which performs some maintenance action on the tablet which is
|
||||
|
||||
@@ -492,14 +492,24 @@ auto vnode_effective_replication_map::clone_data_gently() const -> future<std::u
|
||||
co_return std::move(result);
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set vnode_effective_replication_map::do_get_natural_endpoints(const token& tok,
|
||||
host_id_vector_replica_set vnode_effective_replication_map::do_get_replicas(const token& tok,
|
||||
bool is_vnode) const
|
||||
{
|
||||
const token& key_token = _rs->natural_endpoints_depend_on_token()
|
||||
? (is_vnode ? tok : _tmptr->first_token(tok))
|
||||
: default_replication_map_key;
|
||||
const auto it = _replication_map.find(key_token);
|
||||
return resolve_endpoints<inet_address_vector_replica_set>(it->second, *_tmptr);
|
||||
return it->second;
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set vnode_effective_replication_map::do_get_natural_endpoints(const token& tok,
|
||||
bool is_vnode) const
|
||||
{
|
||||
return resolve_endpoints<inet_address_vector_replica_set>(do_get_replicas(tok, is_vnode), *_tmptr);
|
||||
}
|
||||
|
||||
host_id_vector_replica_set vnode_effective_replication_map::get_replicas(const token& tok) const {
|
||||
return do_get_replicas(tok, false);
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const {
|
||||
|
||||
@@ -210,10 +210,17 @@ public:
|
||||
/// operation which adds a replica which has the same address as the replaced replica.
|
||||
/// Use get_natural_endpoints_without_node_being_replaced() to get replicas without any pending replicas.
|
||||
/// This won't be necessary after we implement https://github.com/scylladb/scylladb/issues/6403.
|
||||
///
|
||||
/// Excludes replicas which are in the left state. After replace, the replaced replica may
|
||||
/// still be in the replica set of the tablet until tablet scheduler rebuilds the replacing replica.
|
||||
/// The old replica will not be listed here. This is necessary to support replace-with-the-same-ip
|
||||
/// scenario. Since we return IPs here, writes to the old replica would be incorrectly routed to the
|
||||
/// new replica.
|
||||
///
|
||||
/// The returned addresses are present in the topology object associated with this instance.
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const = 0;
|
||||
|
||||
/// Returns addresses of replicas for a given token.
|
||||
/// Does not include pending replicas.
|
||||
/// Returns a subset of replicas returned by get_natural_endpoints() without the pending replica.
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const = 0;
|
||||
|
||||
/// Returns the set of pending replicas for a given token.
|
||||
@@ -224,6 +231,12 @@ public:
|
||||
/// Returns a list of nodes to which a read request should be directed.
|
||||
virtual inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const = 0;
|
||||
|
||||
/// Returns replicas for a given token.
|
||||
/// During topology change returns replicas which should be targets for writes, excluding the pending replica.
|
||||
/// Unlike get_natural_endpoints(), the replica set may include nodes in the left state which were
|
||||
/// replaced but not yet rebuilt.
|
||||
virtual host_id_vector_replica_set get_replicas(const token& search_token) const = 0;
|
||||
|
||||
virtual std::optional<tablet_routing_info> check_locality(const token& token) const = 0;
|
||||
|
||||
|
||||
@@ -311,6 +324,7 @@ public: // effective_replication_map
|
||||
inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override;
|
||||
inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override;
|
||||
inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const override;
|
||||
host_id_vector_replica_set get_replicas(const token& search_token) const override;
|
||||
std::optional<tablet_routing_info> check_locality(const token& token) const override;
|
||||
bool has_pending_ranges(locator::host_id endpoint) const override;
|
||||
std::unique_ptr<token_range_splitter> make_splitter() const override;
|
||||
@@ -371,6 +385,7 @@ public:
|
||||
private:
|
||||
dht::token_range_vector do_get_ranges(noncopyable_function<stop_iteration(bool& add_range, const inet_address& natural_endpoint)> consider_range_for_endpoint) const;
|
||||
inet_address_vector_replica_set do_get_natural_endpoints(const token& tok, bool is_vnode) const;
|
||||
host_id_vector_replica_set do_get_replicas(const token& tok, bool is_vnode) const;
|
||||
stop_iteration for_each_natural_endpoint_until(const token& vnode_tok, const noncopyable_function<stop_iteration(const inet_address&)>& func) const;
|
||||
|
||||
public:
|
||||
|
||||
@@ -481,6 +481,17 @@ size_t tablet_metadata::external_memory_usage() const {
|
||||
return result;
|
||||
}
|
||||
|
||||
bool tablet_metadata::has_replica_on(host_id host) const {
|
||||
for (auto&& [id, map] : _tablets) {
|
||||
for (auto&& tablet : map.tablet_ids()) {
|
||||
if (map.get_shard(tablet, host)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
future<bool> check_tablet_replica_shards(const tablet_metadata& tm, host_id this_host) {
|
||||
bool valid = true;
|
||||
for (const auto& [table_id, tmap] : tm.all_tables()) {
|
||||
@@ -506,27 +517,30 @@ private:
|
||||
inet_address_vector_replica_set to_replica_set(const tablet_replica_set& replicas) const {
|
||||
inet_address_vector_replica_set result;
|
||||
result.reserve(replicas.size());
|
||||
auto& topo = _tmptr->get_topology();
|
||||
for (auto&& replica : replicas) {
|
||||
result.emplace_back(_tmptr->get_endpoint_for_host_id(replica.host));
|
||||
auto* node = topo.find_node(replica.host);
|
||||
if (node) {
|
||||
result.emplace_back(node->endpoint());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
host_id_vector_replica_set to_host_set(const tablet_replica_set& replicas) const {
|
||||
host_id_vector_replica_set result;
|
||||
result.reserve(replicas.size());
|
||||
for (auto&& replica : replicas) {
|
||||
result.emplace_back(replica.host);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
const tablet_map& get_tablet_map() const {
|
||||
return _tmptr->tablets().get_tablet_map(_table);
|
||||
}
|
||||
public:
|
||||
tablet_effective_replication_map(table_id table,
|
||||
replication_strategy_ptr rs,
|
||||
token_metadata_ptr tmptr,
|
||||
size_t replication_factor)
|
||||
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
|
||||
, _table(table)
|
||||
, _sharder(*_tmptr, table)
|
||||
{ }
|
||||
|
||||
virtual ~tablet_effective_replication_map() = default;
|
||||
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override {
|
||||
const tablet_replica_set& get_replicas_for_write(dht::token search_token) const {
|
||||
auto&& tablets = get_tablet_map();
|
||||
auto tablet = tablets.get_tablet_id(search_token);
|
||||
auto* info = tablets.get_tablet_transition_info(tablet);
|
||||
@@ -545,8 +559,27 @@ public:
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->writes)));
|
||||
});
|
||||
tablet_logger.trace("get_natural_endpoints({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
|
||||
return to_replica_set(replicas);
|
||||
tablet_logger.trace("get_replicas_for_write({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
|
||||
return replicas;
|
||||
}
|
||||
public:
|
||||
tablet_effective_replication_map(table_id table,
|
||||
replication_strategy_ptr rs,
|
||||
token_metadata_ptr tmptr,
|
||||
size_t replication_factor)
|
||||
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
|
||||
, _table(table)
|
||||
, _sharder(*_tmptr, table)
|
||||
{ }
|
||||
|
||||
virtual ~tablet_effective_replication_map() = default;
|
||||
|
||||
virtual host_id_vector_replica_set get_replicas(const token& search_token) const override {
|
||||
return to_host_set(get_replicas_for_write(search_token));
|
||||
}
|
||||
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override {
|
||||
return to_replica_set(get_replicas_for_write(search_token));
|
||||
}
|
||||
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override {
|
||||
|
||||
@@ -451,6 +451,7 @@ public:
|
||||
const table_to_tablet_map& all_tables() const { return _tablets; }
|
||||
table_to_tablet_map& all_tables() { return _tablets; }
|
||||
size_t external_memory_usage() const;
|
||||
bool has_replica_on(host_id) const;
|
||||
public:
|
||||
void set_balancing_enabled(bool value) { _balancing_enabled = value; }
|
||||
void set_tablet_map(table_id, tablet_map);
|
||||
|
||||
@@ -344,14 +344,20 @@ void topology::index_node(const node* node) {
|
||||
}
|
||||
}
|
||||
|
||||
const auto& dc = node->dc_rack().dc;
|
||||
const auto& rack = node->dc_rack().rack;
|
||||
const auto& endpoint = node->endpoint();
|
||||
_dc_nodes[dc].emplace(node);
|
||||
_dc_rack_nodes[dc][rack].emplace(node);
|
||||
_dc_endpoints[dc].insert(endpoint);
|
||||
_dc_racks[dc][rack].insert(endpoint);
|
||||
_datacenters.insert(dc);
|
||||
// We keep location of left nodes because they may still appear in tablet replica sets
|
||||
// and algorithms expect to know which dc they belonged to. View replica pairing needs stable
|
||||
// replica indexes.
|
||||
// But we don't consider those nodes as members of the cluster so don't update dc registry.
|
||||
if (!node->left()) {
|
||||
const auto& dc = node->dc_rack().dc;
|
||||
const auto& rack = node->dc_rack().rack;
|
||||
const auto& endpoint = node->endpoint();
|
||||
_dc_nodes[dc].emplace(node);
|
||||
_dc_rack_nodes[dc][rack].emplace(node);
|
||||
_dc_endpoints[dc].insert(endpoint);
|
||||
_dc_racks[dc][rack].insert(endpoint);
|
||||
_datacenters.insert(dc);
|
||||
}
|
||||
|
||||
if (node->is_this_node()) {
|
||||
_this_node = node;
|
||||
@@ -551,7 +557,7 @@ std::weak_ordering topology::compare_endpoints(const inet_address& address, cons
|
||||
|
||||
void topology::for_each_node(std::function<void(const node*)> func) const {
|
||||
for (const auto& np : _nodes) {
|
||||
if (np) {
|
||||
if (np && !np->left()) {
|
||||
func(np.get());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,6 +299,41 @@ future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
|
||||
co_return std::move(tm);
|
||||
}
|
||||
|
||||
future<std::unordered_set<locator::host_id>> read_required_hosts(cql3::query_processor& qp) {
|
||||
std::unordered_set<locator::host_id> hosts;
|
||||
|
||||
auto process_row = [&] (const cql3::untyped_result_set_row& row) {
|
||||
tablet_replica_set tablet_replicas;
|
||||
if (row.has("replicas")) {
|
||||
tablet_replicas = deserialize_replica_set(row.get_view("replicas"));
|
||||
}
|
||||
|
||||
for (auto&& r : tablet_replicas) {
|
||||
hosts.insert(r.host);
|
||||
}
|
||||
|
||||
if (row.has("new_replicas")) {
|
||||
tablet_replica_set new_tablet_replicas;
|
||||
new_tablet_replicas = deserialize_replica_set(row.get_view("new_replicas"));
|
||||
for (auto&& r : new_tablet_replicas) {
|
||||
hosts.insert(r.host);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
co_await qp.query_internal("select replicas, new_replicas from system.tablets",
|
||||
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
|
||||
process_row(row);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
} catch (...) {
|
||||
std::throw_with_nested(std::runtime_error("Failed to read tablet required hosts"));
|
||||
}
|
||||
|
||||
co_return std::move(hosts);
|
||||
}
|
||||
|
||||
future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<replica::database>& db) {
|
||||
auto s = db::system_keyspace::tablets();
|
||||
auto rs = co_await db::system_keyspace::query_mutations(db, db::system_keyspace::NAME, db::system_keyspace::TABLETS);
|
||||
|
||||
@@ -64,6 +64,9 @@ future<> save_tablet_metadata(replica::database&, const locator::tablet_metadata
|
||||
/// Reads tablet metadata from system.tablets.
|
||||
future<locator::tablet_metadata> read_tablet_metadata(cql3::query_processor&);
|
||||
|
||||
/// Reads the set of hosts referenced by tablet replicas.
|
||||
future<std::unordered_set<locator::host_id>> read_required_hosts(cql3::query_processor&);
|
||||
|
||||
/// Reads tablet metadata from system.tablets in the form of mutations.
|
||||
future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<database>&);
|
||||
|
||||
|
||||
@@ -409,6 +409,11 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
|
||||
co_await remove_ip(*ip, true);
|
||||
}
|
||||
|
||||
locator::host_id host_id{id.uuid()};
|
||||
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
|
||||
update_topology(host_id, std::nullopt, t.left_nodes_rs.at(id));
|
||||
}
|
||||
|
||||
// However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
|
||||
co_await _messaging.local().ban_host(locator::host_id{id.uuid()});
|
||||
};
|
||||
@@ -588,8 +593,13 @@ future<> storage_service::topology_state_load() {
|
||||
rtlogger.debug("reload raft topology state");
|
||||
std::unordered_set<raft::server_id> prev_normal = boost::copy_range<std::unordered_set<raft::server_id>>(_topology_state_machine._topology.normal_nodes | boost::adaptors::map_keys);
|
||||
|
||||
std::unordered_set<locator::host_id> tablet_hosts;
|
||||
if (_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
tablet_hosts = co_await replica::read_required_hosts(_qp);
|
||||
}
|
||||
|
||||
// read topology state from disk and recreate token_metadata from it
|
||||
_topology_state_machine._topology = co_await _sys_ks.local().load_topology_state();
|
||||
_topology_state_machine._topology = co_await _sys_ks.local().load_topology_state(tablet_hosts);
|
||||
|
||||
if (_manage_topology_change_kind_from_group0) {
|
||||
_topology_change_kind_enabled = upgrade_state_to_topology_op_kind(_topology_state_machine._topology.upgrade_state);
|
||||
@@ -1307,6 +1317,19 @@ topology::upgrade_state_type storage_service::get_topology_upgrade_state() const
|
||||
return _topology_state_machine._topology.upgrade_state;
|
||||
}
|
||||
|
||||
future<> storage_service::await_tablets_rebuilt(raft::server_id replaced_id) {
|
||||
auto is_drained = [&] {
|
||||
return !get_token_metadata().tablets().has_replica_on(locator::host_id(replaced_id.uuid()));
|
||||
};
|
||||
if (!is_drained()) {
|
||||
slogger.info("Waiting for tablet replicas from the replaced node to be rebuilt");
|
||||
co_await _topology_state_machine.event.wait([&] {
|
||||
return is_drained();
|
||||
});
|
||||
}
|
||||
slogger.info("Tablet replicas from the replaced node have been rebuilt");
|
||||
}
|
||||
|
||||
future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
@@ -1610,18 +1633,19 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
supervisor::notify("starting system distributed keyspace");
|
||||
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
|
||||
|
||||
sstring err;
|
||||
|
||||
if (_sys_ks.local().bootstrap_complete()) {
|
||||
if (_topology_state_machine._topology.left_nodes.contains(raft_server->id())) {
|
||||
throw std::runtime_error("A node that already left the cluster cannot be restarted");
|
||||
}
|
||||
} else {
|
||||
err = co_await wait_for_topology_request_completion(join_params.request_id);
|
||||
}
|
||||
auto err = co_await wait_for_topology_request_completion(join_params.request_id);
|
||||
if (!err.empty()) {
|
||||
throw std::runtime_error(fmt::format("{} failed. See earlier errors ({})", raft_replace_info ? "Replace" : "Bootstrap", err));
|
||||
}
|
||||
|
||||
if (!err.empty()) {
|
||||
throw std::runtime_error(fmt::format("{} failed. See earlier errors ({})", raft_replace_info ? "Replace" : "Bootstrap", err));
|
||||
if (raft_replace_info) {
|
||||
co_await await_tablets_rebuilt(raft_replace_info->raft_id);
|
||||
}
|
||||
}
|
||||
|
||||
// If we were the first node in the cluster, at this point `upgrade_state` will be
|
||||
|
||||
@@ -839,6 +839,10 @@ public:
|
||||
topology::upgrade_state_type get_topology_upgrade_state() const;
|
||||
|
||||
node_state get_node_state(locator::host_id id);
|
||||
|
||||
// Waits for topology state in which none of tablets has replaced_id as a replica.
|
||||
// Must be called on shard 0.
|
||||
future<> await_tablets_rebuilt(raft::server_id replaced_id);
|
||||
private:
|
||||
// Tracks progress of the upgrade to topology coordinator.
|
||||
future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>();
|
||||
|
||||
@@ -218,6 +218,7 @@ class load_balancer {
|
||||
host_id id;
|
||||
uint64_t shard_count = 0;
|
||||
uint64_t tablet_count = 0;
|
||||
const locator::node* node; // never nullptr
|
||||
|
||||
// The average shard load on this node.
|
||||
load_type avg_load = 0;
|
||||
@@ -227,6 +228,18 @@ class load_balancer {
|
||||
|
||||
std::optional<locator::load_sketch> target_load_sketch;
|
||||
|
||||
const sstring& dc() const {
|
||||
return node->dc_rack().dc;
|
||||
}
|
||||
|
||||
const sstring& rack() const {
|
||||
return node->dc_rack().rack;
|
||||
}
|
||||
|
||||
locator::node::state state() const {
|
||||
return node->get_state();
|
||||
}
|
||||
|
||||
future<load_sketch&> get_load_sketch(const token_metadata_ptr& tm) {
|
||||
if (!target_load_sketch) {
|
||||
target_load_sketch.emplace(tm);
|
||||
@@ -599,38 +612,45 @@ public:
|
||||
|
||||
std::unordered_map<host_id, node_load> nodes;
|
||||
std::unordered_set<host_id> nodes_to_drain;
|
||||
|
||||
auto ensure_node = [&] (host_id host) {
|
||||
if (nodes.contains(host)) {
|
||||
return;
|
||||
}
|
||||
auto* node = topo.find_node(host);
|
||||
if (!node) {
|
||||
on_internal_error(lblogger, format("Node {} not found in topology", host));
|
||||
}
|
||||
node_load& load = nodes[host];
|
||||
load.id = host;
|
||||
load.node = node;
|
||||
load.shard_count = node->get_shard_count();
|
||||
load.shards.resize(load.shard_count);
|
||||
if (!load.shard_count) {
|
||||
throw std::runtime_error(format("Shard count of {} not found in topology", host));
|
||||
}
|
||||
};
|
||||
|
||||
topo.for_each_node([&] (const locator::node* node_ptr) {
|
||||
if (node_ptr->dc_rack().dc != dc) {
|
||||
return;
|
||||
}
|
||||
bool is_drained = node_ptr->get_state() == locator::node::state::being_decommissioned
|
||||
|| node_ptr->get_state() == locator::node::state::being_removed
|
||||
|| node_ptr->get_state() == locator::node::state::being_replaced;
|
||||
|| node_ptr->get_state() == locator::node::state::being_removed;
|
||||
if (node_ptr->get_state() == locator::node::state::normal || is_drained) {
|
||||
node_load& load = nodes[node_ptr->host_id()];
|
||||
load.id = node_ptr->host_id();
|
||||
load.shard_count = node_ptr->get_shard_count();
|
||||
load.shards.resize(load.shard_count);
|
||||
if (!load.shard_count) {
|
||||
throw std::runtime_error(format("Shard count of {} not found in topology", node_ptr->host_id()));
|
||||
}
|
||||
if (is_drained) {
|
||||
ensure_node(node_ptr->host_id());
|
||||
lblogger.info("Will drain node {} ({}) from DC {}", node_ptr->host_id(), node_ptr->get_state(), dc);
|
||||
nodes_to_drain.emplace(node_ptr->host_id());
|
||||
} else if (node_ptr->is_excluded() || _skiplist.contains(node_ptr->host_id())) {
|
||||
// Excluded nodes should not be chosen as targets for migration.
|
||||
lblogger.debug("Ignoring excluded or dead node {}: state={}", node_ptr->host_id(), node_ptr->get_state());
|
||||
nodes.erase(node_ptr->host_id());
|
||||
} else {
|
||||
ensure_node(node_ptr->host_id());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (nodes.empty()) {
|
||||
lblogger.debug("No nodes to balance.");
|
||||
_stats.for_dc(dc).stop_balance++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
// Compute tablet load on nodes.
|
||||
|
||||
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
|
||||
@@ -639,6 +659,20 @@ public:
|
||||
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) -> future<> {
|
||||
auto trinfo = tmap.get_tablet_transition_info(tid);
|
||||
|
||||
// Check if any replica is on a node which has left.
|
||||
// When node is replaced we don't rebuild as part of topology request.
|
||||
for (auto&& r : ti.replicas) {
|
||||
auto* node = topo.find_node(r.host);
|
||||
if (!node) {
|
||||
on_internal_error(lblogger, format("Replica {} of tablet {} not found in topology",
|
||||
r, global_tablet_id{table, tid}));
|
||||
}
|
||||
if (node->left() && node->dc_rack().dc == dc) {
|
||||
ensure_node(r.host);
|
||||
nodes_to_drain.insert(r.host);
|
||||
}
|
||||
}
|
||||
|
||||
// We reflect migrations in the load as if they already happened,
|
||||
// optimistically assuming that they will succeed.
|
||||
for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) {
|
||||
@@ -656,6 +690,12 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
if (nodes.empty()) {
|
||||
lblogger.debug("No nodes to balance.");
|
||||
_stats.for_dc(dc).stop_balance++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
// Detect finished drain.
|
||||
|
||||
for (auto i = nodes_to_drain.begin(); i != nodes_to_drain.end();) {
|
||||
@@ -691,9 +731,8 @@ public:
|
||||
}
|
||||
|
||||
for (auto&& [host, load] : nodes) {
|
||||
auto& node = topo.get_node(host);
|
||||
lblogger.info("Node {}: rack={} avg_load={}, tablets={}, shards={}, state={}",
|
||||
host, node.dc_rack().rack, load.avg_load, load.tablet_count, load.shard_count, node.get_state());
|
||||
host, load.rack(), load.avg_load, load.tablet_count, load.shard_count, load.state());
|
||||
}
|
||||
|
||||
if (!min_load_node) {
|
||||
@@ -902,9 +941,9 @@ public:
|
||||
for (auto&& r : tmap.get_tablet_info(source_tablet.tablet).replicas) {
|
||||
replicas.insert(r.host);
|
||||
if (nodes.contains(r.host)) {
|
||||
const locator::node& node = topo.get_node(r.host);
|
||||
rack_load[node.dc_rack().rack] += 1;
|
||||
max_rack_load = std::max(max_rack_load, rack_load[node.dc_rack().rack]);
|
||||
const node_load& node = nodes[r.host];
|
||||
rack_load[node.rack()] += 1;
|
||||
max_rack_load = std::max(max_rack_load, rack_load[node.rack()]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -925,14 +964,13 @@ public:
|
||||
continue;
|
||||
}
|
||||
|
||||
const locator::node& target_node = topo.get_node(new_target);
|
||||
const locator::node& source_node = topo.get_node(src_host);
|
||||
if (target_node.dc_rack().rack != source_node.dc_rack().rack
|
||||
&& (rack_load[target_node.dc_rack().rack] + 1 > max_rack_load)) {
|
||||
const node_load& target_node = nodes[new_target];
|
||||
const node_load& source_node = nodes[src_host];
|
||||
if (target_node.rack() != source_node.rack() && (rack_load[target_node.rack()] + 1 > max_rack_load)) {
|
||||
lblogger.debug("next best target {} (avg_load={}) skipped because it would overload rack {} "
|
||||
"with {} replicas of {}, current max is {}",
|
||||
new_target, nodes[new_target].avg_load, target_node.dc_rack().rack,
|
||||
rack_load[target_node.dc_rack().rack] + 1, source_tablet, max_rack_load);
|
||||
new_target, nodes[new_target].avg_load, target_node.rack(),
|
||||
rack_load[target_node.rack()] + 1, source_tablet, max_rack_load);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -948,7 +986,7 @@ public:
|
||||
|
||||
target = nodes_by_load_dst.back();
|
||||
auto& target_info = nodes[target];
|
||||
const locator::node& target_node = topo.get_node(target);
|
||||
auto& src_info = nodes[src.host];
|
||||
auto push_back_target_node = seastar::defer([&] {
|
||||
std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
});
|
||||
@@ -1001,7 +1039,7 @@ public:
|
||||
std::unordered_map<sstring, int> rack_load; // Will be built if check_rack_load
|
||||
|
||||
if (nodes_to_drain.empty()) {
|
||||
check_rack_load = target_node.dc_rack().rack != topo.get_node(src.host).dc_rack().rack;
|
||||
check_rack_load = target_info.rack() != src_info.rack();
|
||||
for (auto&& r: tmap.get_tablet_info(source_tablet.tablet).replicas) {
|
||||
if (r.host == target) {
|
||||
has_replica_on_target = true;
|
||||
@@ -1026,10 +1064,10 @@ public:
|
||||
if (check_rack_load) {
|
||||
auto max_rack_load = std::max_element(rack_load.begin(), rack_load.end(),
|
||||
[] (auto& a, auto& b) { return a.second < b.second; })->second;
|
||||
auto new_rack_load = rack_load[target_node.dc_rack().rack] + 1;
|
||||
auto new_rack_load = rack_load[target_info.rack()] + 1;
|
||||
if (new_rack_load > max_rack_load) {
|
||||
lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}",
|
||||
source_tablet, target_node.dc_rack().rack, new_rack_load, max_rack_load);
|
||||
source_tablet, target_info.rack(), new_rack_load, max_rack_load);
|
||||
_stats.for_dc(dc).tablets_skipped_rack++;
|
||||
continue;
|
||||
}
|
||||
@@ -1038,9 +1076,8 @@ public:
|
||||
auto& target_load_sketch = co_await target_info.get_load_sketch(_tm);
|
||||
auto dst = global_shard_id {target, target_load_sketch.next_shard(target)};
|
||||
|
||||
const locator::node& src_node = topo.get_node(src.host);
|
||||
tablet_transition_kind kind = (src_node.get_state() == locator::node::state::being_removed
|
||||
|| src_node.get_state() == locator::node::state::being_replaced)
|
||||
tablet_transition_kind kind = (src_info.state() == locator::node::state::being_removed
|
||||
|| src_info.state() == locator::node::state::left)
|
||||
? tablet_transition_kind::rebuild : tablet_transition_kind::migration;
|
||||
auto mig = tablet_migration_info {kind, source_tablet, src, dst};
|
||||
auto mig_streaming_info = get_migration_streaming_info(topo, tmap.get_tablet_info(source_tablet.tablet), mig);
|
||||
|
||||
@@ -1208,6 +1208,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// which happens outside the topology coordinator.
|
||||
bool has_updates = !updates.empty();
|
||||
if (has_updates) {
|
||||
co_await utils::get_local_injector().inject("tablet_transition_updates", [] (auto& handler) {
|
||||
rtlogger.info("tablet_transition_updates: start");
|
||||
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
|
||||
});
|
||||
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
|
||||
@@ -74,7 +74,11 @@ std::optional<request_param> topology::get_request_param(raft::server_id id) con
|
||||
};
|
||||
|
||||
std::unordered_set<raft::server_id> topology::get_excluded_nodes() const {
|
||||
return ignored_nodes;
|
||||
auto result = ignored_nodes;
|
||||
for (auto& [id, rs] : left_nodes_rs) {
|
||||
result.insert(id);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::set<sstring> calculate_not_yet_enabled_features(const std::set<sstring>& enabled_features, const auto& supported_features) {
|
||||
|
||||
@@ -132,6 +132,8 @@ struct topology {
|
||||
std::unordered_map<raft::server_id, replica_state> normal_nodes;
|
||||
// Nodes that are left
|
||||
std::unordered_set<raft::server_id> left_nodes;
|
||||
// Left nodes for which we need topology information.
|
||||
std::unordered_map<raft::server_id, replica_state> left_nodes_rs;
|
||||
// Nodes that are waiting to be joined by the topology coordinator
|
||||
std::unordered_map<raft::server_id, replica_state> new_nodes;
|
||||
// Nodes that are in the process to be added to the ring
|
||||
|
||||
@@ -364,3 +364,66 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_left_node_is_kept_outside_dc) {
|
||||
auto id1 = host_id::create_random_id();
|
||||
auto ep1 = gms::inet_address("127.0.0.1");
|
||||
auto id2 = host_id::create_random_id();
|
||||
auto ep2 = gms::inet_address("127.0.0.2");
|
||||
auto id3 = host_id::create_random_id();
|
||||
auto ep3 = gms::inet_address("127.0.0.3");
|
||||
|
||||
const auto dc_rack1 = endpoint_dc_rack {
|
||||
.dc = "dc1",
|
||||
.rack = "rack1"
|
||||
};
|
||||
|
||||
topology::config cfg = {
|
||||
.this_endpoint = ep1,
|
||||
.local_dc_rack = dc_rack1
|
||||
};
|
||||
|
||||
auto topo = topology(cfg);
|
||||
|
||||
set_abort_on_internal_error(false);
|
||||
auto reset_on_internal_abort = seastar::defer([] {
|
||||
set_abort_on_internal_error(true);
|
||||
});
|
||||
|
||||
std::unordered_set<const locator::node*> nodes;
|
||||
|
||||
nodes.insert(topo.add_node(id2, ep2, dc_rack1, node::state::normal));
|
||||
nodes.insert(topo.add_node(id3, ep3, dc_rack1, node::state::left));
|
||||
|
||||
topo.for_each_node([&] (const locator::node* node) {
|
||||
BOOST_REQUIRE(node->host_id() != id3);
|
||||
});
|
||||
|
||||
{
|
||||
auto *n = topo.find_node(id3);
|
||||
BOOST_REQUIRE(n);
|
||||
BOOST_REQUIRE(n->get_state() == locator::node::state::left);
|
||||
}
|
||||
|
||||
// left nodes are not members.
|
||||
BOOST_REQUIRE(!topo.get_datacenter_endpoints().at(dc_rack1.dc).contains(ep3));
|
||||
|
||||
BOOST_REQUIRE(topo.get_datacenter(id3) == dc_rack1.dc);
|
||||
BOOST_REQUIRE(topo.get_rack(id3) == dc_rack1.rack);
|
||||
|
||||
auto topo2 = topo.clone_gently().get0();
|
||||
{
|
||||
auto *n = topo2.find_node(id3);
|
||||
BOOST_REQUIRE(n);
|
||||
BOOST_REQUIRE(n->get_state() == locator::node::state::left);
|
||||
}
|
||||
|
||||
// Make the DC empty of nodes
|
||||
topo.remove_node(id1);
|
||||
topo.remove_node(id2);
|
||||
// Left node location is still known
|
||||
BOOST_REQUIRE(topo.get_datacenter(id3) == dc_rack1.dc);
|
||||
BOOST_REQUIRE(topo.get_rack(id3) == dc_rack1.rack);
|
||||
|
||||
topo.clear_gently().get();
|
||||
}
|
||||
|
||||
@@ -263,6 +263,71 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
|
||||
}, tablet_cql_test_config());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_read_required_hosts) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
|
||||
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
|
||||
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
|
||||
|
||||
tablet_metadata tm = read_tablet_metadata(e.local_qp()).get();
|
||||
|
||||
auto ts = current_timestamp(e);
|
||||
verify_tablet_metadata_persistence(e, tm, ts);
|
||||
BOOST_REQUIRE_EQUAL(std::unordered_set<locator::host_id>({}),
|
||||
read_required_hosts(e.local_qp()).get());
|
||||
|
||||
// Add table1
|
||||
auto table1 = add_table(e).get();
|
||||
{
|
||||
tablet_map tmap(1);
|
||||
tmap.set_tablet(tmap.first_tablet(), tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {h1, 0},
|
||||
tablet_replica {h2, 3},
|
||||
}
|
||||
});
|
||||
tm.set_tablet_map(table1, std::move(tmap));
|
||||
}
|
||||
|
||||
ts = current_timestamp(e);
|
||||
verify_tablet_metadata_persistence(e, tm, ts);
|
||||
BOOST_REQUIRE_EQUAL(std::unordered_set<locator::host_id>({h1, h2}),
|
||||
read_required_hosts(e.local_qp()).get());
|
||||
|
||||
// Add table2
|
||||
auto table2 = add_table(e).get();
|
||||
{
|
||||
tablet_map tmap(2);
|
||||
auto tb = tmap.first_tablet();
|
||||
tmap.set_tablet(tb, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {h1, 0},
|
||||
}
|
||||
});
|
||||
tb = *tmap.next_tablet(tb);
|
||||
tmap.set_tablet(tb, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {h2, 0},
|
||||
}
|
||||
});
|
||||
tmap.set_tablet_transition_info(tb, tablet_transition_info{
|
||||
tablet_transition_stage::allow_write_both_read_old,
|
||||
tablet_transition_kind::migration,
|
||||
tablet_replica_set {
|
||||
tablet_replica {h3, 0},
|
||||
},
|
||||
tablet_replica {h3, 0}
|
||||
});
|
||||
tm.set_tablet_map(table2, std::move(tmap));
|
||||
}
|
||||
|
||||
ts = current_timestamp(e);
|
||||
verify_tablet_metadata_persistence(e, tm, ts);
|
||||
BOOST_REQUIRE_EQUAL(std::unordered_set<locator::host_id>({h1, h2, h3}),
|
||||
read_required_hosts(e.local_qp()).get());
|
||||
}, tablet_cql_test_config());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_get_shard) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
|
||||
|
||||
@@ -129,6 +129,12 @@ static future<> test_basic_operations(app_template& app) {
|
||||
|
||||
testlog.info("Read mutations in {:.6f} [ms]", time_to_read_muts.count() * 1000);
|
||||
|
||||
auto time_to_read_hosts = duration_in_seconds([&] {
|
||||
replica::read_required_hosts(e.local_qp()).get();
|
||||
});
|
||||
|
||||
testlog.info("Read required hosts in {:.6f} [ms]", time_to_read_hosts.count() * 1000);
|
||||
|
||||
auto cm_size = 0;
|
||||
for (auto&& cm : muts) {
|
||||
cm_size += cm.representation().size();
|
||||
|
||||
@@ -313,6 +313,15 @@ class ScyllaRESTAPIClient():
|
||||
assert(type(data) == str)
|
||||
return data
|
||||
|
||||
async def get_raft_leader(self, node_ip: str, group_id: Optional[str] = None) -> HostID:
|
||||
"""Returns host ID of the current leader of the given raft group as seen by the registry on the contact node.
|
||||
When group_id is not specified, group0 is used."""
|
||||
params = {}
|
||||
if group_id:
|
||||
params["group_id"] = group_id
|
||||
data = await self.client.get_json("/raft/leader_host", host=node_ip, params=params)
|
||||
return HostID(data)
|
||||
|
||||
async def repair(self, node_ip: str, keyspace: str, table: str, ranges: str = '') -> None:
|
||||
"""Repair the given table and wait for it to complete"""
|
||||
if ranges:
|
||||
|
||||
@@ -14,9 +14,9 @@ import pytest
|
||||
import time
|
||||
from cassandra.cluster import Session # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.internal_types import ServerInfo, HostID
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier, get_available_host
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -77,6 +77,13 @@ async def get_current_group0_config(manager: ManagerClient, srv: ServerInfo) ->
|
||||
return result
|
||||
|
||||
|
||||
async def get_topology_coordinator(manager: ManagerClient) -> HostID:
|
||||
"""Get the host ID of the topology coordinator."""
|
||||
host = await get_available_host(manager.cql, time.time() + 60)
|
||||
await read_barrier(manager.cql, host)
|
||||
return await manager.api.get_raft_leader(host.address)
|
||||
|
||||
|
||||
async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> None:
|
||||
"""Ensure that the normal token owners and group 0 members match
|
||||
according to each currently running server.
|
||||
|
||||
99
test/topology_experimental_raft/test_mv_tablets_replace.py
Normal file
99
test/topology_experimental_raft/test_mv_tablets_replace.py
Normal file
@@ -0,0 +1,99 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
from typing import List
|
||||
|
||||
from cassandra import ConsistencyLevel
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.internal_types import ServerInfo, HostID
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.topology.util import get_topology_coordinator
|
||||
from test.topology_experimental_raft.test_mv_tablets import get_tablet_replicas
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def find_server_by_host_id(manager: ManagerClient, servers: List[ServerInfo], host_id: HostID) -> ServerInfo:
|
||||
for s in servers:
|
||||
if await manager.get_host_id(s.server_id) == host_id:
|
||||
return s
|
||||
raise Exception(f"Host ID {host_id} not found in {servers}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient):
|
||||
"""
|
||||
Verifies that view replica pairing is stable in the case of node replace.
|
||||
After replace, the node is in left state, but still present in the replica set.
|
||||
If view pairing code would use get_natural_endpoints(), which excludes left nodes,
|
||||
the pairing would be shifted during replace.
|
||||
"""
|
||||
|
||||
servers = await manager.servers_add(4)
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}"
|
||||
" AND tablets = {'initial': 1}")
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int)")
|
||||
await cql.run_async("CREATE MATERIALIZED VIEW test.tv AS SELECT * FROM test.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (c, pk) WITH SYNCHRONOUS_UPDATES = TRUE")
|
||||
|
||||
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
|
||||
# See https://github.com/scylladb/scylladb/issues/16527
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
base_replicas = await get_tablet_replicas(manager, servers[0], "test", "test", 0)
|
||||
logger.info(f'test.test replicas: {base_replicas}')
|
||||
view_replicas = await get_tablet_replicas(manager, servers[0], "test", "tv", 0)
|
||||
logger.info(f'test.tv replicas: {view_replicas}')
|
||||
server_to_replace = await find_server_by_host_id(manager, servers, HostID(str(view_replicas[0][0])))
|
||||
server_to_down = await find_server_by_host_id(manager, servers, HostID(str(base_replicas[0][0])))
|
||||
|
||||
logger.info('Downing a node to be replaced')
|
||||
await manager.server_stop(server_to_replace.server_id)
|
||||
|
||||
logger.info('Blocking tablet rebuild')
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
await manager.api.enable_injection(coord_serv.ip_addr, "tablet_transition_updates", one_shot=True)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
coord_mark = await coord_log.mark()
|
||||
|
||||
logger.info('Replacing the node')
|
||||
replace_cfg = ReplaceConfig(replaced_id = server_to_replace.server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
replace_task = asyncio.create_task(manager.server_add(replace_cfg))
|
||||
|
||||
await coord_log.wait_for('tablet_transition_updates: start', from_mark=coord_mark)
|
||||
|
||||
if server_to_down.server_id != server_to_replace.server_id:
|
||||
await manager.server_stop(server_to_down.server_id)
|
||||
|
||||
# The update is supposed to go to the second replica only, since the other one is downed.
|
||||
# If pairing would shift, the update to the view would be lost because the first replica
|
||||
# is the one which is in the left state.
|
||||
logger.info('Updating base table')
|
||||
await cql.run_async(SimpleStatement("INSERT INTO test.test (pk, c) VALUES (3, 4)", consistency_level=ConsistencyLevel.ONE))
|
||||
logger.info('Querying the view')
|
||||
assert [(4,3)] == list(await cql.run_async(SimpleStatement("SELECT * FROM test.tv WHERE c=4", consistency_level=ConsistencyLevel.ONE)))
|
||||
|
||||
if server_to_down.server_id != server_to_replace.server_id:
|
||||
await manager.server_start(server_to_down.server_id)
|
||||
|
||||
logger.info('Unblocking tablet rebuild')
|
||||
if coord_serv.server_id != server_to_down.server_id:
|
||||
await manager.api.message_injection(coord_serv.ip_addr, "tablet_transition_updates")
|
||||
|
||||
logger.info('Waiting for replace')
|
||||
await replace_task
|
||||
|
||||
logger.info('Querying')
|
||||
assert [(4,3)] == list(await cql.run_async("SELECT * FROM test.tv WHERE c=4"))
|
||||
@@ -30,20 +30,24 @@ async def run_async_cl_all(cql, query: str):
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
cmdline = ['--logger-log-level', 'storage_service=trace']
|
||||
cmdline = [
|
||||
'--logger-log-level', 'storage_service=trace',
|
||||
'--logger-log-level', 'raft_topology=trace',
|
||||
]
|
||||
|
||||
# 4 nodes so that we can find new tablet replica for the RF=3 table on removenode
|
||||
servers = await manager.servers_add(4, cmdline=cmdline)
|
||||
servers = await manager.servers_add(3, cmdline=cmdline)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await create_keyspace(cql, "test", 32, rf=1)
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
# We want RF=2 table to validate that quorum reads work after replacing node finishes
|
||||
# bootstrap which indicates that bootstrap waits for rebuilt.
|
||||
# Otherwise, some reads would fail to find a quorum.
|
||||
await create_keyspace(cql, "test2", 32, rf=2)
|
||||
await cql.run_async("CREATE TABLE test2.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
# RF=3
|
||||
await create_keyspace(cql, "test3", 32, rf=3)
|
||||
await cql.run_async("CREATE TABLE test3.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
@@ -54,23 +58,19 @@ async def test_replace(manager: ManagerClient):
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test2.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test3.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
async def check_ks(ks):
|
||||
logger.info(f"Checking {ks}")
|
||||
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.QUORUM)
|
||||
rows = await cql.run_async(query)
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
assert r.c == r.pk
|
||||
|
||||
async def check():
|
||||
# RF=1 table "test" will experience data loss so don't check it.
|
||||
# We include it to check that the system doesn't crash.
|
||||
|
||||
logger.info("Checking table test2")
|
||||
query = SimpleStatement("SELECT * FROM test2.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
rows = await cql.run_async(query)
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
assert r.c == r.pk
|
||||
|
||||
logger.info("Checking table test3")
|
||||
query = SimpleStatement("SELECT * FROM test3.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
rows = await cql.run_async(query)
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
assert r.c == r.pk
|
||||
# RF=1 keyspace will experience data loss so don't check it.
|
||||
# We include it in the test only to check that the system doesn't crash.
|
||||
await check_ks("test2")
|
||||
await check_ks("test3")
|
||||
|
||||
await check()
|
||||
|
||||
@@ -81,11 +81,21 @@ async def test_replace(manager: ManagerClient):
|
||||
logger.info('Replacing a node')
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
await manager.server_add(replace_cfg)
|
||||
servers.append(await manager.server_add(replace_cfg))
|
||||
servers = servers[1:]
|
||||
|
||||
await check()
|
||||
|
||||
# Verify that QUORUM reads from RF=3 table work when replacing finished and we down a single node.
|
||||
# This validates that replace waits for tablet rebuilt before finishing bootstrap, otherwise some reads
|
||||
# would fail to find a quorum.
|
||||
logger.info('Downing a node')
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
await manager.server_not_sees_other_server(servers[1].ip_addr, servers[0].ip_addr)
|
||||
await manager.server_not_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
|
||||
|
||||
await check_ks("test3")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_removenode(manager: ManagerClient):
|
||||
|
||||
Reference in New Issue
Block a user