From d3d83869ce5e1112b48cfe05b64ad7b24842433d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 26 Oct 2023 00:03:25 +0200 Subject: [PATCH] storage_service: Introduce session concept --- configure.py | 2 + service/session.cc | 74 +++++++++++++++++++++ service/session.hh | 129 ++++++++++++++++++++++++++++++++++++ service/storage_service.cc | 1 + test/boost/sessions_test.cc | 94 ++++++++++++++++++++++++++ utils/UUID.hh | 2 +- 6 files changed, 301 insertions(+), 1 deletion(-) create mode 100644 service/session.cc create mode 100644 service/session.hh create mode 100644 test/boost/sessions_test.cc diff --git a/configure.py b/configure.py index 6ee7431df0..621ed7aa4a 100755 --- a/configure.py +++ b/configure.py @@ -550,6 +550,7 @@ scylla_tests = set([ 'test/boost/network_topology_strategy_test', 'test/boost/token_metadata_test', 'test/boost/tablets_test', + 'test/boost/sessions_test', 'test/boost/nonwrapping_range_test', 'test/boost/observable_test', 'test/boost/partitioner_test', @@ -1091,6 +1092,7 @@ scylla_core = (['message/messaging_service.cc', 'locator/util.cc', 'service/client_state.cc', 'service/storage_service.cc', + 'service/session.cc', 'service/misc_services.cc', 'service/pager/paging_state.cc', 'service/pager/query_pagers.cc', diff --git a/service/session.cc b/service/session.cc new file mode 100644 index 0000000000..5490c34292 --- /dev/null +++ b/service/session.cc @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "service/session.hh" +#include "log.hh" + +namespace service { + +static logging::logger slogger("session"); + + +session::guard::guard(session& s) + : _session(&s) + , _holder(s._gate.hold()) { +} + +session::guard::~guard() { +} + +session_manager::session_manager() { + create_session(default_session_id); +} + +session::guard session_manager::enter_session(session_id id) { + auto i = _sessions.find(id); + if (i == _sessions.end()) { + throw std::runtime_error(fmt::format("Session not found: {}", id)); + } + auto guard = i->second->enter(); + slogger.debug("session {} entered", id); + return guard; +} + +void session_manager::create_session(session_id id) { + auto [i, created] = _sessions.emplace(id, std::make_unique(id)); + if (created) { + slogger.debug("session {} created", id); + } else { + slogger.debug("session {} already exists", id); + } +} + +void session_manager::initiate_close_of_sessions_except(const std::unordered_set& keep) { + for (auto&& [id, session] : _sessions) { + if (id != default_session_id && !keep.contains(id)) { + if (!session->is_closing()) { + _closing_sessions.push_front(*session); + } + session->start_closing(); + } + } +} + +future<> session_manager::drain_closing_sessions() { + auto lock = co_await get_units(_session_drain_sem, 1); + auto i = _closing_sessions.begin(); + while (i != _closing_sessions.end()) { + session& s = *i; + ++i; + auto id = s.id(); + slogger.debug("draining session {}", id); + co_await s.close(); + if (_sessions.erase(id)) { + slogger.debug("session {} closed", id); + } + } +} + +} // namespace service diff --git a/service/session.hh b/service/session.hh new file mode 100644 index 0000000000..64e8309a3f --- /dev/null +++ b/service/session.hh @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "utils/UUID.hh" + +#include +#include +#include +#include + +#include +#include + +namespace service { + +using session_id = utils::tagged_uuid; + +// We want it be different than default-constructed session_id to catch mistakes. +constexpr session_id default_session_id = session_id( + utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC + +/// Session is used to track execution of work related to some greater task, identified by session_id. +/// Work can enter the session using enter(), and is considered to be part of the session +/// as long as the guard returned by enter() is alive. +/// +/// Session goes over the following states monotonically: +/// 1) open - accepts work via enter() +/// 2) closing - rejects work via enter() +/// 3) closed - rejects work via enter(), and no guards are alive anymore +/// +/// Sessions are removed only after they are closed, it's impossible to have a session::guard of a session +/// which is not in the registry. +class session { +public: + using link_type = bi::list_member_hook>; +private: + session_id _id; + seastar::gate _gate; + std::optional> _closed; + link_type _link; +public: + using list_type = boost::intrusive::list, + boost::intrusive::constant_time_size>; +public: + class guard { + session* _session; + seastar::gate::holder _holder; + public: + explicit guard(session& s); + guard(const guard&) noexcept = default; + guard(guard&&) noexcept = default; + guard& operator=(guard&&) noexcept = default; + guard& operator=(const guard&) noexcept = default; + ~guard(); + + void check() const { + if (!valid()) { + throw seastar::abort_requested_exception(); + } + } + + [[nodiscard]] bool valid() const { + return !_session->_closed; + } + }; + + explicit session(session_id id) : _id(id) {} + + guard enter() { + return guard(*this); + }; + + /// No new work is admitted to enter the session after this. + /// Can be called many times. + void start_closing() noexcept { + if (!_closed) { + _closed = seastar::shared_future<>(_gate.close()); + } + } + + /// Returns true iff the session is not open. + /// Calling enter() in this state will fail. + bool is_closing() const { + return bool(_closed); + } + + session_id id() const { + return _id; + } + + /// Post-condition of successfully resolved future: There are no guards alive for this session, and + /// and it's impossible to create more such guards later. + /// Can be called concurrently. + future<> close() { + start_closing(); + return _closed->get_future(); + } +}; + +class session_manager { + std::unordered_map> _sessions; + session::list_type _closing_sessions; + seastar::semaphore _session_drain_sem{1}; +public: + session_manager(); + + session::guard enter_session(session_id id); + + /// Creates a session on this shard if it doesn't exist yet. + /// If the session already exists does nothing. + void create_session(session_id); + + /// Calls start_closing() on all sessions except those in keep. + void initiate_close_of_sessions_except(const std::unordered_set& keep); + + /// Post-condition: All sessions which are in closing state before the call will be in closed state after the call. + /// Can be called concurrently. + future<> drain_closing_sessions(); +}; + +} // namespace service diff --git a/service/storage_service.cc b/service/storage_service.cc index 037917e7c1..aa73b3ed5c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -10,6 +10,7 @@ */ #include "storage_service.hh" +#include "service/session.hh" #include "dht/boot_strapper.hh" #include #include diff --git a/test/boost/sessions_test.cc b/test/boost/sessions_test.cc new file mode 100644 index 0000000000..a7bf2347db --- /dev/null +++ b/test/boost/sessions_test.cc @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + + +#include "test/lib/scylla_test_case.hh" +#include +#include "test/lib/random_utils.hh" +#include "test/lib/cql_test_env.hh" +#include "test/lib/log.hh" + +#include "service/session.hh" + +using namespace service; + +SEASTAR_TEST_CASE(test_default_session_always_exists) { + return seastar::async([] { + session_manager mgr; + auto guard = mgr.enter_session(default_session_id); + guard.check(); + + mgr.initiate_close_of_sessions_except({}); + mgr.drain_closing_sessions().get(); + + guard = mgr.enter_session(default_session_id); + guard.check(); + }); +} + +SEASTAR_TEST_CASE(test_default_constructible_session_does_not_exist) { + return seastar::async([] { + session_manager mgr; + session_id id; + // For safety, we don't want to treat unset id same as default_session_id. + BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error); + }); +} + +SEASTAR_TEST_CASE(test_session_closing) { + return seastar::async([] { + session_manager mgr; + + auto id = session_id(utils::make_random_uuid()); + auto id2 = session_id(utils::make_random_uuid()); + auto id3 = session_id(utils::make_random_uuid()); + auto id4 = session_id(utils::make_random_uuid()); + + BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error); + + mgr.create_session(id); + mgr.create_session(id2); + + auto guard = mgr.enter_session(id); + auto guard2 = mgr.enter_session(id2); + + guard.check(); + guard2.check(); + + mgr.initiate_close_of_sessions_except({id}); + + BOOST_REQUIRE(guard.valid()); + BOOST_REQUIRE(!guard2.valid()); + + auto f = mgr.drain_closing_sessions(); + auto f2 = mgr.drain_closing_sessions(); // test concurrent drain + BOOST_REQUIRE(!f.available()); // blocked by guard2 + + // Concurrent wait drain + mgr.create_session(id3); + mgr.initiate_close_of_sessions_except({id}); + mgr.create_session(id3); // no-op + mgr.create_session(id4); + + { + auto _ = std::move(guard2); + } + + f.get(); + f2.get(); + + mgr.drain_closing_sessions().get(); + + BOOST_REQUIRE(guard.valid()); + + mgr.enter_session(id); + mgr.enter_session(id4); + BOOST_REQUIRE_THROW(mgr.enter_session(id2), std::runtime_error); + BOOST_REQUIRE_THROW(mgr.enter_session(id3), std::runtime_error); + }); +} diff --git a/utils/UUID.hh b/utils/UUID.hh index 563c4a3b69..bcc9fbcbf5 100644 --- a/utils/UUID.hh +++ b/utils/UUID.hh @@ -211,7 +211,7 @@ struct tagged_uuid { } static tagged_uuid create_random_id() noexcept { return tagged_uuid{utils::make_random_uuid()}; } static constexpr tagged_uuid create_null_id() noexcept { return tagged_uuid{}; } - explicit tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {} + explicit constexpr tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {} constexpr tagged_uuid() = default; const utils::UUID& uuid() const noexcept {