diff --git a/configure.py b/configure.py index 0c065856e9..f71b4e8935 100755 --- a/configure.py +++ b/configure.py @@ -490,6 +490,7 @@ scylla_tests = set([ 'test/boost/exceptions_fallback_test', 'test/boost/exceptions_optimized_test', 'test/boost/expr_test', + 'test/boost/file_stream_test', 'test/boost/flush_queue_test', 'test/boost/fragmented_temporary_buffer_test', 'test/boost/frozen_mutation_test', @@ -1082,6 +1083,7 @@ scylla_core = (['message/messaging_service.cc', 'streaming/stream_request.cc', 'streaming/stream_summary.cc', 'streaming/stream_transfer_task.cc', + 'streaming/stream_blob.cc', 'streaming/stream_receive_task.cc', 'streaming/stream_plan.cc', 'streaming/progress_info.cc', @@ -1313,6 +1315,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/group0.idl.hh', 'idl/hinted_handoff.idl.hh', 'idl/storage_proxy.idl.hh', + 'idl/sstables.idl.hh', 'idl/group0_state_machine.idl.hh', 'idl/mapreduce_request.idl.hh', 'idl/replica_exception.idl.hh', diff --git a/db/config.cc b/db/config.cc index 42dce007f3..65eca08626 100644 --- a/db/config.cc +++ b/db/config.cc @@ -760,6 +760,7 @@ db::config::config(std::shared_ptr exts) "Throttles streaming I/O to the specified total throughput (in MiBs/s) across the entire system. Streaming I/O includes the one performed by repair and both RBNO and legacy topology operations such as adding or removing a node. Setting the value to 0 disables stream throttling.") , stream_plan_ranges_fraction(this, "stream_plan_ranges_fraction", liveness::LiveUpdate, value_status::Used, 0.1, "Specify the fraction of ranges to stream in a single stream plan. Value is between 0 and 1.") + , enable_file_stream(this, "enable_file_stream", liveness::LiveUpdate, value_status::Used, true, "Set true to use file based stream for tablet instead of mutation based stream") , trickle_fsync(this, "trickle_fsync", value_status::Unused, false, "When doing sequential writing, enabling this option tells fsync to force the operating system to flush the dirty buffers at a set interval trickle_fsync_interval_in_kb. Enable this parameter to avoid sudden dirty buffer flushing from impacting read latencies. Recommended to use on SSDs, but not on HDDs.") , trickle_fsync_interval_in_kb(this, "trickle_fsync_interval_in_kb", value_status::Unused, 10240, diff --git a/db/config.hh b/db/config.hh index a0e8a486e1..52f2f934e7 100644 --- a/db/config.hh +++ b/db/config.hh @@ -243,6 +243,7 @@ public: named_value inter_dc_stream_throughput_outbound_megabits_per_sec; named_value stream_io_throughput_mb_per_sec; named_value stream_plan_ranges_fraction; + named_value enable_file_stream; named_value trickle_fsync; named_value trickle_fsync_interval_in_kb; named_value auto_bootstrap; diff --git a/docs/architecture/tablets.rst b/docs/architecture/tablets.rst index 0176adb7e8..605b27bf16 100644 --- a/docs/architecture/tablets.rst +++ b/docs/architecture/tablets.rst @@ -72,6 +72,25 @@ to a new node. .. image:: images/tablets-load-balancing.png +File-based Streaming +======================== + +:label-tip:`ScyllaDB Enterprise` + +File-based streaming is a ScyllaDB Enterprise-only feature that optimizes +tablet migration. + +In ScyllaDB Open Source, migrating tablets is performed by streaming mutation +fragments, which involves deserializing SSTable files into mutation fragments +and re-serializing them back into SSTables on the other node. +In ScyllaDB Enterprise, migrating tablets is performed by streaming entire +SStables, which does not require (de)serializing or processing mutation fragments. +As a result, less data is streamed over the network, and less CPU is consumed, +especially for data models that contain small cells. + +File-based streaming is used for tablet migration in all +:ref:`keyspaces created with tablets enabled `. + .. _tablets-enable-tablets: Enabling Tablets diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 78b801d389..23fcdd89ea 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -160,6 +160,7 @@ public: gms::feature in_memory_tables { *this, "IN_MEMORY_TABLES"sv }; gms::feature workload_prioritization { *this, "WORKLOAD_PRIORITIZATION"sv }; + gms::feature file_stream { *this, "FILE_STREAM"sv }; gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv }; public: diff --git a/idl/CMakeLists.txt b/idl/CMakeLists.txt index 3caffe6ca0..7f5c9082ac 100644 --- a/idl/CMakeLists.txt +++ b/idl/CMakeLists.txt @@ -51,6 +51,7 @@ set(idl_headers raft_storage.idl.hh group0.idl.hh hinted_handoff.idl.hh + sstables.idl.hh storage_proxy.idl.hh storage_service.idl.hh group0_state_machine.idl.hh diff --git a/idl/sstables.idl.hh b/idl/sstables.idl.hh new file mode 100644 index 0000000000..a2e87c7f3f --- /dev/null +++ b/idl/sstables.idl.hh @@ -0,0 +1,19 @@ +/* + * Copyright 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + + +namespace sstables { + +enum class sstable_state : uint8_t { + normal, + staging, + quarantine, + upload, +}; + +} diff --git a/idl/streaming.idl.hh b/idl/streaming.idl.hh index 61539410d0..03d4dc16d0 100644 --- a/idl/streaming.idl.hh +++ b/idl/streaming.idl.hh @@ -9,8 +9,10 @@ #include "idl/range.idl.hh" #include "idl/token.idl.hh" #include "idl/uuid.idl.hh" +#include "idl/sstables.idl.hh" #include "streaming/stream_fwd.hh" +#include "streaming/stream_blob.hh" namespace service { @@ -68,8 +70,65 @@ enum class stream_mutation_fragments_cmd : uint8_t { end_of_stream, }; +class file_stream_id final { + utils::UUID uuid(); +}; + +enum class stream_blob_cmd : uint8_t { + ok, + error, + data, + end_of_stream, +}; + +enum class file_ops : uint16_t { + stream_sstables, + load_sstables, +}; + +class stream_blob_data { + temporary_buffer buf; +}; + +class stream_blob_cmd_data { + streaming::stream_blob_cmd cmd; + std::optional data; +}; + +class stream_blob_meta { + streaming::file_stream_id ops_id; + table_id table; + sstring filename; + seastar::shard_id dst_shard_id; + streaming::file_ops fops; + service::frozen_topology_guard topo_guard; + std::optional sstable_state; +}; + +class node_and_shard { + locator::host_id node; + seastar::shard_id shard; +}; + +class stream_files_request { + streaming::file_stream_id ops_id; + sstring keyspace_name; + sstring table_name; + table_id table; + dht::token_range range; + std::vector targets; + service::frozen_topology_guard topo_guard; +}; + +class stream_files_response { + size_t stream_bytes; +}; + verb [[with_client_info]] prepare_message (streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, streaming::stream_reason reason [[version 3.1.0]], service::session_id session [[version 6.0.0]]) -> streaming::prepare_message; verb [[with_client_info]] prepare_done_message (streaming::plan_id plan_id, unsigned dst_cpu_id); verb [[with_client_info]] stream_mutation_done (streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id); verb [[with_client_info]] complete_message (streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed [[version 2.1.0]]); + +verb [[with_client_info, cancellable]] tablet_stream_files (streaming::stream_files_request req) -> streaming::stream_files_response; + } diff --git a/idl/uuid.idl.hh b/idl/uuid.idl.hh index b69946fe04..ee15886bfe 100644 --- a/idl/uuid.idl.hh +++ b/idl/uuid.idl.hh @@ -11,6 +11,7 @@ #include "query_id.hh" #include "locator/host_id.hh" #include "tasks/types.hh" +#include "service/session.hh" namespace utils { class UUID final { @@ -42,3 +43,4 @@ class host_id final { }; } // namespace locator + diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 2734449f71..fe90dea9fd 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -41,6 +41,7 @@ #include "repair/repair.hh" #include "streaming/stream_reason.hh" #include "streaming/stream_mutation_fragments_cmd.hh" +#include "streaming/stream_blob.hh" #include "cache_temperature.hh" #include "raft/raft.hh" #include "service/raft/group0_fwd.hh" @@ -1270,6 +1271,39 @@ future<> messaging_service::unregister_stream_mutation_fragments() { return unregister_handler(messaging_verb::STREAM_MUTATION_FRAGMENTS); } +// Wrapper for STREAM_BLOB +rpc::sink messaging_service::make_sink_for_stream_blob(rpc::source& source) { + return source.make_sink(); +} + +future, rpc::source>> +messaging_service::make_sink_and_source_for_stream_blob(streaming::stream_blob_meta meta, locator::host_id id) { + if (is_shutting_down()) { + co_await coroutine::return_exception(rpc::closed_error()); + } + auto rpc_client = get_rpc_client(messaging_verb::STREAM_BLOB, addr_for_host_id(id), id); + auto sink = co_await rpc_client->make_stream_sink(); + std::exception_ptr ex; + try { + auto rpc_handler = rpc()->make_client (streaming::stream_blob_meta, rpc::sink)>(messaging_verb::STREAM_BLOB); + auto source = co_await rpc_handler(*rpc_client, meta, sink); + co_return std::make_tuple(std::move(sink), std::move(source)); + } catch (...) { + ex = std::current_exception(); + } + // Reach here only in case of error + co_await sink.close(); + co_return coroutine::return_exception_ptr(ex); +} + +void messaging_service::register_stream_blob(std::function> (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source source)>&& func) { + register_handler(this, messaging_verb::STREAM_BLOB, std::move(func)); +} + +future<> messaging_service::unregister_stream_blob() { + return unregister_handler(messaging_verb::STREAM_BLOB); +} + template future, rpc::source>> do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shard_id dst_shard_id, shared_ptr rpc_client, std::unique_ptr& rpc) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 083caee6ea..ab4182d912 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -37,6 +37,10 @@ namespace streaming { class prepare_message; enum class stream_mutation_fragments_cmd : uint8_t; + enum class stream_blob_cmd : uint8_t; + class stream_blob_data; + class stream_blob_meta; + class stream_blob_cmd_data; } namespace gms { @@ -411,6 +415,13 @@ public: rpc::sink make_sink_for_stream_mutation_fragments(rpc::source>& source); future, rpc::source>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, locator::host_id id); + // Wrapper for STREAM_BLOB + // The receiver of STREAM_BLOB sends streaming::stream_blob_cmd_data as status code to the sender to notify any error on the receiver side. + void register_stream_blob(std::function> (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source source)>&& func); + future<> unregister_stream_blob(); + rpc::sink make_sink_for_stream_blob(rpc::source& source); + future, rpc::source>> make_sink_and_source_for_stream_blob(streaming::stream_blob_meta meta, locator::host_id id); + // Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM future, rpc::source>> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, shard_id dst_cpu_id, locator::host_id id); rpc::sink make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source& source); diff --git a/service/storage_service.cc b/service/storage_service.cc index 668bf414c7..ee79533112 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -63,6 +63,7 @@ #include "locator/local_strategy.hh" #include "utils/user_provided_param.hh" #include "version.hh" +#include "streaming/stream_blob.hh" #include "dht/range_streamer.hh" #include #include @@ -93,10 +94,12 @@ #include #include #include +#include "utils/pretty_printers.hh" #include "utils/stall_free.hh" #include "utils/error_injection.hh" #include "locator/util.hh" #include "idl/storage_service.dist.hh" +#include "idl/streaming.dist.hh" #include "service/storage_proxy.hh" #include "service/raft/join_node.hh" #include "idl/join_node.dist.hh" @@ -6082,6 +6085,52 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { } }); + if (trinfo->transition != locator::tablet_transition_kind::intranode_migration && _feature_service.file_stream && _db.local().get_config().enable_file_stream()) { + co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) { + rtlogger.info("migration_streaming_wait: start"); + return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); + }); + + auto dst_node = trinfo->pending_replica->host; + auto dst_shard_id = trinfo->pending_replica->shard; + auto transition = trinfo->transition; + + // Release token_metadata_ptr early so it will no block barriers for other migrations + // Don't access trinfo after this. + tm = {}; + + co_await utils::get_local_injector().inject("stream_sstable_files", [&] (auto& handler) -> future<> { + slogger.info("stream_sstable_files: waiting"); + while (!handler.poll_for_message()) { + co_await sleep_abortable(std::chrono::milliseconds(5), guard.get_abort_source()); + } + slogger.info("stream_sstable_files: released"); + }); + + for (auto src : streaming_info.read_from) { + // Use file stream for tablet to stream data + auto ops_id = streaming::file_stream_id::create_random_id(); + auto start_time = std::chrono::steady_clock::now(); + size_t stream_bytes = 0; + try { + auto& table = _db.local().find_column_family(tablet.table); + slogger.debug("stream_sstables[{}] Streaming for tablet {} of {} started table={}.{} range={} src={}", + ops_id, transition, tablet, table.schema()->ks_name(), table.schema()->cf_name(), range, src); + auto resp = co_await streaming::tablet_stream_files(ops_id, table, range, src.host, dst_node, dst_shard_id, _messaging.local(), _abort_source, topo_guard); + stream_bytes = resp.stream_bytes; + slogger.debug("stream_sstables[{}] Streaming for tablet migration of {} successful", ops_id, tablet); + auto duration = std::chrono::duration(std::chrono::steady_clock::now() - start_time); + auto bw = utils::pretty_printed_throughput(stream_bytes, duration);; + slogger.info("stream_sstables[{}] Streaming for tablet migration of {} finished table={}.{} range={} stream_bytes={} stream_time={} stream_bw={}", + ops_id, tablet, table.schema()->ks_name(), table.schema()->cf_name(), range, stream_bytes, duration, bw); + } catch (...) { + slogger.warn("stream_sstables[{}] Streaming for tablet migration of {} from {} failed: {}", ops_id, tablet, leaving_replica, std::current_exception()); + throw; + } + } + } else { // Caution: following code is intentionally unindented to be in sync with OSS + + if (trinfo->transition == locator::tablet_transition_kind::intranode_migration) { if (!leaving_replica || leaving_replica->host != tm->get_my_id()) { throw std::runtime_error(fmt::format("Invalid leaving replica for intra-node migration, tablet: {}, leaving: {}", @@ -6133,6 +6182,8 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { slogger.info("Streaming for tablet migration of {} finished table={}.{} range={}", tablet, table.schema()->ks_name(), table.schema()->cf_name(), range); } + } // Traditional streaming vs file-based streaming. + // If new pending tablet replica needs splitting, streaming waits for it to complete. // That's to provide a guarantee that once migration is over, the coordinator can finalize // splitting under the promise that compaction groups of tablets are all split, ready @@ -7042,6 +7093,21 @@ void storage_service::init_messaging_service() { return handler(ss); }); }; + ser::streaming_rpc_verbs::register_tablet_stream_files(&_messaging.local(), + [this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future { + streaming::stream_files_response resp; + resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future { + auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req, [&ss] (locator::host_id host) -> future { + return ss.container().invoke_on(0, [host] (storage_service& ss) { + return ss.host2ip(host); + }); + }); + co_return res.stream_bytes; + }, + size_t(0), + std::plus()); + co_return resp; + }); ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) { return handle_raft_rpc(dst_id, [cmd = std::move(cmd), term, cmd_index] (auto& ss) { return ss.raft_topology_cmd_handler(term, cmd_index, cmd); @@ -7177,7 +7243,8 @@ future<> storage_service::uninit_messaging_service() { return when_all_succeed( ser::node_ops_rpc_verbs::unregister(&_messaging.local()), ser::storage_service_rpc_verbs::unregister(&_messaging.local()), - ser::join_node_rpc_verbs::unregister(&_messaging.local()) + ser::join_node_rpc_verbs::unregister(&_messaging.local()), + ser::streaming_rpc_verbs::unregister_tablet_stream_files(&_messaging.local()) ).discard_result(); } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index c5e9f43403..d99594915f 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -3388,6 +3388,223 @@ std::string to_string(const shared_sstable& sst, bool include_origin) { fmt::format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level()); } +std::string sstable_stream_source::component_basename() const { + return _sst->component_basename(_type); +} + +sstable_stream_source::sstable_stream_source(shared_sstable sst, component_type type) + : _sst(std::move(sst)) + , _type(type) +{} + +std::vector> create_stream_sources(const sstables::sstable_files_snapshot& snapshot) { + std::vector> result; + result.reserve(snapshot.files.size()); + + class sstable_stream_source_impl : public sstable_stream_source { + file _file; + public: + sstable_stream_source_impl(shared_sstable table, component_type type, file f) + : sstable_stream_source(std::move(table), type) + , _file(std::move(f)) + {} + future> input(const file_input_stream_options& options) const override { + if (_type == component_type::Scylla) { + // Filter out any node-local info (i.e. extensions) + // and reserialize data. Load into a temp object. + // TODO/FIXME. Not all extension attributes might + // need removing. In fact, it might be wrong (in the future) + // to do so. ATM we know this is safe and correct, but really + // extensions should remove themselves if required. + scylla_metadata tmp; + uint64_t size = co_await _file.size(); + auto r = file_random_access_reader(_file, size, default_sstable_buffer_size); + co_await parse(*_sst->get_schema(), _sst->get_version(), r, tmp); + co_await r.close(); + + tmp.remove_extension_attributes(); + + std::vector> bufs; + // TODO: move to seastar. Based on memory_data_sink, but allowing us + // to actually move away the buffers later. I don't want to modify + // util classes in an enterprise patch. + class buffer_data_sink_impl : public data_sink_impl { + std::vector>& _bufs; + public: + buffer_data_sink_impl(std::vector>& bufs) + : _bufs(bufs) + {} + future<> put(net::packet data) override { + throw std::logic_error("unsupported operation"); + } + future<> put(temporary_buffer buf) override { + _bufs.emplace_back(std::move(buf)); + return make_ready_future<>(); + } + future<> flush() override { + return make_ready_future<>(); + } + future<> close() override { + return make_ready_future<>(); + } + size_t buffer_size() const noexcept override { + return 128*1024; + } + }; + + co_await seastar::async([&] { + file_writer fw(data_sink(std::make_unique(bufs))); + write(_sst->get_version(), fw, tmp); + fw.close(); + }); + // TODO: move to seastar. Based on buffer_input... in utils, but + // handles potential 1+ buffers + class buffer_data_source_impl : public data_source_impl { + private: + std::vector> _bufs; + size_t _index = 0; + public: + buffer_data_source_impl(std::vector>&& bufs) + : _bufs(std::move(bufs)) + {} + buffer_data_source_impl(buffer_data_source_impl&&) noexcept = default; + buffer_data_source_impl& operator=(buffer_data_source_impl&&) noexcept = default; + + future> get() override { + if (_index < _bufs.size()) { + return make_ready_future>(std::move(_bufs.at(_index++))); + } + return make_ready_future>(); + } + future> skip(uint64_t n) override { + while (n > 0 && _index < _bufs.size()) { + auto& buf = _bufs.at(_index); + auto min = std::min(n, buf.size()); + buf.trim_front(min); + if (buf.empty()) { + ++_index; + } + n -= min; + } + return get(); + } + }; + co_return input_stream(data_source(std::make_unique(std::move(bufs)))); + } + co_return make_file_input_stream(_file, options); + } + }; + + auto& files = snapshot.files; + + auto add = [&](component_type type, file f) { + result.emplace_back(std::make_unique(snapshot.sst, type, std::move(f))); + }; + + try { + add(component_type::TOC, files.at(component_type::TOC)); + add(component_type::Scylla, files.at(component_type::Scylla)); + } catch (std::out_of_range&) { + std::throw_with_nested(std::invalid_argument("Missing required sstable component")); + } + for (auto&& [type, f] : files) { + if (type != component_type::TOC && type != component_type::Scylla) { + add(type, std::move(f)); + } + } + + return result; +} + +class sstable_stream_sink_impl : public sstable_stream_sink { + shared_sstable _sst; + component_type _type; + bool _last_component; +public: + sstable_stream_sink_impl(shared_sstable sst, component_type type, bool last_component) + : _sst(std::move(sst)) + , _type(type) + , _last_component(last_component) + {} +private: + future<> load_metadata() const { + auto metafile = _sst->filename(sstables::component_type::Scylla); + if (!co_await file_exists(metafile)) { + // for compatibility with streaming a non-scylla table (no scylla component) + co_return; + } + if (!_sst->get_shared_components().scylla_metadata) { + sstables::scylla_metadata tmp; + co_await _sst->read_simple(tmp); + _sst->get_shared_components().scylla_metadata = std::move(tmp); + } + } + future<> save_metadata() const { + if (!_sst->get_shared_components().scylla_metadata) { + co_return; + } + file_output_stream_options options; + options.buffer_size = default_sstable_buffer_size; + co_await seastar::async([&] { + auto w = _sst->make_component_file_writer(component_type::Scylla, std::move(options), open_flags::wo | open_flags::create).get(); + write(_sst->get_version(), w, *_sst->get_shared_components().scylla_metadata); + w.close(); + }); + } +public: + future> output(const file_open_options& foptions, const file_output_stream_options& stream_options) override { + assert(_type != component_type::TOC); + // TOC and scylla components are guaranteed not to depend on metadata. Ignore these (chicken, egg) + bool load_save_meta = _type != component_type::TemporaryTOC && _type != component_type::Scylla; + + // otherwise, first load scylla metadata from disk as written so far. + if (load_save_meta) { + co_await load_metadata(); + } + // now we can open the component file. any extensions applied should write info into metadata + auto f = co_await _sst->open_file(_type, open_flags::wo | open_flags::create, foptions); + + // Save back to disk. + if (load_save_meta) { + co_await save_metadata(); + } + + co_return co_await make_file_output_stream(std::move(f), stream_options); + } + future close_and_seal() override { + if (_last_component) { + // If we are the last component in a sequence, we can seal the table. + co_await _sst->_storage->seal(*_sst); + co_return std::move(_sst); + } + _sst = {}; + co_return nullptr; + } + future<> abort() override { + if (!_sst) { + co_return; + } + auto filename = fs::path(_sst->_storage->prefix()) / std::string_view(_sst->component_basename(_type)); + // TODO: if we are the last component (or really always), should we remove all component files? + // For now, this remains the responsibility of calling code (see handle_tablet_migration etc) + co_await remove_file(filename.native()); + } +}; + +std::unique_ptr create_stream_sink(schema_ptr schema, sstables_manager& sstm, const data_dictionary::storage_options& s_opts, sstable_state state, std::string_view component_filename, bool last_component) { + auto desc = parse_path(component_filename, schema->ks_name(), schema->cf_name()); + auto sst = sstm.make_sstable(schema, s_opts, desc.generation, state, desc.version, desc.format); + + auto type = desc.component; + // Don't write actual TOC. Write temp, if successful, storage::seal will rename this to actual + // TOC (see above close_and_seal). + if (type == component_type::TOC) { + type = component_type::TemporaryTOC; + } + + return std::make_unique(std::move(sst), type, last_component); +} + generation_type generation_type::from_string(const std::string& s) { int64_t int_value; diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 008888eff3..e394d6a691 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -182,6 +182,8 @@ public: future<> commit(); }; +class sstable_stream_sink_impl; + class sstable : public enable_lw_shared_from_this { friend ::sstable_assertions; public: @@ -258,6 +260,10 @@ public: // It's up to the storage driver how to implement this. future<> change_state(sstable_state to, delayed_commit_changes* delay = nullptr); + sstable_state state() const { + return _state; + } + // Filesystem-specific call to grab an sstable from upload dir and // put it into the desired destination assigning the given generation future<> pick_up_from_upload(sstable_state to, generation_type new_generation); @@ -519,6 +525,7 @@ private: return filename(dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f); } + friend class sstable_stream_sink_impl; friend class sstable_directory; friend class filesystem_storage; friend class s3_storage; @@ -788,7 +795,6 @@ public: shareable_components& get_shared_components() const { return *_components; } - schema_ptr get_schema() const { return _schema; } @@ -1135,6 +1141,50 @@ struct sstable_files_snapshot { std::unordered_map files; }; +// A sstable_stream_source gives back +// component input streams suitable for streaming to other nodes, +// in appropriate order. Data will be decrypted and sanitized as required. +class sstable_stream_source { +protected: + shared_sstable _sst; + component_type _type; +public: + sstable_stream_source(shared_sstable, component_type); + virtual ~sstable_stream_source() = default; + + // Input stream for data appropriate for stream transfer for this component + virtual future> input(const file_input_stream_options&) const = 0; + + // source sstable + const sstable& source() const { + return *_sst; + } + // component + component_type type() const { + return _type; + } + std::string component_basename() const; +}; + +// Translates the result of gathering readable snapshot files into ordered items for streaming. +std::vector> create_stream_sources(const sstables::sstable_files_snapshot&); + +class sstable_stream_sink { +public: + virtual ~sstable_stream_sink() = default; + // Stream to the component file + virtual future> output(const file_open_options&, const file_output_stream_options&) = 0; + // closes this component. If this is the last component in a set (see "last_component" in creating method below) + // the table on disk will be sealed. + // Returns sealed sstable if last, or nullptr otherwise. + virtual future close_and_seal() = 0; + virtual future<> abort() = 0; +}; + +// Creates a sink object which can receive a component file sourced from above source object data. + +std::unique_ptr create_stream_sink(schema_ptr, sstables_manager&, const data_dictionary::storage_options&, sstable_state, std::string_view component_filename, bool last_component); + } // namespace sstables template <> struct fmt::formatter : fmt::formatter { diff --git a/sstables/types.hh b/sstables/types.hh index 51dd558684..4e0930b8cf 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -626,6 +626,9 @@ struct scylla_metadata { const extension_attributes* get_extension_attributes() const { return data.get(); } + void remove_extension_attributes() { + data.data.erase(scylla_metadata_type::ExtensionAttributes); + } extension_attributes& get_or_create_extension_attributes() { auto* ext = data.get(); if (ext == nullptr) { diff --git a/streaming/CMakeLists.txt b/streaming/CMakeLists.txt index 5a6d3da492..7456f6ed40 100644 --- a/streaming/CMakeLists.txt +++ b/streaming/CMakeLists.txt @@ -14,6 +14,7 @@ target_sources(streaming stream_session_state.cc stream_summary.cc stream_task.cc + stream_blob.cc stream_transfer_task.cc) target_include_directories(streaming PUBLIC diff --git a/streaming/stream_blob.cc b/streaming/stream_blob.cc new file mode 100644 index 0000000000..cbe8dee139 --- /dev/null +++ b/streaming/stream_blob.cc @@ -0,0 +1,705 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "message/messaging_service.hh" +#include "streaming/stream_blob.hh" +#include "streaming/stream_plan.hh" +#include "gms/inet_address.hh" +#include "utils/pretty_printers.hh" +#include "utils/error_injection.hh" +#include "locator/host_id.hh" +#include "replica/database.hh" +#include "sstables/sstables.hh" +#include "sstables/sstables_manager.hh" +#include "sstables/sstable_version.hh" +#include "sstables/generation_type.hh" +#include "sstables/types.hh" +#include "idl/streaming.dist.hh" +#include "service/topology_guard.hh" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace streaming { + +static logging::logger blogger("stream_blob"); + +constexpr size_t file_stream_buffer_size = 128 * 1024; +constexpr size_t file_stream_write_behind = 10; +constexpr size_t file_stream_read_ahead = 4; + +static sstables::sstable_state sstable_state(const streaming::stream_blob_meta& meta) { + return meta.sstable_state.value_or(sstables::sstable_state::normal); +} + +static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::database& db, table_id table, sstables::sstable_state state, sstring filename, seastar::shard_id shard) { + blogger.debug("stream_sstables[{}] Loading sstable {} on shard {}", ops_id, filename, shard); + auto s = db.find_column_family(table).schema(); + auto data_path = std::filesystem::path(filename); + auto desc = sstables::parse_path(data_path, s->ks_name(), s->cf_name()); + co_await db.container().invoke_on(shard, [id = s->id(), desc, state] (replica::database& db) -> future<> { + replica::table& t = db.find_column_family(id); + auto erm = t.get_effective_replication_map(); + auto& sstm = t.get_sstables_manager(); + auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format); + co_await sst->load(erm->get_sharder(*t.schema())); + co_await t.add_sstable_and_update_cache(sst); + }); + blogger.info("stream_sstables[{}] Loaded sstable {} on shard {} successfully", ops_id, filename, shard); +} + +static utils::pretty_printed_throughput get_bw(size_t total_size, std::chrono::steady_clock::time_point start_time) { + auto duration = std::chrono::steady_clock::now() - start_time; + return utils::pretty_printed_throughput(total_size, duration); +} + +// For tablet stream checks +class tablet_stream_status { +public: + bool finished = false; + void check_valid_stream(); +}; + +void tablet_stream_status::check_valid_stream() { + if (finished) { + throw std::runtime_error("The stream has finished already"); + } +} + +static thread_local std::unordered_map> tablet_streams; + +future<> mark_tablet_stream_start(file_stream_id ops_id) { + return seastar::smp::invoke_on_all([ops_id] { + auto status = make_lw_shared(); + tablet_streams.emplace(ops_id, status); + }); +} + +future<> mark_tablet_stream_done(file_stream_id ops_id) { + return seastar::smp::invoke_on_all([ops_id] { + auto it = tablet_streams.find(ops_id); + if (it == tablet_streams.end()) { + return; + } + auto status = it->second; + if (status) { + status->finished = true; + } + tablet_streams.erase(ops_id); + }); +} + +lw_shared_ptr get_tablet_stream(file_stream_id ops_id) { + auto status = tablet_streams[ops_id]; + if (!status) { + auto msg = format("stream_sstables[{}] Could not find ops_id={}", ops_id, ops_id); + blogger.warn("{}", msg); + throw std::runtime_error(msg); + } + return status; +} + +static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_inject, const sstring& error) { + if (may_inject) { + if (rand() % 500 == 0) { + auto msg = format("fstream[{}] Injected file stream error={} file={}", + meta.ops_id, error, meta.filename); + blogger.warn("{}", msg); + throw std::runtime_error(msg); + } + } +} + +future<> stream_blob_handler(replica::database& db, + netw::messaging_service& ms, + gms::inet_address from, + streaming::stream_blob_meta meta, + rpc::sink sink, + rpc::source source, + stream_blob_create_output_fn create_output, + bool inject_errors) { + bool fstream_closed = false; + bool sink_closed = false; + bool status_sent = false; + size_t total_size = 0; + auto start_time = std::chrono::steady_clock::now(); + std::optional> fstream; + std::exception_ptr error; + stream_blob_finish_fn finish; + + // Will log a message when streaming is done. Used to synchronize tests. + lw_shared_ptr log_done; + if (utils::get_local_injector().is_enabled("stream_mutation_fragments")) { + log_done = make_lw_shared(seastar::make_shared(seastar::defer([] { + blogger.info("stream_mutation_fragments: done (tablets)"); + }))); + } + + try { + auto status = get_tablet_stream(meta.ops_id); + auto guard = service::topology_guard(meta.topo_guard); + + // Reject any file_ops that is not support by this node + if (meta.fops != streaming::file_ops::stream_sstables && + meta.fops != streaming::file_ops::load_sstables) { + auto msg = format("fstream[{}] Unsupported file_ops={} peer={} file={}", + meta.ops_id, int(meta.fops), from, meta.filename); + blogger.warn("{}", msg); + throw std::runtime_error(msg); + } + + blogger.debug("fstream[{}] Follower started peer={} file={}", + meta.ops_id, from, meta.filename); + + auto [f, out] = co_await create_output(db, meta); + finish = std::move(f); + fstream = std::move(out); + + bool got_end_of_stream = false; + for (;;) { + try { + auto opt = co_await source(); + if (!opt) { + break; + } + + co_await utils::get_local_injector().inject("stream_mutation_fragments", [&guard] (auto& handler) -> future<> { + blogger.info("stream_mutation_fragments: waiting (tablets)"); + while (!handler.poll_for_message()) { + guard.check(); + co_await sleep(std::chrono::milliseconds(5)); + } + blogger.info("stream_mutation_fragments: released (tablets)"); + }); + + stream_blob_cmd_data& cmd_data = std::get<0>(*opt); + auto cmd = cmd_data.cmd; + if (cmd == streaming::stream_blob_cmd::error) { + blogger.warn("fstream[{}] Follower got stream_blob_cmd::error from peer={} file={}", + meta.ops_id, from, meta.filename); + throw std::runtime_error(format("Got stream_blob_cmd::error from peer={} file={}", from, meta.filename)); + } else if (cmd == streaming::stream_blob_cmd::end_of_stream) { + blogger.debug("fstream[{}] Follower got stream_blob_cmd::end_of_stream from peer={} file={}", + meta.ops_id, from, meta.filename); + got_end_of_stream = true; + } else if (cmd == streaming::stream_blob_cmd::data) { + std::optional data = std::move(cmd_data.data); + if (data) { + total_size += data->size(); + blogger.trace("fstream[{}] Follower received data from peer={} data={}", meta.ops_id, from, data->size()); + status->check_valid_stream(); + if (!data->empty()) { + co_await fstream->write((char*)data->data(), data->size()); + } + } + } + } catch (seastar::rpc::stream_closed) { + // After we get streaming::stream_blob_cmd::end_of_stream which + // is the last message from peer, it does not matter if the + // source() is closed or not. + if (got_end_of_stream) { + break; + } else { + throw; + } + } catch (...) { + throw; + } + may_inject_error(meta, inject_errors, "rx_data"); + } + + // If we reach here, streaming::stream_blob_cmd::end_of_stream should be received. Otherwise there + // must be an error, e.g., the sender closed the stream without sending streaming::stream_blob_cmd::error. + if (!got_end_of_stream) { + throw std::runtime_error(format("fstream[{}] Follower failed to get end_of_stream", meta.ops_id)); + } + + status->check_valid_stream(); + co_await fstream->flush(); + co_await fstream->close(); + fstream_closed = true; + + may_inject_error(meta, inject_errors, "flush_and_close"); + + co_await finish(store_result::ok); + + // Send status code and close the sink + co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::ok)); + status_sent = true; + co_await sink.close(); + sink_closed = true; + } catch (...) { + error = std::current_exception(); + } + if (error) { + blogger.warn("fstream[{}] Follower failed peer={} file={} received_size={} bw={} error={}", + meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time), error); + if (!fstream_closed) { + try { + if (fstream) { + // Make sure fstream is always closed + co_await fstream->close(); + } + } catch (...) { + blogger.warn("fstream[{}] Follower failed to close the file stream: {}", + meta.ops_id, std::current_exception()); + // We could do nothing but continue to cleanup more + } + } + if (!status_sent) { + try { + may_inject_error(meta, inject_errors, "no_error_code_back"); + co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error)); + } catch (...) { + // Try our best to send the status code. + // If we could not send it, we could do nothing but close the sink. + blogger.warn("fstream[{}] Follower failed to send error code: {}", + meta.ops_id, std::current_exception()); + } + } + try { + if (!sink_closed) { + // Make sure sink is always closed + co_await sink.close(); + } + } catch (...) { + blogger.warn("fstream[{}] Follower failed to close the stream sink: {}", + meta.ops_id, std::current_exception()); + } + try { + // Drain everything in source + for (;;) { + auto opt = co_await source(); + if (!opt) { + break; + } + } + } catch (...) { + blogger.warn("fstream[{}] Follower failed to drain rpc stream source: {}", + meta.ops_id, std::current_exception()); + } + + try { + // Remove the file in case of error + if (finish) { + co_await finish(store_result::failure); + blogger.info("fstream[{}] Follower removed partial file={}", meta.ops_id, meta.filename); + } + } catch (...) { + blogger.warn("fstream[{}] Follower failed to remove partial file={}: {}", + meta.ops_id, meta.filename, std::current_exception()); + } + + // Do not call rethrow_exception(error) because the caller could do nothing but log + // the error. We have already logged the error here. + } else { + // Get some statistics + blogger.debug("fstream[{}] Follower finished peer={} file={} received_size={} bw={}", + meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time)); + } + co_return; +} + + +future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, + gms::inet_address from, + streaming::stream_blob_meta meta, + rpc::sink sink, + rpc::source source) { + + co_await stream_blob_handler(db, ms, std::move(from), meta, std::move(sink), std::move(source), [](replica::database& db, const streaming::stream_blob_meta& meta) -> future { + auto foptions = file_open_options(); + foptions.sloppy_size = true; + foptions.extent_allocation_size_hint = 32 << 20; + + auto stream_options = file_output_stream_options(); + stream_options.buffer_size = file_stream_buffer_size; + stream_options.write_behind = file_stream_write_behind; + + auto& table = db.find_column_family(meta.table); + auto& sstm = table.get_sstables_manager(); + auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, meta.fops == file_ops::load_sstables); + auto out = co_await sstable_sink->output(foptions, stream_options); + co_return output_result{ + [sstable_sink = std::move(sstable_sink), &meta, &db](store_result res) -> future<> { + if (res != store_result::ok) { + co_await sstable_sink->abort(); + co_return; + } + auto sst = co_await sstable_sink->close_and_seal(); + if (sst) { + auto filename = sst->toc_filename(); + sst = {}; + co_await load_sstable_for_tablet(meta.ops_id, db, meta.table, sstable_state(meta), std::move(filename), meta.dst_shard_id); + } + }, + std::move(out) + }; + }); +} + +// Get a new sstable name using the new generation +// For example: +// oldname: me-3ga1_0iiv_2e5uo2flv7lgdl2j0d-big-Index.db +// newgen: 3ga1_0iiv_3vj5c2flv7lgdl2j0d +// newname: me-3ga1_0iiv_3vj5c2flv7lgdl2j0d-big-Index.db +static std::string get_sstable_name_with_generation(const file_stream_id& ops_id, const std::string& oldname, const std::string& newgen) { + std::string newname = oldname; + // The generation name starts after the first '-'. + auto it = newname.find("-"); + if (it != std::string::npos) { + newname.replace(++it, newgen.size(), newgen); + return newname; + } else { + auto msg = fmt::format("fstream[{}] Failed to get sstable name for {} with generation {}", ops_id, oldname, newgen); + blogger.warn("{}", msg); + throw std::runtime_error(msg); + } +} +} + +template<> struct fmt::formatter : fmt::ostream_formatter {}; + +namespace streaming { + +// Send files in the files list to the nodes in targets list over network +// Returns number of bytes sent over network +future +tablet_stream_files(netw::messaging_service& ms, std::list sources, std::vector targets, table_id table, file_stream_id ops_id, host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool inject_errors) { + size_t ops_total_size = 0; + if (targets.empty()) { + co_return ops_total_size; + } + if (sources.empty()) { + co_return ops_total_size; + } + + blogger.debug("fstream[{}] Master started sending n={}, sources={}, targets={}", + ops_id, sources.size(), sources, targets); + + struct sink_and_source { + gms::inet_address node; + rpc::sink sink; + rpc::source source; + bool sink_closed = false; + bool status_sent = false; + }; + + auto ops_start_time = std::chrono::steady_clock::now(); + streaming::stream_blob_meta meta; + meta.ops_id = ops_id; + meta.table = table; + meta.topo_guard = topo_guard; + std::exception_ptr error; + + auto stream_options = file_input_stream_options(); + stream_options.buffer_size = file_stream_buffer_size; + stream_options.read_ahead = file_stream_read_ahead; + + for (auto& info : sources) { + auto& filename = info.filename; + std::optional> fstream; + bool fstream_closed = false; + try { + meta.fops = info.fops; + meta.filename = info.filename; + meta.sstable_state = info.sstable_state; + fstream = co_await info.source(stream_options); + } catch (...) { + blogger.warn("fstream[{}] Master failed sources={} targets={} error={}", + ops_id, sources, targets, std::current_exception()); + throw; + } + + std::vector ss; + size_t total_size = 0; + auto start_time = std::chrono::steady_clock::now(); + bool got_error_from_peer = false; + try { + for (auto& x : targets) { + const auto& node = x.node; + meta.dst_shard_id = x.shard; + auto ip = co_await host2ip(node); + blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, ip, filename, targets); + auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node); + ss.push_back(sink_and_source{ip, std::move(sink), std::move(source)}); + } + + // This fiber sends data to peer node + auto send_data_to_peer = [&] () mutable -> future<> { + std::exception_ptr error; + try { + while (!got_error_from_peer) { + may_inject_error(meta, inject_errors, "read_data"); + auto buf = co_await fstream->read_up_to(file_stream_buffer_size); + if (buf.size() == 0) { + break; + } + streaming::stream_blob_data data(std::move(buf)); + auto data_size = data.size(); + stream_blob_cmd_data cmd_data(streaming::stream_blob_cmd::data, std::move(data)); + co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> { + total_size += data_size; + ops_total_size += data_size; + blogger.trace("fstream[{}] Master sending file={} to node={} chunk_size={}", + ops_id, filename, s.node, data_size); + may_inject_error(meta, inject_errors, "tx_data"); + co_await s.sink(cmd_data); + }); + } + } catch (...) { + error = std::current_exception(); + } + if (error) { + // We have to close the stream otherwise if the stream is + // ok, the get_status_code_from_peer fiber below might + // wait for the source() forever. + for (auto& s : ss) { + try { + co_await s.sink.close(); + s.sink_closed = true; + } catch (...) { + } + } + std::rethrow_exception(error); + } + + if (fstream) { + co_await fstream->close(); + fstream_closed = true; + } + + for (auto& s : ss) { + blogger.debug("fstream[{}] Master done sending file={} to node={}", ops_id, filename, s.node); + if (!got_error_from_peer) { + co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::end_of_stream)); + s.status_sent = true; + } + co_await s.sink.close(); + s.sink_closed = true; + } + }; + + // This fiber gets status code from peer node + auto get_status_code_from_peer = [&] () mutable -> future<> { + co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> { + bool got_cmd_ok = false; + while (!got_error_from_peer) { + try { + auto opt = co_await s.source(); + if (opt) { + stream_blob_cmd_data& cmd_data = std::get<0>(*opt); + if (cmd_data.cmd == streaming::stream_blob_cmd::error) { + got_error_from_peer = true; + blogger.warn("fstream[{}] Master got stream_blob_cmd::error file={} peer={}", + ops_id, filename, s.node); + throw std::runtime_error(format("Got stream_blob_cmd::error from peer {}", s.node)); + } if (cmd_data.cmd == streaming::stream_blob_cmd::ok) { + got_cmd_ok = true; + } + blogger.debug("fstream[{}] Master got stream_blob_cmd={} file={} peer={}", + ops_id, int(cmd_data.cmd), filename, s.node); + } else { + break; + } + } catch (seastar::rpc::stream_closed) { + // After we get streaming::stream_blob_cmd::ok + // which is the last message from peer, it does not + // matter if the source() is closed or not. + if (got_cmd_ok) { + break; + } else { + throw; + } + } catch (...) { + throw; + } + } + }); + }; + + co_await coroutine::all(send_data_to_peer, get_status_code_from_peer); + } catch (...) { + error = std::current_exception(); + } + if (error) { + blogger.warn("fstream[{}] Master failed sending file={} to targets={} send_size={} bw={} error={}", + ops_id, filename, targets, total_size, get_bw(total_size, start_time), error); + // Error handling for fstream and sink + if (!fstream_closed) { + try { + if (fstream) { + co_await fstream->close(); + } + } catch (...) { + // We could do nothing but continue to cleanup more + blogger.warn("fstream[{}] Master failed to close file stream: {}", + ops_id, std::current_exception()); + } + } + for (auto& s : ss) { + try { + if (!s.status_sent && !s.sink_closed) { + co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error)); + s.status_sent = true; + } + } catch (...) { + // We could do nothing but continue to close + blogger.warn("fstream[{}] Master failed to send error code: {}", + ops_id, std::current_exception()); + } + try { + if (!s.sink_closed) { + co_await s.sink.close(); + s.sink_closed = true; + } + } catch (...) { + // We could do nothing but continue + blogger.warn("fstream[{}] Master failed to close rpc stream sink: {}", + ops_id, std::current_exception()); + } + + try { + // Drain everything in source + for (;;) { + auto opt = co_await s.source(); + if (!opt) { + break; + } + } + } catch (...) { + blogger.warn("fstream[{}] Master failed to drain rpc stream source: {}", + ops_id, std::current_exception()); + } + } + // Stop handling remaining files + break; + } else { + blogger.debug("fstream[{}] Master done sending file={} to targets={} send_size={} bw={}", + ops_id, filename, targets, total_size, get_bw(total_size, start_time)); + } + } + if (error) { + blogger.warn("fstream[{}] Master failed sending files_nr={} files={} targets={} send_size={} bw={} error={}", + ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time), error); + std::rethrow_exception(error); + } else { + blogger.debug("fstream[{}] Master finished sending files_nr={} files={} targets={} send_size={} bw={}", + ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time)); + } + co_return ops_total_size; +} + + +future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip) { + stream_files_response resp; + auto& table = db.find_column_family(req.table); + auto sstables = co_await table.take_storage_snapshot(req.range); + co_await utils::get_local_injector().inject("order_sstables_for_streaming", [&sstables] (auto& handler) -> future<> { + if (sstables.size() == 3) { + // make sure the sstables are ordered so that the sstable containing shadowed data is streamed last + const std::string_view shadowed_file = handler.template get("shadowed_file").value(); + for (int index: {0, 1}) { + if (sstables[index].sst->component_basename(component_type::Data) == shadowed_file) { + std::swap(sstables[index], sstables[2]); + } + } + } + return make_ready_future<>(); + }); + auto files = std::list(); + + sstables::sstable_generation_generator sst_gen(0); + + for (auto& sst_snapshot : sstables) { + auto& sst = sst_snapshot.sst; + // stable state (across files) is a must for load to work on destination + auto sst_state = sst->state(); + + auto sources = create_stream_sources(sst_snapshot); + auto newgen = fmt::to_string(sst_gen(sstables::uuid_identifiers::yes)); + + for (auto&& s : sources) { + auto oldname = s->component_basename(); + auto newname = get_sstable_name_with_generation(req.ops_id, oldname, newgen); + + blogger.debug("fstream[{}] Get name oldname={}, newname={}", req.ops_id, oldname, newname); + + auto& info = files.emplace_back(); + info.fops = file_ops::stream_sstables; + info.sstable_state = sst_state; + info.filename = std::move(newname); + info.source = [s = std::move(s)](const file_input_stream_options& options) { + return s->input(options); + }; + } + // ensure we mark the end of each component sequence. + if (!files.empty()) { + files.back().fops = file_ops::load_sstables; + } + } + if (files.empty()) { + co_return resp; + } + blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}", + req.ops_id, sstables.size(), files.size(), files, req.range); + auto ops_start_time = std::chrono::steady_clock::now(); + size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, std::move(host2ip), req.topo_guard); + resp.stream_bytes = stream_bytes; + auto duration = std::chrono::steady_clock::now() - ops_start_time; + blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} files={} range={} stream_bytes={} stream_time={} stream_bw={}", + req.ops_id, sstables.size(), files.size(), files, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time)); + co_return resp; +} + +future tablet_stream_files(const file_stream_id& ops_id, + replica::table& table, + const dht::token_range& range, + const locator::host_id& src_host, + const locator::host_id& dst_host, + seastar::shard_id dst_shard_id, + netw::messaging_service& ms, + abort_source& as, + service::frozen_topology_guard topo_guard) { + stream_files_response resp; + std::exception_ptr error; + try { + co_await mark_tablet_stream_start(ops_id); + } catch (...) { + error = std::current_exception(); + } + if (!error) { + try { + streaming::stream_files_request req; + req.ops_id = ops_id; + req.keyspace_name = table.schema()->ks_name(), + req.table_name = table.schema()->cf_name(); + req.table = table.schema()->id(); + req.range = range; + req.targets = std::vector{node_and_shard{dst_host, dst_shard_id}}; + req.topo_guard = topo_guard; + resp = co_await ser::streaming_rpc_verbs::send_tablet_stream_files(&ms, src_host, as, req); + } catch (...) { + error = std::current_exception(); + } + } + co_await mark_tablet_stream_done(ops_id); + if (error) { + std::rethrow_exception(error); + } + co_return resp; +} + +} diff --git a/streaming/stream_blob.hh b/streaming/stream_blob.hh new file mode 100644 index 0000000000..47ad861bc8 --- /dev/null +++ b/streaming/stream_blob.hh @@ -0,0 +1,192 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "message/messaging_service_fwd.hh" +#include +#include +#include +#include +#include +#include +#include +#include "utils/UUID.hh" +#include "dht/i_partitioner.hh" +#include "bytes.hh" +#include "replica/database_fwd.hh" +#include "locator/host_id.hh" +#include "service/topology_guard.hh" +#include "sstables/open_info.hh" + +#include +#include + +namespace streaming { + +using file_stream_id = utils::tagged_uuid; + +// - The file_ops::stream_sstables is used to stream a sstable file. +// +// - The file_ops::load_sstables is used to stream a sstable file and +// ask the receiver to load the sstable into the system. +enum class file_ops : uint16_t { + stream_sstables, + load_sstables, +}; + +// For STREAM_BLOB verb +enum class stream_blob_cmd : uint8_t { + ok, + error, + data, + end_of_stream, +}; + +class stream_blob_data { +public: + temporary_buffer buf; + stream_blob_data() = default; + stream_blob_data(temporary_buffer b) : buf(std::move(b)) {} + const char* data() const { + return buf.get(); + } + size_t size() const { + return buf.size(); + } + bool empty() const { + return buf.size() == 0; + } +}; + +class stream_blob_cmd_data { +public: + stream_blob_cmd cmd; + // The optional data contains value when the cmd is stream_blob_cmd::data. + // When the cmd is set to other values, e.g., stream_blob_cmd::error, the + // data contains no value. + std::optional data; + stream_blob_cmd_data(stream_blob_cmd c) : cmd(c) {} + stream_blob_cmd_data(stream_blob_cmd c, std::optional d) + : cmd(c) + , data(std::move(d)) + {} + stream_blob_cmd_data(stream_blob_cmd c, stream_blob_data d) + : cmd(c) + , data(std::move(d)) + {} + +}; + +class stream_blob_meta { +public: + file_stream_id ops_id; + table_id table; + sstring filename; + seastar::shard_id dst_shard_id; + streaming::file_ops fops; + service::frozen_topology_guard topo_guard; + std::optional sstable_state; + // We can extend this verb to send arbitary blob of data +}; + +enum class store_result { + ok, failure, +}; + +using stream_blob_source_fn = noncopyable_function>(const file_input_stream_options&)>; +using stream_blob_finish_fn = noncopyable_function(store_result)>; +using output_result = std::tuple>; +using stream_blob_create_output_fn = noncopyable_function(replica::database&, const streaming::stream_blob_meta&)>; + +struct stream_blob_info { + sstring filename; + streaming::file_ops fops; + std::optional sstable_state; + stream_blob_source_fn source; + + friend inline std::ostream& operator<<(std::ostream& os, const stream_blob_info& x) { + return os << x.filename; + } +}; + +// The handler for the STREAM_BLOB verb. +seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, gms::inet_address from, streaming::stream_blob_meta meta, rpc::sink sink, rpc::source source); + +// Exposed mainly for testing + +future<> stream_blob_handler(replica::database& db, + netw::messaging_service& ms, + gms::inet_address from, + streaming::stream_blob_meta meta, + rpc::sink sink, + rpc::source source, + stream_blob_create_output_fn, + bool may_inject_errors = false + ); + +// For TABLET_STREAM_FILES +class node_and_shard { +public: + locator::host_id node; + seastar::shard_id shard; + friend inline std::ostream& operator<<(std::ostream& os, const node_and_shard& x) { + return os << x.node << ":" << x.shard; + } + +}; + +} + +template <> struct fmt::formatter : fmt::ostream_formatter {}; + +namespace streaming { + +class stream_files_request { +public: + file_stream_id ops_id; + sstring keyspace_name; + sstring table_name; + table_id table; + dht::token_range range; + std::vector targets; + service::frozen_topology_guard topo_guard; +}; + +class stream_files_response { +public: + size_t stream_bytes = 0; +}; + +using host2ip_t = std::function (locator::host_id)>; + +// The handler for the TABLET_STREAM_FILES verb. The receiver of this verb will +// stream sstables files specified by the stream_files_request req. +future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip); + +// Ask the src node to stream sstables to dst node for table in the given token range using TABLET_STREAM_FILES verb. +future tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src, const locator::host_id& dst, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard); + +// Exposed for testability +future tablet_stream_files(netw::messaging_service& ms, + std::list sources, + std::vector targets, + table_id table, + file_stream_id ops_id, + host2ip_t host2ip, + service::frozen_topology_guard topo_guard, + bool may_inject_errors = false + ); + + +future<> mark_tablet_stream_start(file_stream_id); +future<> mark_tablet_stream_done(file_stream_id); + +} + +template<> struct fmt::formatter; diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 4619f2b092..16d0128edd 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -19,6 +19,7 @@ #include "dht/auto_refreshing_sharder.hh" #include #include +#include "streaming/stream_blob.hh" #include "streaming/stream_session_state.hh" #include "service/migration_manager.hh" #include "mutation_writer/multishard_writer.hh" @@ -278,12 +279,21 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { return make_ready_future<>(); } }); + ms.register_stream_blob([this] (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source source) { + auto from = netw::messaging_service::get_source(cinfo).addr; + auto sink = _ms.local().make_sink_for_stream_blob(source); + (void)stream_blob_handler(_db.local(), _ms.local(), from, meta, sink, source).handle_exception([ms = _ms.local().shared_from_this()] (std::exception_ptr eptr) { + sslog.warn("Failed to run stream blob handler: {}", eptr); + }); + return make_ready_future>(sink); + }); } future<> stream_manager::uninit_messaging_service_handler() { auto& ms = _ms.local(); return when_all_succeed( ser::streaming_rpc_verbs::unregister(&ms), + ms.unregister_stream_blob(), ms.unregister_stream_mutation_fragments()).discard_result(); } diff --git a/test/boost/CMakeLists.txt b/test/boost/CMakeLists.txt index 26fddfec57..e11f2426c7 100644 --- a/test/boost/CMakeLists.txt +++ b/test/boost/CMakeLists.txt @@ -251,6 +251,8 @@ add_scylla_test(string_format_test KIND BOOST) add_scylla_test(summary_test KIND BOOST) +add_scylla_test(file_stream_test + KIND SEASTAR) add_scylla_test(tagged_integer_test KIND SEASTAR) add_scylla_test(token_metadata_test diff --git a/test/boost/file_stream_test.cc b/test/boost/file_stream_test.cc new file mode 100644 index 0000000000..e704231837 --- /dev/null +++ b/test/boost/file_stream_test.cc @@ -0,0 +1,260 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "test/lib/cql_test_env.hh" +#include "streaming/stream_blob.hh" +#include "message/messaging_service.hh" +#include "test/lib/log.hh" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +future generate_file_hash(sstring filename) { + auto f = co_await seastar::open_file_dma(filename, seastar::open_flags::ro); + auto in = seastar::make_file_input_stream(std::move(f)); + CryptoPP::SHA256 hash; + unsigned char digest[CryptoPP::SHA256::DIGESTSIZE]; + std::stringstream ss; + while (true) { + auto buf = co_await in.read(); + if (buf.empty()) { + break; + } + hash.Update((const unsigned char*)buf.get(), buf.size()); + } + co_await in.close(); + hash.Final(digest); + for (int i = 0; i < CryptoPP::SHA256::DIGESTSIZE; i++) { + ss << std::hex << std::setw(2) << std::setfill('0') << (int)digest[i]; + } + co_return ss.str(); +} + +sstring generate_random_filename() { + char filename[L_tmpnam]; + std::tmpnam(filename); + return filename; +} + +future<> write_random_content_to_file(const sstring& filename, size_t content_size = 1024) { + auto f = co_await open_file_dma(filename, open_flags::rw | open_flags::create); + auto ostream = co_await make_file_output_stream(std::move(f)); + srand(time(nullptr)); + for (size_t i = 0; i < content_size; ++i) { + char c = rand() % 256; + co_await ostream.write(&c, 1); + } + co_await ostream.close(); +} + +using namespace streaming; + +static future +do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vector filelist, const std::string& suffix, bool inject_error, bool unsupported_file_ops = false) { + bool ret = false; + bool verb_register = false; + auto ops_id = file_stream_id::create_random_id(); + auto& global_db = db.container(); + auto& global_ms = ms.container(); + int n_retries = 0; + + do { + try { + if (!verb_register) { + co_await smp::invoke_on_all([&] { + return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source source) { + auto from = netw::messaging_service::get_source(cinfo).addr; + auto sink = global_ms.local().make_sink_for_stream_blob(source); + (void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future { + auto path = meta.filename + suffix; + auto f = co_await open_file_dma(path, open_flags::wo|open_flags::create); + auto out = co_await make_file_output_stream(std::move(f)); + co_return output_result{ + [path = std::move(path)](store_result res) -> future<> { + if (res != store_result::ok) { + co_await remove_file(path); + } + }, + std::move(out) + }; + }, inject_error).handle_exception([sink, source, ms = global_ms.local().shared_from_this()] (std::exception_ptr eptr) { + testlog.warn("Failed to run stream blob handler: {}", eptr); + }); + return make_ready_future>(sink); + }); + }); + } + verb_register = true; + auto table = table_id::create_random_id(); + auto files = std::list(); + auto hostid = db.get_token_metadata().get_my_id(); + seastar::shard_id dst_shard_id = 0; + co_await mark_tablet_stream_start(ops_id); + auto targets = std::vector{node_and_shard{hostid, dst_shard_id}}; + for (const auto& filename : filelist) { + auto fops = file_ops::stream_sstables; + fops = unsupported_file_ops ? file_ops(0xff55) : fops; + auto file = co_await open_file_dma(filename, open_flags::ro); + auto& info = files.emplace_back(); + info.filename = filename; + info.fops = fops; + info.source = [file = std::move(file)](const file_input_stream_options& foptions) mutable -> future> { + co_return make_file_input_stream(std::move(file), foptions); + }; + } + auto host2ip = [&global_db] (locator::host_id id) -> future { + co_return global_db.local().get_token_metadata().get_topology().my_address(); + }; + size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, host2ip, service::null_topology_guard, inject_error); + co_await mark_tablet_stream_done(ops_id); + testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes); + ret = true; + } catch (seastar::rpc::stream_closed&) { + testlog.warn("do_test_file_stream[{}] status=fail error={} retry={}", ops_id, std::current_exception(), n_retries++); + if (n_retries < 3) { + testlog.info("Retrying send"); + continue; + } + } catch (...) { + testlog.warn("do_test_file_stream[{}] status=fail error={}", ops_id, std::current_exception()); + } + } while (false); + + if (verb_register) { + co_await smp::invoke_on_all([&global_ms] { + return global_ms.local().unregister_stream_blob(); + }); + } + co_return ret; +} + +void do_test_file_stream(bool inject_error) { + cql_test_config cfg; + cfg.ms_listen = true; + std::vector files; + std::vector files_rx; + std::vector hash_tx; + std::vector hash_rx; + size_t nr_files = 10; + size_t file_size = 0; + static const std::string suffix = ".rx"; + + while (files.size() != nr_files) { + auto name = generate_random_filename(); + files.push_back(name); + files_rx.push_back(name + suffix); + } + + size_t base_size = 1024; + +#ifdef SEASTAR_DEBUG + base_size = 1; +#endif + + for (auto& file : files) { + if (file_size == 0) { + file_size = 1 * 1024 * base_size; + } else { + file_size = (rand() % 10) * 1024 * base_size + rand() % base_size; + } + file_size = std::max(size_t(1), file_size); + testlog.info("file_tx={} file_size={}", file, file_size); + write_random_content_to_file(file, file_size).get(); + } + + do_with_cql_env_thread([files, inject_error] (auto& e) { + do_test_file_stream(e.local_db(), e.get_messaging_service().local(), files, suffix, inject_error).get(); + }, cfg).get(); + + bool cleanup = true; + for (auto& file : files) { + auto hash = generate_file_hash(file).get(); + testlog.info("file_tx={} hash={}", file, hash); + hash_tx.push_back(hash); + if (cleanup) { + seastar::remove_file(file).get(); + } + } + for (auto& file : files_rx) { + sstring hash = "SKIP"; + try { + hash = generate_file_hash(file).get(); + if (cleanup) { + seastar::remove_file(file).get(); + } + } catch (...) { + if (!inject_error) { + throw; + } + } + hash_rx.push_back(hash); + testlog.info("file_rx={} hash={}", file, hash); + } + + BOOST_REQUIRE(hash_tx.size() == hash_rx.size()); + for (size_t i = 0; i < hash_tx.size(); i++) { + testlog.info("Check tx_hash={} rx_hash={}", hash_tx[i], hash_rx[i]); + if (inject_error) { + BOOST_REQUIRE(hash_tx[i] == hash_rx[i] || sstring("SKIP") == hash_rx[i]); + } else { + BOOST_REQUIRE(hash_tx[i] == hash_rx[i]); + } + } +} + +void do_test_unsupported_file_ops() { + bool inject_error = false; + bool unsupported_file_ops = true; + + cql_test_config cfg; + cfg.ms_listen = true; + std::vector files; + size_t nr_files = 2; + size_t file_size = 1024; + + while (files.size() != nr_files) { + auto name = generate_random_filename(); + files.push_back(name); + } + + for (auto& file : files) { + testlog.info("file_tx={} file_size={}", file, file_size); + write_random_content_to_file(file, file_size).get(); + } + + do_with_cql_env_thread([files, inject_error, unsupported_file_ops] (auto& e) { + auto ok = do_test_file_stream(e.local_db(), e.get_messaging_service().local(), files, "", inject_error, unsupported_file_ops).get(); + // Stream with a unsupported file ops should fail + BOOST_REQUIRE(ok == false); + }, cfg).get(); + + for (auto& file : files) { + seastar::remove_file(file).get(); + } +} + +SEASTAR_THREAD_TEST_CASE(test_file_stream) { + bool inject_error = false; + do_test_file_stream(inject_error); +} + +SEASTAR_THREAD_TEST_CASE(test_file_stream_inject_error) { + bool inject_error = true; + do_test_file_stream(inject_error); +} + +SEASTAR_THREAD_TEST_CASE(test_unsupported_file_ops) { + do_test_unsupported_file_ops(); +} diff --git a/test/topology_custom/test_tablets2.py b/test/topology_custom/test_tablets2.py index 441f113234..fcfbbc2070 100644 --- a/test/topology_custom/test_tablets2.py +++ b/test/topology_custom/test_tablets2.py @@ -251,6 +251,7 @@ async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient): cmdline = [ '--logger-log-level', 'storage_service=trace', '--logger-log-level', 'raft_topology=trace', + '--enable-file-stream', 'false', ] servers = [await manager.server_add(cmdline=cmdline)] diff --git a/test/topology_custom/test_tablets_migration.py b/test/topology_custom/test_tablets_migration.py index 486b939155..f203679b7e 100644 --- a/test/topology_custom/test_tablets_migration.py +++ b/test/topology_custom/test_tablets_migration.py @@ -6,13 +6,15 @@ from cassandra.query import SimpleStatement, ConsistencyLevel from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import HTTPError, read_barrier -from test.pylib.tablets import get_all_tablet_replicas +from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas from test.topology.conftest import skip_mode from test.topology.util import wait_for_cql_and_get_hosts import time import pytest import logging import asyncio +import os +import glob logger = logging.getLogger(__name__) @@ -289,3 +291,100 @@ async def test_tablet_back_and_forth_migration(manager: ManagerClient): await assert_rows(2) await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({3}, {3});") await assert_rows(3) + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_staging_backlog_is_preserved_with_file_based_streaming(manager: ManagerClient): + logger.info("Bootstrapping cluster") + # the error injection will halt view updates from staging, allowing migration to transfer the view update backlog. + cfg = {'enable_user_defined_functions': False, 'enable_tablets': True, + 'error_injections_at_startup': ['view_update_generator_consume_staging_sstable']} + servers = [await manager.server_add(config=cfg)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + await cql.run_async("CREATE MATERIALIZED VIEW test.mv1 AS \ + SELECT * FROM test.test WHERE pk IS NOT NULL AND c IS NOT NULL \ + PRIMARY KEY (c, pk);") + + logger.info("Populating single tablet") + keys = range(256) + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + await manager.api.flush_keyspace(servers[0].ip_addr, "test") + + # check + async def check(expected): + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == len(expected) + await check(keys) + + logger.info("Adding new server") + servers.append(await manager.server_add(config=cfg)) + + async def get_table_dir(manager, server_id): + node_workdir = await manager.server_get_workdir(server_id) + return glob.glob(os.path.join(node_workdir, "data", "test", "test-*"))[0] + + s0_table_dir = await get_table_dir(manager, servers[0].server_id) + logger.info(f"Table dir in server 0: {s0_table_dir}") + + s1_table_dir = await get_table_dir(manager, servers[1].server_id) + logger.info(f"Table dir in server 1: {s1_table_dir}") + + # Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown. + # It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed. + manager.driver_close() + cql = None + await manager.server_stop_gracefully(servers[0].server_id) + + def move_sstables_to_staging(table_dir: str): + table_staging_dir = os.path.join(table_dir, "staging") + logger.info(f"Moving sstables to staging dir: {table_staging_dir}") + for sst in glob.glob(os.path.join(table_dir, "*-Data.db")): + for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")): + dst_path = os.path.join(table_staging_dir, os.path.basename(src_path)) + logger.info(f"Moving sstable file {src_path} to {dst_path}") + os.rename(src_path, dst_path) + + def sstable_count_in_staging(table_dir: str): + table_staging_dir = os.path.join(table_dir, "staging") + return len(glob.glob(os.path.join(table_staging_dir, "*-Data.db"))) + + move_sstables_to_staging(s0_table_dir) + s0_sstables_in_staging = sstable_count_in_staging(s0_table_dir) + + await manager.server_start(servers[0].server_id) + cql = manager.get_cql() + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + tablet_token = 0 # Doesn't matter since there is one tablet + replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token) + s1_host_id = await manager.get_host_id(servers[1].server_id) + dst_shard = 0 + + migration_task = asyncio.create_task( + manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token)) + + logger.info("Waiting for migration to finish") + await migration_task + logger.info("Migration done") + + # FIXME: After https://github.com/scylladb/scylladb/issues/19149 is fixed, we can check that view updates complete + # after migration and then check for base-view consistency. By the time being, we only check that backlog is + # transferred by looking at staging directory. + + s1_sstables_in_staging = sstable_count_in_staging(s1_table_dir) + logger.info(f"SSTable count in staging dir of server 1: {s1_sstables_in_staging}") + + logger.info("Allowing view update generator to progress again") + for server in servers: + manager.api.disable_injection(server.ip_addr, 'view_update_generator_consume_staging_sstable') + + assert s0_sstables_in_staging > 0 + assert s0_sstables_in_staging == s1_sstables_in_staging + + await check(keys) diff --git a/test/topology_custom/test_topology_ops_encrypted.py b/test/topology_custom/test_topology_ops_encrypted.py new file mode 100644 index 0000000000..ac42c35e7c --- /dev/null +++ b/test/topology_custom/test_topology_ops_encrypted.py @@ -0,0 +1,85 @@ +# +# Copyright (C) 2023-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# +from test.pylib.scylla_cluster import ReplaceConfig +from test.pylib.manager_client import ManagerClient +from test.pylib.internal_types import ServerInfo +from test.pylib.util import unique_name, wait_for_cql_and_get_hosts +from test.topology.util import check_token_ring_and_group0_consistency, reconnect_driver +from test.topology_custom.test_topology_ops import check_node_log_for_failed_mutations, start_writes + +from cassandra.cluster import Session, ConsistencyLevel +from cassandra.query import SimpleStatement + +import asyncio +import time +import pytest +import logging + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +@pytest.mark.parametrize("tablets_enabled", [True, False]) +async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_enabled: bool, tmp_path): + """Test basic topology operations using the topology coordinator. But encrypted.""" + d = tmp_path / "keys" + d.mkdir() + k = d / "system_key" + k.write_text('AES/CBC/PKCS5Padding:128:ApvJEoFpQmogvam18bb54g==') + cfg = {'enable_tablets' : tablets_enabled, + 'user_info_encryption': {'enabled': True, 'key_provider': 'LocalFileSystemKeyProviderFactory'}, + 'system_key_directory': d.as_posix()} + rf = 3 + num_nodes = rf + if tablets_enabled: + num_nodes += 1 + + logger.info("Bootstrapping first node") + servers = [await manager.server_add(config=cfg)] + + logger.info(f"Restarting node {servers[0]}") + await manager.server_stop_gracefully(servers[0].server_id) + await manager.server_start(servers[0].server_id) + + logger.info("Bootstrapping other nodes") + servers += await manager.servers_add(num_nodes, config=cfg) + + await wait_for_cql_and_get_hosts(manager.cql, servers, time.time() + 60) + cql = await reconnect_driver(manager) + finish_writes = await start_writes(cql, rf, ConsistencyLevel.ONE) + + logger.info(f"Decommissioning node {servers[0]}") + await manager.decommission_node(servers[0].server_id) + await check_token_ring_and_group0_consistency(manager) + servers = servers[1:] + + logger.info(f"Restarting node {servers[0]} when other nodes have bootstrapped") + await manager.server_stop_gracefully(servers[0].server_id) + await manager.server_start(servers[0].server_id) + + logger.info(f"Stopping node {servers[0]}") + await manager.server_stop_gracefully(servers[0].server_id) + await check_node_log_for_failed_mutations(manager, servers[0]) + + logger.info(f"Replacing node {servers[0]}") + replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False) + servers = servers[1:] + [await manager.server_add(replace_cfg)] + await check_token_ring_and_group0_consistency(manager) + + logger.info(f"Stopping node {servers[0]}") + await manager.server_stop_gracefully(servers[0].server_id) + await check_node_log_for_failed_mutations(manager, servers[0]) + + logger.info(f"Removing node {servers[0]} using {servers[1]}") + await manager.remove_node(servers[1].server_id, servers[0].server_id) + await check_token_ring_and_group0_consistency(manager) + servers = servers[1:] + + logger.info("Checking results of the background writes") + await finish_writes() + + for server in servers: + await check_node_log_for_failed_mutations(manager, server) +