Merge 'Track tablet streaming under global sessions to prevent side-effects of failed streaming' from Tomasz Grabiec

Tablet streaming involves asynchronous RPCs to other replicas which transfer writes. We want side-effects from streaming only within the migration stage in which the streaming was started. This is currently not guaranteed on failure. When streaming master fails (e.g. due to RPC failing), it can be that some streaming work is still alive somewhere (e.g. RPC on wire) and will have side-effects at some point later.

This PR implements tracking of all operations involved in streaming which may have side-effects, which allows the topology change coordinator to fence them and wait for them to complete if they were already admitted.

The tracking and fencing is implemented by using global "sessions", created for streaming of a single tablet. Session is globally identified by UUID. The identifier is assigned by the topology change coordinator, and stored in system.tablets. Sessions are created and closed based on group0 state (tablet metadata) by the barrier command sent to each replica, which we already do on transitions between stages. Also, each barrier waits for sessions which have been closed to be drained.

The barrier is blocked only if there is some session with work which was left behind by unsuccessful streaming. In which case it should not be blocked for long, because streaming process checks often if the guard was left behind and stops if it was.

This mechanism of tracking is fault-tolerant: session id is stored in group0, so coordinator can make progress on failover. The barriers guarantee that session exists on all replicas, and that it will be closed on all replicas.

Closes scylladb/scylladb#15847

* github.com:scylladb/scylladb:
  test: tablets: Add test for failed streaming being fenced away
  error_injection: Introduce poll_for_message()
  error_injection: Make is_enabled() public
  api: Add API to kill connection to a particular host
  range_streamer: Do not block topology change barriers around streaming
  range_streamer, tablets: Do not keep token metadata around streaming
  tablets: Fail gracefully when migrating tablet has no pending replica
  storage_service, api: Add API to disable tablet balancing
  storage_service, api: Add API to migrate a tablet
  storage_service, raft topology: Run streaming under session topology guard
  storage_service, tablets: Use session to guard tablet streaming
  tablets: Add per-tablet session id field to tablet metadata
  service: range_streamer: Propagate topology_guard to receivers
  streaming: Always close the rpc::sink
  storage_service: Introduce concept of a topology_guard
  storage_service: Introduce session concept
  tablets: Fix topology_metadata_guard holding on to the old erm
  docs: Document the topology_guard mechanism
This commit is contained in:
Avi Kivity
2023-12-07 16:29:02 +02:00
43 changed files with 1326 additions and 80 deletions

View File

