From 7f29b7d8f68be1764f6895d03ee7128a47446dee Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Tue, 22 Oct 2024 17:15:33 +0200 Subject: [PATCH 01/10] storage_proxy: propagate group0 client and TSM dependency This commit makes storage_proxy::remote dependent on raft_group0_client and topology_state_machine. storage_proxy::remote gets references to these via the call to start_remote(). These references will be needed to call storage_service::truncate_table_with_tablets(). --- main.cc | 2 +- service/storage_proxy.cc | 14 +++++++++----- service/storage_proxy.hh | 6 ++++-- test/lib/cql_test_env.cc | 2 +- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/main.cc b/main.cc index bbfd51a1b7..ab0999a806 100644 --- a/main.cc +++ b/main.cc @@ -1790,7 +1790,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl static seastar::sharded mtg; mtg.start(cfg->large_memory_allocation_warning_threshold()).get(); supervisor::notify("initializing storage proxy RPC verbs"); - proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(messaging), std::ref(gossiper), std::ref(mm), std::ref(sys_ks)).get(); + proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(messaging), std::ref(gossiper), std::ref(mm), std::ref(sys_ks), std::ref(group0_client), std::ref(tsm)).get(); auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] { proxy.invoke_on_all(&service::storage_proxy::stop_remote).get(); }); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 2ad4efa088..4783854503 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -238,15 +238,18 @@ class storage_proxy::remote { const gms::gossiper& _gossiper; migration_manager& _mm; sharded& _sys_ks; - + raft_group0_client& _group0_client; + topology_state_machine& _topology_state_machine; + netw::connection_drop_slot_t _connection_dropped; netw::connection_drop_registration_t _condrop_registration; bool _stopped{false}; public: - remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded& sys_ks) - : _sp(sp), _ms(ms), _gossiper(g), _mm(mm), _sys_ks(sys_ks) + remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded& sys_ks, + raft_group0_client& group0_client, topology_state_machine& tsm) + : _sp(sp), _ms(ms), _gossiper(g), _mm(mm), _sys_ks(sys_ks), _group0_client(group0_client), _topology_state_machine(tsm) , _connection_dropped(std::bind_front(&remote::connection_dropped, this)) , _condrop_registration(_ms.when_connection_drops(_connection_dropped)) { @@ -6610,8 +6613,9 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname, std: return remote().send_truncate_blocking(std::move(keyspace), std::move(cfname), timeout_in_ms); } -void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded& sys_ks) { - _remote = std::make_unique(*this, ms, g, mm, sys_ks); +void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded& sys_ks, + raft_group0_client& group0_client, topology_state_machine& tsm) { + _remote = std::make_unique(*this, ms, g, mm, sys_ks, group0_client, tsm); } future<> storage_proxy::stop_remote() { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 8e73e4943a..1783589a51 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -38,6 +38,7 @@ #include "replica/exceptions.hh" #include "locator/host_id.hh" #include "dht/token_range_endpoints.hh" +#include "service/storage_service.hh" class reconcilable_result; class frozen_mutation_and_schema; @@ -475,7 +476,8 @@ private: public: storage_proxy(distributed& db, config cfg, db::view::node_update_backlog& max_view_update_backlog, - scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory); + scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, + locator::effective_replication_map_factory& erm_factory); ~storage_proxy(); const distributed& get_db() const { @@ -510,7 +512,7 @@ public: } // Start/stop the remote part of `storage_proxy` that is required for performing distributed queries. - void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&, sharded& sys_ks); + void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&, sharded& sys_ks, raft_group0_client&, topology_state_machine&); future<> stop_remote(); gms::inet_address my_address() const noexcept; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 74a2afaca7..3b21d951b0 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -930,7 +930,7 @@ private: }); if (cfg_in.need_remote_proxy) { - _proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(_ms), std::ref(_gossiper), std::ref(_mm), std::ref(_sys_ks)).get(); + _proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(_ms), std::ref(_gossiper), std::ref(_mm), std::ref(_sys_ks), std::ref(group0_client), std::ref(_topology_state_machine)).get(); } auto stop_proxy_remote = defer([this, need = cfg_in.need_remote_proxy] { if (need) { From 3ac44109e3bf5daadd6f26df592850083b4e604d Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Mon, 25 Nov 2024 18:35:03 +0100 Subject: [PATCH 02/10] system.topology_requests: change schema This commit adds the new column in the system.topology_requests table which are needed for the new global topology request. --- db/system_keyspace.cc | 4 ++++ db/system_keyspace.hh | 1 + service/topology_mutation.cc | 5 +++++ service/topology_mutation.hh | 1 + 4 files changed, 11 insertions(+) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 3b30f85038..4fd17eef9a 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -283,6 +283,7 @@ schema_ptr system_keyspace::topology_requests() { .with_column("done", boolean_type) .with_column("error", utf8_type) .with_column("end_time", timestamp_type) + .with_column("truncate_table_id", uuid_type) .set_comment("Topology request tracking") .with_hash_version() .build(); @@ -3388,6 +3389,9 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t if (row.has("end_time")) { entry.end_time = row.get_as("end_time"); } + if (row.has("truncate_table_id")) { + entry.truncate_table_id = table_id(row.get_as("truncate_table_id")); + } return entry; } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 55a8183c49..a6f051a432 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -406,6 +406,7 @@ public: sstring error; db_clock::time_point end_time; db_clock::time_point ts; + table_id truncate_table_id; }; using topology_requests_entries = std::unordered_map; diff --git a/service/topology_mutation.cc b/service/topology_mutation.cc index ab7ab10182..c9be2916af 100644 --- a/service/topology_mutation.cc +++ b/service/topology_mutation.cc @@ -313,6 +313,11 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b return set("done", true); } +topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_truncate_table_data(const table_id& table_id) { + apply_atomic("truncate_table_id", table_id.uuid()); + return *this; +} + template class topology_mutation_builder_base; template class topology_mutation_builder_base; template class topology_mutation_builder_base; diff --git a/service/topology_mutation.hh b/service/topology_mutation.hh index 4d5f886a20..eb5514355c 100644 --- a/service/topology_mutation.hh +++ b/service/topology_mutation.hh @@ -151,6 +151,7 @@ public: using builder_base::del; topology_request_tracking_mutation_builder& set(const char* cell, topology_request value); topology_request_tracking_mutation_builder& done(std::optional error = std::nullopt); + topology_request_tracking_mutation_builder& set_truncate_table_data(const table_id& table_id); canonical_mutation build() { return canonical_mutation{std::move(_m)}; } }; From bfbfc0fea9ed0779dd713a2894c61f4166b74be3 Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Wed, 23 Oct 2024 12:03:06 +0200 Subject: [PATCH 03/10] feature_service: added cluster feature for system.topology schema change This patch adds a feature serive which protects the system.topology schema change against situations where clusters are incompletely upgraded to new a version and could be rolled back. --- gms/feature_service.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 5f1d4e6951..9fe07e0d85 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -116,6 +116,7 @@ public: gms::feature collection_indexing { *this, "COLLECTION_INDEXING"sv }; gms::feature large_collection_detection { *this, "LARGE_COLLECTION_DETECTION"sv }; gms::feature range_tombstone_and_dead_rows_detection { *this, "RANGE_TOMBSTONE_AND_DEAD_ROWS_DETECTION"sv }; + gms::feature truncate_as_topology_operation { *this, "TRUNCATE_AS_TOPOLOGY_OPERATION"sv }; gms::feature secondary_indexes_on_static_columns { *this, "SECONDARY_INDEXES_ON_STATIC_COLUMNS"sv }; gms::feature tablets { *this, "TABLETS"sv }; gms::feature uuid_sstable_identifiers { *this, "UUID_SSTABLE_IDENTIFIERS"sv }; From 36d35d229762ddcd5a6189bd9dd5f77e5bf54cc0 Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Wed, 23 Oct 2024 12:43:53 +0200 Subject: [PATCH 04/10] RPC: add truncate_with_tablets RPC with frozen_topology_guard This change introduces a new truncate_with_tablets RPC with a parameter of type service::frozen_topology_guard. This is materialized on replica nodes into a topology_guard which guarantees that truncate is performed under a global session, which, in turn, makes sure that we don't execute truncate as a result of stale RPCs. Also, this RPC does not have a timeout. Timeout will be handled on the coordinator side, and the truncate operation will not be allowed to time out. --- idl/storage_proxy.idl.hh | 1 + message/messaging_service.cc | 2 ++ message/messaging_service.hh | 3 ++- service/storage_proxy.cc | 8 +++++++- service/topology_coordinator.cc | 3 +++ 5 files changed, 15 insertions(+), 2 deletions(-) diff --git a/idl/storage_proxy.idl.hh b/idl/storage_proxy.idl.hh index 4a857585b2..b02870d273 100644 --- a/idl/storage_proxy.idl.hh +++ b/idl/storage_proxy.idl.hh @@ -32,6 +32,7 @@ verb [[with_client_info, with_timeout]] read_data (query::read_command cmd [[ref verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, service::fencing_token fence [[version 5.4.0]]) -> reconcilable_result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]]; verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]], std::optional [[version 5.2.0]]; verb [[with_timeout]] truncate (sstring, sstring); +verb [[]] truncate_with_tablets (sstring ks_name, sstring cf_name, service::frozen_topology_guard frozen_guard); verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional trace_info [[ref]]) -> service::paxos::prepare_response [[unique_ptr]]; verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional trace_info [[ref]]) -> bool; verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]]); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 9c505ea3cf..3f7cc10219 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -45,6 +45,7 @@ #include "serializer.hh" #include "db/per_partition_rate_limit_info.hh" #include "service/topology_state_machine.hh" +#include "service/topology_guard.hh" #include "service/raft/join_node.hh" #include "idl/consistency_level.dist.hh" #include "idl/tracing.dist.hh" @@ -643,6 +644,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::READ_DIGEST: case messaging_verb::DEFINITIONS_UPDATE: case messaging_verb::TRUNCATE: + case messaging_verb::TRUNCATE_WITH_TABLETS: case messaging_verb::MIGRATION_REQUEST: case messaging_verb::SCHEMA_CHECK: case messaging_verb::COUNTER_MUTATION: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index aa1f67bd68..62c710ea15 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -203,7 +203,8 @@ enum class messaging_verb : int32_t { JOIN_NODE_QUERY = 73, TASKS_GET_CHILDREN = 74, TABLET_REPAIR = 75, - LAST = 76, + TRUNCATE_WITH_TABLETS = 76, + LAST = 77, }; } // namespace netw diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 4783854503..5dcd1ddc7c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -263,6 +263,7 @@ public: ser::storage_proxy_rpc_verbs::register_read_mutation_data(&_ms, std::bind_front(&remote::handle_read_mutation_data, this)); ser::storage_proxy_rpc_verbs::register_read_digest(&_ms, std::bind_front(&remote::handle_read_digest, this)); ser::storage_proxy_rpc_verbs::register_truncate(&_ms, std::bind_front(&remote::handle_truncate, this)); + ser::storage_proxy_rpc_verbs::register_truncate_with_tablets(&_ms, std::bind_front(&remote::handle_truncate_with_tablets, this)); // Register PAXOS verb handlers ser::storage_proxy_rpc_verbs::register_paxos_prepare(&_ms, std::bind_front(&remote::handle_paxos_prepare, this)); ser::storage_proxy_rpc_verbs::register_paxos_accept(&_ms, std::bind_front(&remote::handle_paxos_accept, this)); @@ -925,7 +926,12 @@ private: } future<> handle_truncate(rpc::opt_time_point timeout, sstring ksname, sstring cfname) { - return replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname); + co_await replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname); + } + + future<> handle_truncate_with_tablets(sstring ksname, sstring cfname, service::frozen_topology_guard frozen_guard) { + topology_guard guard(frozen_guard); + co_await replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname); } future>> diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 9206537c9f..7053cacbae 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -51,6 +51,9 @@ #include "idl/join_node.dist.hh" #include "idl/storage_service.dist.hh" +#include "replica/exceptions.hh" +#include "service/paxos/prepare_response.hh" +#include "idl/storage_proxy.dist.hh" #include "service/topology_coordinator.hh" From fa3ec6e6334dbf91930b67bb620116ef1ab12e54 Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Tue, 3 Dec 2024 17:42:39 +0100 Subject: [PATCH 05/10] storage_service: move logic of wait_for_topology_request_completion() This change moves to logic of storage_service::wait_for_topology_request_completion() into topology_state_machine. --- service/storage_service.cc | 17 +---------------- service/topology_state_machine.cc | 20 ++++++++++++++++++++ service/topology_state_machine.hh | 5 +++++ 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index dfd98002ec..a1e8fc6220 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4678,22 +4678,7 @@ future<> storage_service::do_cluster_cleanup() { } future storage_service::wait_for_topology_request_completion(utils::UUID id, bool require_entry) { - rtlogger.debug("Start waiting for topology request completion (request id {})", id); - while (true) { - auto c = _topology_state_machine.reload_count; - auto [done, error] = co_await _sys_ks.local().get_topology_request_state(id, require_entry); - if (done) { - rtlogger.debug("Request with id {} is completed with status: {}", id, error.empty() ? sstring("success") : error); - co_return error; - } - if (c == _topology_state_machine.reload_count) { - // wait only if the state was not reloaded while we were preempted - rtlogger.debug("Waiting for a topology event while waiting for topology request completion (request id {})", id); - co_await _topology_state_machine.event.when(); - } - } - - co_return sstring(); + co_return co_await _topology_state_machine.wait_for_request_completion(_sys_ks.local(), id, require_entry); } future<> storage_service::wait_for_topology_not_busy() { diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index be783b52b8..56989e3055 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -9,6 +9,7 @@ #include "topology_state_machine.hh" #include "utils/log.hh" +#include "db/system_keyspace.hh" #include @@ -251,6 +252,25 @@ future<> topology_state_machine::await_not_busy() { } } +future topology_state_machine::wait_for_request_completion(db::system_keyspace& sys_ks, utils::UUID id, bool require_entry) { + tsmlogger.debug("Start waiting for topology request completion (request id {})", id); + while (true) { + auto c = reload_count; + auto [done, error] = co_await sys_ks.get_topology_request_state(id, require_entry); + if (done) { + tsmlogger.debug("Request with id {} is completed with status: {}", id, error.empty() ? sstring("success") : error); + co_return error; + } + if (c == reload_count) { + // wait only if the state was not reloaded while we were preempted + tsmlogger.debug("Waiting for a topology event while waiting for topology request completion (request id {})", id); + co_await event.when(); + } + } + + co_return sstring(); +} + } auto fmt::formatter::format(service::cleanup_status status, diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 49d30fb393..25f4656bf0 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -22,6 +22,10 @@ #include "service/session.hh" #include "mutation/canonical_mutation.hh" +namespace db { + class system_keyspace; +} + namespace service { enum class node_state: uint16_t { @@ -231,6 +235,7 @@ struct topology_state_machine { size_t reload_count = 0; future<> await_not_busy(); + future wait_for_request_completion(db::system_keyspace& sys_ks, utils::UUID id, bool require_entry); }; // Raft leader uses this command to drive bootstrap process on other nodes From 93cfeb91603cd213e9b3f91da62acd7f0bbb5287 Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Tue, 19 Nov 2024 17:33:12 +0100 Subject: [PATCH 06/10] truncate: make TRUNCATE a global topology operation This commit adds the code needed to create a TRUNCATE global topology request. It also adds the handler for this request to the topology coordinator. The execution of the truncate operation is not canceled on a timeout, but the query coordinator side will return a timeout error. --- service/storage_proxy.cc | 78 +++++++++++++++++++- service/topology_coordinator.cc | 115 ++++++++++++++++++++++++++++++ service/topology_state_machine.cc | 1 + service/topology_state_machine.hh | 1 + 4 files changed, 194 insertions(+), 1 deletion(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 5dcd1ddc7c..44a6531ef8 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -59,6 +59,7 @@ #include "service/migration_manager.hh" #include "service/client_state.hh" #include "service/paxos/proposal.hh" +#include "service/topology_mutation.hh" #include "locator/token_metadata.hh" #include #include @@ -240,7 +241,9 @@ class storage_proxy::remote { sharded& _sys_ks; raft_group0_client& _group0_client; topology_state_machine& _topology_state_machine; - + abort_source _group0_as; + future<> _truncate_table_fiber = make_ready_future<>(); + netw::connection_drop_slot_t _connection_dropped; netw::connection_drop_registration_t _condrop_registration; @@ -276,6 +279,8 @@ public: // Must call before destroying the `remote` object. future<> stop() { + _group0_as.request_abort(); + co_await std::move(_truncate_table_fiber); co_await ser::storage_proxy_rpc_verbs::unregister(&_ms); _stopped = true; } @@ -449,6 +454,25 @@ public: return ser::storage_proxy_rpc_verbs::send_paxos_prune(&_ms, addr, timeout, schema_id, key, ballot, tracing::make_trace_info(tr_state)); } + future<> truncate_with_tablets(sstring ks_name, sstring cf_name, std::chrono::milliseconds timeout_in_ms) { + if (!_truncate_table_fiber.available()) { + throw std::runtime_error("Another TRUNCATE TABLE is ongoing, please retry."); + } + auto sem = make_lw_shared(0); + _truncate_table_fiber = request_truncate_with_tablets(ks_name, cf_name).then_wrapped([sem] (auto f) { + if (f.failed()) { + sem->broken(f.get_exception()); + } else { + sem->signal(1); + } + }); + try { + co_await sem->wait(timeout_in_ms, 1); + } catch (seastar::semaphore_timed_out&) { + throw std::runtime_error(format("Timeout during TRUNCATE TABLE of {}.{}", ks_name, cf_name)); + } + } + future<> send_truncate_blocking(sstring keyspace, sstring cfname, std::chrono::milliseconds timeout_in_ms) { if (!_gossiper.get_unreachable_token_owners().empty()) { slogger.info("Cannot perform truncate, some hosts are down"); @@ -1044,6 +1068,58 @@ private: cf->drop_hit_rate(*id); } } + + future<> request_truncate_with_tablets(sstring ks_name, sstring cf_name) { + if (this_shard_id() != 0) { + // group0 is only set on shard 0 + co_return co_await _sp.container().invoke_on(0, [&] (storage_proxy& sp) { + return sp.remote().request_truncate_with_tablets(ks_name, cf_name); + }); + } + + // Create the global topology request + utils::UUID global_request_id; + while (true) { + group0_guard guard = co_await _group0_client.start_operation(_group0_as, raft_timeout{}); + + if (_topology_state_machine._topology.global_request.has_value()) { + throw exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."); + } + + global_request_id = guard.new_group0_state_id(); + + const table_id table_id = _sp.local_db().find_uuid(ks_name, cf_name); + + std::vector updates; + updates.emplace_back(topology_mutation_builder(guard.write_timestamp()) + .set_global_topology_request(global_topology_request::truncate_table) + .set_global_topology_request_id(global_request_id) + .set_session(session_id(global_request_id)) + .build()); + + updates.emplace_back(topology_request_tracking_mutation_builder(global_request_id) + .set_truncate_table_data(table_id) + .set("done", false) + .set("start_time", db_clock::now()) + .build()); + + topology_change change{std::move(updates)}; + sstring reason = "Truncating table"; + group0_command g0_cmd = _group0_client.prepare_command(std::move(change), guard, reason); + try { + co_await _group0_client.add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{}); + break; + } catch (group0_concurrent_modification&) { + slogger.debug("request_truncate_with_tablets: concurrent modification, retrying"); + } + } + + // Wait for the topology request to complete + sstring error = co_await _topology_state_machine.wait_for_request_completion(_sys_ks.local(), global_request_id, true); + if (!error.empty()) { + throw std::runtime_error(error); + } + } }; using namespace exceptions; diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 7053cacbae..6273f04f91 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -836,6 +836,121 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); } break; + case global_topology_request::truncate_table: { + // Execute a barrier to make sure the nodes we are performing truncate on see the session + // and are able to create a topology_guard using the frozen_guard we are sending over RPC + // TODO: Exclude nodes which don't contain replicas of the table we are truncating + guard = co_await global_tablet_token_metadata_barrier(std::move(guard)); + + const utils::UUID& global_request_id = _topo_sm._topology.global_request_id.value(); + std::optional error; + // We should perform TRUNCATE only if the session is still valid. It could be cleared if a previous truncate + // handler performed the truncate and cleared the session, but crashed before finalizing the request + if (_topo_sm._topology.session) { + const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id, true); + const table_id& table_id = topology_requests_entry.truncate_table_id; + lw_shared_ptr table = _db.get_tables_metadata().get_table_if_exists(table_id); + + if (table) { + const sstring& ks_name = table->schema()->ks_name(); + const sstring& cf_name = table->schema()->cf_name(); + + rtlogger.info("Performing TRUNCATE TABLE global topology request for {}.{}", ks_name, cf_name); + + // Collect the IDs of the hosts with replicas, but ignore excluded nodes + std::unordered_set replica_hosts; + const std::unordered_set excluded_nodes = _topo_sm._topology.get_excluded_nodes(); + const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id); + co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) { + for (const locator::tablet_replica& replica: tinfo.replicas) { + if (!excluded_nodes.contains(raft::server_id(replica.host.uuid()))) { + replica_hosts.insert(replica.host); + } + } + return make_ready_future<>(); + }); + + // Release the guard to avoid blocking group0 for long periods of time while invoking RPCs + release_guard(std::move(guard)); + + // Check if all the nodes with replicas are alive + for (const locator::host_id& replica_host: replica_hosts) { + if (!_gossiper.is_alive(replica_host)) { + throw std::runtime_error(::format("Cannot perform TRUNCATE on table {}.{} because host {} is down", ks_name, cf_name, replica_host)); + } + } + + // Send the RPC to all replicas + const service::frozen_topology_guard frozen_guard { _topo_sm._topology.session }; + co_await coroutine::parallel_for_each(replica_hosts, [&] (const locator::host_id& host_id) -> future<> { + co_await ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard); + }); + } else { + error = ::format("Table with UUID {} does not exist.", table_id); + } + + // Clear the session and save the error message + while (true) { + if (!guard) { + guard = co_await start_operation(); + } + + std::vector updates; + updates.push_back(topology_mutation_builder(guard.write_timestamp()) + .del_session() + .build()); + if (error) { + updates.push_back(topology_request_tracking_mutation_builder(global_request_id) + .set("error", *error) + .build()); + } + + sstring reason = "Clear truncate session"; + topology_change change{std::move(updates)}; + group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); + try { + co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); + break; + } catch (group0_concurrent_modification&) { + rtlogger.info("handle_global_request(): concurrent modification, retrying"); + } + } + } + + // Execute a barrier to ensure the TRUNCATE RPC can't run on any nodes after this point + if (!guard) { + guard = co_await start_operation(); + } + guard = co_await global_tablet_token_metadata_barrier(std::move(guard)); + + // Finalize the request + while (true) { + if (!guard) { + guard = co_await start_operation(); + } + std::vector updates; + updates.push_back(topology_mutation_builder(guard.write_timestamp()) + .del_global_topology_request() + .del_global_topology_request_id() + .build()); + updates.push_back(topology_request_tracking_mutation_builder(global_request_id) + .set("end_time", db_clock::now()) + .set("done", true) + .build()); + + sstring reason = "Truncate has completed"; + topology_change change{std::move(updates)}; + group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); + try { + co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); + break; + } catch (group0_concurrent_modification&) { + rtlogger.info("handle_global_request(): concurrent modification, retrying"); + } + } + + break; + } } } diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 56989e3055..3c30937670 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -204,6 +204,7 @@ static std::unordered_map global_topology_requ {global_topology_request::new_cdc_generation, "new_cdc_generation"}, {global_topology_request::cleanup, "cleanup"}, {global_topology_request::keyspace_rf_change, "keyspace_rf_change"}, + {global_topology_request::truncate_table, "truncate_table"}, }; global_topology_request global_topology_request_from_string(const sstring& s) { diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 25f4656bf0..e9eda1b3b6 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -76,6 +76,7 @@ enum class global_topology_request: uint16_t { new_cdc_generation, cleanup, keyspace_rf_change, + truncate_table, }; struct ring_slice { From 4cd7a1acaba128474d00c8dd9e52ef16c646d20f Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Tue, 19 Nov 2024 18:26:34 +0100 Subject: [PATCH 07/10] storage_proxy: use new TRUNCATE for tablets This change adds branching based on keyspace replication method, and uses the new TRUNCATE for keyspaces with tablets. --- service/storage_proxy.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 44a6531ef8..2bc976a40e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -6688,11 +6688,14 @@ db::hints::manager& storage_proxy::hints_manager_for(db::write_type type) { future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname, std::chrono::milliseconds timeout_in_ms) { slogger.debug("Starting a blocking truncate operation on keyspace {}, CF {}", keyspace, cfname); - if (local_db().find_keyspace(keyspace).get_replication_strategy().get_type() == locator::replication_strategy_type::local) { - return replica::database::truncate_table_on_all_shards(_db, remote().system_keyspace().container(), keyspace, cfname); + const replica::keyspace& ks = local_db().find_keyspace(keyspace); + if (ks.get_replication_strategy().get_type() == locator::replication_strategy_type::local) { + co_await replica::database::truncate_table_on_all_shards(_db, remote().system_keyspace().container(), keyspace, cfname); + } else if (ks.uses_tablets() && features().truncate_as_topology_operation) { + co_await remote().truncate_with_tablets(std::move(keyspace), std::move(cfname), timeout_in_ms); + } else { + co_await remote().send_truncate_blocking(std::move(keyspace), std::move(cfname), timeout_in_ms); } - - return remote().send_truncate_blocking(std::move(keyspace), std::move(cfname), timeout_in_ms); } void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded& sys_ks, From e65a235fd5a6f77912982ca3f67fa8a4a845efff Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Tue, 3 Dec 2024 18:18:54 +0100 Subject: [PATCH 08/10] test: add tests for truncate with tablets This patch adds the unit tests for truncate with tablets. test_truncate_while_migration() triggers a tablet migration, then runs a TRUNCATE TABLE for the table containing the tablet being migrated. test_truncate_with_concurrent_drop() starts a truncate, then attempts to drop the table while it is being truncated. test_truncate_while_node_restart() validates the case where a replica node is restarted while truncate is running. test_truncate_with_coordinator_crash() validates if truncate is correctly completed in cases where the topology coordinator has crashed or restarted after the truncate session is cleared, but before the truncate request is finalized. --- service/storage_service.cc | 4 + service/topology_coordinator.cc | 10 + .../test_truncate_with_tablets.py | 215 ++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 test/topology_custom/test_truncate_with_tablets.py diff --git a/service/storage_service.cc b/service/storage_service.cc index a1e8fc6220..0d1c8130ff 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6025,6 +6025,10 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { throw std::runtime_error(fmt::format("Cannot stream within the same node using regular migration, tablet: {}, shard {} -> {}", tablet, leaving_replica->shard, trinfo->pending_replica->shard)); } + co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) { + rtlogger.info("migration_streaming_wait: start"); + return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); + }); auto& table = _db.local().find_column_family(tablet.table); std::vector tables = {table.schema()->cf_name()}; auto my_id = tm->get_my_id(); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 6273f04f91..251bddc032 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -873,6 +873,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Release the guard to avoid blocking group0 for long periods of time while invoking RPCs release_guard(std::move(guard)); + co_await utils::get_local_injector().inject("truncate_table_wait", [] (auto& handler) { + rtlogger.info("truncate_table_wait: start"); + return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); + }); + // Check if all the nodes with replicas are alive for (const locator::host_id& replica_host: replica_hosts) { if (!_gossiper.is_alive(replica_host)) { @@ -917,6 +922,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } } + utils::get_local_injector().inject("truncate_crash_after_session_clear", [] { + rtlogger.info("truncate_crash_after_session_clear hit, killing the node"); + _exit(1); + }); + // Execute a barrier to ensure the TRUNCATE RPC can't run on any nodes after this point if (!guard) { guard = co_await start_operation(); diff --git a/test/topology_custom/test_truncate_with_tablets.py b/test/topology_custom/test_truncate_with_tablets.py new file mode 100644 index 0000000000..da500c7515 --- /dev/null +++ b/test/topology_custom/test_truncate_with_tablets.py @@ -0,0 +1,215 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from cassandra.query import SimpleStatement, ConsistencyLevel +from cassandra.protocol import InvalidRequest +from test.pylib.manager_client import ManagerClient +from test.topology.conftest import skip_mode +from test.topology.util import get_topology_coordinator +from test.pylib.tablets import get_all_tablet_replicas +from test.pylib.util import wait_for_cql_and_get_hosts +import time +import pytest +import logging +import asyncio + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_truncate_while_migration(manager: ManagerClient): + + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True, + 'error_injections_at_startup': ['migration_streaming_wait'] + } + + servers = [] + servers.append(await manager.server_add(config=cfg)) + + cql = manager.get_cql() + + # Create a keyspace with tablets and initial_tablets == 2, then insert data + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") + await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);') + + keys = range(1024) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + + # Add a node to the cluster. This will cause the tablet load balancer to migrate one tablet to the new node + servers.append(await manager.server_add(config=cfg)) + + # Wait for tablet streaming to start + pending_node = servers[1] + pending_log = await manager.server_open_log(pending_node.server_id) + + await pending_log.wait_for('migration_streaming_wait: start') + await manager.api.message_injection(pending_node.ip_addr, 'migration_streaming_wait') + + # Do a TRUNCATE TABLE while the tablet is being streamed + await cql.run_async('TRUNCATE TABLE test.test') + + # Wait for streaming to complete + await pending_log.wait_for('raft_topology - Streaming for tablet migration of.*successful') + + # Check if we have any data + row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL)) + assert row[0].count == 0 + + +async def get_raft_leader_and_log(manager: ManagerClient, servers): + raft_leader_host_id = await get_topology_coordinator(manager) + for s in servers: + if raft_leader_host_id == await manager.get_host_id(s.server_id): + raft_leader = s + break + raft_leader_log = await manager.server_open_log(raft_leader.server_id) + return (raft_leader, raft_leader_log) + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_truncate_with_concurrent_drop(manager: ManagerClient): + + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True, + 'error_injections_at_startup': ['truncate_table_wait'] + } + + servers = [] + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + + cql = manager.get_cql() + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + # Create a keyspace with tablets and initial_tablets == 2, then insert data + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") + await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);') + + keys = range(1024) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + + (raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers) + + if raft_leader == servers[0]: + trunc_host = hosts[1] + drop_host = hosts[2] + elif raft_leader == servers[1]: + trunc_host = hosts[0] + drop_host = hosts[2] + elif raft_leader == servers[2]: + trunc_host = hosts[0] + drop_host = hosts[1] + else: + assert False, 'Unable to determine raft leader' + + # Start a TRUNCATE in the background + trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=trunc_host) + # Wait for the topology coordinator to reach a point wher it is about to start sending the truncate RPCs + await raft_leader_log.wait_for('truncate_table_wait: start') + # Execute DROP TABLE + await cql.run_async('DROP TABLE test.test', host=drop_host) + # Release TRUNCATE table in topology coordinator + await manager.api.message_injection(raft_leader.ip_addr, 'truncate_table_wait') + # Check we received an error + with pytest.raises(InvalidRequest, match='unconfigured table test'): + await trunc_future + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_truncate_while_node_restart(manager: ManagerClient): + + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True } + + servers = [] + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + + cql = manager.get_cql() + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + # Create a keyspace with tablets and initial_tablets == 2, then insert data + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") + await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);') + + keys = range(1024) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + + (raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers) + + # Decide which node to restart; select a node with a replica but not the raft leader + tablet_replicas = await get_all_tablet_replicas(manager, raft_leader, 'test', 'test') + replica_hosts = [tr.replicas[0][0] for tr in tablet_replicas] + for s in servers: + if s != raft_leader: + host_id = await manager.get_host_id(s.server_id) + if host_id in replica_hosts: + restart_node = s + break + + # Shutdown the node containing a replica + await manager.server_stop_gracefully(restart_node.server_id) + # Start truncating in the background + trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=hosts[0]) + # Restart the node + await manager.server_start(restart_node.server_id) + # Wait for truncate to complete + await trunc_future + + # Check if truncate was successful + row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL)) + assert row[0].count == 0 + + +@pytest.mark.xfail(reason="issue #21719") +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_truncate_with_coordinator_crash(manager: ManagerClient): + + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True } + + servers = [] + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + + cql = manager.get_cql() + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + # Create a keyspace with tablets and initial_tablets == 2, then insert data + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") + await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);') + + keys = range(1024) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + + (raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers) + + if raft_leader == servers[0]: + trunc_host = hosts[1] + else: + trunc_host = hosts[0] + + # Enable injection to crash the raft leader after truncate cleared the session ID + await manager.api.enable_injection(raft_leader.ip_addr, 'truncate_crash_after_session_clear', one_shot=False) + + # Start a TRUNCATE in the background + trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=trunc_host) + # Wait for the topology coordinator to crash + await raft_leader_log.wait_for('truncate_crash_after_session_clear hit, killing the node') + await manager.server_stop(raft_leader.server_id) + # Restart the crashed node + await manager.server_start(raft_leader.server_id) + # Wait for truncate to complete + await trunc_future + + # Check if we have any data + row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL)) + assert row[0].count == 0 From 781f0a23974cdb70fa3cdbaf1f9746836eb45559 Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Fri, 25 Oct 2024 12:08:39 +0200 Subject: [PATCH 09/10] storage_proxy: fix indentation and remove empty catch/rethrow This change fixes code indentation in storage_proxy::remote::send_truncate_blocking() It also removes an empty catch and rethrow block. --- service/storage_proxy.cc | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 2bc976a40e..cb982af6fe 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -495,12 +495,10 @@ public: co_await coroutine::parallel_for_each(all_endpoints, [&] (auto ep) { return send_truncate(ep, timeout, keyspace, cfname); }); - } catch (rpc::timeout_error& e) { - slogger.trace("Truncation of {} timed out: {}", cfname, e.what()); - throw; - } catch (...) { - throw; - } + } catch (rpc::timeout_error& e) { + slogger.trace("Truncation of {} timed out: {}", cfname, e.what()); + throw; + } } private: From 49cc771bda26ea8b3a57f2be205bb4a9a99d518c Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Fri, 1 Nov 2024 15:42:08 +0100 Subject: [PATCH 10/10] docs: docs: topology-over-raft: Document truncate_table request --- docs/dev/topology-over-raft.md | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 7d2b304dec..13c462ea4a 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -154,8 +154,20 @@ transition. An example of this is the `rebuilding` state which does not change the topology but requires streaming data. Separately from the per-node requests, there is also a 'global' request field -for operations that don't affect any specific node but the entire cluster, -such as `check_and_repair_cdc_streams`. +for operations that don't affect any specific node but the entire cluster. These +are the currently supported global topology operations: + +- `new_cdc_generation` aka `check_and_repair_cdc_streams` +- `cleanup` +- `keyspace_rf_change` +- `truncate_table` Truncate for keyspaces with tablets is implemented as a topology + request in order to serialize it with other topology operations (migration, repair) + and avoid issues with data resurrection when truncate is executed during tablet + migrations. Truncate has only one implicit transition stage. When the topology + coordinator executes the truncate request, it issues truncate RPCs to nodes which + contain replicas of the table being truncated. It uses [sessions](#Topology guards) + to make sure that no stale RPCs are executed outside of the scope of the request. + # Load balancing @@ -591,8 +603,8 @@ There are also a few static columns for cluster-global properties: - `global_topology_request_id` - if set, contains global topology request's id, which is a new group0's state id - `new_cdc_generation_data_uuid` - used in `commit_cdc_generation` state, the time UUID of the generation to be committed - `upgrade_state` - describes the progress of the upgrade to raft-based topology. -- 'new_keyspace_rf_change_ks_name' - the name of the KS that is being the target of the scheduled ALTER KS statement -- 'new_keyspace_rf_change_data' - the KS options to be used when executing the scheduled ALTER KS statement +- `new_keyspace_rf_change_ks_name` - the name of the KS that is being the target of the scheduled ALTER KS statement +- `new_keyspace_rf_change_data` - the KS options to be used when executing the scheduled ALTER KS statement # Join procedure