From 6cd310fc1a717ff19c9f766db904a3b4c9ed6842 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 27 Nov 2023 17:30:08 +0100 Subject: [PATCH 01/18] docs: Document the topology_guard mechanism --- docs/dev/topology-over-raft.md | 64 ++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 84fe9727ef..e2c4984e93 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -170,6 +170,70 @@ When tablet is not in transition, the following invariants hold: 1. The storage layer (database) on any node contains writes for keys which belong to the tablet only if that shard is one of the current tablet replicas. +# Topology guards + +In addition to synchronizing with data access operations (e.g. CQL requests), we need to synchronize with +operations started by the topology change coordinator like streaming, repair, etc. +Those are distributed operations which involve several nodes. + +The goal of tracking such operations is to make sure that they don’t have any side-effects beyond the +topology change process (e.g. tablet migration) they were started in. This invariant is the basis for +reasoning about correctness. For example, if streaming which started in migration X runs concurrently +with later repair migration Y of the same tablet, we can run into consistency issues, +e.g. old streaming can resurrect deleted data. + +Example scenario: + + 1. Tablet T has replicas {A, B, C} + 2. Write (w1) of key K1 is replicated everywhere + 3. Start migration of tablet T replica from A to D + 4. Migration fails, but leaves behind an async part (s1) which later sends w1 to D + 5. Migration is retried and completes + 6. Write (w2) of Key K1 which deletes the key is replicated everywhere {D, B, C} + 7. Tablet T is repaired + 8. Tombstone for w2 is garbage-collected on D + 9. s1 is applied to D, and resurrects K1 (bad!) + +For tablets, the time window of those migrations is much smaller than with vnodes, because tablet migrations +are started automatically and tablets themselves are smaller so operations complete faster. + +We don't want to use tracking by effective_replication_map_ptr for streaming, because we want to allow +unrelated migrations to start concurrently with streaming of some tablet. Using effective_replication_map_ptr +precludes that because it blocks the global token metadata barrier, so the tablet state machine would not be +able to make progress. + +The solution is to use a tracking mechanism called topology guards, which is built on top of the concept of +"sessions" managed in group0. + +Each topology operation runs under a unique global session, identified by UUID. Session life cycle is controlled by +the topology change coordinator, and part of the group0 state machine. For example, for streaming, session is created +when entering the "streaming" stage, and cleared when entering the next stage. Each stage which needs a session gets +its own unique session id assigned by the topology change coordinator. For tablets, session id is stored with +a given tablet in tablet_transition_info. Retried operations running within the same stage use the same session. + +When topology operation starts, it picks up the current session id from group0 (e.g. tablet_transition_info::session) +and keeps it in the object called frozen_topology_guard. This object acts as a permit which identifies the scope of the operation. +It is propagated everywhere where the operation may have side effects. It must be materialized into a topology_guard object around +sections which do have side effects. Materialization will succeed only if the session is still open. If it succeeds, +the topology_guard will keep the session guard alive, which is tracked under the local session object. + +When topology change coordinator moves to the next stage, it clears the session in group0. The next stage will +execute a global token metadata barrier, which does two things: (1) executes a raft read barrier and (2) waits +for all guards created under no longer open sessions to be released. Doing (1) ensures that the local node +sees that the session should be closed. The local state machine will initiate close on all sessions which are +no longer present in group0. Doing (2) ensures that any operation started under no longer open sessions is finished. +So executing a global token metadata barrier ensures that only operations started under still open sessions are running +or can start later. Stale RPCs which carry the permit of an already closed session will be rejected after the barrier +when they try to materialize the permit into topology_guard. + +Because sessions are managed in group0, there is no room for races between topology change coordinator's decisions +about sessions and actual state of sessions on each node. Each node follows the group0 history, and will arrive at the same view +about sessions. Also, stale topology coordinators cannot close or create sessions and mess with the intent of the new coordinator. + +Because each tablet uses its own unique session, the global token metadata barrier is not blocked by active streaming. + +VNode-based operations also use this mechanism, but they use a single session for the whole topology, as they don't +need any parallelism. # Topology state persistence table From 2d4cd9c57463ca2775c0cdfa79b96b27f0a71040 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 22 Sep 2023 19:02:10 +0200 Subject: [PATCH 02/18] tablets: Fix topology_metadata_guard holding on to the old erm Since abort callbacks are fired synchronously, we must change the table's erm before we do that so that the callbacks obtain the new erm. Otherwise, we will block barriers. --- replica/table.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/replica/table.cc b/replica/table.cc index d4042c5de8..249994536a 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1695,10 +1695,10 @@ table::table(schema_ptr schema, config config, lw_shared_ptrinvalidate(); + auto old_erm = std::exchange(_erm, std::move(erm)); + if (old_erm) { + old_erm->invalidate(); } - _erm = std::move(erm); } partition_presence_checker From d3d83869ce5e1112b48cfe05b64ad7b24842433d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 26 Oct 2023 00:03:25 +0200 Subject: [PATCH 03/18] storage_service: Introduce session concept --- configure.py | 2 + service/session.cc | 74 +++++++++++++++++++++ service/session.hh | 129 ++++++++++++++++++++++++++++++++++++ service/storage_service.cc | 1 + test/boost/sessions_test.cc | 94 ++++++++++++++++++++++++++ utils/UUID.hh | 2 +- 6 files changed, 301 insertions(+), 1 deletion(-) create mode 100644 service/session.cc create mode 100644 service/session.hh create mode 100644 test/boost/sessions_test.cc diff --git a/configure.py b/configure.py index 6ee7431df0..621ed7aa4a 100755 --- a/configure.py +++ b/configure.py @@ -550,6 +550,7 @@ scylla_tests = set([ 'test/boost/network_topology_strategy_test', 'test/boost/token_metadata_test', 'test/boost/tablets_test', + 'test/boost/sessions_test', 'test/boost/nonwrapping_range_test', 'test/boost/observable_test', 'test/boost/partitioner_test', @@ -1091,6 +1092,7 @@ scylla_core = (['message/messaging_service.cc', 'locator/util.cc', 'service/client_state.cc', 'service/storage_service.cc', + 'service/session.cc', 'service/misc_services.cc', 'service/pager/paging_state.cc', 'service/pager/query_pagers.cc', diff --git a/service/session.cc b/service/session.cc new file mode 100644 index 0000000000..5490c34292 --- /dev/null +++ b/service/session.cc @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "service/session.hh" +#include "log.hh" + +namespace service { + +static logging::logger slogger("session"); + + +session::guard::guard(session& s) + : _session(&s) + , _holder(s._gate.hold()) { +} + +session::guard::~guard() { +} + +session_manager::session_manager() { + create_session(default_session_id); +} + +session::guard session_manager::enter_session(session_id id) { + auto i = _sessions.find(id); + if (i == _sessions.end()) { + throw std::runtime_error(fmt::format("Session not found: {}", id)); + } + auto guard = i->second->enter(); + slogger.debug("session {} entered", id); + return guard; +} + +void session_manager::create_session(session_id id) { + auto [i, created] = _sessions.emplace(id, std::make_unique(id)); + if (created) { + slogger.debug("session {} created", id); + } else { + slogger.debug("session {} already exists", id); + } +} + +void session_manager::initiate_close_of_sessions_except(const std::unordered_set& keep) { + for (auto&& [id, session] : _sessions) { + if (id != default_session_id && !keep.contains(id)) { + if (!session->is_closing()) { + _closing_sessions.push_front(*session); + } + session->start_closing(); + } + } +} + +future<> session_manager::drain_closing_sessions() { + auto lock = co_await get_units(_session_drain_sem, 1); + auto i = _closing_sessions.begin(); + while (i != _closing_sessions.end()) { + session& s = *i; + ++i; + auto id = s.id(); + slogger.debug("draining session {}", id); + co_await s.close(); + if (_sessions.erase(id)) { + slogger.debug("session {} closed", id); + } + } +} + +} // namespace service diff --git a/service/session.hh b/service/session.hh new file mode 100644 index 0000000000..64e8309a3f --- /dev/null +++ b/service/session.hh @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "utils/UUID.hh" + +#include +#include +#include +#include + +#include +#include + +namespace service { + +using session_id = utils::tagged_uuid; + +// We want it be different than default-constructed session_id to catch mistakes. +constexpr session_id default_session_id = session_id( + utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC + +/// Session is used to track execution of work related to some greater task, identified by session_id. +/// Work can enter the session using enter(), and is considered to be part of the session +/// as long as the guard returned by enter() is alive. +/// +/// Session goes over the following states monotonically: +/// 1) open - accepts work via enter() +/// 2) closing - rejects work via enter() +/// 3) closed - rejects work via enter(), and no guards are alive anymore +/// +/// Sessions are removed only after they are closed, it's impossible to have a session::guard of a session +/// which is not in the registry. +class session { +public: + using link_type = bi::list_member_hook>; +private: + session_id _id; + seastar::gate _gate; + std::optional> _closed; + link_type _link; +public: + using list_type = boost::intrusive::list, + boost::intrusive::constant_time_size>; +public: + class guard { + session* _session; + seastar::gate::holder _holder; + public: + explicit guard(session& s); + guard(const guard&) noexcept = default; + guard(guard&&) noexcept = default; + guard& operator=(guard&&) noexcept = default; + guard& operator=(const guard&) noexcept = default; + ~guard(); + + void check() const { + if (!valid()) { + throw seastar::abort_requested_exception(); + } + } + + [[nodiscard]] bool valid() const { + return !_session->_closed; + } + }; + + explicit session(session_id id) : _id(id) {} + + guard enter() { + return guard(*this); + }; + + /// No new work is admitted to enter the session after this. + /// Can be called many times. + void start_closing() noexcept { + if (!_closed) { + _closed = seastar::shared_future<>(_gate.close()); + } + } + + /// Returns true iff the session is not open. + /// Calling enter() in this state will fail. + bool is_closing() const { + return bool(_closed); + } + + session_id id() const { + return _id; + } + + /// Post-condition of successfully resolved future: There are no guards alive for this session, and + /// and it's impossible to create more such guards later. + /// Can be called concurrently. + future<> close() { + start_closing(); + return _closed->get_future(); + } +}; + +class session_manager { + std::unordered_map> _sessions; + session::list_type _closing_sessions; + seastar::semaphore _session_drain_sem{1}; +public: + session_manager(); + + session::guard enter_session(session_id id); + + /// Creates a session on this shard if it doesn't exist yet. + /// If the session already exists does nothing. + void create_session(session_id); + + /// Calls start_closing() on all sessions except those in keep. + void initiate_close_of_sessions_except(const std::unordered_set& keep); + + /// Post-condition: All sessions which are in closing state before the call will be in closed state after the call. + /// Can be called concurrently. + future<> drain_closing_sessions(); +}; + +} // namespace service diff --git a/service/storage_service.cc b/service/storage_service.cc index 037917e7c1..aa73b3ed5c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -10,6 +10,7 @@ */ #include "storage_service.hh" +#include "service/session.hh" #include "dht/boot_strapper.hh" #include #include diff --git a/test/boost/sessions_test.cc b/test/boost/sessions_test.cc new file mode 100644 index 0000000000..a7bf2347db --- /dev/null +++ b/test/boost/sessions_test.cc @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + + +#include "test/lib/scylla_test_case.hh" +#include +#include "test/lib/random_utils.hh" +#include "test/lib/cql_test_env.hh" +#include "test/lib/log.hh" + +#include "service/session.hh" + +using namespace service; + +SEASTAR_TEST_CASE(test_default_session_always_exists) { + return seastar::async([] { + session_manager mgr; + auto guard = mgr.enter_session(default_session_id); + guard.check(); + + mgr.initiate_close_of_sessions_except({}); + mgr.drain_closing_sessions().get(); + + guard = mgr.enter_session(default_session_id); + guard.check(); + }); +} + +SEASTAR_TEST_CASE(test_default_constructible_session_does_not_exist) { + return seastar::async([] { + session_manager mgr; + session_id id; + // For safety, we don't want to treat unset id same as default_session_id. + BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error); + }); +} + +SEASTAR_TEST_CASE(test_session_closing) { + return seastar::async([] { + session_manager mgr; + + auto id = session_id(utils::make_random_uuid()); + auto id2 = session_id(utils::make_random_uuid()); + auto id3 = session_id(utils::make_random_uuid()); + auto id4 = session_id(utils::make_random_uuid()); + + BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error); + + mgr.create_session(id); + mgr.create_session(id2); + + auto guard = mgr.enter_session(id); + auto guard2 = mgr.enter_session(id2); + + guard.check(); + guard2.check(); + + mgr.initiate_close_of_sessions_except({id}); + + BOOST_REQUIRE(guard.valid()); + BOOST_REQUIRE(!guard2.valid()); + + auto f = mgr.drain_closing_sessions(); + auto f2 = mgr.drain_closing_sessions(); // test concurrent drain + BOOST_REQUIRE(!f.available()); // blocked by guard2 + + // Concurrent wait drain + mgr.create_session(id3); + mgr.initiate_close_of_sessions_except({id}); + mgr.create_session(id3); // no-op + mgr.create_session(id4); + + { + auto _ = std::move(guard2); + } + + f.get(); + f2.get(); + + mgr.drain_closing_sessions().get(); + + BOOST_REQUIRE(guard.valid()); + + mgr.enter_session(id); + mgr.enter_session(id4); + BOOST_REQUIRE_THROW(mgr.enter_session(id2), std::runtime_error); + BOOST_REQUIRE_THROW(mgr.enter_session(id3), std::runtime_error); + }); +} diff --git a/utils/UUID.hh b/utils/UUID.hh index 563c4a3b69..bcc9fbcbf5 100644 --- a/utils/UUID.hh +++ b/utils/UUID.hh @@ -211,7 +211,7 @@ struct tagged_uuid { } static tagged_uuid create_random_id() noexcept { return tagged_uuid{utils::make_random_uuid()}; } static constexpr tagged_uuid create_null_id() noexcept { return tagged_uuid{}; } - explicit tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {} + explicit constexpr tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {} constexpr tagged_uuid() = default; const utils::UUID& uuid() const noexcept { From 0e42fe4c3cf9ee781d8fad11b3489eaa190f672c Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 26 Oct 2023 00:20:53 +0200 Subject: [PATCH 04/18] storage_service: Introduce concept of a topology_guard topology_guard is used to track distributed operations started by the topology change coordinator, e.g. streaming, to make sure that those operations have no side effects after topology change coordinator moved to the next migration stage, of a given tablet or of the whole ring. topology_guard can be sent over the wire in the form of frozen_topology_guard. It can be materialized again on the other side. While in transit, it doesn't block the coordinator barriers. But if the coordinator moved on, materialization of the guard will fail. So tracking safety is preserved. In this patch, the guard implementation is based on tracking work under global sessions, but the concept is flexible and other mechanisms can be used without changing user code. --- service/storage_service.cc | 6 ++++ service/topology_guard.hh | 74 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 service/topology_guard.hh diff --git a/service/storage_service.cc b/service/storage_service.cc index aa73b3ed5c..914dd6b138 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -108,6 +108,12 @@ namespace service { static logging::logger slogger("storage_service"); +static thread_local session_manager topology_session_manager; + +session_manager& get_topology_session_manager() { + return topology_session_manager; +} + static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30}; storage_service::storage_service(abort_source& abort_source, diff --git a/service/topology_guard.hh b/service/topology_guard.hh new file mode 100644 index 0000000000..b6b1a45585 --- /dev/null +++ b/service/topology_guard.hh @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "service/session.hh" +#include "replica/database_fwd.hh" + +namespace service { + +/// Represents topology_guard in transit. +/// Can be passed across nodes and copied across shards. +/// Guard in transit doesn't block barriers, but can be materialized into topology_guard +/// which does. If the guard was invalidated while in transit, materialization will fail. +/// Has an null state which represents no guard. +/// topology_guard created out of null frozen_topology_guard is valid and never aborted. +using frozen_topology_guard = session_id; + +/// Represents a guard which is always valid and never aborted. +constexpr frozen_topology_guard null_topology_guard = default_session_id; + +session_manager& get_topology_session_manager(); + +/// A guard used by operations started by the topology change coordinator whose scope +/// of operation is supposed to be limited to a single transition stage. +/// The coordinator invalidates guards, typically when it advances to the next stage. +/// Invalidated guards are still alive and block barriers, but their check() will throw, which +/// gives the holder a chance to interrupt the operation and release the guard early. +/// The token metadata barrier executed by the coordinator waits for guards to die, even the +/// invalidated ones. +/// +/// Guards allow coordinators to make sure there are no side-effects from operations started +/// in earlier stages of topology change operation. If all side-effects are performed under +/// the guard's scope, then executing a token metadata barrier after invalidating the guard +/// guarantees that there can be no side-effects from earlier stages. +/// The token metadata barrier must contact all nodes which may have alive guards. +template concept TopologyGuard = requires(T guard, replica::table& t, frozen_topology_guard frozen_guard) { + // Acquiring the guard. + { T(t, frozen_guard) } -> std::same_as; + + // Moving the guard. + { T(std::move(guard)) } -> std::same_as; + + // Checking if the guard was invalidated. + { guard.check() } -> std::same_as; +}; + +/// TopologyGuard implementation where the scope of the guard's validity +/// is limited to the scope of a session validity. +/// The topology change coordinator invalidates the guards by closing the session. +class session_topology_guard { + session::guard _guard; +public: + using frozen = session_id; + + session_topology_guard(replica::table& t, frozen_topology_guard g) + : _guard(get_topology_session_manager().enter_session(g)) + { } + + void check() { + return _guard.check(); + } +}; + +static_assert(TopologyGuard); + +using topology_guard = session_topology_guard; + +} // namespace service From 063095ea5040cdd5613e8a5ec64c8ae4f3462057 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 6 Dec 2023 18:33:00 +0100 Subject: [PATCH 05/18] streaming: Always close the rpc::sink rpc::sink::~sink aborts if not closed. There is a try/catch clause which ensures that close() is called, but there was code after sink is created which is not covered by it. Move sink construction past that code. --- streaming/stream_session.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 58a61d7606..62998390ea 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -121,7 +121,6 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { } return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable { return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable { - auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source); struct stream_mutation_fragments_cmd_status { bool got_cmd = false; bool got_end_of_stream = false; @@ -163,6 +162,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { } }); }; + auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source); try { // Make sure the table with cf_id is still present at this point. // Close the sink in case the table is dropped. From fd3c089ccc8be68bbf43787a3f4dfc6a9eb25b55 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 26 Oct 2023 00:35:19 +0200 Subject: [PATCH 06/18] service: range_streamer: Propagate topology_guard to receivers --- dht/boot_strapper.cc | 5 +++-- dht/boot_strapper.hh | 3 ++- dht/range_streamer.cc | 3 ++- dht/range_streamer.hh | 8 +++++-- message/messaging_service.cc | 16 +++++++------- message/messaging_service.hh | 9 ++++---- repair/row_level.cc | 3 ++- service/storage_service.cc | 36 ++++++++++++++++++++----------- service/storage_service.hh | 3 ++- sstables_loader.cc | 2 +- streaming/consumer.cc | 6 ++++-- streaming/consumer.hh | 4 +++- streaming/stream_plan.cc | 2 ++ streaming/stream_plan.hh | 5 ++++- streaming/stream_session.cc | 27 +++++++++++++++-------- streaming/stream_session.hh | 10 +++++++++ streaming/stream_transfer_task.cc | 15 ++++++++----- 17 files changed, 105 insertions(+), 52 deletions(-) diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index 8d3930314b..0de4e84e0d 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -27,7 +27,8 @@ static logging::logger blogger("boot_strapper"); namespace dht { -future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, inet_address replace_address) { +future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, service::frozen_topology_guard topo_guard, + inet_address replace_address) { blogger.debug("Beginning bootstrap process: sorted_tokens={}", get_token_metadata().sorted_tokens()); sstring description; if (reason == streaming::stream_reason::bootstrap) { @@ -38,7 +39,7 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper throw std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap"); } try { - auto streamer = make_lw_shared(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, _dr, description, reason); + auto streamer = make_lw_shared(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, _dr, description, reason, topo_guard); auto nodes_to_filter = gossiper.get_unreachable_members(); if (reason == streaming::stream_reason::replace) { nodes_to_filter.insert(std::move(replace_address)); diff --git a/dht/boot_strapper.hh b/dht/boot_strapper.hh index e2bc9392f9..0c50c043e7 100644 --- a/dht/boot_strapper.hh +++ b/dht/boot_strapper.hh @@ -14,6 +14,7 @@ #include #include "replica/database_fwd.hh" #include "streaming/stream_reason.hh" +#include "service/topology_guard.hh" #include #include @@ -52,7 +53,7 @@ public: , _token_metadata_ptr(std::move(tmptr)) { } - future<> bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, inet_address replace_address = {}); + future<> bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, service::frozen_topology_guard, inet_address replace_address = {}); /** * if initialtoken was specified, use that (split on comma). diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 1d94dc3f11..bd44500d5b 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -265,7 +265,8 @@ future<> range_streamer::stream_async() { unsigned nr_ranges_streamed = 0; size_t nr_ranges_total = range_vec.size(); auto do_streaming = [&] (dht::token_range_vector&& ranges_to_stream) { - auto sp = stream_plan(_stream_manager.local(), format("{}-{}-index-{:d}", description, keyspace, sp_index++), _reason); + auto sp = stream_plan(_stream_manager.local(), format("{}-{}-index-{:d}", description, keyspace, sp_index++), + _reason, _topo_guard); auto abort_listener = _abort_source.subscribe([&] () noexcept { sp.abort(); }); _abort_source.check(); logger.info("{} with {} for keyspace={}, streaming [{}, {}) out of {} ranges", diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index b95f222a9b..bc57bef422 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -14,6 +14,7 @@ #include "streaming/stream_plan.hh" #include "streaming/stream_state.hh" #include "streaming/stream_reason.hh" +#include "service/topology_guard.hh" #include "gms/inet_address.hh" #include "range.hh" #include @@ -78,6 +79,7 @@ public: range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set tokens, inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, + service::frozen_topology_guard topo_guard, std::vector tables = {}) : _db(db) , _stream_manager(sm) @@ -89,13 +91,14 @@ public: , _description(std::move(description)) , _reason(reason) , _tables(std::move(tables)) + , _topo_guard(topo_guard) { _abort_source.check(); } range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, - inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, std::vector tables = {}) - : range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set(), address, std::move(dr), description, reason, std::move(tables)) { + inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector tables = {}) + : range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set(), address, std::move(dr), description, reason, std::move(topo_guard), std::move(tables)) { } void add_source_filter(std::unique_ptr filter) { @@ -158,6 +161,7 @@ private: sstring _description; streaming::stream_reason _reason; std::vector _tables; + service::frozen_topology_guard _topo_guard; std::unordered_multimap> _to_stream; std::unordered_set> _source_filters; // Number of tx and rx ranges added diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 7f78b12403..7b0fd2b898 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -994,15 +994,15 @@ rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rp } future, rpc::source>> -messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) { +messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id) { using value_type = std::tuple, rpc::source>; if (is_shutting_down()) { return make_exception_future(rpc::closed_error()); } auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); - return rpc_client->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { - auto rpc_handler = rpc()->make_client (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); - return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink, rpc_client] (future> source) mutable { + return rpc_client->make_stream_sink().then([this, session, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { + auto rpc_handler = rpc()->make_client (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, service::session_id, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); + return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, session, sink).then_wrapped([sink, rpc_client] (future> source) mutable { return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable { return make_ready_future(value_type(std::move(sink), source.get0())); }); @@ -1010,7 +1010,7 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche }); } -void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional, rpc::source> source)>&& func) { +void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional, rpc::optional, rpc::source> source)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func)); } @@ -1107,13 +1107,13 @@ future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_strea // PREPARE_MESSAGE void messaging_service::register_prepare_message(std::function (const rpc::client_info& cinfo, - streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason)>&& func) { + streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason, rpc::optional)>&& func) { register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func)); } future messaging_service::send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id, - sstring description, streaming::stream_reason reason) { + sstring description, streaming::stream_reason reason, service::session_id session) { return send_message(this, messaging_verb::PREPARE_MESSAGE, id, - std::move(msg), plan_id, std::move(description), reason); + std::move(msg), plan_id, std::move(description), reason, session); } future<> messaging_service::unregister_prepare_message() { return unregister_handler(messaging_verb::PREPARE_MESSAGE); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index a869198be2..494abed440 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -20,6 +20,7 @@ #include "schema/schema_fwd.hh" #include "streaming/stream_fwd.hh" #include "locator/host_id.hh" +#include "service/session.hh" #include #include @@ -354,9 +355,9 @@ public: // Wrapper for PREPARE_MESSAGE verb void register_prepare_message(std::function (const rpc::client_info& cinfo, - streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason)>&& func); + streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason, rpc::optional)>&& func); future send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id, - sstring description, streaming::stream_reason); + sstring description, streaming::stream_reason, service::session_id); future<> unregister_prepare_message(); // Wrapper for PREPARE_DONE_MESSAGE verb @@ -366,10 +367,10 @@ public: // Wrapper for STREAM_MUTATION_FRAGMENTS // The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, -2 means error and table is dropped, other status code value are reserved for future use. - void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source)>&& func); + void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::optional, rpc::source> source)>&& func); future<> unregister_stream_mutation_fragments(); rpc::sink make_sink_for_stream_mutation_fragments(rpc::source>& source); - future, rpc::source>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); + future, rpc::source>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id); // Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM future, rpc::source>> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); diff --git a/repair/row_level.cc b/repair/row_level.cc index 34d045003a..911d7b844a 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -495,8 +495,9 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { } replica::table& t = _db.local().find_column_family(_schema->id()); rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions()); + service::frozen_topology_guard topo_guard = service::null_topology_guard; // FIXME: propagate _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, _schema->get_sharder(), std::move(_queue_reader), - streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason)), + streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), t.stream_in_progress()).then([w] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); diff --git a/service/storage_service.cc b/service/storage_service.cc index 914dd6b138..f197b453bb 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -10,6 +10,7 @@ */ #include "storage_service.hh" +#include "service/topology_guard.hh" #include "service/session.hh" #include "dht/boot_strapper.hh" #include @@ -3539,7 +3540,7 @@ future<> storage_service::bootstrap(std::unordered_set& bootstrap_tokens, _gossiper.wait_for_range_setup().get(); dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()); slogger.info("Starting to bootstrap..."); - bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper).get(); + bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, null_topology_guard).get(); } else { // Even with RBNO bootstrap we need to announce the new CDC generation immediately after it's created. _gossiper.add_local_application_state({ @@ -4965,7 +4966,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token } else { slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid); dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()); - bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get(); + bs.bootstrap(streaming::stream_reason::replace, _gossiper, null_topology_guard, replace_address).get(); } on_streaming_finished(); @@ -5289,6 +5290,7 @@ void storage_service::node_ops_insert(node_ops_id ops_uuid, future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { return seastar::async([this, coordinator, req = std::move(req)] () mutable { auto ops_uuid = req.ops_uuid; + auto topo_guard = null_topology_guard; slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", req.cmd, ops_uuid); if (req.cmd == node_ops_cmd::query_pending_ops) { @@ -5360,7 +5362,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad _repair.local().removenode_with_repair(get_token_metadata_ptr(), node, ops).get(); } else { slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator); - removenode_with_stream(node, as).get(); + removenode_with_stream(node, topo_guard, as).get(); } } } else if (req.cmd == node_ops_cmd::removenode_abort) { @@ -5661,7 +5663,7 @@ future<> storage_service::rebuild(sstring source_dc) { co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); } else { auto streamer = make_lw_shared(ss._db, ss._stream_manager, tmptr, ss._abort_source, - ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild); + ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard); streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); if (source_dc != "") { streamer->add_source_filter(std::make_unique(source_dc)); @@ -5820,8 +5822,10 @@ future<> storage_service::removenode_add_ranges(lw_shared_ptr storage_service::removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr) { - return seastar::async([this, leaving_node, as_ptr] { +future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, + frozen_topology_guard topo_guard, + shared_ptr as_ptr) { + return seastar::async([this, leaving_node, as_ptr, topo_guard] { auto tmptr = get_token_metadata_ptr(); abort_source as; auto sub = _abort_source.subscribe([&as] () noexcept { @@ -5837,7 +5841,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, as.request_abort(); } }); - auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode); + auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard); removenode_add_ranges(streamer, leaving_node).get(); try { streamer->stream_async().get(); @@ -5888,7 +5892,12 @@ future<> storage_service::leave_ring() { future<> storage_service::stream_ranges(std::unordered_map> ranges_to_stream_by_keyspace) { - auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission); + auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, + get_broadcast_address(), + _snitch.local()->get_location(), + "Unbootstrap", + streaming::stream_reason::decommission, + null_topology_guard); for (auto& entry : ranges_to_stream_by_keyspace) { const auto& keyspace = entry.first; auto& ranges_with_endpoints = entry.second; @@ -6246,7 +6255,7 @@ future storage_service::raft_topology_cmd_handler(raft } else { dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()); - co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper); + co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, null_topology_guard); } })); } @@ -6273,7 +6282,7 @@ future storage_service::raft_topology_cmd_handler(raft auto replaced_id = std::get(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id; auto existing_ip = _group0->address_map().find(replaced_id); assert(existing_ip); - co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, *existing_ip); + co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, null_topology_guard, *existing_ip); } })); } @@ -6325,7 +6334,7 @@ future storage_service::raft_topology_cmd_handler(raft auto ops = seastar::make_shared(node_ops_id::create_random_id(), as, std::move(ignored_ips)); return _repair.local().removenode_with_repair(get_token_metadata_ptr(), *ip, ops); } else { - return removenode_with_stream(*ip, as); + return removenode_with_stream(*ip, null_topology_guard, as); } })); result.status = raft_topology_cmd_result::command_status::success; @@ -6340,7 +6349,8 @@ future storage_service::raft_topology_cmd_handler(raft co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); } else { auto streamer = make_lw_shared(_db, _stream_manager, tmptr, _abort_source, - get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild); + get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, + null_topology_guard); streamer->add_source_filter(std::make_unique(_gossiper.get_unreachable_members())); if (source_dc != "") { streamer->add_source_filter(std::make_unique(source_dc)); @@ -6527,7 +6537,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { std::vector tables = {table.schema()->cf_name()}; auto streamer = make_lw_shared(_db, _stream_manager, std::move(tm), guard.get_abort_source(), get_broadcast_address(), _snitch.local()->get_location(), - "Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables)); + "Tablet migration", streaming::stream_reason::tablet_migration, null_topology_guard, std::move(tables)); streamer->add_source_filter(std::make_unique( _gossiper.get_unreachable_members())); diff --git a/service/storage_service.hh b/service/storage_service.hh index e6a0a527cf..75adbb818e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -14,6 +14,7 @@ #include #include "gms/i_endpoint_state_change_subscriber.hh" #include "service/endpoint_lifecycle_subscriber.hh" +#include "service/topology_guard.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/tablets.hh" #include "locator/tablet_metadata_guard.hh" @@ -538,7 +539,7 @@ private: */ future> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; - future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr); + future<> removenode_with_stream(gms::inet_address leaving_node, frozen_topology_guard, shared_ptr as_ptr); future<> removenode_add_ranges(lw_shared_ptr streamer, gms::inet_address leaving_node); // needs to be modified to accept either a keyspace or ARS. diff --git a/sstables_loader.cc b/sstables_loader.cc index e20bec640d..29a886efb9 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -184,7 +184,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name, for (auto& node : current_targets) { if (!metas.contains(node)) { auto [sink, source] = co_await ms.make_sink_and_source_for_stream_mutation_fragments(reader.schema()->version(), - ops_uuid, cf_id, estimated_partitions, reason, netw::messaging_service::msg_addr(node)); + ops_uuid, cf_id, estimated_partitions, reason, service::default_session_id, netw::messaging_service::msg_addr(node)); llog.debug("load_and_stream: ops_uuid={}, make sink and source for node={}", ops_uuid, node); metas.emplace(node, send_meta_data(node, std::move(sink), std::move(source))); metas.at(node).receive(); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 6acbbe1fa4..85d792235b 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -24,11 +24,13 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin sharded& vug, uint64_t estimated_partitions, stream_reason reason, - sstables::offstrategy offstrategy) { - return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin)] (flat_mutation_reader_v2 reader) -> future<> { + sstables::offstrategy offstrategy, + service::frozen_topology_guard frozen_guard) { + return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { std::exception_ptr ex; try { auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); + auto guard = service::topology_guard(*cf, frozen_guard); auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason); //FIXME: for better estimations this should be transmitted from remote auto metadata = mutation_source_metadata{}; diff --git a/streaming/consumer.hh b/streaming/consumer.hh index 302e2defaf..84e103a764 100644 --- a/streaming/consumer.hh +++ b/streaming/consumer.hh @@ -8,6 +8,7 @@ #include "sstables/sstable_set.hh" #include "streaming/stream_reason.hh" +#include "service/topology_guard.hh" namespace replica { class database; @@ -28,6 +29,7 @@ std::function(flat_mutation_reader_v2)> make_streaming_consumer(sstring sharded& vug, uint64_t estimated_partitions, stream_reason reason, - sstables::offstrategy offstrategy); + sstables::offstrategy offstrategy, + service::frozen_topology_guard); } diff --git a/streaming/stream_plan.cc b/streaming/stream_plan.cc index cbf6dba8cb..ccb203af6e 100644 --- a/streaming/stream_plan.cc +++ b/streaming/stream_plan.cc @@ -25,6 +25,7 @@ stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, dh auto session = _coordinator->get_or_create_session(_mgr, from); session->add_stream_request(keyspace, std::move(ranges), std::move(column_families)); session->set_reason(_reason); + session->set_topo_guard(_topo_guard); return *this; } @@ -37,6 +38,7 @@ stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, dht auto session = _coordinator->get_or_create_session(_mgr, to); session->add_transfer_ranges(std::move(keyspace), std::move(ranges), std::move(column_families)); session->set_reason(_reason); + session->set_topo_guard(_topo_guard); return *this; } diff --git a/streaming/stream_plan.hh b/streaming/stream_plan.hh index 2df1e4ed17..e5fe701fbb 100644 --- a/streaming/stream_plan.hh +++ b/streaming/stream_plan.hh @@ -35,6 +35,7 @@ private: plan_id _plan_id; sstring _description; stream_reason _reason; + service::frozen_topology_guard _topo_guard; std::vector _handlers; shared_ptr _coordinator; bool _range_added = false; @@ -46,11 +47,13 @@ public: * * @param description Stream type that describes this StreamPlan */ - stream_plan(stream_manager& mgr, sstring description, stream_reason reason = stream_reason::unspecified) + stream_plan(stream_manager& mgr, sstring description, stream_reason reason = stream_reason::unspecified, + service::frozen_topology_guard topo_guard = {}) : _mgr(mgr) , _plan_id(plan_id{utils::UUID_gen::get_time_UUID()}) , _description(description) , _reason(reason) + , _topo_guard(topo_guard) , _coordinator(make_shared()) { } diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 62998390ea..6c288cb4b8 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -36,6 +36,7 @@ #include "streaming/stream_mutation_fragments_cmd.hh" #include "consumer.hh" #include "readers/generating_v2.hh" +#include "service/topology_guard.hh" namespace streaming { @@ -89,17 +90,19 @@ public: void stream_manager::init_messaging_service_handler(abort_source& as) { auto& ms = _ms.local(); - ms.register_prepare_message([this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason_opt) { + ms.register_prepare_message([this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason_opt, rpc::optional session) { const auto& src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); const auto& from = cinfo.retrieve_auxiliary("baddr"); auto dst_cpu_id = this_shard_id(); auto reason = reason_opt ? *reason_opt : stream_reason::unspecified; - return container().invoke_on(dst_cpu_id, [msg = std::move(msg), plan_id, description = std::move(description), from, src_cpu_id, reason] (auto& sm) mutable { + auto topo_guard = service::frozen_topology_guard(session.value_or(service::default_session_id)); + return container().invoke_on(dst_cpu_id, [msg = std::move(msg), plan_id, description = std::move(description), from, src_cpu_id, reason, topo_guard] (auto& sm) mutable { auto sr = stream_result_future::init_receiving_side(sm, plan_id, description, from); auto session = sm.get_session(plan_id, from, "PREPARE_MESSAGE"); session->init(sr); session->dst_cpu_id = src_cpu_id; session->set_reason(reason); + session->set_topo_guard(topo_guard); return session->prepare(std::move(msg.requests), std::move(msg.summaries)); }); }); @@ -111,23 +114,29 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { return make_ready_future<>(); }); }); - ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source) { + ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, + rpc::optional reason_opt, + rpc::optional session, + rpc::source> source) { auto from = netw::messaging_service::get_source(cinfo); auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; - sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason)); + service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id); + sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session); if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) { return make_exception_future>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later", utils::fb_utilities::get_broadcast_address()))); } - return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable { - return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable { + return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard] (schema_ptr s) mutable { + return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s] (reader_permit permit) mutable { struct stream_mutation_fragments_cmd_status { bool got_cmd = false; bool got_end_of_stream = false; }; auto cmd_status = make_lw_shared(); auto offstrategy_update = make_lw_shared(_db, cf_id, plan_id); - auto get_next_mutation_fragment = [&sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable { + auto guard = service::topology_guard(s->table(), topo_guard); + auto get_next_mutation_fragment = [guard = std::move(guard), &sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable { + guard.check(); return source().then([&sm, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional>> opt) mutable { if (opt) { auto cmd = std::get<1>(*opt); @@ -172,7 +181,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { //FIXME: discarded future. (void)mutation_writer::distribute_reader_and_consume_on_shards(s, erm->get_sharder(*s), make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), - make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)), + make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard), std::move(op) ).then_wrapped([s, plan_id, from, sink, estimated_partitions, erm] (future f) mutable { int32_t status = 0; @@ -271,7 +280,7 @@ future<> stream_session::on_initialization_complete() { } auto id = msg_addr{this->peer, 0}; sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id); - return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason()).then_wrapped([this, id] (auto&& f) { + return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason(), topo_guard()).then_wrapped([this, id] (auto&& f) { try { auto msg = f.get0(); sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE Reply from {}", this->plan_id(), this->peer); diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index d877d29e3e..762b96bed4 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -22,6 +22,7 @@ #include "streaming/stream_manager.hh" #include "streaming/stream_reason.hh" #include "streaming/session_info.hh" +#include "service/topology_guard.hh" #include "query-request.hh" #include #include @@ -156,6 +157,7 @@ private: session_info _session_info; stream_reason _reason = stream_reason::unspecified; + service::frozen_topology_guard _topo_guard; public: stream_reason get_reason() const { return _reason; @@ -164,6 +166,14 @@ public: _reason = reason; } + void set_topo_guard(service::frozen_topology_guard topo_guard) { + _topo_guard = topo_guard; + } + + service::frozen_topology_guard topo_guard() const { + return _topo_guard; + } + void add_bytes_sent(int64_t bytes) { _bytes_sent += bytes; } diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 3aeaa9ae08..fcce0e6542 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -16,6 +16,7 @@ #include "streaming/stream_manager.hh" #include "streaming/stream_reason.hh" #include "streaming/stream_mutation_fragments_cmd.hh" +#include "service/topology_guard.hh" #include "readers/mutation_fragment_v1_stream.hh" #include "mutation/mutation_fragment_stream_validator.hh" #include "mutation/frozen_mutation.hh" @@ -50,6 +51,7 @@ struct send_info { netw::messaging_service::msg_addr id; uint32_t dst_cpu_id; stream_reason reason; + service::frozen_topology_guard topo_guard; size_t mutations_nr{0}; semaphore mutations_done{0}; bool error_logged = false; @@ -60,13 +62,15 @@ struct send_info { noncopyable_function update; send_info(netw::messaging_service& ms_, streaming::plan_id plan_id_, replica::table& tbl_, reader_permit permit_, dht::token_range_vector ranges_, netw::messaging_service::msg_addr id_, - uint32_t dst_cpu_id_, stream_reason reason_, noncopyable_function update_fn) + uint32_t dst_cpu_id_, stream_reason reason_, service::frozen_topology_guard topo_guard_, + noncopyable_function update_fn) : ms(ms_) , plan_id(plan_id_) , cf_id(tbl_.schema()->id()) , id(id_) , dst_cpu_id(dst_cpu_id_) , reason(reason_) + , topo_guard(topo_guard_) , cf(tbl_) , ranges(std::move(ranges_)) , prs(dht::to_partition_ranges(ranges)) @@ -116,7 +120,7 @@ future<> send_mutation_fragments(lw_shared_ptr si) { } return si->estimate_partitions().then([si] (size_t estimated_partitions) { sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions); - return si->ms.make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then_unpack([si] (rpc::sink sink, rpc::source source) mutable { + return si->ms.make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->topo_guard, si->id).then_unpack([si] (rpc::sink sink, rpc::source source) mutable { auto got_error_from_peer = make_lw_shared(false); auto table_is_dropped = make_lw_shared(false); @@ -205,10 +209,11 @@ future<> stream_transfer_task::execute() { sort_and_merge_ranges(); auto reason = session->get_reason(); auto& sm = session->manager(); - return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (stream_manager& sm) mutable { + auto topo_guard = session->topo_guard(); + return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason, topo_guard] (stream_manager& sm) mutable { auto& tbl = sm.db().find_column_family(cf_id); - return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable { - auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) { + return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason, topo_guard] (reader_permit permit) mutable { + auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, topo_guard, [&sm, plan_id, addr = id.addr] (size_t sz) { sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz); }); return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { From 5381792401f1c19968a1732556a246082217c6a0 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 2 Nov 2023 01:47:07 +0100 Subject: [PATCH 07/18] tablets: Add per-tablet session id field to tablet metadata range_streamer will pick it up when creating topology_guard. It's materialized in memory only for migrating tablets in tablet_transition_info. --- locator/tablets.cc | 9 ++++++++- locator/tablets.hh | 5 ++++- replica/tablet_mutation_builder.hh | 3 +++ replica/tablets.cc | 25 ++++++++++++++++++++++++- test/boost/tablets_test.cc | 3 ++- 5 files changed, 41 insertions(+), 4 deletions(-) diff --git a/locator/tablets.cc b/locator/tablets.cc index cb2564d3d2..5e7e81679a 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -71,10 +71,14 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage) on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } -tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica) +tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, + tablet_replica_set next, + tablet_replica pending_replica, + service::session_id session_id) : stage(stage) , next(std::move(next)) , pending_replica(std::move(pending_replica)) + , session_id(session_id) , writes(get_selector_for_writes(stage)) , reads(get_selector_for_reads(stage)) { } @@ -280,6 +284,9 @@ std::ostream& operator<<(std::ostream& out, const tablet_map& r) { out << format("\n [{}]: last_token={}, replicas={}", tid, r.get_last_token(tid), tablet.replicas); if (auto tr = r.get_tablet_transition_info(tid)) { out << format(", stage={}, new_replicas={}, pending={}", tr->stage, tr->next, tr->pending_replica); + if (tr->session_id) { + out << format(", session={}", tr->session_id); + } } first = false; tid = *r.next_tablet(tid); diff --git a/locator/tablets.hh b/locator/tablets.hh index 260aa5a983..13ba07fd29 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -11,6 +11,7 @@ #include "dht/token.hh" #include "utils/small_vector.hh" #include "locator/host_id.hh" +#include "service/session.hh" #include "dht/i_partitioner_fwd.hh" #include "schema/schema_fwd.hh" #include "utils/chunked_vector.hh" @@ -171,10 +172,12 @@ struct tablet_transition_info { tablet_transition_stage stage; tablet_replica_set next; tablet_replica pending_replica; // Optimization (next - tablet_info::replicas) + service::session_id session_id; write_replica_set_selector writes; read_replica_set_selector reads; - tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica); + tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica, + service::session_id session_id = {}); bool operator==(const tablet_transition_info&) const = default; }; diff --git a/replica/tablet_mutation_builder.hh b/replica/tablet_mutation_builder.hh index 80c5cc505f..c240c3ff0c 100644 --- a/replica/tablet_mutation_builder.hh +++ b/replica/tablet_mutation_builder.hh @@ -11,6 +11,7 @@ #include "mutation/mutation.hh" #include "db/system_keyspace.hh" #include "replica/tablets.hh" +#include "service/session.hh" namespace replica { @@ -35,6 +36,8 @@ public: tablet_mutation_builder& set_new_replicas(dht::token last_token, locator::tablet_replica_set replicas); tablet_mutation_builder& set_replicas(dht::token last_token, locator::tablet_replica_set replicas); tablet_mutation_builder& set_stage(dht::token last_token, locator::tablet_transition_stage stage); + tablet_mutation_builder& set_session(dht::token last_token, service::session_id); + tablet_mutation_builder& del_session(dht::token last_token); tablet_mutation_builder& del_transition(dht::token last_token); mutation build() { diff --git a/replica/tablets.cc b/replica/tablets.cc index 2f437a5873..e2f4c2cbfd 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -43,6 +43,7 @@ schema_ptr make_tablets_schema() { .with_column("replicas", replica_set_type) .with_column("new_replicas", replica_set_type) .with_column("stage", utf8_type) + .with_column("session", uuid_type) .with_version(db::system_keyspace::generate_schema_version(id)) .build(); } @@ -82,6 +83,9 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke if (auto tr_info = tablets.get_tablet_transition_info(tid)) { m.set_clustered_cell(ck, "stage", tablet_transition_stage_to_string(tr_info->stage), ts); m.set_clustered_cell(ck, "new_replicas", make_list_value(replica_set_type, replicas_to_data_value(tr_info->next)), ts); + if (tr_info->session_id) { + m.set_clustered_cell(ck, "session", data_value(tr_info->session_id.uuid()), ts); + } } tid = *tablets.next_tablet(tid); co_await coroutine::maybe_yield(); @@ -107,6 +111,19 @@ tablet_mutation_builder::set_stage(dht::token last_token, locator::tablet_transi return *this; } +tablet_mutation_builder& +tablet_mutation_builder::set_session(dht::token last_token, service::session_id session_id) { + _m.set_clustered_cell(get_ck(last_token), "session", data_value(session_id.uuid()), _ts); + return *this; +} + +tablet_mutation_builder& +tablet_mutation_builder::del_session(dht::token last_token) { + auto session_col = _s->get_column_definition("session"); + _m.set_clustered_cell(get_ck(last_token), *session_col, atomic_cell::make_dead(_ts, gc_clock::now())); + return *this; +} + tablet_mutation_builder& tablet_mutation_builder::del_transition(dht::token last_token) { auto ck = get_ck(last_token); @@ -114,6 +131,8 @@ tablet_mutation_builder::del_transition(dht::token last_token) { _m.set_clustered_cell(ck, *stage_col, atomic_cell::make_dead(_ts, gc_clock::now())); auto new_replicas_col = _s->get_column_definition("new_replicas"); _m.set_clustered_cell(ck, *new_replicas_col, atomic_cell::make_dead(_ts, gc_clock::now())); + auto session_col = _s->get_column_definition("session"); + _m.set_clustered_cell(ck, *session_col, atomic_cell::make_dead(_ts, gc_clock::now())); return *this; } @@ -198,8 +217,12 @@ future read_tablet_metadata(cql3::query_processor& qp) { throw std::runtime_error(format("Too many pending replicas for table {} tablet {}: {}", table, current->tid, pending)); } + service::session_id session_id; + if (row.has("session")) { + session_id = service::session_id(row.get_as("session")); + } current->map.set_tablet_transition_info(current->tid, tablet_transition_info{stage, - std::move(new_tablet_replicas), *pending.begin()}); + std::move(new_tablet_replicas), *pending.begin(), session_id}); } current->map.set_tablet(current->tid, tablet_info{std::move(tablet_replicas)}); diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 69036975dd..4486263249 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -153,7 +153,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { tablet_replica {h1, 4}, tablet_replica {h2, 2}, }, - tablet_replica {h1, 4} + tablet_replica {h1, 4}, + session_id(utils::UUID_gen::get_time_UUID()) }); } From 080169cad6f85a6fed17e2bd85310bb94d1b4a0f Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 2 Nov 2023 01:12:45 +0100 Subject: [PATCH 08/18] storage_service, tablets: Use session to guard tablet streaming --- service/storage_service.cc | 54 +++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index f197b453bb..37dfb6b359 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1644,10 +1644,13 @@ class topology_coordinator { auto& tablet_state = _tablets[gid]; table_id table = s->id(); + auto get_mutation_builder = [&] () { + return replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table); + }; + auto transition_to = [&] (locator::tablet_transition_stage stage) { slogger.trace("raft topology: Will set tablet {} stage to {}", gid, stage); - updates.emplace_back( - replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) + updates.emplace_back(get_mutation_builder() .set_stage(last_token, stage) .build()); }; @@ -1667,7 +1670,15 @@ class topology_coordinator { switch (trinfo.stage) { case locator::tablet_transition_stage::allow_write_both_read_old: - transition_to_with_barrier(locator::tablet_transition_stage::write_both_read_old); + if (do_barrier()) { + slogger.trace("raft topology: Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old); + updates.emplace_back(get_mutation_builder() + .set_stage(last_token, locator::tablet_transition_stage::write_both_read_old) + // Create session a bit earlier to avoid adding barrier + // to the streaming stage to create sessions on replicas. + .set_session(last_token, session_id(utils::UUID_gen::get_time_UUID())) + .build()); + } break; case locator::tablet_transition_stage::write_both_read_old: transition_to_with_barrier(locator::tablet_transition_stage::streaming); @@ -1686,7 +1697,11 @@ class topology_coordinator { return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging, netw::msg_addr(id2ip(dst)), _as, raft::server_id(dst.uuid()), gid); })) { - transition_to(locator::tablet_transition_stage::write_both_read_new); + slogger.trace("raft topology: Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new); + updates.emplace_back(get_mutation_builder() + .set_stage(last_token, locator::tablet_transition_stage::write_both_read_new) + .del_session(last_token) + .build()); } break; case locator::tablet_transition_stage::write_both_read_new: @@ -1710,8 +1725,7 @@ class topology_coordinator { // See do_tablet_operation() doc. if (do_barrier()) { _tablets.erase(gid); - updates.emplace_back( - replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) + updates.emplace_back(get_mutation_builder() .del_transition(last_token) .set_replicas(last_token, trinfo.next) .build()); @@ -4171,6 +4185,20 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt std::vector> pending_table_erms; pending_table_erms.resize(smp::count); + std::unordered_set open_sessions; + + // Collect open sessions + { + for (auto&& [table_id, tmap]: tmptr->tablets().all_tables()) { + for (auto&& [tid, trinfo]: tmap.transitions()) { + if (trinfo.session_id) { + auto id = session_id(trinfo.session_id); + open_sessions.insert(id); + } + } + } + } + try { auto base_shard = this_shard_id(); pending_token_metadata_ptr[base_shard] = tmptr; @@ -4263,6 +4291,12 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt cf.update_effective_replication_map(std::move(it->second)); it = table_erms.erase(it); } + + auto& session_mgr = get_topology_session_manager(); + session_mgr.initiate_close_of_sessions_except(open_sessions); + for (auto id : open_sessions) { + session_mgr.create_session(id); + } }); } catch (...) { // applying the changes on all shards should never fail @@ -6219,6 +6253,8 @@ future storage_service::raft_topology_cmd_handler(raft } co_await ss._shared_token_metadata.stale_versions_in_use(); + co_await get_topology_session_manager().drain_closing_sessions(); + slogger.debug("raft_topology_cmd::barrier_and_drain done"); }); result.status = raft_topology_cmd_result::command_status::success; @@ -6518,6 +6554,10 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { if (trinfo->stage != locator::tablet_transition_stage::streaming) { throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet)); } + auto topo_guard = trinfo->session_id; + if (!trinfo->session_id) { + throw std::runtime_error(format("Tablet {} session is not set", tablet)); + } if (trinfo->pending_replica.host != tm->get_my_id()) { throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet)); } @@ -6537,7 +6577,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { std::vector tables = {table.schema()->cf_name()}; auto streamer = make_lw_shared(_db, _stream_manager, std::move(tm), guard.get_abort_source(), get_broadcast_address(), _snitch.local()->get_location(), - "Tablet migration", streaming::stream_reason::tablet_migration, null_topology_guard, std::move(tables)); + "Tablet migration", streaming::stream_reason::tablet_migration, topo_guard, std::move(tables)); streamer->add_source_filter(std::make_unique( _gossiper.get_unreachable_members())); From 31c995332ca23d9dc1e93b564f654bba821767e3 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 2 Nov 2023 00:37:21 +0100 Subject: [PATCH 09/18] storage_service, raft topology: Run streaming under session topology guard Prevents stale streaming operation from running beyond topology operation they were started in. After the session field is cleared, or changed to something else, the old topology_guard used by streaming is interrupted and fenced and the next barrier will join with any remaining work. --- db/system_keyspace.cc | 5 +++++ service/storage_service.cc | 27 +++++++++++++++++++++++---- service/topology_state_machine.hh | 4 ++++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index b039adff1f..9c5fb2f777 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -234,6 +234,7 @@ schema_ptr system_keyspace::topology() { .with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column) .with_column("global_topology_request", utf8_type, column_kind::static_column) .with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column) + .with_column("session", uuid_type, column_kind::static_column) .set_comment("Current state of topology change machine") .with_version(generate_schema_version(id)) .build(); @@ -2650,6 +2651,10 @@ future system_keyspace::load_topology_state() { if (some_row.has("enabled_features")) { ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features")); } + + if (some_row.has("session")) { + ret.session = service::session_id(some_row.get_as("session")); + } } co_return ret; diff --git a/service/storage_service.cc b/service/storage_service.cc index 37dfb6b359..867d7c769c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -659,6 +659,7 @@ public: topology_mutation_builder& set_transition_state(topology::transition_state); topology_mutation_builder& set_version(topology::version_t); topology_mutation_builder& set_fence_version(topology::version_t); + topology_mutation_builder& set_session(session_id); topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&); topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value); topology_mutation_builder& set_unpublished_cdc_generations(const std::vector& values); @@ -668,6 +669,7 @@ public: topology_mutation_builder& add_enabled_features(const std::set& value); topology_mutation_builder& add_unpublished_cdc_generation(const cdc::generation_id_v2& value); topology_mutation_builder& del_transition_state(); + topology_mutation_builder& del_session(); topology_mutation_builder& del_global_topology_request(); topology_node_mutation_builder& with_node(raft::server_id); canonical_mutation build() { return canonical_mutation{std::move(_m)}; } @@ -819,10 +821,19 @@ topology_mutation_builder& topology_mutation_builder::set_fence_version(topology return *this; } +topology_mutation_builder& topology_mutation_builder::set_session(session_id value) { + _m.set_static_cell("session", value.uuid(), _ts); + return *this; +} + topology_mutation_builder& topology_mutation_builder::del_transition_state() { return del("transition_state"); } +topology_mutation_builder& topology_mutation_builder::del_session() { + return del("session"); +} + topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id( const cdc::generation_id_v2& value) { apply_atomic("current_cdc_generation_timestamp", value.ts); @@ -1801,6 +1812,7 @@ class topology_coordinator { updates.emplace_back( topology_mutation_builder(guard.write_timestamp()) .set_transition_state(topology::transition_state::write_both_read_old) + .set_session(session_id(guard.new_group0_state_id())) .set_version(_topo_sm._topology.version + 1) .build()); } else { @@ -2009,6 +2021,7 @@ class topology_coordinator { builder.del_transition_state(); } else { builder.set_transition_state(topology::transition_state::write_both_read_old); + builder.set_session(session_id(guard.new_group0_state_id())); builder.set_version(_topo_sm._topology.version + 1); } auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id); @@ -2102,6 +2115,7 @@ class topology_coordinator { topology_mutation_builder builder(node.guard.write_timestamp()); builder .set_transition_state(topology::transition_state::write_both_read_new) + .del_session() .set_version(_topo_sm._topology.version + 1); auto str = ::format("{}: streaming completed for node {}", node.rs->state, node.id); co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str)); @@ -4189,6 +4203,11 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt // Collect open sessions { + auto session = _topology_state_machine._topology.session; + if (session) { + open_sessions.insert(session); + } + for (auto&& [table_id, tmap]: tmptr->tablets().all_tables()) { for (auto&& [tid, trinfo]: tmap.transitions()) { if (trinfo.session_id) { @@ -6291,7 +6310,7 @@ future storage_service::raft_topology_cmd_handler(raft } else { dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()); - co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, null_topology_guard); + co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, _topology_state_machine._topology.session); } })); } @@ -6318,7 +6337,7 @@ future storage_service::raft_topology_cmd_handler(raft auto replaced_id = std::get(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id; auto existing_ip = _group0->address_map().find(replaced_id); assert(existing_ip); - co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, null_topology_guard, *existing_ip); + co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, _topology_state_machine._topology.session, *existing_ip); } })); } @@ -6370,7 +6389,7 @@ future storage_service::raft_topology_cmd_handler(raft auto ops = seastar::make_shared(node_ops_id::create_random_id(), as, std::move(ignored_ips)); return _repair.local().removenode_with_repair(get_token_metadata_ptr(), *ip, ops); } else { - return removenode_with_stream(*ip, null_topology_guard, as); + return removenode_with_stream(*ip, _topology_state_machine._topology.session, as); } })); result.status = raft_topology_cmd_result::command_status::success; @@ -6386,7 +6405,7 @@ future storage_service::raft_topology_cmd_handler(raft } else { auto streamer = make_lw_shared(_db, _stream_manager, tmptr, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, - null_topology_guard); + _topology_state_machine._topology.session); streamer->add_source_filter(std::make_unique(_gossiper.get_unreachable_members())); if (source_dc != "") { streamer->add_source_filter(std::make_unique(source_dc)); diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 1e9220d38f..9bd29ef7d0 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -21,6 +21,7 @@ #include "dht/token.hh" #include "raft/raft.hh" #include "utils/UUID.hh" +#include "service/session.hh" #include "mutation/canonical_mutation.hh" namespace service { @@ -146,6 +147,9 @@ struct topology { // Set of features that are considered to be enabled by the cluster. std::set enabled_features; + // Session used to create topology_guard for operations like streaming. + session_id session; + // Find only nodes in non 'left' state const std::pair* find(raft::server_id id) const; // Return true if node exists in any state including 'left' one From 1f57d1ea2836b4831d9acb6ba21b3bebeb95b94b Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 22 Nov 2023 17:28:29 +0100 Subject: [PATCH 10/18] storage_service, api: Add API to migrate a tablet Will be used in tests, or for hot fixes in production. --- api/api-doc/storage_service.json | 72 +++++++++++++++++++++++++++++ api/storage_service.cc | 23 +++++++++ locator/tablets.hh | 5 ++ service/storage_service.cc | 77 +++++++++++++++++++++++++++++++ service/storage_service.hh | 4 ++ service/topology_state_machine.cc | 4 ++ service/topology_state_machine.hh | 3 ++ test/pylib/rest_client.py | 11 +++++ 8 files changed, 199 insertions(+) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 3faad75458..b80b8911f3 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -2465,6 +2465,78 @@ } ] }, + { + "path":"/storage_service/tablets/move", + "operations":[ + { + "nickname":"move_tablet", + "method":"POST", + "summary":"Moves a tablet replica", + "type":"void", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"ks", + "description":"Keyspace name", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"table", + "description":"Table name", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"token", + "description":"Token owned by the tablet to move", + "required":true, + "allowMultiple":false, + "type":"integer", + "paramType":"query" + }, + { + "name":"src_host", + "description":"Source host id", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"dst_host", + "description":"Destination host id", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"src_shard", + "description":"Source shard number", + "required":true, + "allowMultiple":false, + "type":"integer", + "paramType":"query" + }, + { + "name":"dst_shard", + "description":"Destination shard number", + "required":true, + "allowMultiple":false, + "type":"integer", + "paramType":"query" + } + ] + } + ] + }, { "path":"/storage_service/metrics/total_hints", "operations":[ diff --git a/api/storage_service.cc b/api/storage_service.cc index 9dd955eb72..60c6afe000 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -74,6 +74,11 @@ locator::host_id validate_host_id(const sstring& param) { return hoep.id; } +static +int64_t validate_int(const sstring& param) { + return std::atoll(param.c_str()); +} + // splits a request parameter assumed to hold a comma-separated list of table names // verify that the tables are found, otherwise a bad_param_exception exception is thrown // containing the description of the respective no_such_column_family error. @@ -1359,6 +1364,23 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) -> future { + auto src_host_id = validate_host_id(req->get_query_param("src_host")); + shard_id src_shard_id = validate_int(req->get_query_param("src_shard")); + auto dst_host_id = validate_host_id(req->get_query_param("dst_host")); + shard_id dst_shard_id = validate_int(req->get_query_param("dst_shard")); + auto token = dht::token::from_int64(validate_int(req->get_query_param("token"))); + auto ks = req->get_query_param("ks"); + auto table = req->get_query_param("table"); + auto table_id = ctx.db.local().find_column_family(ks, table).schema()->id(); + + co_await ss.local().move_tablet(table_id, token, + locator::tablet_replica{src_host_id, src_shard_id}, + locator::tablet_replica{dst_host_id, dst_shard_id}); + + co_return json_void(); + }); + sp::get_schema_versions.set(r, [&ss](std::unique_ptr req) { return ss.local().describe_schema_versions().then([] (auto result) { std::vector res; @@ -1456,6 +1478,7 @@ void unset_storage_service(http_context& ctx, routes& r) { ss::get_effective_ownership.unset(r); ss::sstable_info.unset(r); ss::reload_raft_topology_state.unset(r); + ss::move_tablet.unset(r); sp::get_schema_versions.unset(r); } diff --git a/locator/tablets.hh b/locator/tablets.hh index 13ba07fd29..8a3dce75da 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -125,6 +125,11 @@ bool contains(const tablet_replica_set& rs, host_id host) { return false; } +inline +bool contains(const tablet_replica_set& rs, const tablet_replica& r) { + return std::ranges::any_of(rs, [&] (auto&& r_) { return r_ == r; }); +} + /// Stores information about a single tablet. struct tablet_info { tablet_replica_set replicas; diff --git a/service/storage_service.cc b/service/storage_service.cc index 867d7c769c..af72cf103e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6644,6 +6644,83 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) { }); } +future<> storage_service::move_tablet(table_id table, dht::token token, locator::tablet_replica src, locator::tablet_replica dst) { + auto holder = _async_gate.hold(); + + if (this_shard_id() != 0) { + // group0 is only set on shard 0. + co_return co_await container().invoke_on(0, [&] (auto& ss) { + return ss.move_tablet(table, token, src, dst); + }); + } + + while (true) { + auto guard = co_await _group0->client().start_operation(&_abort_source); + + while (_topology_state_machine._topology.is_busy()) { + slogger.debug("move_tablet(): topology state machine is busy"); + release_guard(std::move(guard)); + co_await _topology_state_machine.event.wait(); + guard = co_await _group0->client().start_operation(&_abort_source); + } + + std::vector updates; + auto ks_name = _db.local().find_schema(table)->ks_name(); + auto& tmap = get_token_metadata().tablets().get_tablet_map(table); + auto tid = tmap.get_tablet_id(token); + auto& tinfo = tmap.get_tablet_info(tid); + auto last_token = tmap.get_last_token(tid); + auto gid = locator::global_tablet_id{table, tid}; + + // FIXME: Validate replication strategy constraints. + + if (!locator::contains(tinfo.replicas, src)) { + throw std::runtime_error(format("Tablet {} has no replica on {}", gid, src)); + } + auto* node = get_token_metadata().get_topology().find_node(dst.host); + if (!node) { + throw std::runtime_error(format("Unknown host: {}", dst.host)); + } + if (dst.shard >= node->get_shard_count()) { + throw std::runtime_error(format("Host {} does not have shard {}", dst.shard)); + } + if (src.host == dst.host) { + throw std::runtime_error("Migrating within the same node is not supported"); + } + + if (src == dst) { + co_return; + } + + updates.push_back(canonical_mutation(replica::tablet_mutation_builder(guard.write_timestamp(), ks_name, table) + .set_new_replicas(last_token, locator::replace_replica(tinfo.replicas, src, dst)) + .set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old) + .build())); + updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp()) + .set_transition_state(topology::transition_state::tablet_migration) + .set_version(_topology_state_machine._topology.version + 1) + .build())); + + sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst); + slogger.info("raft topology: {}", reason); + slogger.trace("raft topology: do update {} reason {}", updates, reason); + topology_change change{std::move(updates)}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard)); + break; + } catch (group0_concurrent_modification&) { + slogger.debug("move_tablet(): concurrent modification, retrying"); + } + } + + // Wait for migration to finish. + co_await _topology_state_machine.event.wait([&] { + auto& tmap = get_token_metadata().tablets().get_tablet_map(table); + return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token)); + }); +} + future storage_service::join_node_request_handler(join_node_request_params params) { join_node_request_result result; slogger.info("raft topology: received request to join from host_id: {}", params.host_id); diff --git a/service/storage_service.hh b/service/storage_service.hh index 75adbb818e..be7051d34f 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -772,6 +772,10 @@ public: // Precondition: the topology mutations were already written to disk; the function only transitions the in-memory state machine. // Public for `reload_raft_topology_state` REST API. future<> topology_transition(); + +public: + future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst); + private: // load topology state machine snapshot into memory // raft_group0_client::_read_apply_mutex must be held diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 4764ca1b75..06f598f592 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -39,6 +39,10 @@ bool topology::contains(raft::server_id id) { left_nodes.contains(id); } +bool topology::is_busy() const { + return tstate.has_value(); +} + std::set calculate_not_yet_enabled_features(const std::set& enabled_features, const auto& supported_features) { std::set to_enable; bool first = true; diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 9bd29ef7d0..f85405c2ab 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -159,6 +159,9 @@ struct topology { // Are there any non-left nodes? bool is_empty() const; + // Returns false iff we can safely start a new topology change. + bool is_busy() const; + // Calculates a set of features that are supported by all normal nodes but not yet enabled. std::set calculate_not_yet_enabled_features() const; }; diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 79e4e4ed23..d2d982dd57 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -209,6 +209,17 @@ class ScyllaRESTAPIClient(): await self.client.post(f"/v2/error_injection/injection/{injection}", host=node_ip, params={"one_shot": str(one_shot)}, json={ key: str(value) for key, value in parameters.items() }) + async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int) -> None: + await self.client.post(f"/storage_service/tablets/move", host=node_ip, params={ + "ks": ks, + "table": table, + "src_host": str(src_host), + "src_shard": str(src_shard), + "dst_host": str(dst_host), + "dst_shard": str(dst_shard), + "token": str(token) + }) + async def disable_injection(self, node_ip: str, injection: str) -> None: await self.client.delete(f"/v2/error_injection/injection/{injection}", host=node_ip) From d1c1b592362ce6d5f0e94828b390b042e65cf758 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 22 Nov 2023 18:22:39 +0100 Subject: [PATCH 11/18] storage_service, api: Add API to disable tablet balancing Load balancing needs to be disabled before making a series of manual migrations so that we don't fight with the load balancer. Also will be used in tests to ensure tablets stick to expected locations. --- api/api-doc/storage_service.json | 24 +++++++++ api/storage_service.cc | 17 ++++++ db/system_keyspace.cc | 7 +++ locator/tablets.hh | 5 ++ service/storage_service.cc | 46 ++++++++++++++++ service/storage_service.hh | 1 + service/tablet_allocator.cc | 2 +- service/topology_state_machine.hh | 3 ++ test/boost/tablets_test.cc | 90 +++++++++++++++++++++++++++++++ test/pylib/rest_client.py | 6 +++ 10 files changed, 200 insertions(+), 1 deletion(-) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index b80b8911f3..46a3589791 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -2537,6 +2537,30 @@ } ] }, + { + "path":"/storage_service/tablets/balancing", + "operations":[ + { + "nickname":"tablet_balancing_enable", + "method":"POST", + "summary":"Moves a tablet replica", + "type":"void", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"enabled", + "description":"When set to false, tablet load balancing is disabled", + "required":true, + "allowMultiple":false, + "type":"boolean", + "paramType":"query" + } + ] + } + ] + }, { "path":"/storage_service/metrics/total_hints", "operations":[ diff --git a/api/storage_service.cc b/api/storage_service.cc index 60c6afe000..0ef32b2b9d 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -74,6 +74,16 @@ locator::host_id validate_host_id(const sstring& param) { return hoep.id; } +bool validate_bool(const sstring& param) { + if (param == "true") { + return true; + } else if (param == "false") { + return false; + } else { + throw std::runtime_error("Parameter must be either 'true' or 'false'"); + } +} + static int64_t validate_int(const sstring& param) { return std::atoll(param.c_str()); @@ -1381,6 +1391,12 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) -> future { + auto enabled = validate_bool(req->get_query_param("enabled")); + co_await ss.local().set_tablet_balancing_enabled(enabled); + co_return json_void(); + }); + sp::get_schema_versions.set(r, [&ss](std::unique_ptr req) { return ss.local().describe_schema_versions().then([] (auto result) { std::vector res; @@ -1479,6 +1495,7 @@ void unset_storage_service(http_context& ctx, routes& r) { ss::sstable_info.unset(r); ss::reload_raft_topology_state.unset(r); ss::move_tablet.unset(r); + ss::tablet_balancing_enable.unset(r); sp::get_schema_versions.unset(r); } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 9c5fb2f777..d5eba6f689 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -235,6 +235,7 @@ schema_ptr system_keyspace::topology() { .with_column("global_topology_request", utf8_type, column_kind::static_column) .with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column) .with_column("session", uuid_type, column_kind::static_column) + .with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column) .set_comment("Current state of topology change machine") .with_version(generate_schema_version(id)) .build(); @@ -2655,6 +2656,12 @@ future system_keyspace::load_topology_state() { if (some_row.has("session")) { ret.session = service::session_id(some_row.get_as("session")); } + + if (some_row.has("tablet_balancing_enabled")) { + ret.tablet_balancing_enabled = some_row.get_as("tablet_balancing_enabled"); + } else { + ret.tablet_balancing_enabled = true; + } } co_return ret; diff --git a/locator/tablets.hh b/locator/tablets.hh index 8a3dce75da..48fc6b73d6 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -343,12 +343,17 @@ public: using table_to_tablet_map = std::unordered_map; private: table_to_tablet_map _tablets; + + // When false, tablet load balancer will not try to rebalance tablets. + bool _balancing_enabled = true; public: + bool balancing_enabled() const { return _balancing_enabled; } const tablet_map& get_tablet_map(table_id id) const; const table_to_tablet_map& all_tables() const { return _tablets; } table_to_tablet_map& all_tables() { return _tablets; } size_t external_memory_usage() const; public: + void set_balancing_enabled(bool value) { _balancing_enabled = value; } void set_tablet_map(table_id, tablet_map); tablet_map& get_tablet_map(table_id id); future<> clear_gently(); diff --git a/service/storage_service.cc b/service/storage_service.cc index af72cf103e..a44db2af94 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -534,6 +534,7 @@ future<> storage_service::topology_state_load() { if (_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) { tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp)); + tmptr->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled); } })); @@ -660,6 +661,7 @@ public: topology_mutation_builder& set_version(topology::version_t); topology_mutation_builder& set_fence_version(topology::version_t); topology_mutation_builder& set_session(session_id); + topology_mutation_builder& set_tablet_balancing_enabled(bool); topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&); topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value); topology_mutation_builder& set_unpublished_cdc_generations(const std::vector& values); @@ -826,6 +828,11 @@ topology_mutation_builder& topology_mutation_builder::set_session(session_id val return *this; } +topology_mutation_builder& topology_mutation_builder::set_tablet_balancing_enabled(bool value) { + _m.set_static_cell("tablet_balancing_enabled", value, _ts); + return *this; +} + topology_mutation_builder& topology_mutation_builder::del_transition_state() { return del("transition_state"); } @@ -6148,6 +6155,7 @@ future<> storage_service::load_tablet_metadata() { } return mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) -> future<> { tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp)); + tmptr->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled); }, acquire_merge_lock::no); } @@ -6721,6 +6729,44 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: }); } +future<> storage_service::set_tablet_balancing_enabled(bool enabled) { + auto holder = _async_gate.hold(); + + if (this_shard_id() != 0) { + // group0 is only set on shard 0. + co_return co_await container().invoke_on(0, [&] (auto& ss) { + return ss.set_tablet_balancing_enabled(enabled); + }); + } + + while (true) { + group0_guard guard = co_await _group0->client().start_operation(&_abort_source); + + while (_topology_state_machine._topology.is_busy()) { + slogger.debug("set_tablet_balancing_enabled(): topology is busy"); + release_guard(std::move(guard)); + co_await _topology_state_machine.event.wait(); + guard = co_await _group0->client().start_operation(&_abort_source); + } + + std::vector updates; + updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp()) + .set_tablet_balancing_enabled(enabled) + .build())); + + sstring reason = format("Setting tablet balancing to {}", enabled); + slogger.info("raft topology: {}", reason); + topology_change change{std::move(updates)}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard)); + break; + } catch (group0_concurrent_modification&) { + slogger.debug("set_tablet_balancing_enabled(): concurrent modification"); + } + } +} + future storage_service::join_node_request_handler(join_node_request_params params) { join_node_request_result result; slogger.info("raft topology: received request to join from host_id: {}", params.host_id); diff --git a/service/storage_service.hh b/service/storage_service.hh index be7051d34f..c0709a03d1 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -775,6 +775,7 @@ public: public: future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst); + future<> set_tablet_balancing_enabled(bool); private: // load topology state machine snapshot into memory diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index b63c5b8113..b29e825ede 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -417,7 +417,7 @@ public: } if (nodes_to_drain.empty()) { - if (!shuffle && max_load == min_load) { + if (!shuffle && (max_load == min_load || !_tm->tablets().balancing_enabled())) { // load is balanced. // TODO: Evaluate and fix intra-node balance. _stats.for_dc(dc).stop_balance++; diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index f85405c2ab..6eff870d4f 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -150,6 +150,9 @@ struct topology { // Session used to create topology_guard for operations like streaming. session_id session; + // When false, tablet load balancer will not try to rebalance tablets. + bool tablet_balancing_enabled = true; + // Find only nodes in non 'left' state const std::pair* find(raft::server_id id) const; // Return true if node exists in any state including 'left' one diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 4486263249..56ceebe660 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -1289,6 +1289,96 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { }).get(); } +SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { + do_with_cql_env_thread([] (auto& e) { + inet_address ip1("192.168.0.1"); + inet_address ip2("192.168.0.2"); + + auto host1 = host_id(next_uuid()); + auto host2 = host_id(next_uuid()); + + auto table1 = table_id(next_uuid()); + + unsigned shard_count = 1; + + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = ip1, + .local_dc_rack = locator::endpoint_dc_rack::default_location + } + }); + + // host1 is loaded and host2 is empty, resulting in an imbalance. + stm.mutate_token_metadata([&] (auto& tm) { + tm.update_host_id(host1, ip1); + tm.update_host_id(host2, ip2); + tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + + tablet_map tmap(16); + for (auto tid : tmap.tablet_ids()) { + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, 0}, + } + }); + } + tablet_metadata tmeta; + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future<>(); + }).get(); + + { + auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0(); + BOOST_REQUIRE(!plan.empty()); + } + + // Disable load balancing + stm.mutate_token_metadata([&] (token_metadata& tm) { + tm.tablets().set_balancing_enabled(false); + return make_ready_future<>(); + }).get(); + + { + auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0(); + BOOST_REQUIRE(plan.empty()); + } + + // Check that cloning preserves the setting + stm.mutate_token_metadata([&] (token_metadata& tm) { + return make_ready_future<>(); + }).get(); + + { + auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0(); + BOOST_REQUIRE(plan.empty()); + } + + // Enable load balancing back + stm.mutate_token_metadata([&] (token_metadata& tm) { + tm.tablets().set_balancing_enabled(true); + return make_ready_future<>(); + }).get(); + + { + auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0(); + BOOST_REQUIRE(!plan.empty()); + } + + // Check that cloning preserves the setting + stm.mutate_token_metadata([&] (token_metadata& tm) { + return make_ready_future<>(); + }).get(); + + { + auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get0(); + BOOST_REQUIRE(!plan.empty()); + } + }).get(); +} + SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { do_with_cql_env_thread([] (auto& e) { const int n_hosts = 6; diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index d2d982dd57..df3547cc69 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -220,6 +220,12 @@ class ScyllaRESTAPIClient(): "token": str(token) }) + async def enable_tablet_balancing(self, node_ip: str) -> None: + await self.client.post(f"/storage_service/tablets/balancing", host=node_ip, params={"enabled": "true"}) + + async def disable_tablet_balancing(self, node_ip: str) -> None: + await self.client.post(f"/storage_service/tablets/balancing", host=node_ip, params={"enabled": "false"}) + async def disable_injection(self, node_ip: str, injection: str) -> None: await self.client.delete(f"/v2/error_injection/injection/{injection}", host=node_ip) From 7a59acf2485d21b87f67a7ed7a72d0ea80bc5582 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 23 Nov 2023 00:29:28 +0100 Subject: [PATCH 12/18] tablets: Fail gracefully when migrating tablet has no pending replica Before the patch we SIGSEGV trying to access pending replica in this case. Fail early instead. --- replica/tablets.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/replica/tablets.cc b/replica/tablets.cc index e2f4c2cbfd..33ba821b7a 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -213,6 +213,10 @@ future read_tablet_metadata(cql3::query_processor& qp) { for (auto&& r : tablet_replicas) { pending.erase(r); } + if (pending.size() == 0) { + throw std::runtime_error(format("Stage set but no pending replica for table {} tablet {}", + table, current->tid)); + } if (pending.size() > 1) { throw std::runtime_error(format("Too many pending replicas for table {} tablet {}: {}", table, current->tid, pending)); From c228f2c940d8e26378cc584bcaced81c650f0a84 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 3 Aug 2023 02:00:05 +0200 Subject: [PATCH 13/18] range_streamer, tablets: Do not keep token metadata around streaming It holds back global token metadata barrier during streaming, which limits parallelism of load balancing. --- dht/range_streamer.cc | 1 + dht/range_streamer.hh | 3 ++- service/storage_service.cc | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index bd44500d5b..c9055a78ce 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -245,6 +245,7 @@ future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::vnode future<> range_streamer::stream_async() { _nr_ranges_remaining = nr_ranges_to_stream(); _nr_total_ranges = _nr_ranges_remaining; + _token_metadata_ptr = nullptr; logger.info("{} starts, nr_ranges_remaining={}", _description, _nr_ranges_remaining); auto start = lowres_clock::now(); return do_for_each(_to_stream, [this, description = _description] (auto& stream) { diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index bc57bef422..31a14d813d 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -144,6 +144,7 @@ private: } #endif + // Can be called only before stream_async(). const token_metadata& get_token_metadata() { return *_token_metadata_ptr; } @@ -153,7 +154,7 @@ public: private: distributed& _db; sharded& _stream_manager; - const token_metadata_ptr _token_metadata_ptr; + token_metadata_ptr _token_metadata_ptr; abort_source& _abort_source; std::unordered_set _tokens; inet_address _address; diff --git a/service/storage_service.cc b/service/storage_service.cc index a44db2af94..00d7b7fde4 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6605,6 +6605,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { auto streamer = make_lw_shared(_db, _stream_manager, std::move(tm), guard.get_abort_source(), get_broadcast_address(), _snitch.local()->get_location(), "Tablet migration", streaming::stream_reason::tablet_migration, topo_guard, std::move(tables)); + tm = nullptr; streamer->add_source_filter(std::make_unique( _gossiper.get_unreachable_members())); From 9dac0febcef5029fad69d380b7a6276a1ff27ca8 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 4 Aug 2023 19:42:15 +0200 Subject: [PATCH 14/18] range_streamer: Do not block topology change barriers around streaming Streaming was keeping effective_replication_map_ptr around the whole process, which blocks topology change barriers. This will inhibit progress of tablet load balancer or concurrent migrations, resulting in worse performance. Fix by switching to the most recent erm on sharder calls. multishard_writer calls shard_of() for each new partition. A better way would be to switch immediately when topology version changes, but this is left for later. --- dht/auto_refreshing_sharder.hh | 59 ++++++++++++++++++++++++++++++++++ streaming/stream_session.cc | 8 +++-- 2 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 dht/auto_refreshing_sharder.hh diff --git a/dht/auto_refreshing_sharder.hh b/dht/auto_refreshing_sharder.hh new file mode 100644 index 0000000000..42ae6f4766 --- /dev/null +++ b/dht/auto_refreshing_sharder.hh @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "dht/sharder.hh" +#include "replica/database.hh" + +namespace dht { + +/// A sharder which uses table's most up to date effective_replication_map in each invocation. +/// As a result, the time during which a single token metadata version is kept is reduced. +/// The purpose of this class is having a sharder which can be passed to long-running processes +/// which do not need a single view on replication for the whole duration of the process, +/// and do not need to block topology change barriers. +/// +/// No guarantees are made about how long token metadata version is kept alive. +/// It may be released before the next call. +class auto_refreshing_sharder : public dht::sharder { + lw_shared_ptr _table; + locator::effective_replication_map_ptr _erm; + const dht::sharder* _sharder; + optimized_optional _callback; +private: + void refresh() { + _erm = _table->get_effective_replication_map(); + _sharder = &_erm->get_sharder(*_table->schema()); + _callback = _erm->get_validity_abort_source().subscribe([this] () noexcept { + refresh(); + }); + } +public: + auto_refreshing_sharder(lw_shared_ptr table) + : _table(std::move(table)) + { + refresh(); + } + + virtual ~auto_refreshing_sharder() = default; + + virtual unsigned shard_of(const dht::token& token) const override { + return _sharder->shard_of(token); + } + + virtual std::optional next_shard(const dht::token& t) const override { + return _sharder->next_shard(t); + } + + virtual dht::token token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans = 1) const override { + return _sharder->token_for_next_shard(t, shard, spans); + } +}; + +} // namespace dht diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 6c288cb4b8..706cf7bebb 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -16,6 +16,7 @@ #include "streaming/stream_result_future.hh" #include "streaming/stream_manager.hh" #include "dht/i_partitioner.hh" +#include "dht/auto_refreshing_sharder.hh" #include "utils/fb_utilities.hh" #include "streaming/stream_plan.hh" #include @@ -176,14 +177,15 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { // Make sure the table with cf_id is still present at this point. // Close the sink in case the table is dropped. auto& table = _db.local().find_column_family(cf_id); - auto erm = table.get_effective_replication_map(); auto op = table.stream_in_progress(); + auto sharder_ptr = std::make_unique(table.shared_from_this()); + auto& sharder = *sharder_ptr; //FIXME: discarded future. - (void)mutation_writer::distribute_reader_and_consume_on_shards(s, erm->get_sharder(*s), + (void)mutation_writer::distribute_reader_and_consume_on_shards(s, sharder, make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard), std::move(op) - ).then_wrapped([s, plan_id, from, sink, estimated_partitions, erm] (future f) mutable { + ).then_wrapped([s, plan_id, from, sink, estimated_partitions, sh_ptr = std::move(sharder_ptr)] (future f) mutable { int32_t status = 0; uint64_t received_partitions = 0; if (f.failed()) { From 733eb216014d043d409c19c1033f88ac859172fe Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 24 Nov 2023 14:49:47 +0100 Subject: [PATCH 15/18] api: Add API to kill connection to a particular host For testing failure scenarios. --- api/api-doc/error_injection.json | 24 ++++++++++++++++++++++++ api/messaging_service.cc | 12 ++++++++++++ test/pylib/rest_client.py | 3 +++ 3 files changed, 39 insertions(+) diff --git a/api/api-doc/error_injection.json b/api/api-doc/error_injection.json index 261acb4500..b01b1b828a 100644 --- a/api/api-doc/error_injection.json +++ b/api/api-doc/error_injection.json @@ -90,6 +90,30 @@ } ] }, + { + "path":"/v2/error_injection/disconnect/{ip}", + "operations":[ + { + "method":"POST", + "summary":"Drop connection to a given IP", + "type":"void", + "nickname":"inject_disconnect", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"ip", + "description":"IP address to disconnect from", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + } + ] + } + ] + }, { "path":"/v2/error_injection/injection", "operations":[ diff --git a/api/messaging_service.cc b/api/messaging_service.cc index 9d23107122..7ed0ca594f 100644 --- a/api/messaging_service.cc +++ b/api/messaging_service.cc @@ -10,6 +10,7 @@ #include "message/messaging_service.hh" #include #include "api/api-doc/messaging_service.json.hh" +#include "api/api-doc/error_injection.json.hh" #include #include @@ -19,6 +20,8 @@ using namespace netw; namespace api { +namespace hf = httpd::error_injection_json; + using shard_info = messaging_service::shard_info; using msg_addr = messaging_service::msg_addr; @@ -142,6 +145,14 @@ void set_messaging_service(http_context& ctx, routes& r, sharded(res); }); }); + + hf::inject_disconnect.set(r, [&ms] (std::unique_ptr req) -> future { + auto ip = msg_addr(req->param["ip"]); + co_await ms.invoke_on_all([ip] (netw::messaging_service& ms) { + ms.remove_rpc_client(ip); + }); + co_return json::json_void(); + }); } void unset_messaging_service(http_context& ctx, routes& r) { @@ -155,6 +166,7 @@ void unset_messaging_service(http_context& ctx, routes& r) { get_respond_completed_messages.unset(r); get_version.unset(r); get_dropped_messages_by_ver.unset(r); + hf::inject_disconnect.unset(r); } } diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index df3547cc69..b237eda12d 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -238,6 +238,9 @@ class ScyllaRESTAPIClient(): async def message_injection(self, node_ip: str, injection: str) -> None: await self.client.post(f"/v2/error_injection/injection/{injection}/message", host=node_ip) + async def inject_disconnect(self, node_ip: str, ip_to_disconnect_from: str) -> None: + await self.client.post(f"/v2/error_injection/disconnect/{ip_to_disconnect_from}", host=node_ip) + async def get_logger_level(self, node_ip: str, logger: str) -> str: """Get logger level""" return await self.client.get_text(f"/system/logger/{logger}", host=node_ip) From ce0dc9e940ab84c15b524109de9ac6ac6e4aeb95 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 25 Nov 2023 22:48:56 +0100 Subject: [PATCH 16/18] error_injection: Make is_enabled() public --- utils/error_injection.hh | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/utils/error_injection.hh b/utils/error_injection.hh index eb8bd194e1..eeb99e1696 100644 --- a/utils/error_injection.hh +++ b/utils/error_injection.hh @@ -223,11 +223,6 @@ private: // TODO: change to unordered_set once we have heterogeneous lookups std::map _enabled; - bool is_enabled(const std::string_view& injection_name) const { - auto data = get_data(injection_name); - return data && !data->is_ongoing_oneshot(); - } - bool is_one_shot(const std::string_view& injection_name) const { const auto it = _enabled.find(injection_name); if (it == _enabled.end()) { @@ -253,6 +248,13 @@ private: } public: + // \brief Returns true iff the injection is enabled. + // \param name error injection name to check + bool is_enabled(const std::string_view& injection_name) const { + auto data = get_data(injection_name); + return data && !data->is_ongoing_oneshot(); + } + // \brief Enter into error injection if it's enabled // \param name error injection name to check bool enter(const std::string_view& name) { @@ -461,6 +463,10 @@ class error_injection { static thread_local error_injection _local; using handler_fun = std::function; public: + bool is_enabled(const std::string_view& name) const { + return false; + } + bool enter(const std::string_view& name) const { return false; } From 083a0279a940886313c5cbfdf269d638c1240a0b Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sun, 26 Nov 2023 16:23:02 +0100 Subject: [PATCH 17/18] error_injection: Introduce poll_for_message() To allow more complex waiting, which involves other exit conditions. --- utils/error_injection.hh | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/utils/error_injection.hh b/utils/error_injection.hh index eeb99e1696..680d144a50 100644 --- a/utils/error_injection.hh +++ b/utils/error_injection.hh @@ -157,6 +157,20 @@ public: ++_read_messages_counter; } + // \brief Checks if there is an unreceived message. + // If yes, returns true and marks the message as received. + bool poll_for_message() { + if (!_shared_data) { + on_internal_error(errinj_logger, "injection_shared_data is not initialized"); + } + + if (_read_messages_counter < _shared_data->received_message_count) { + ++_read_messages_counter; + return true; + } + return false; + } + std::optional get(std::string_view key) { if (!_shared_data) { on_internal_error(errinj_logger, "injection_shared_data is not initialized"); From 7d0f4c10a26430dbc32b9749a469086543c6b488 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 23 Nov 2023 13:45:34 +0100 Subject: [PATCH 18/18] test: tablets: Add test for failed streaming being fenced away --- streaming/stream_session.cc | 34 +++++-- test/pylib/manager_client.py | 4 + .../test_tablets.py | 95 ++++++++++++++++++- 3 files changed, 126 insertions(+), 7 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 706cf7bebb..534da45237 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -38,6 +38,7 @@ #include "consumer.hh" #include "readers/generating_v2.hh" #include "service/topology_guard.hh" +#include "utils/error_injection.hh" namespace streaming { @@ -127,8 +128,8 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { return make_exception_future>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later", utils::fb_utilities::get_broadcast_address()))); } - return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard] (schema_ptr s) mutable { - return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s] (reader_permit permit) mutable { + return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as] (schema_ptr s) mutable { + return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s, &as] (reader_permit permit) mutable { struct stream_mutation_fragments_cmd_status { bool got_cmd = false; bool got_end_of_stream = false; @@ -136,9 +137,18 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { auto cmd_status = make_lw_shared(); auto offstrategy_update = make_lw_shared(_db, cf_id, plan_id); auto guard = service::topology_guard(s->table(), topo_guard); - auto get_next_mutation_fragment = [guard = std::move(guard), &sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable { + + // Will log a message when streaming is done. Used to synchronize tests. + lw_shared_ptr log_done; + if (utils::get_local_injector().is_enabled("stream_mutation_fragments")) { + log_done = make_lw_shared(seastar::make_shared(seastar::defer([] { + sslog.info("stream_mutation_fragments: done"); + }))); + } + + auto get_next_mutation_fragment = [guard = std::move(guard), &as, &sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable { guard.check(); - return source().then([&sm, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional>> opt) mutable { + return source().then([&sm, &guard, &as, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional>> opt) mutable { if (opt) { auto cmd = std::get<1>(*opt); if (cmd) { @@ -160,7 +170,19 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { auto mf = fmf.unfreeze(*s, permit); sm.local().update_progress(plan_id, from.addr, progress_info::direction::IN, sz); offstrategy_update->update(); - return make_ready_future(std::move(mf)); + + return utils::get_local_injector().inject_with_handler("stream_mutation_fragments", [&guard, &as] (auto& handler) -> future<> { + auto& guard_ = guard; + auto& as_ = as; + sslog.info("stream_mutation_fragments: waiting"); + while (!handler.poll_for_message()) { + guard_.check(); + co_await sleep_abortable(std::chrono::milliseconds(5), as_); + } + sslog.info("stream_mutation_fragments: released"); + }).then([mf = std::move(mf)] () mutable { + return mutation_fragment_opt(std::move(mf)); + }); } else { // If the sender has sent stream_mutation_fragments_cmd it means it is // a node that understands the new protocol. It must send end_of_stream @@ -185,7 +207,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard), std::move(op) - ).then_wrapped([s, plan_id, from, sink, estimated_partitions, sh_ptr = std::move(sharder_ptr)] (future f) mutable { + ).then_wrapped([s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future f) mutable { int32_t status = 0; uint64_t received_partitions = 0; if (f.failed()) { diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index f4d068e735..cb45841de7 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -337,6 +337,10 @@ class ManagerClient(): raise Exception(f"Failed to get local host id address for server {server_id}") from exc return HostID(host_id) + async def get_table_id(self, keyspace: str, table: str): + rows = await self.cql.run_async(f"select id from system_schema.tables where keyspace_name = '{keyspace}' and table_name = '{table}'") + return rows[0].id + async def server_sees_others(self, server_id: ServerNum, count: int, interval: float = 45.): """Wait till a server sees a minimum given count of other servers""" if count < 1: diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 0f8b98802d..d4b9297f5c 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -3,11 +3,15 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later # +from uuid import UUID + +from cassandra.query import SimpleStatement, ConsistencyLevel from test.pylib.manager_client import ManagerClient -from test.pylib.rest_client import inject_error_one_shot +from test.pylib.rest_client import inject_error_one_shot, HTTPError from test.pylib.rest_client import inject_error from test.pylib.util import wait_for_cql_and_get_hosts +from test.topology.conftest import skip_mode from test.topology.util import reconnect_driver import pytest @@ -29,6 +33,21 @@ async def inject_error_on(manager, error_name, servers): await asyncio.gather(*errs) +async def get_tablet_replicas(manager, keyspace_name, table_name, token): + table_id = await manager.get_table_id(keyspace_name, table_name) + rows = await manager.cql.run_async(f"SELECT last_token, replicas FROM system.tablets where " + f"keyspace_name = '{keyspace_name}' and " + f"table_id = {table_id}") + for row in rows: + if row.last_token >= token: + return row.replicas + + +async def get_tablet_replica(manager, keyspace_name, table_name, token): + replicas = await get_tablet_replicas(manager, keyspace_name, table_name, token) + return replicas[0] + + @pytest.mark.asyncio async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient): """Test that you can create a table and insert and query data""" @@ -199,3 +218,77 @@ async def test_topology_changes(manager: ManagerClient): await check() await cql.run_async("DROP KEYSPACE test;") + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=trace', + ] + servers = [await manager.server_add(cmdline=cmdline)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', " + "'replication_factor': 1, 'initial_tablets': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + + servers.append(await manager.server_add(cmdline=cmdline)) + + key = 7 # Whatever + tablet_token = 0 # Doesn't matter since there is one tablet + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 0)") + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 1 + + replica = await get_tablet_replica(manager, 'test', 'test', tablet_token) + + s0_host_id = await manager.get_host_id(servers[0].server_id) + s1_host_id = await manager.get_host_id(servers[1].server_id) + dst_shard = 0 + + await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True) + s1_log = await manager.server_open_log(servers[1].server_id) + s1_mark = await s1_log.mark() + + migration_task = asyncio.create_task( + manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token)) + + # Wait for the replica-side writer of streaming to reach a place where it already + # received writes from the leaving replica but haven't applied them yet. + # Once the writer reaches this place, it will wait for the message_injection() call below before proceeding. + # The place we block the writer in should not hold to erm or topology_guard because that will block the migration + # below and prevent test from proceeding. + await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark) + s1_mark = await s1_log.mark() + + # Should cause streaming to fail and be retried while leaving behind the replica-side writer. + await manager.api.inject_disconnect(servers[0].ip_addr, servers[1].ip_addr) + + logger.info("Waiting for migration to finish") + await migration_task + logger.info("Migration done") + + # Sanity test + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 1 + + await cql.run_async("TRUNCATE test.test") + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 0 + + # Release abandoned streaming + await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments") + await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark) + + # Verify that there is no data resurrection + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 0 + + # Verify that moving the tablet back works + await manager.api.move_tablet(servers[0].ip_addr, "test", "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token) + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 0