@@ -90,6 +90,30 @@
}
]
},
{
"path":"/v2/error_injection/disconnect/{ip}",
"operations":[
{
"method":"POST",
"summary":"Drop connection to a given IP",
"type":"void",
"nickname":"inject_disconnect",
"produces":[
"application/json"
],
"parameters":[
{
"name":"ip",
"description":"IP address to disconnect from",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
}
]
}
]
},
{
"path":"/v2/error_injection/injection",
"operations":[

View File

@@ -2465,6 +2465,102 @@
}
]
},
{
"path":"/storage_service/tablets/move",
"operations":[
{
"nickname":"move_tablet",
"method":"POST",
"summary":"Moves a tablet replica",
"type":"void",
"produces":[
"application/json"
],
"parameters":[
{
"name":"ks",
"description":"Keyspace name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"table",
"description":"Table name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"token",
"description":"Token owned by the tablet to move",
"required":true,
"allowMultiple":false,
"type":"integer",
"paramType":"query"
},
{
"name":"src_host",
"description":"Source host id",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"dst_host",
"description":"Destination host id",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"src_shard",
"description":"Source shard number",
"required":true,
"allowMultiple":false,
"type":"integer",
"paramType":"query"
},
{
"name":"dst_shard",
"description":"Destination shard number",
"required":true,
"allowMultiple":false,
"type":"integer",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/tablets/balancing",
"operations":[
{
"nickname":"tablet_balancing_enable",
"method":"POST",
"summary":"Moves a tablet replica",
"type":"void",
"produces":[
"application/json"
],
"parameters":[
{
"name":"enabled",
"description":"When set to false, tablet load balancing is disabled",
"required":true,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/metrics/total_hints",
"operations":[

View File

@@ -10,6 +10,7 @@
#include "message/messaging_service.hh"
#include <seastar/rpc/rpc_types.hh>
#include "api/api-doc/messaging_service.json.hh"
#include "api/api-doc/error_injection.json.hh"
#include <iostream>
#include <sstream>
@@ -19,6 +20,8 @@ using namespace netw;
namespace api {
namespace hf = httpd::error_injection_json;
using shard_info = messaging_service::shard_info;
using msg_addr = messaging_service::msg_addr;
@@ -142,6 +145,14 @@ void set_messaging_service(http_context& ctx, routes& r, sharded<netw::messaging
return make_ready_future<json::json_return_type>(res);
});
});
hf::inject_disconnect.set(r, [&ms] (std::unique_ptr<request> req) -> future<json::json_return_type> {
auto ip = msg_addr(req->param["ip"]);
co_await ms.invoke_on_all([ip] (netw::messaging_service& ms) {
ms.remove_rpc_client(ip);
});
co_return json::json_void();
});
}
void unset_messaging_service(http_context& ctx, routes& r) {
@@ -155,6 +166,7 @@ void unset_messaging_service(http_context& ctx, routes& r) {
get_respond_completed_messages.unset(r);
get_version.unset(r);
get_dropped_messages_by_ver.unset(r);
hf::inject_disconnect.unset(r);
}
}

View File

@@ -74,6 +74,21 @@ locator::host_id validate_host_id(const sstring& param) {
return hoep.id;
}
bool validate_bool(const sstring& param) {
if (param == "true") {
return true;
} else if (param == "false") {
return false;
} else {
throw std::runtime_error("Parameter must be either 'true' or 'false'");
}
}
static
int64_t validate_int(const sstring& param) {
return std::atoll(param.c_str());
}
// splits a request parameter assumed to hold a comma-separated list of table names
// verify that the tables are found, otherwise a bad_param_exception exception is thrown
// containing the description of the respective no_such_column_family error.
@@ -1340,6 +1355,29 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
co_return json_void();
});
ss::move_tablet.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
auto src_host_id = validate_host_id(req->get_query_param("src_host"));
shard_id src_shard_id = validate_int(req->get_query_param("src_shard"));
auto dst_host_id = validate_host_id(req->get_query_param("dst_host"));
shard_id dst_shard_id = validate_int(req->get_query_param("dst_shard"));
auto token = dht::token::from_int64(validate_int(req->get_query_param("token")));
auto ks = req->get_query_param("ks");
auto table = req->get_query_param("table");
auto table_id = ctx.db.local().find_column_family(ks, table).schema()->id();
co_await ss.local().move_tablet(table_id, token,
locator::tablet_replica{src_host_id, src_shard_id},
locator::tablet_replica{dst_host_id, dst_shard_id});
co_return json_void();
});
ss::tablet_balancing_enable.set(r, [&ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
auto enabled = validate_bool(req->get_query_param("enabled"));
co_await ss.local().set_tablet_balancing_enabled(enabled);
co_return json_void();
});
sp::get_schema_versions.set(r, [&ss](std::unique_ptr<http::request> req) {
return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
@@ -1437,6 +1475,8 @@ void unset_storage_service(http_context& ctx, routes& r) {
ss::get_effective_ownership.unset(r);
ss::sstable_info.unset(r);
ss::reload_raft_topology_state.unset(r);
ss::move_tablet.unset(r);
ss::tablet_balancing_enable.unset(r);
sp::get_schema_versions.unset(r);
}

View File

@@ -550,6 +550,7 @@ scylla_tests = set([
'test/boost/network_topology_strategy_test',
'test/boost/token_metadata_test',
'test/boost/tablets_test',
'test/boost/sessions_test',
'test/boost/nonwrapping_range_test',
'test/boost/observable_test',
'test/boost/partitioner_test',
@@ -1091,6 +1092,7 @@ scylla_core = (['message/messaging_service.cc',
'locator/util.cc',
'service/client_state.cc',
'service/storage_service.cc',
'service/session.cc',
'service/misc_services.cc',
'service/pager/paging_state.cc',
'service/pager/query_pagers.cc',

View File

@@ -233,6 +233,8 @@ schema_ptr system_keyspace::topology() {
.with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
.with_column("global_topology_request", utf8_type, column_kind::static_column)
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
.with_column("session", uuid_type, column_kind::static_column)
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
.set_comment("Current state of topology change machine")
.with_version(generate_schema_version(id))
.build();
@@ -2649,6 +2651,16 @@ future<service::topology> system_keyspace::load_topology_state() {
if (some_row.has("enabled_features")) {
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
}
if (some_row.has("session")) {
ret.session = service::session_id(some_row.get_as<utils::UUID>("session"));
}
if (some_row.has("tablet_balancing_enabled")) {
ret.tablet_balancing_enabled = some_row.get_as<bool>("tablet_balancing_enabled");
} else {
ret.tablet_balancing_enabled = true;
}
}
co_return ret;

View File

@@ -0,0 +1,59 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "dht/sharder.hh"
#include "replica/database.hh"
namespace dht {
/// A sharder which uses table's most up to date effective_replication_map in each invocation.
/// As a result, the time during which a single token metadata version is kept is reduced.
/// The purpose of this class is having a sharder which can be passed to long-running processes
/// which do not need a single view on replication for the whole duration of the process,
/// and do not need to block topology change barriers.
///
/// No guarantees are made about how long token metadata version is kept alive.
/// It may be released before the next call.
class auto_refreshing_sharder : public dht::sharder {
lw_shared_ptr<replica::table> _table;
locator::effective_replication_map_ptr _erm;
const dht::sharder* _sharder;
optimized_optional<seastar::abort_source::subscription> _callback;
private:
void refresh() {
_erm = _table->get_effective_replication_map();
_sharder = &_erm->get_sharder(*_table->schema());
_callback = _erm->get_validity_abort_source().subscribe([this] () noexcept {
refresh();
});
}
public:
auto_refreshing_sharder(lw_shared_ptr<replica::table> table)
: _table(std::move(table))
{
refresh();
}
virtual ~auto_refreshing_sharder() = default;
virtual unsigned shard_of(const dht::token& token) const override {
return _sharder->shard_of(token);
}
virtual std::optional<dht::shard_and_token> next_shard(const dht::token& t) const override {
return _sharder->next_shard(t);
}
virtual dht::token token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans = 1) const override {
return _sharder->token_for_next_shard(t, shard, spans);
}
};
} // namespace dht

View File

@@ -27,7 +27,8 @@ static logging::logger blogger("boot_strapper");
namespace dht {
future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, inet_address replace_address) {
future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, service::frozen_topology_guard topo_guard,
inet_address replace_address) {
blogger.debug("Beginning bootstrap process: sorted_tokens={}", get_token_metadata().sorted_tokens());
sstring description;
if (reason == streaming::stream_reason::bootstrap) {
@@ -38,7 +39,7 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper
throw std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap");
}
try {
auto streamer = make_lw_shared<range_streamer>(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, _dr, description, reason);
auto streamer = make_lw_shared<range_streamer>(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, _dr, description, reason, topo_guard);
auto nodes_to_filter = gossiper.get_unreachable_members();
if (reason == streaming::stream_reason::replace) {
nodes_to_filter.insert(std::move(replace_address));

View File

@@ -14,6 +14,7 @@
#include <unordered_set>
#include "replica/database_fwd.hh"
#include "streaming/stream_reason.hh"
#include "service/topology_guard.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/abort_source.hh>
@@ -52,7 +53,7 @@ public:
, _token_metadata_ptr(std::move(tmptr)) {
}
future<> bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, inet_address replace_address = {});
future<> bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, service::frozen_topology_guard, inet_address replace_address = {});
/**
* if initialtoken was specified, use that (split on comma).

View File

@@ -245,6 +245,7 @@ future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::vnode
future<> range_streamer::stream_async() {
_nr_ranges_remaining = nr_ranges_to_stream();
_nr_total_ranges = _nr_ranges_remaining;
_token_metadata_ptr = nullptr;
logger.info("{} starts, nr_ranges_remaining={}", _description, _nr_ranges_remaining);
auto start = lowres_clock::now();
return do_for_each(_to_stream, [this, description = _description] (auto& stream) {
@@ -265,7 +266,8 @@ future<> range_streamer::stream_async() {
unsigned nr_ranges_streamed = 0;
size_t nr_ranges_total = range_vec.size();
auto do_streaming = [&] (dht::token_range_vector&& ranges_to_stream) {
auto sp = stream_plan(_stream_manager.local(), format("{}-{}-index-{:d}", description, keyspace, sp_index++), _reason);
auto sp = stream_plan(_stream_manager.local(), format("{}-{}-index-{:d}", description, keyspace, sp_index++),
_reason, _topo_guard);
auto abort_listener = _abort_source.subscribe([&] () noexcept { sp.abort(); });
_abort_source.check();
logger.info("{} with {} for keyspace={}, streaming [{}, {}) out of {} ranges",

View File

@@ -14,6 +14,7 @@
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
#include "streaming/stream_reason.hh"
#include "service/topology_guard.hh"
#include "gms/inet_address.hh"
#include "range.hh"
#include <seastar/core/distributed.hh>
@@ -78,6 +79,7 @@ public:
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason,
service::frozen_topology_guard topo_guard,
std::vector<sstring> tables = {})
: _db(db)
, _stream_manager(sm)
@@ -89,13 +91,14 @@ public:
, _description(std::move(description))
, _reason(reason)
, _tables(std::move(tables))
, _topo_guard(topo_guard)
{
_abort_source.check();
}
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source,
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, std::vector<sstring> tables = {})
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason, std::move(tables)) {
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector<sstring> tables = {})
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason, std::move(topo_guard), std::move(tables)) {
}
void add_source_filter(std::unique_ptr<i_source_filter> filter) {
@@ -141,6 +144,7 @@ private:
}
#endif
// Can be called only before stream_async().
const token_metadata& get_token_metadata() {
return *_token_metadata_ptr;
}
@@ -150,7 +154,7 @@ public:
private:
distributed<replica::database>& _db;
sharded<streaming::stream_manager>& _stream_manager;
const token_metadata_ptr _token_metadata_ptr;
token_metadata_ptr _token_metadata_ptr;
abort_source& _abort_source;
std::unordered_set<token> _tokens;
inet_address _address;
@@ -158,6 +162,7 @@ private:
sstring _description;
streaming::stream_reason _reason;
std::vector<sstring> _tables;
service::frozen_topology_guard _topo_guard;
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_stream;
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
// Number of tx and rx ranges added

View File

@@ -170,6 +170,70 @@ When tablet is not in transition, the following invariants hold:
1. The storage layer (database) on any node contains writes for keys which belong to the tablet only if
that shard is one of the current tablet replicas.
# Topology guards
In addition to synchronizing with data access operations (e.g. CQL requests), we need to synchronize with
operations started by the topology change coordinator like streaming, repair, etc.
Those are distributed operations which involve several nodes.
The goal of tracking such operations is to make sure that they dont have any side-effects beyond the
topology change process (e.g. tablet migration) they were started in. This invariant is the basis for
reasoning about correctness. For example, if streaming which started in migration X runs concurrently
with later repair migration Y of the same tablet, we can run into consistency issues,
e.g. old streaming can resurrect deleted data.
Example scenario:
1. Tablet T has replicas {A, B, C}
2. Write (w1) of key K1 is replicated everywhere
3. Start migration of tablet T replica from A to D
4. Migration fails, but leaves behind an async part (s1) which later sends w1 to D
5. Migration is retried and completes
6. Write (w2) of Key K1 which deletes the key is replicated everywhere {D, B, C}
7. Tablet T is repaired
8. Tombstone for w2 is garbage-collected on D
9. s1 is applied to D, and resurrects K1 (bad!)
For tablets, the time window of those migrations is much smaller than with vnodes, because tablet migrations
are started automatically and tablets themselves are smaller so operations complete faster.
We don't want to use tracking by effective_replication_map_ptr for streaming, because we want to allow
unrelated migrations to start concurrently with streaming of some tablet. Using effective_replication_map_ptr
precludes that because it blocks the global token metadata barrier, so the tablet state machine would not be
able to make progress.
The solution is to use a tracking mechanism called topology guards, which is built on top of the concept of
"sessions" managed in group0.
Each topology operation runs under a unique global session, identified by UUID. Session life cycle is controlled by
the topology change coordinator, and part of the group0 state machine. For example, for streaming, session is created
when entering the "streaming" stage, and cleared when entering the next stage. Each stage which needs a session gets
its own unique session id assigned by the topology change coordinator. For tablets, session id is stored with
a given tablet in tablet_transition_info. Retried operations running within the same stage use the same session.
When topology operation starts, it picks up the current session id from group0 (e.g. tablet_transition_info::session)
and keeps it in the object called frozen_topology_guard. This object acts as a permit which identifies the scope of the operation.
It is propagated everywhere where the operation may have side effects. It must be materialized into a topology_guard object around
sections which do have side effects. Materialization will succeed only if the session is still open. If it succeeds,
the topology_guard will keep the session guard alive, which is tracked under the local session object.
When topology change coordinator moves to the next stage, it clears the session in group0. The next stage will
execute a global token metadata barrier, which does two things: (1) executes a raft read barrier and (2) waits
for all guards created under no longer open sessions to be released. Doing (1) ensures that the local node
sees that the session should be closed. The local state machine will initiate close on all sessions which are
no longer present in group0. Doing (2) ensures that any operation started under no longer open sessions is finished.
So executing a global token metadata barrier ensures that only operations started under still open sessions are running
or can start later. Stale RPCs which carry the permit of an already closed session will be rejected after the barrier
when they try to materialize the permit into topology_guard.
Because sessions are managed in group0, there is no room for races between topology change coordinator's decisions
about sessions and actual state of sessions on each node. Each node follows the group0 history, and will arrive at the same view
about sessions. Also, stale topology coordinators cannot close or create sessions and mess with the intent of the new coordinator.
Because each tablet uses its own unique session, the global token metadata barrier is not blocked by active streaming.
VNode-based operations also use this mechanism, but they use a single session for the whole topology, as they don't
need any parallelism.
# Topology state persistence table

View File

@@ -71,10 +71,14 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage)
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
}
tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica)
tablet_transition_info::tablet_transition_info(tablet_transition_stage stage,
tablet_replica_set next,
tablet_replica pending_replica,
service::session_id session_id)
: stage(stage)
, next(std::move(next))
, pending_replica(std::move(pending_replica))
, session_id(session_id)
, writes(get_selector_for_writes(stage))
, reads(get_selector_for_reads(stage))
{ }
@@ -280,6 +284,9 @@ std::ostream& operator<<(std::ostream& out, const tablet_map& r) {
out << format("\n [{}]: last_token={}, replicas={}", tid, r.get_last_token(tid), tablet.replicas);
if (auto tr = r.get_tablet_transition_info(tid)) {
out << format(", stage={}, new_replicas={}, pending={}", tr->stage, tr->next, tr->pending_replica);
if (tr->session_id) {
out << format(", session={}", tr->session_id);
}
}
first = false;
tid = *r.next_tablet(tid);

View File

@@ -11,6 +11,7 @@
#include "dht/token.hh"
#include "utils/small_vector.hh"
#include "locator/host_id.hh"
#include "service/session.hh"
#include "dht/i_partitioner_fwd.hh"
#include "schema/schema_fwd.hh"
#include "utils/chunked_vector.hh"
@@ -124,6 +125,11 @@ bool contains(const tablet_replica_set& rs, host_id host) {
return false;
}
inline
bool contains(const tablet_replica_set& rs, const tablet_replica& r) {
return std::ranges::any_of(rs, [&] (auto&& r_) { return r_ == r; });
}
/// Stores information about a single tablet.
struct tablet_info {
tablet_replica_set replicas;
@@ -171,10 +177,12 @@ struct tablet_transition_info {
tablet_transition_stage stage;
tablet_replica_set next;
tablet_replica pending_replica; // Optimization (next - tablet_info::replicas)
service::session_id session_id;
write_replica_set_selector writes;
read_replica_set_selector reads;
tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica);
tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica,
service::session_id session_id = {});
bool operator==(const tablet_transition_info&) const = default;
};
@@ -335,12 +343,17 @@ public:
using table_to_tablet_map = std::unordered_map<table_id, tablet_map>;
private:
table_to_tablet_map _tablets;
// When false, tablet load balancer will not try to rebalance tablets.
bool _balancing_enabled = true;
public:
bool balancing_enabled() const { return _balancing_enabled; }
const tablet_map& get_tablet_map(table_id id) const;
const table_to_tablet_map& all_tables() const { return _tablets; }
table_to_tablet_map& all_tables() { return _tablets; }
size_t external_memory_usage() const;
public:
void set_balancing_enabled(bool value) { _balancing_enabled = value; }
void set_tablet_map(table_id, tablet_map);
tablet_map& get_tablet_map(table_id id);
future<> clear_gently();

View File

@@ -989,15 +989,15 @@ rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rp
}
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>>
messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) {
messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id) {
using value_type = std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>;
if (is_shutting_down()) {
return make_exception_future<value_type>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, session, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, service::session_id, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, session, sink).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable {
return make_ready_future<value_type>(value_type(std::move(sink), source.get0()));
});
@@ -1005,7 +1005,7 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche
});
}
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::optional<service::session_id>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func));
}
@@ -1102,13 +1102,13 @@ future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_strea
// PREPARE_MESSAGE
void messaging_service::register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason)>&& func) {
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason, rpc::optional<service::session_id>)>&& func) {
register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func));
}
future<streaming::prepare_message> messaging_service::send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id,
sstring description, streaming::stream_reason reason) {
sstring description, streaming::stream_reason reason, service::session_id session) {
return send_message<streaming::prepare_message>(this, messaging_verb::PREPARE_MESSAGE, id,
std::move(msg), plan_id, std::move(description), reason);
std::move(msg), plan_id, std::move(description), reason, session);
}
future<> messaging_service::unregister_prepare_message() {
return unregister_handler(messaging_verb::PREPARE_MESSAGE);

View File

@@ -20,6 +20,7 @@
#include "schema/schema_fwd.hh"
#include "streaming/stream_fwd.hh"
#include "locator/host_id.hh"
#include "service/session.hh"
#include <list>
#include <vector>
@@ -364,9 +365,9 @@ public:
// Wrapper for PREPARE_MESSAGE verb
void register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason)>&& func);
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason, rpc::optional<service::session_id>)>&& func);
future<streaming::prepare_message> send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id,
sstring description, streaming::stream_reason);
sstring description, streaming::stream_reason, service::session_id);
future<> unregister_prepare_message();
// Wrapper for PREPARE_DONE_MESSAGE verb
@@ -376,10 +377,10 @@ public:
// Wrapper for STREAM_MUTATION_FRAGMENTS
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, -2 means error and table is dropped, other status code value are reserved for future use.
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::optional<service::session_id>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
future<> unregister_stream_mutation_fragments();
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>>& source);
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id);
// Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM
future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);

View File

@@ -495,8 +495,9 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
}
replica::table& t = _db.local().find_column_family(_schema->id());
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
service::frozen_topology_guard topo_guard = service::null_topology_guard; // FIXME: propagate
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, _schema->get_sharder(), std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason)),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
t.stream_in_progress()).then([w] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);

View File

@@ -1697,10 +1697,10 @@ table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_optio
}
void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
if (_erm) {
_erm->invalidate();
auto old_erm = std::exchange(_erm, std::move(erm));
if (old_erm) {
old_erm->invalidate();
}
_erm = std::move(erm);
}
partition_presence_checker

View File

@@ -11,6 +11,7 @@
#include "mutation/mutation.hh"
#include "db/system_keyspace.hh"
#include "replica/tablets.hh"
#include "service/session.hh"
namespace replica {
@@ -35,6 +36,8 @@ public:
tablet_mutation_builder& set_new_replicas(dht::token last_token, locator::tablet_replica_set replicas);
tablet_mutation_builder& set_replicas(dht::token last_token, locator::tablet_replica_set replicas);
tablet_mutation_builder& set_stage(dht::token last_token, locator::tablet_transition_stage stage);
tablet_mutation_builder& set_session(dht::token last_token, service::session_id);
tablet_mutation_builder& del_session(dht::token last_token);
tablet_mutation_builder& del_transition(dht::token last_token);
mutation build() {

View File

@@ -43,6 +43,7 @@ schema_ptr make_tablets_schema() {
.with_column("replicas", replica_set_type)
.with_column("new_replicas", replica_set_type)
.with_column("stage", utf8_type)
.with_column("session", uuid_type)
.with_version(db::system_keyspace::generate_schema_version(id))
.build();
}
@@ -82,6 +83,9 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke
if (auto tr_info = tablets.get_tablet_transition_info(tid)) {
m.set_clustered_cell(ck, "stage", tablet_transition_stage_to_string(tr_info->stage), ts);
m.set_clustered_cell(ck, "new_replicas", make_list_value(replica_set_type, replicas_to_data_value(tr_info->next)), ts);
if (tr_info->session_id) {
m.set_clustered_cell(ck, "session", data_value(tr_info->session_id.uuid()), ts);
}
}
tid = *tablets.next_tablet(tid);
co_await coroutine::maybe_yield();
@@ -107,6 +111,19 @@ tablet_mutation_builder::set_stage(dht::token last_token, locator::tablet_transi
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_session(dht::token last_token, service::session_id session_id) {
_m.set_clustered_cell(get_ck(last_token), "session", data_value(session_id.uuid()), _ts);
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::del_session(dht::token last_token) {
auto session_col = _s->get_column_definition("session");
_m.set_clustered_cell(get_ck(last_token), *session_col, atomic_cell::make_dead(_ts, gc_clock::now()));
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::del_transition(dht::token last_token) {
auto ck = get_ck(last_token);
@@ -114,6 +131,8 @@ tablet_mutation_builder::del_transition(dht::token last_token) {
_m.set_clustered_cell(ck, *stage_col, atomic_cell::make_dead(_ts, gc_clock::now()));
auto new_replicas_col = _s->get_column_definition("new_replicas");
_m.set_clustered_cell(ck, *new_replicas_col, atomic_cell::make_dead(_ts, gc_clock::now()));
auto session_col = _s->get_column_definition("session");
_m.set_clustered_cell(ck, *session_col, atomic_cell::make_dead(_ts, gc_clock::now()));
return *this;
}
@@ -194,12 +213,20 @@ future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
for (auto&& r : tablet_replicas) {
pending.erase(r);
}
if (pending.size() == 0) {
throw std::runtime_error(format("Stage set but no pending replica for table {} tablet {}",
table, current->tid));
}
if (pending.size() > 1) {
throw std::runtime_error(format("Too many pending replicas for table {} tablet {}: {}",
table, current->tid, pending));
}
service::session_id session_id;
if (row.has("session")) {
session_id = service::session_id(row.get_as<utils::UUID>("session"));
}
current->map.set_tablet_transition_info(current->tid, tablet_transition_info{stage,
std::move(new_tablet_replicas), *pending.begin()});
std::move(new_tablet_replicas), *pending.begin(), session_id});
}
current->map.set_tablet(current->tid, tablet_info{std::move(tablet_replicas)});

74
service/session.cc Normal file
View File

@@ -0,0 +1,74 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "service/session.hh"
#include "log.hh"
namespace service {
static logging::logger slogger("session");
session::guard::guard(session& s)
: _session(&s)
, _holder(s._gate.hold()) {
}
session::guard::~guard() {
}
session_manager::session_manager() {
create_session(default_session_id);
}
session::guard session_manager::enter_session(session_id id) {
auto i = _sessions.find(id);
if (i == _sessions.end()) {
throw std::runtime_error(fmt::format("Session not found: {}", id));
}
auto guard = i->second->enter();
slogger.debug("session {} entered", id);
return guard;
}
void session_manager::create_session(session_id id) {
auto [i, created] = _sessions.emplace(id, std::make_unique<session>(id));
if (created) {
slogger.debug("session {} created", id);
} else {
slogger.debug("session {} already exists", id);
}
}
void session_manager::initiate_close_of_sessions_except(const std::unordered_set<session_id>& keep) {
for (auto&& [id, session] : _sessions) {
if (id != default_session_id && !keep.contains(id)) {
if (!session->is_closing()) {
_closing_sessions.push_front(*session);
}
session->start_closing();
}
}
}
future<> session_manager::drain_closing_sessions() {
auto lock = co_await get_units(_session_drain_sem, 1);
auto i = _closing_sessions.begin();
while (i != _closing_sessions.end()) {
session& s = *i;
++i;
auto id = s.id();
slogger.debug("draining session {}", id);
co_await s.close();
if (_sessions.erase(id)) {
slogger.debug("session {} closed", id);
}
}
}
} // namespace service

129
service/session.hh Normal file
View File

@@ -0,0 +1,129 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "utils/UUID.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/semaphore.hh>
#include <boost/intrusive/list.hpp>
#include <unordered_set>
namespace service {
using session_id = utils::tagged_uuid<struct session_id_tag>;
// We want it be different than default-constructed session_id to catch mistakes.
constexpr session_id default_session_id = session_id(
utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC
/// Session is used to track execution of work related to some greater task, identified by session_id.
/// Work can enter the session using enter(), and is considered to be part of the session
/// as long as the guard returned by enter() is alive.
///
/// Session goes over the following states monotonically:
/// 1) open - accepts work via enter()
/// 2) closing - rejects work via enter()
/// 3) closed - rejects work via enter(), and no guards are alive anymore
///
/// Sessions are removed only after they are closed, it's impossible to have a session::guard of a session
/// which is not in the registry.
class session {
public:
using link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
private:
session_id _id;
seastar::gate _gate;
std::optional<shared_future<>> _closed;
link_type _link;
public:
using list_type = boost::intrusive::list<session,
boost::intrusive::member_hook<session, session::link_type, &session::_link>,
boost::intrusive::constant_time_size<false>>;
public:
class guard {
session* _session;
seastar::gate::holder _holder;
public:
explicit guard(session& s);
guard(const guard&) noexcept = default;
guard(guard&&) noexcept = default;
guard& operator=(guard&&) noexcept = default;
guard& operator=(const guard&) noexcept = default;
~guard();
void check() const {
if (!valid()) {
throw seastar::abort_requested_exception();
}
}
[[nodiscard]] bool valid() const {
return !_session->_closed;
}
};
explicit session(session_id id) : _id(id) {}
guard enter() {
return guard(*this);
};
/// No new work is admitted to enter the session after this.
/// Can be called many times.
void start_closing() noexcept {
if (!_closed) {
_closed = seastar::shared_future<>(_gate.close());
}
}
/// Returns true iff the session is not open.
/// Calling enter() in this state will fail.
bool is_closing() const {
return bool(_closed);
}
session_id id() const {
return _id;
}
/// Post-condition of successfully resolved future: There are no guards alive for this session, and
/// and it's impossible to create more such guards later.
/// Can be called concurrently.
future<> close() {
start_closing();
return _closed->get_future();
}
};
class session_manager {
std::unordered_map<session_id, std::unique_ptr<session>> _sessions;
session::list_type _closing_sessions;
seastar::semaphore _session_drain_sem{1};
public:
session_manager();
session::guard enter_session(session_id id);
/// Creates a session on this shard if it doesn't exist yet.
/// If the session already exists does nothing.
void create_session(session_id);
/// Calls start_closing() on all sessions except those in keep.
void initiate_close_of_sessions_except(const std::unordered_set<session_id>& keep);
/// Post-condition: All sessions which are in closing state before the call will be in closed state after the call.
/// Can be called concurrently.
future<> drain_closing_sessions();
};
} // namespace service

View File

@@ -10,6 +10,8 @@
*/
#include "storage_service.hh"
#include "service/topology_guard.hh"
#include "service/session.hh"
#include "dht/boot_strapper.hh"
#include <seastar/core/distributed.hh>
#include <seastar/util/defer.hh>
@@ -106,6 +108,12 @@ namespace service {
static logging::logger slogger("storage_service");
static thread_local session_manager topology_session_manager;
session_manager& get_topology_session_manager() {
return topology_session_manager;
}
static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30};
storage_service::storage_service(abort_source& abort_source,
@@ -525,6 +533,7 @@ future<> storage_service::topology_state_load() {
if (_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp));
tmptr->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled);
}
}));
@@ -650,6 +659,8 @@ public:
topology_mutation_builder& set_transition_state(topology::transition_state);
topology_mutation_builder& set_version(topology::version_t);
topology_mutation_builder& set_fence_version(topology::version_t);
topology_mutation_builder& set_session(session_id);
topology_mutation_builder& set_tablet_balancing_enabled(bool);
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value);
topology_mutation_builder& set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values);
@@ -659,6 +670,7 @@ public:
topology_mutation_builder& add_enabled_features(const std::set<S>& value);
topology_mutation_builder& add_unpublished_cdc_generation(const cdc::generation_id_v2& value);
topology_mutation_builder& del_transition_state();
topology_mutation_builder& del_session();
topology_mutation_builder& del_global_topology_request();
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
@@ -810,10 +822,24 @@ topology_mutation_builder& topology_mutation_builder::set_fence_version(topology
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_session(session_id value) {
_m.set_static_cell("session", value.uuid(), _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_tablet_balancing_enabled(bool value) {
_m.set_static_cell("tablet_balancing_enabled", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::del_transition_state() {
return del("transition_state");
}
topology_mutation_builder& topology_mutation_builder::del_session() {
return del("session");
}
topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id(
const cdc::generation_id_v2& value) {
apply_atomic("current_cdc_generation_timestamp", value.ts);
@@ -1635,10 +1661,13 @@ class topology_coordinator {
auto& tablet_state = _tablets[gid];
table_id table = s->id();
auto get_mutation_builder = [&] () {
return replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table);
};
auto transition_to = [&] (locator::tablet_transition_stage stage) {
slogger.trace("raft topology: Will set tablet {} stage to {}", gid, stage);
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, stage)
.build());
};
@@ -1658,7 +1687,15 @@ class topology_coordinator {
switch (trinfo.stage) {
case locator::tablet_transition_stage::allow_write_both_read_old:
transition_to_with_barrier(locator::tablet_transition_stage::write_both_read_old);
if (do_barrier()) {
slogger.trace("raft topology: Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old);
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::write_both_read_old)
// Create session a bit earlier to avoid adding barrier
// to the streaming stage to create sessions on replicas.
.set_session(last_token, session_id(utils::UUID_gen::get_time_UUID()))
.build());
}
break;
case locator::tablet_transition_stage::write_both_read_old:
transition_to_with_barrier(locator::tablet_transition_stage::streaming);
@@ -1677,7 +1714,11 @@ class topology_coordinator {
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
netw::msg_addr(id2ip(dst)), _as, raft::server_id(dst.uuid()), gid);
})) {
transition_to(locator::tablet_transition_stage::write_both_read_new);
slogger.trace("raft topology: Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new);
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::write_both_read_new)
.del_session(last_token)
.build());
}
break;
case locator::tablet_transition_stage::write_both_read_new:
@@ -1701,8 +1742,7 @@ class topology_coordinator {
// See do_tablet_operation() doc.
if (do_barrier()) {
_tablets.erase(gid);
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
updates.emplace_back(get_mutation_builder()
.del_transition(last_token)
.set_replicas(last_token, trinfo.next)
.build());
@@ -1778,6 +1818,7 @@ class topology_coordinator {
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::write_both_read_old)
.set_session(session_id(guard.new_group0_state_id()))
.set_version(_topo_sm._topology.version + 1)
.build());
} else {
@@ -1986,6 +2027,7 @@ class topology_coordinator {
builder.del_transition_state();
} else {
builder.set_transition_state(topology::transition_state::write_both_read_old);
builder.set_session(session_id(guard.new_group0_state_id()));
builder.set_version(_topo_sm._topology.version + 1);
}
auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id);
@@ -2079,6 +2121,7 @@ class topology_coordinator {
topology_mutation_builder builder(node.guard.write_timestamp());
builder
.set_transition_state(topology::transition_state::write_both_read_new)
.del_session()
.set_version(_topo_sm._topology.version + 1);
auto str = ::format("{}: streaming completed for node {}", node.rs->state, node.id);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
@@ -3531,7 +3574,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
_gossiper.wait_for_range_setup().get();
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr());
slogger.info("Starting to bootstrap...");
bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper).get();
bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, null_topology_guard).get();
} else {
// Even with RBNO bootstrap we need to announce the new CDC generation immediately after it's created.
_gossiper.add_local_application_state({
@@ -4162,6 +4205,25 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_table_erms;
pending_table_erms.resize(smp::count);
std::unordered_set<session_id> open_sessions;
// Collect open sessions
{
auto session = _topology_state_machine._topology.session;
if (session) {
open_sessions.insert(session);
}
for (auto&& [table_id, tmap]: tmptr->tablets().all_tables()) {
for (auto&& [tid, trinfo]: tmap.transitions()) {
if (trinfo.session_id) {
auto id = session_id(trinfo.session_id);
open_sessions.insert(id);
}
}
}
}
try {
auto base_shard = this_shard_id();
pending_token_metadata_ptr[base_shard] = tmptr;
@@ -4254,6 +4316,12 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
cf.update_effective_replication_map(std::move(it->second));
it = table_erms.erase(it);
}
auto& session_mgr = get_topology_session_manager();
session_mgr.initiate_close_of_sessions_except(open_sessions);
for (auto id : open_sessions) {
session_mgr.create_session(id);
}
});
} catch (...) {
// applying the changes on all shards should never fail
@@ -4957,7 +5025,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
} else {
slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid);
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr());
bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get();
bs.bootstrap(streaming::stream_reason::replace, _gossiper, null_topology_guard, replace_address).get();
}
on_streaming_finished();
@@ -5281,6 +5349,7 @@ void storage_service::node_ops_insert(node_ops_id ops_uuid,
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
return seastar::async([this, coordinator, req = std::move(req)] () mutable {
auto ops_uuid = req.ops_uuid;
auto topo_guard = null_topology_guard;
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", req.cmd, ops_uuid);
if (req.cmd == node_ops_cmd::query_pending_ops) {
@@ -5352,7 +5421,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
_repair.local().removenode_with_repair(get_token_metadata_ptr(), node, ops).get();
} else {
slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator);
removenode_with_stream(node, as).get();
removenode_with_stream(node, topo_guard, as).get();
}
}
} else if (req.cmd == node_ops_cmd::removenode_abort) {
@@ -5653,7 +5722,7 @@ future<> storage_service::rebuild(sstring source_dc) {
co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._stream_manager, tmptr, ss._abort_source,
ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild);
ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(ss._gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
@@ -5812,8 +5881,10 @@ future<> storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streame
}
}
future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr) {
return seastar::async([this, leaving_node, as_ptr] {
future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
frozen_topology_guard topo_guard,
shared_ptr<abort_source> as_ptr) {
return seastar::async([this, leaving_node, as_ptr, topo_guard] {
auto tmptr = get_token_metadata_ptr();
abort_source as;
auto sub = _abort_source.subscribe([&as] () noexcept {
@@ -5829,7 +5900,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
as.request_abort();
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode);
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard);
removenode_add_ranges(streamer, leaving_node).get();
try {
streamer->stream_async().get();
@@ -5880,7 +5951,12 @@ future<> storage_service::leave_ring() {
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace) {
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission);
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source,
get_broadcast_address(),
_snitch.local()->get_location(),
"Unbootstrap",
streaming::stream_reason::decommission,
null_topology_guard);
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
@@ -6078,6 +6154,7 @@ future<> storage_service::load_tablet_metadata() {
}
return mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) -> future<> {
tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp));
tmptr->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled);
}, acquire_merge_lock::no);
}
@@ -6202,6 +6279,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
co_await ss._shared_token_metadata.stale_versions_in_use();
co_await get_topology_session_manager().drain_closing_sessions();
slogger.debug("raft_topology_cmd::barrier_and_drain done");
});
result.status = raft_topology_cmd_result::command_status::success;
@@ -6238,7 +6317,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper);
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, _topology_state_machine._topology.session);
}
}));
}
@@ -6265,7 +6344,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id;
auto existing_ip = _group0->address_map().find(replaced_id);
assert(existing_ip);
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, *existing_ip);
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, _topology_state_machine._topology.session, *existing_ip);
}
}));
}
@@ -6317,7 +6396,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
auto ops = seastar::make_shared<node_ops_info>(node_ops_id::create_random_id(), as, std::move(ignored_ips));
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), *ip, ops);
} else {
return removenode_with_stream(*ip, as);
return removenode_with_stream(*ip, _topology_state_machine._topology.session, as);
}
}));
result.status = raft_topology_cmd_result::command_status::success;
@@ -6332,7 +6411,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, _abort_source,
get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild);
get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild,
_topology_state_machine._topology.session);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(_gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
@@ -6500,6 +6580,10 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
if (trinfo->stage != locator::tablet_transition_stage::streaming) {
throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet));
}
auto topo_guard = trinfo->session_id;
if (!trinfo->session_id) {
throw std::runtime_error(format("Tablet {} session is not set", tablet));
}
if (trinfo->pending_replica.host != tm->get_my_id()) {
throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet));
}
@@ -6519,7 +6603,8 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
std::vector<sstring> tables = {table.schema()->cf_name()};
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, std::move(tm), guard.get_abort_source(),
get_broadcast_address(), _snitch.local()->get_location(),
"Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables));
"Tablet migration", streaming::stream_reason::tablet_migration, topo_guard, std::move(tables));
tm = nullptr;
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
_gossiper.get_unreachable_members()));
@@ -6567,6 +6652,121 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
});
}
future<> storage_service::move_tablet(table_id table, dht::token token, locator::tablet_replica src, locator::tablet_replica dst) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.move_tablet(table, token, src, dst);
});
}
while (true) {
auto guard = co_await _group0->client().start_operation(&_abort_source);
while (_topology_state_machine._topology.is_busy()) {
slogger.debug("move_tablet(): topology state machine is busy");
release_guard(std::move(guard));
co_await _topology_state_machine.event.wait();
guard = co_await _group0->client().start_operation(&_abort_source);
}
std::vector<canonical_mutation> updates;
auto ks_name = _db.local().find_schema(table)->ks_name();
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto last_token = tmap.get_last_token(tid);
auto gid = locator::global_tablet_id{table, tid};
// FIXME: Validate replication strategy constraints.
if (!locator::contains(tinfo.replicas, src)) {
throw std::runtime_error(format("Tablet {} has no replica on {}", gid, src));
}
auto* node = get_token_metadata().get_topology().find_node(dst.host);
if (!node) {
throw std::runtime_error(format("Unknown host: {}", dst.host));
}
if (dst.shard >= node->get_shard_count()) {
throw std::runtime_error(format("Host {} does not have shard {}", dst.shard));
}
if (src.host == dst.host) {
throw std::runtime_error("Migrating within the same node is not supported");
}
if (src == dst) {
co_return;
}
updates.push_back(canonical_mutation(replica::tablet_mutation_builder(guard.write_timestamp(), ks_name, table)
.set_new_replicas(last_token, locator::replace_replica(tinfo.replicas, src, dst))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.build()));
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topology_state_machine._topology.version + 1)
.build()));
sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst);
slogger.info("raft topology: {}", reason);
slogger.trace("raft topology: do update {} reason {}", updates, reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard));
break;
} catch (group0_concurrent_modification&) {
slogger.debug("move_tablet(): concurrent modification, retrying");
}
}
// Wait for migration to finish.
co_await _topology_state_machine.event.wait([&] {
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token));
});
}
future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.set_tablet_balancing_enabled(enabled);
});
}
while (true) {
group0_guard guard = co_await _group0->client().start_operation(&_abort_source);
while (_topology_state_machine._topology.is_busy()) {
slogger.debug("set_tablet_balancing_enabled(): topology is busy");
release_guard(std::move(guard));
co_await _topology_state_machine.event.wait();
guard = co_await _group0->client().start_operation(&_abort_source);
}
std::vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.set_tablet_balancing_enabled(enabled)
.build()));
sstring reason = format("Setting tablet balancing to {}", enabled);
slogger.info("raft topology: {}", reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard));
break;
} catch (group0_concurrent_modification&) {
slogger.debug("set_tablet_balancing_enabled(): concurrent modification");
}
}
}
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
join_node_request_result result;
slogger.info("raft topology: received request to join from host_id: {}", params.host_id);

View File

@@ -14,6 +14,7 @@
#include <seastar/core/shared_future.hh>
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/topology_guard.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
@@ -541,7 +542,7 @@ private:
*/
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr);
future<> removenode_with_stream(gms::inet_address leaving_node, frozen_topology_guard, shared_ptr<abort_source> as_ptr);
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
// needs to be modified to accept either a keyspace or ARS.
@@ -774,6 +775,11 @@ public:
// Precondition: the topology mutations were already written to disk; the function only transitions the in-memory state machine.
// Public for `reload_raft_topology_state` REST API.
future<> topology_transition();
public:
future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst);
future<> set_tablet_balancing_enabled(bool);
private:
// load topology state machine snapshot into memory
// raft_group0_client::_read_apply_mutex must be held

View File

@@ -417,7 +417,7 @@ public:
}
if (nodes_to_drain.empty()) {
if (!shuffle && max_load == min_load) {
if (!shuffle && (max_load == min_load || !_tm->tablets().balancing_enabled())) {
// load is balanced.
// TODO: Evaluate and fix intra-node balance.
_stats.for_dc(dc).stop_balance++;

74
service/topology_guard.hh Normal file
View File

@@ -0,0 +1,74 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "service/session.hh"
#include "replica/database_fwd.hh"
namespace service {
/// Represents topology_guard in transit.
/// Can be passed across nodes and copied across shards.
/// Guard in transit doesn't block barriers, but can be materialized into topology_guard
/// which does. If the guard was invalidated while in transit, materialization will fail.
/// Has an null state which represents no guard.
/// topology_guard created out of null frozen_topology_guard is valid and never aborted.
using frozen_topology_guard = session_id;
/// Represents a guard which is always valid and never aborted.
constexpr frozen_topology_guard null_topology_guard = default_session_id;
session_manager& get_topology_session_manager();
/// A guard used by operations started by the topology change coordinator whose scope
/// of operation is supposed to be limited to a single transition stage.
/// The coordinator invalidates guards, typically when it advances to the next stage.
/// Invalidated guards are still alive and block barriers, but their check() will throw, which
/// gives the holder a chance to interrupt the operation and release the guard early.
/// The token metadata barrier executed by the coordinator waits for guards to die, even the
/// invalidated ones.
///
/// Guards allow coordinators to make sure there are no side-effects from operations started
/// in earlier stages of topology change operation. If all side-effects are performed under
/// the guard's scope, then executing a token metadata barrier after invalidating the guard
/// guarantees that there can be no side-effects from earlier stages.
/// The token metadata barrier must contact all nodes which may have alive guards.
template <typename T> concept TopologyGuard = requires(T guard, replica::table& t, frozen_topology_guard frozen_guard) {
// Acquiring the guard.
{ T(t, frozen_guard) } -> std::same_as<T>;
// Moving the guard.
{ T(std::move(guard)) } -> std::same_as<T>;
// Checking if the guard was invalidated.
{ guard.check() } -> std::same_as<void>;
};
/// TopologyGuard implementation where the scope of the guard's validity
/// is limited to the scope of a session validity.
/// The topology change coordinator invalidates the guards by closing the session.
class session_topology_guard {
session::guard _guard;
public:
using frozen = session_id;
session_topology_guard(replica::table& t, frozen_topology_guard g)
: _guard(get_topology_session_manager().enter_session(g))
{ }
void check() {
return _guard.check();
}
};
static_assert(TopologyGuard<session_topology_guard>);
using topology_guard = session_topology_guard;
} // namespace service

View File

@@ -39,6 +39,10 @@ bool topology::contains(raft::server_id id) {
left_nodes.contains(id);
}
bool topology::is_busy() const {
return tstate.has_value();
}
std::set<sstring> calculate_not_yet_enabled_features(const std::set<sstring>& enabled_features, const auto& supported_features) {
std::set<sstring> to_enable;
bool first = true;

View File

@@ -21,6 +21,7 @@
#include "dht/token.hh"
#include "raft/raft.hh"
#include "utils/UUID.hh"
#include "service/session.hh"
#include "mutation/canonical_mutation.hh"
namespace service {
@@ -146,6 +147,12 @@ struct topology {
// Set of features that are considered to be enabled by the cluster.
std::set<sstring> enabled_features;
// Session used to create topology_guard for operations like streaming.
session_id session;
// When false, tablet load balancer will not try to rebalance tablets.
bool tablet_balancing_enabled = true;
// Find only nodes in non 'left' state
const std::pair<const raft::server_id, replica_state>* find(raft::server_id id) const;
// Return true if node exists in any state including 'left' one
@@ -155,6 +162,9 @@ struct topology {
// Are there any non-left nodes?
bool is_empty() const;
// Returns false iff we can safely start a new topology change.
bool is_busy() const;
// Calculates a set of features that are supported by all normal nodes but not yet enabled.
std::set<sstring> calculate_not_yet_enabled_features() const;
};

View File

@@ -184,7 +184,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
for (auto& node : current_targets) {
if (!metas.contains(node)) {
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_mutation_fragments(reader.schema()->version(),
ops_uuid, cf_id, estimated_partitions, reason, netw::messaging_service::msg_addr(node));
ops_uuid, cf_id, estimated_partitions, reason, service::default_session_id, netw::messaging_service::msg_addr(node));
llog.debug("load_and_stream: ops_uuid={}, make sink and source for node={}", ops_uuid, node);
metas.emplace(node, send_meta_data(node, std::move(sink), std::move(source)));
metas.at(node).receive();

View File

@@ -24,11 +24,13 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
sharded<db::view::view_update_generator>& vug,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy) {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin)] (flat_mutation_reader_v2 reader) -> future<> {
sstables::offstrategy offstrategy,
service::frozen_topology_guard frozen_guard) {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> {
std::exception_ptr ex;
try {
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
auto guard = service::topology_guard(*cf, frozen_guard);
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};

View File

@@ -8,6 +8,7 @@
#include "sstables/sstable_set.hh"
#include "streaming/stream_reason.hh"
#include "service/topology_guard.hh"
namespace replica {
class database;
@@ -28,6 +29,7 @@ std::function<future<>(flat_mutation_reader_v2)> make_streaming_consumer(sstring
sharded<db::view::view_update_generator>& vug,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy);
sstables::offstrategy offstrategy,
service::frozen_topology_guard);
}

View File

@@ -25,6 +25,7 @@ stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, dh
auto session = _coordinator->get_or_create_session(_mgr, from);
session->add_stream_request(keyspace, std::move(ranges), std::move(column_families));
session->set_reason(_reason);
session->set_topo_guard(_topo_guard);
return *this;
}
@@ -37,6 +38,7 @@ stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, dht
auto session = _coordinator->get_or_create_session(_mgr, to);
session->add_transfer_ranges(std::move(keyspace), std::move(ranges), std::move(column_families));
session->set_reason(_reason);
session->set_topo_guard(_topo_guard);
return *this;
}

View File

@@ -35,6 +35,7 @@ private:
plan_id _plan_id;
sstring _description;
stream_reason _reason;
service::frozen_topology_guard _topo_guard;
std::vector<stream_event_handler*> _handlers;
shared_ptr<stream_coordinator> _coordinator;
bool _range_added = false;
@@ -46,11 +47,13 @@ public:
*
* @param description Stream type that describes this StreamPlan
*/
stream_plan(stream_manager& mgr, sstring description, stream_reason reason = stream_reason::unspecified)
stream_plan(stream_manager& mgr, sstring description, stream_reason reason = stream_reason::unspecified,
service::frozen_topology_guard topo_guard = {})
: _mgr(mgr)
, _plan_id(plan_id{utils::UUID_gen::get_time_UUID()})
, _description(description)
, _reason(reason)
, _topo_guard(topo_guard)
, _coordinator(make_shared<stream_coordinator>())
{
}

View File

@@ -16,6 +16,7 @@
#include "streaming/stream_result_future.hh"
#include "streaming/stream_manager.hh"
#include "dht/i_partitioner.hh"
#include "dht/auto_refreshing_sharder.hh"
#include "streaming/stream_plan.hh"
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
@@ -35,6 +36,8 @@
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "consumer.hh"
#include "readers/generating_v2.hh"
#include "service/topology_guard.hh"
#include "utils/error_injection.hh"
namespace streaming {
@@ -88,17 +91,19 @@ public:
void stream_manager::init_messaging_service_handler(abort_source& as) {
auto& ms = _ms.local();
ms.register_prepare_message([this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<stream_reason> reason_opt) {
ms.register_prepare_message([this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<stream_reason> reason_opt, rpc::optional<service::session_id> session) {
const auto& src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
auto dst_cpu_id = this_shard_id();
auto reason = reason_opt ? *reason_opt : stream_reason::unspecified;
return container().invoke_on(dst_cpu_id, [msg = std::move(msg), plan_id, description = std::move(description), from, src_cpu_id, reason] (auto& sm) mutable {
auto topo_guard = service::frozen_topology_guard(session.value_or(service::default_session_id));
return container().invoke_on(dst_cpu_id, [msg = std::move(msg), plan_id, description = std::move(description), from, src_cpu_id, reason, topo_guard] (auto& sm) mutable {
auto sr = stream_result_future::init_receiving_side(sm, plan_id, description, from);
auto session = sm.get_session(plan_id, from, "PREPARE_MESSAGE");
session->init(sr);
session->dst_cpu_id = src_cpu_id;
session->set_reason(reason);
session->set_topo_guard(topo_guard);
return session->prepare(std::move(msg.requests), std::move(msg.summaries));
});
});
@@ -110,25 +115,39 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
return make_ready_future<>();
});
});
ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions,
rpc::optional<stream_reason> reason_opt,
rpc::optional<service::session_id> session,
rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
auto from = netw::messaging_service::get_source(cinfo);
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason));
service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);
sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session);
if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) {
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
_db.local().get_token_metadata().get_topology().my_address())));
}
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable {
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable {
auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source);
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as] (schema_ptr s) mutable {
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s, &as] (reader_permit permit) mutable {
struct stream_mutation_fragments_cmd_status {
bool got_cmd = false;
bool got_end_of_stream = false;
};
auto cmd_status = make_lw_shared<stream_mutation_fragments_cmd_status>();
auto offstrategy_update = make_lw_shared<offstrategy_trigger>(_db, cf_id, plan_id);
auto get_next_mutation_fragment = [&sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
return source().then([&sm, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
auto guard = service::topology_guard(s->table(), topo_guard);
// Will log a message when streaming is done. Used to synchronize tests.
lw_shared_ptr<std::any> log_done;
if (utils::get_local_injector().is_enabled("stream_mutation_fragments")) {
log_done = make_lw_shared<std::any>(seastar::make_shared(seastar::defer([] {
sslog.info("stream_mutation_fragments: done");
})));
}
auto get_next_mutation_fragment = [guard = std::move(guard), &as, &sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
guard.check();
return source().then([&sm, &guard, &as, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
if (opt) {
auto cmd = std::get<1>(*opt);
if (cmd) {
@@ -150,7 +169,19 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
auto mf = fmf.unfreeze(*s, permit);
sm.local().update_progress(plan_id, from.addr, progress_info::direction::IN, sz);
offstrategy_update->update();
return make_ready_future<mutation_fragment_opt>(std::move(mf));
return utils::get_local_injector().inject_with_handler("stream_mutation_fragments", [&guard, &as] (auto& handler) -> future<> {
auto& guard_ = guard;
auto& as_ = as;
sslog.info("stream_mutation_fragments: waiting");
while (!handler.poll_for_message()) {
guard_.check();
co_await sleep_abortable(std::chrono::milliseconds(5), as_);
}
sslog.info("stream_mutation_fragments: released");
}).then([mf = std::move(mf)] () mutable {
return mutation_fragment_opt(std::move(mf));
});
} else {
// If the sender has sent stream_mutation_fragments_cmd it means it is
// a node that understands the new protocol. It must send end_of_stream
@@ -162,18 +193,20 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
}
});
};
auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source);
try {
// Make sure the table with cf_id is still present at this point.
// Close the sink in case the table is dropped.
auto& table = _db.local().find_column_family(cf_id);
auto erm = table.get_effective_replication_map();
auto op = table.stream_in_progress();
auto sharder_ptr = std::make_unique<dht::auto_refreshing_sharder>(table.shared_from_this());
auto& sharder = *sharder_ptr;
//FIXME: discarded future.
(void)mutation_writer::distribute_reader_and_consume_on_shards(s, erm->get_sharder(*s),
(void)mutation_writer::distribute_reader_and_consume_on_shards(s, sharder,
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)),
make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard),
std::move(op)
).then_wrapped([s, plan_id, from, sink, estimated_partitions, erm] (future<uint64_t> f) mutable {
).then_wrapped([s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future<uint64_t> f) mutable {
int32_t status = 0;
uint64_t received_partitions = 0;
if (f.failed()) {
@@ -270,7 +303,7 @@ future<> stream_session::on_initialization_complete() {
}
auto id = msg_addr{this->peer, 0};
sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id);
return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason()).then_wrapped([this, id] (auto&& f) {
return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason(), topo_guard()).then_wrapped([this, id] (auto&& f) {
try {
auto msg = f.get0();
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE Reply from {}", this->plan_id(), this->peer);

View File

@@ -22,6 +22,7 @@
#include "streaming/stream_manager.hh"
#include "streaming/stream_reason.hh"
#include "streaming/session_info.hh"
#include "service/topology_guard.hh"
#include "query-request.hh"
#include <map>
#include <vector>
@@ -156,6 +157,7 @@ private:
session_info _session_info;
stream_reason _reason = stream_reason::unspecified;
service::frozen_topology_guard _topo_guard;
public:
stream_reason get_reason() const {
return _reason;
@@ -164,6 +166,14 @@ public:
_reason = reason;
}
void set_topo_guard(service::frozen_topology_guard topo_guard) {
_topo_guard = topo_guard;
}
service::frozen_topology_guard topo_guard() const {
return _topo_guard;
}
void add_bytes_sent(int64_t bytes) {
_bytes_sent += bytes;
}

View File

@@ -16,6 +16,7 @@
#include "streaming/stream_manager.hh"
#include "streaming/stream_reason.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "service/topology_guard.hh"
#include "readers/mutation_fragment_v1_stream.hh"
#include "mutation/mutation_fragment_stream_validator.hh"
#include "mutation/frozen_mutation.hh"
@@ -50,6 +51,7 @@ struct send_info {
netw::messaging_service::msg_addr id;
uint32_t dst_cpu_id;
stream_reason reason;
service::frozen_topology_guard topo_guard;
size_t mutations_nr{0};
semaphore mutations_done{0};
bool error_logged = false;
@@ -60,13 +62,15 @@ struct send_info {
noncopyable_function<void(size_t)> update;
send_info(netw::messaging_service& ms_, streaming::plan_id plan_id_, replica::table& tbl_, reader_permit permit_,
dht::token_range_vector ranges_, netw::messaging_service::msg_addr id_,
uint32_t dst_cpu_id_, stream_reason reason_, noncopyable_function<void(size_t)> update_fn)
uint32_t dst_cpu_id_, stream_reason reason_, service::frozen_topology_guard topo_guard_,
noncopyable_function<void(size_t)> update_fn)
: ms(ms_)
, plan_id(plan_id_)
, cf_id(tbl_.schema()->id())
, id(id_)
, dst_cpu_id(dst_cpu_id_)
, reason(reason_)
, topo_guard(topo_guard_)
, cf(tbl_)
, ranges(std::move(ranges_))
, prs(dht::to_partition_ranges(ranges))
@@ -116,7 +120,7 @@ future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
}
return si->estimate_partitions().then([si] (size_t estimated_partitions) {
sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions);
return si->ms.make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then_unpack([si] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd> sink, rpc::source<int32_t> source) mutable {
return si->ms.make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->topo_guard, si->id).then_unpack([si] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd> sink, rpc::source<int32_t> source) mutable {
auto got_error_from_peer = make_lw_shared<bool>(false);
auto table_is_dropped = make_lw_shared<bool>(false);
@@ -205,10 +209,11 @@ future<> stream_transfer_task::execute() {
sort_and_merge_ranges();
auto reason = session->get_reason();
auto& sm = session->manager();
return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (stream_manager& sm) mutable {
auto topo_guard = session->topo_guard();
return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason, topo_guard] (stream_manager& sm) mutable {
auto& tbl = sm.db().find_column_family(cf_id);
return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable {
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) {
return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason, topo_guard] (reader_permit permit) mutable {
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, topo_guard, [&sm, plan_id, addr = id.addr] (size_t sz) {
sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz);
});
return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {

View File

@@ -0,0 +1,94 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "test/lib/scylla_test_case.hh"
#include <seastar/testing/thread_test_case.hh>
#include "test/lib/random_utils.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/log.hh"
#include "service/session.hh"
using namespace service;
SEASTAR_TEST_CASE(test_default_session_always_exists) {
return seastar::async([] {
session_manager mgr;
auto guard = mgr.enter_session(default_session_id);
guard.check();
mgr.initiate_close_of_sessions_except({});
mgr.drain_closing_sessions().get();
guard = mgr.enter_session(default_session_id);
guard.check();
});
}
SEASTAR_TEST_CASE(test_default_constructible_session_does_not_exist) {
return seastar::async([] {
session_manager mgr;
session_id id;
// For safety, we don't want to treat unset id same as default_session_id.
BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error);
});
}
SEASTAR_TEST_CASE(test_session_closing) {
return seastar::async([] {
session_manager mgr;
auto id = session_id(utils::make_random_uuid());
auto id2 = session_id(utils::make_random_uuid());
auto id3 = session_id(utils::make_random_uuid());
auto id4 = session_id(utils::make_random_uuid());
BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error);
mgr.create_session(id);
mgr.create_session(id2);
auto guard = mgr.enter_session(id);
auto guard2 = mgr.enter_session(id2);
guard.check();
guard2.check();
mgr.initiate_close_of_sessions_except({id});
BOOST_REQUIRE(guard.valid());
BOOST_REQUIRE(!guard2.valid());
auto f = mgr.drain_closing_sessions();
auto f2 = mgr.drain_closing_sessions(); // test concurrent drain
BOOST_REQUIRE(!f.available()); // blocked by guard2
// Concurrent wait drain
mgr.create_session(id3);
mgr.initiate_close_of_sessions_except({id});
mgr.create_session(id3); // no-op
mgr.create_session(id4);
{
auto _ = std::move(guard2);
}
f.get();
f2.get();
mgr.drain_closing_sessions().get();
BOOST_REQUIRE(guard.valid());
mgr.enter_session(id);
mgr.enter_session(id4);
BOOST_REQUIRE_THROW(mgr.enter_session(id2), std::runtime_error);
BOOST_REQUIRE_THROW(mgr.enter_session(id3), std::runtime_error);
});
}

View File

@@ -152,7 +152,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
tablet_replica {h1, 4},
tablet_replica {h2, 2},
},
tablet_replica {h1, 4}
tablet_replica {h1, 4},
session_id(utils::UUID_gen::get_time_UUID())
});
}
@@ -1287,6 +1288,96 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) {
do_with_cql_env_thread([] (auto& e) {
inet_address ip1("192.168.0.1");
inet_address ip2("192.168.0.2");
auto host1 = host_id(next_uuid());
auto host2 = host_id(next_uuid());
auto table1 = table_id(next_uuid());
unsigned shard_count = 1;
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
locator::topology::config{
.this_endpoint = ip1,
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
// host1 is loaded and host2 is empty, resulting in an imbalance.
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tablet_map tmap(16);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
}
});
}
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0();
BOOST_REQUIRE(!plan.empty());
}
// Disable load balancing
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.tablets().set_balancing_enabled(false);
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0();
BOOST_REQUIRE(plan.empty());
}
// Check that cloning preserves the setting
stm.mutate_token_metadata([&] (token_metadata& tm) {
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0();
BOOST_REQUIRE(plan.empty());
}
// Enable load balancing back
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.tablets().set_balancing_enabled(true);
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0();
BOOST_REQUIRE(!plan.empty());
}
// Check that cloning preserves the setting
stm.mutate_token_metadata([&] (token_metadata& tm) {
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0();
BOOST_REQUIRE(!plan.empty());
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
do_with_cql_env_thread([] (auto& e) {
const int n_hosts = 6;

View File

@@ -337,6 +337,10 @@ class ManagerClient():
raise Exception(f"Failed to get local host id address for server {server_id}") from exc
return HostID(host_id)
async def get_table_id(self, keyspace: str, table: str):
rows = await self.cql.run_async(f"select id from system_schema.tables where keyspace_name = '{keyspace}' and table_name = '{table}'")
return rows[0].id
async def server_sees_others(self, server_id: ServerNum, count: int, interval: float = 45.):
"""Wait till a server sees a minimum given count of other servers"""
if count < 1:

View File

@@ -209,6 +209,23 @@ class ScyllaRESTAPIClient():
await self.client.post(f"/v2/error_injection/injection/{injection}",
host=node_ip, params={"one_shot": str(one_shot)}, json={ key: str(value) for key, value in parameters.items() })
async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int) -> None:
await self.client.post(f"/storage_service/tablets/move", host=node_ip, params={
"ks": ks,
"table": table,
"src_host": str(src_host),
"src_shard": str(src_shard),
"dst_host": str(dst_host),
"dst_shard": str(dst_shard),
"token": str(token)
})
async def enable_tablet_balancing(self, node_ip: str) -> None:
await self.client.post(f"/storage_service/tablets/balancing", host=node_ip, params={"enabled": "true"})
async def disable_tablet_balancing(self, node_ip: str) -> None:
await self.client.post(f"/storage_service/tablets/balancing", host=node_ip, params={"enabled": "false"})
async def disable_injection(self, node_ip: str, injection: str) -> None:
await self.client.delete(f"/v2/error_injection/injection/{injection}", host=node_ip)
@@ -221,6 +238,9 @@ class ScyllaRESTAPIClient():
async def message_injection(self, node_ip: str, injection: str) -> None:
await self.client.post(f"/v2/error_injection/injection/{injection}/message", host=node_ip)
async def inject_disconnect(self, node_ip: str, ip_to_disconnect_from: str) -> None:
await self.client.post(f"/v2/error_injection/disconnect/{ip_to_disconnect_from}", host=node_ip)
async def get_logger_level(self, node_ip: str, logger: str) -> str:
"""Get logger level"""
return await self.client.get_text(f"/system/logger/{logger}", host=node_ip)

View File

@@ -3,11 +3,15 @@
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
from uuid import UUID
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
from test.pylib.rest_client import inject_error_one_shot, HTTPError
from test.pylib.rest_client import inject_error
from test.pylib.util import wait_for_cql_and_get_hosts
from test.topology.conftest import skip_mode
from test.topology.util import reconnect_driver
import pytest
@@ -29,6 +33,21 @@ async def inject_error_on(manager, error_name, servers):
await asyncio.gather(*errs)
async def get_tablet_replicas(manager, keyspace_name, table_name, token):
table_id = await manager.get_table_id(keyspace_name, table_name)
rows = await manager.cql.run_async(f"SELECT last_token, replicas FROM system.tablets where "
f"keyspace_name = '{keyspace_name}' and "
f"table_id = {table_id}")
for row in rows:
if row.last_token >= token:
return row.replicas
async def get_tablet_replica(manager, keyspace_name, table_name, token):
replicas = await get_tablet_replicas(manager, keyspace_name, table_name, token)
return replicas[0]
@pytest.mark.asyncio
async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient):
"""Test that you can create a table and insert and query data"""
@@ -199,3 +218,77 @@ async def test_topology_changes(manager: ManagerClient):
await check()
await cql.run_async("DROP KEYSPACE test;")
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=trace',
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 1, 'initial_tablets': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
servers.append(await manager.server_add(cmdline=cmdline))
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 0)")
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 1
replica = await get_tablet_replica(manager, 'test', 'test', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
# Wait for the replica-side writer of streaming to reach a place where it already
# received writes from the leaving replica but haven't applied them yet.
# Once the writer reaches this place, it will wait for the message_injection() call below before proceeding.
# The place we block the writer in should not hold to erm or topology_guard because that will block the migration
# below and prevent test from proceeding.
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
s1_mark = await s1_log.mark()
# Should cause streaming to fail and be retried while leaving behind the replica-side writer.
await manager.api.inject_disconnect(servers[0].ip_addr, servers[1].ip_addr)
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
# Sanity test
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 1
await cql.run_async("TRUNCATE test.test")
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 0
# Release abandoned streaming
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
# Verify that there is no data resurrection
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 0
# Verify that moving the tablet back works
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 0

View File

@@ -211,7 +211,7 @@ struct tagged_uuid {
}
static tagged_uuid create_random_id() noexcept { return tagged_uuid{utils::make_random_uuid()}; }
static constexpr tagged_uuid create_null_id() noexcept { return tagged_uuid{}; }
explicit tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {}
explicit constexpr tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {}
constexpr tagged_uuid() = default;
const utils::UUID& uuid() const noexcept {

View File

@@ -157,6 +157,20 @@ public:
++_read_messages_counter;
}
// \brief Checks if there is an unreceived message.
// If yes, returns true and marks the message as received.
bool poll_for_message() {
if (!_shared_data) {
on_internal_error(errinj_logger, "injection_shared_data is not initialized");
}
if (_read_messages_counter < _shared_data->received_message_count) {
++_read_messages_counter;
return true;
}
return false;
}
std::optional<std::string_view> get(std::string_view key) {
if (!_shared_data) {
on_internal_error(errinj_logger, "injection_shared_data is not initialized");
@@ -223,11 +237,6 @@ private:
// TODO: change to unordered_set once we have heterogeneous lookups
std::map<sstring, injection_data, str_less> _enabled;
bool is_enabled(const std::string_view& injection_name) const {
auto data = get_data(injection_name);
return data && !data->is_ongoing_oneshot();
}
bool is_one_shot(const std::string_view& injection_name) const {
const auto it = _enabled.find(injection_name);
if (it == _enabled.end()) {
@@ -253,6 +262,13 @@ private:
}
public:
// \brief Returns true iff the injection is enabled.
// \param name error injection name to check
bool is_enabled(const std::string_view& injection_name) const {
auto data = get_data(injection_name);
return data && !data->is_ongoing_oneshot();
}
// \brief Enter into error injection if it's enabled
// \param name error injection name to check
bool enter(const std::string_view& name) {
@@ -461,6 +477,10 @@ class error_injection<false> {
static thread_local error_injection _local;
using handler_fun = std::function<void()>;
public:
bool is_enabled(const std::string_view& name) const {
return false;
}
bool enter(const std::string_view& name) const {
return false;
}