From b0b91bf5ec45bd437393f7729de2cfb778501ea4 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 31 May 2023 18:16:36 +0300 Subject: [PATCH 1/5] proxy/remote: Keep sharded& dependency This dependency will be needed to call service::paxos_state:: calls and all of them are done in storage_proxy::remote() methods only Signed-off-by: Pavel Emelyanov --- main.cc | 2 +- service/storage_proxy.cc | 13 +++++++++---- service/storage_proxy.hh | 2 +- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/main.cc b/main.cc index 21353c0ce8..2da58da815 100644 --- a/main.cc +++ b/main.cc @@ -1446,7 +1446,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl supervisor::notify("initializing migration manager RPC verbs"); mm.invoke_on_all(&service::migration_manager::init_messaging_service).get(); supervisor::notify("initializing storage proxy RPC verbs"); - proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(messaging), std::ref(gossiper), std::ref(mm)).get(); + proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(messaging), std::ref(gossiper), std::ref(mm), std::ref(sys_ks)).get(); auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] { proxy.invoke_on_all(&service::storage_proxy::stop_remote).get(); }); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 02e6913de2..3ebf69b759 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -163,6 +163,7 @@ class storage_proxy::remote { netw::messaging_service& _ms; const gms::gossiper& _gossiper; migration_manager& _mm; + sharded& _sys_ks; netw::connection_drop_slot_t _connection_dropped; netw::connection_drop_registration_t _condrop_registration; @@ -170,8 +171,8 @@ class storage_proxy::remote { bool _stopped{false}; public: - remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm) - : _sp(sp), _ms(ms), _gossiper(g), _mm(mm) + remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded& sys_ks) + : _sp(sp), _ms(ms), _gossiper(g), _mm(mm), _sys_ks(sys_ks) , _connection_dropped(std::bind_front(&remote::connection_dropped, this)) , _condrop_registration(_ms.when_connection_drops(_connection_dropped)) { @@ -209,6 +210,10 @@ public: return _gossiper.is_alive(ep); } + db::system_keyspace& system_keyspace() { + return _sys_ks.local(); + } + // Note: none of the `send_*` functions use `remote` after yielding - by the first yield, // control is delegated to another service (messaging_service). Thus unfinished `send`s // do not make it unsafe to destroy the `remote` object. @@ -6233,8 +6238,8 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname, std: return remote().send_truncate_blocking(std::move(keyspace), std::move(cfname), timeout_in_ms); } -void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm) { - _remote = std::make_unique(*this, ms, g, mm); +void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded& sys_ks) { + _remote = std::make_unique(*this, ms, g, mm, sys_ks); } future<> storage_proxy::stop_remote() { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 26810a46fa..d1ae69b8eb 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -487,7 +487,7 @@ public: } // Start/stop the remote part of `storage_proxy` that is required for performing distributed queries. - void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&); + void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&, sharded& sys_ks); future<> stop_remote(); private: From b4fc1076e3229c0038e17bc410fde60441ad400b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 19 Jul 2023 19:30:35 +0300 Subject: [PATCH 2/5] test: Optionally initialize proxy remote for cql_test_env Some test cases that use cql_test_env involve paxos state updates. Since this update is becoming via proxy->remote->system_keyspace those test cases need cql_test_env to initialize the remote part of the proxy too Signed-off-by: Pavel Emelyanov --- test/boost/cql_query_test.cc | 16 ++++++++++++---- test/boost/extensions_test.cc | 4 +++- test/boost/query_processor_test.cc | 4 +++- test/lib/cql_test_env.cc | 9 +++++++++ test/lib/cql_test_env.hh | 1 + 5 files changed, 28 insertions(+), 6 deletions(-) diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index ec41f8c1bc..12a9354e20 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4682,6 +4682,8 @@ static void prepared_on_shard(cql_test_env& e, const sstring& query, } SEASTAR_TEST_CASE(test_null_value_tuple_floating_types_and_uuids) { + cql_test_config cfg; + cfg.need_remote_proxy = true; return do_with_cql_env_thread([] (cql_test_env& e) { auto test_for_single_type = [&e] (const shared_ptr& type, auto update_value) { cquery_nofail(e, format("CREATE TABLE IF NOT EXISTS t (k int PRIMARY KEY, test {})", type->cql3_type_name())); @@ -4703,10 +4705,12 @@ SEASTAR_TEST_CASE(test_null_value_tuple_floating_types_and_uuids) { test_for_single_type(float_type, 1.0f); test_for_single_type(uuid_type, utils::make_random_uuid()); test_for_single_type(timeuuid_type, utils::UUID("00000000-0000-1000-0000-000000000000")); - }); + }, std::move(cfg)); } SEASTAR_TEST_CASE(test_like_parameter_marker) { + cql_test_config cfg; + cfg.need_remote_proxy = true; return do_with_cql_env_thread([] (cql_test_env& e) { cquery_nofail(e, "CREATE TABLE t (pk int PRIMARY KEY, col text)"); cquery_nofail(e, "INSERT INTO t (pk, col) VALUES (1, 'aaa')"); @@ -4720,10 +4724,12 @@ SEASTAR_TEST_CASE(test_like_parameter_marker) { prepared_on_shard(e, query, {T("err"), I(1), T("a%")}, {{B(false), "chg"}}); prepared_on_shard(e, query, {T("chg"), I(2), T("b%")}, {{B(true), "bbb"}}); prepared_on_shard(e, query, {T("err"), I(1), T("a%")}, {{B(false), "chg"}}); - }); + }, std::move(cfg)); } SEASTAR_TEST_CASE(test_list_parameter_marker) { + cql_test_config cfg; + cfg.need_remote_proxy = true; return do_with_cql_env_thread([] (cql_test_env& e) { cquery_nofail(e, "CREATE TABLE t (k int PRIMARY KEY, v list)"); cquery_nofail(e, "INSERT INTO t (k, v) VALUES (1, [10, 20, 30])"); @@ -4742,10 +4748,12 @@ SEASTAR_TEST_CASE(test_list_parameter_marker) { prepared_on_shard(e, query, {list_of({100, 200, 300}), I(1), list_of({20, 21, 22})}, {{B(true), list_of({10, 20, 30})}}); - }); + }, std::move(cfg)); } SEASTAR_TEST_CASE(test_select_serial_consistency) { + cql_test_config cfg; + cfg.need_remote_proxy = true; return do_with_cql_env_thread([] (cql_test_env& e) { cquery_nofail(e, "CREATE TABLE t (a int, b int, primary key (a,b))"); cquery_nofail(e, "INSERT INTO t (a, b) VALUES (1, 1)"); @@ -4769,7 +4777,7 @@ SEASTAR_TEST_CASE(test_select_serial_consistency) { check_fails("select * from t where b > 0 allow filtering"); check_fails("select * from t where a in (1, 3)"); prepared_on_shard(e, "select * from t where a = 1", {}, {{I(1), I(1)}, {I(1), I(2)}}, db::consistency_level::SERIAL); - }); + }, std::move(cfg)); } diff --git a/test/boost/extensions_test.cc b/test/boost/extensions_test.cc index 341ac6bff6..6f1929b0aa 100644 --- a/test/boost/extensions_test.cc +++ b/test/boost/extensions_test.cc @@ -126,6 +126,8 @@ SEASTAR_TEST_CASE(paxos_grace_seconds_extension) { auto ext = std::make_shared(); ext->add_schema_extension(db::paxos_grace_seconds_extension::NAME); auto cfg = ::make_shared(ext); + cql_test_config cql_cfg(cfg); + cql_cfg.need_remote_proxy = true; return do_with_cql_env([] (cql_test_env& e) { // Verify that paxos_grace_seconds extensions gets recognized properly @@ -156,7 +158,7 @@ SEASTAR_TEST_CASE(paxos_grace_seconds_extension) { }); return f; - }, cfg); + }, std::move(cql_cfg)); } SEASTAR_TEST_CASE(test_extension_remove) { diff --git a/test/boost/query_processor_test.cc b/test/boost/query_processor_test.cc index 72bf15119d..b03b471ad7 100644 --- a/test/boost/query_processor_test.cc +++ b/test/boost/query_processor_test.cc @@ -202,6 +202,8 @@ auto make_options(clevel cl) { } // anonymous namespace SEASTAR_TEST_CASE(test_query_counters) { + cql_test_config cfg; + cfg.need_remote_proxy = true; return do_with_cql_env_thread([](cql_test_env& e) { // Executes a query and waits for it to complete. auto process_query = [&e](const sstring& query, clevel cl) mutable { @@ -278,7 +280,7 @@ SEASTAR_TEST_CASE(test_query_counters) { clevel::ANY); expected["ANY"] += 2; BOOST_CHECK_EQUAL(expected, get_query_metrics()); - }); + }, std::move(cfg)); } SEASTAR_TEST_CASE(test_select_full_scan_metrics) { diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index bb8eeda7f3..4ae2cb2fa1 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -897,6 +897,15 @@ public: sys_dist_ks.start(std::ref(qp), std::ref(mm), std::ref(proxy)).get(); + if (cfg_in.need_remote_proxy) { + proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(ms), std::ref(gossiper), std::ref(mm), std::ref(sys_ks)).get(); + } + auto stop_proxy_remote = defer([&proxy, need = cfg_in.need_remote_proxy] { + if (need) { + proxy.invoke_on_all(&service::storage_proxy::stop_remote).get(); + } + }); + sl_controller.invoke_on_all([&sys_dist_ks, &sl_controller] (qos::service_level_controller& service) { qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor = ::static_pointer_cast( diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index 5fb2341538..4fd458d766 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -90,6 +90,7 @@ public: std::optional dbcfg; std::set disabled_features; std::optional qp_mcfg; + bool need_remote_proxy = false; cql_test_config(); cql_test_config(const cql_test_config&); From d9ba8eb8df25c334b0889e781fa4ef3361d9c46e Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 31 May 2023 18:45:14 +0300 Subject: [PATCH 3/5] service/paxos: Add db::system_keyspace& argument to some methods The paxos_state's .prepare(), .accept(), .learn() and .prune() methods access system keyspace via its static methods. The only caller of those (storage_proxy::remote) already has the sharded system k.s. reference and can pass its .local() one as argument Signed-off-by: Pavel Emelyanov --- service/paxos/paxos_state.cc | 8 ++++---- service/paxos/paxos_state.hh | 9 +++++---- service/storage_proxy.cc | 30 +++++++++++++++--------------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index a0d0ab66fe..5569403c3b 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -43,7 +43,7 @@ future paxos_state::get_cas_lock(const dht::token& key, cloc co_return m; } -future paxos_state::prepare(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, +future paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] { @@ -139,7 +139,7 @@ future paxos_state::prepare(storage_proxy& sp, tracing::trace_ }); } -future paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, +future paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, clock_type::time_point timeout) { return utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout, [&sp, token = std::move(token), &proposal, schema, tr_state, timeout] { @@ -178,7 +178,7 @@ future paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_ }); } -future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decision, clock_type::time_point timeout, +future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { if (utils::get_local_injector().enter("paxos_error_before_learn")) { return make_exception_future<>(utils::injected_error("injected_error_before_learn")); @@ -237,7 +237,7 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis }); } -future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, +future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { logger.debug("Delete paxos state for ballot {}", ballot); tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot); diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index e582841838..c92d549e96 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -18,6 +18,7 @@ namespace service { class storage_proxy; } +namespace db { class system_keyspace; } namespace service::paxos { @@ -111,16 +112,16 @@ public: , _accepted_proposal(std::move(accepted)) , _most_recent_commit(std::move(commit)) {} // Replica RPC endpoint for Paxos "prepare" phase. - static future prepare(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, + static future prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, clock_type::time_point timeout); // Replica RPC endpoint for Paxos "accept" phase. - static future accept(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, + static future accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, clock_type::time_point timeout); // Replica RPC endpoint for Paxos "learn". - static future<> learn(storage_proxy& sp, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state); + static future<> learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state); // Replica RPC endpoint for pruning Paxos table - static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, + static future<> prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout, tracing::trace_state_ptr tr_state); }; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3ebf69b759..3db4cc9e9b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -567,9 +567,9 @@ private: return handle_write(src_addr, t, schema_version, std::move(decision), forward, reply_to, shard, response_id, trace_info, fencing_token{}, - /* apply_fn */ [] (shared_ptr& p, tracing::trace_state_ptr tr_state, schema_ptr s, + /* apply_fn */ [this] (shared_ptr& p, tracing::trace_state_ptr tr_state, schema_ptr s, const paxos::proposal& decision, clock_type::time_point timeout, fencing_token) { - return paxos::paxos_state::learn(*p, std::move(s), decision, timeout, tr_state); + return paxos::paxos_state::learn(*p, _sys_ks.local(), std::move(s), decision, timeout, tr_state); }, /* forward_fn */ [this] (shared_ptr&, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m, gms::inet_address reply_to, unsigned shard, response_id_type response_id, @@ -770,7 +770,7 @@ private: cmd.max_result_size.emplace(cinfo.retrieve_auxiliary("max_result_size")); } - return get_schema_for_read(cmd.schema_version, src_addr, *timeout).then([&sp = _sp, cmd = std::move(cmd), key = std::move(key), ballot, + return get_schema_for_read(cmd.schema_version, src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, cmd = std::move(cmd), key = std::move(key), ballot, only_digest, da, timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable { dht::token token = dht::get_token(*schema, key); unsigned shard = schema->table().shard_of(token); @@ -778,9 +778,9 @@ private: sp.get_stats().replica_cross_shard_ops += !local; return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)), cmd = make_lw_shared(std::move(cmd)), key = std::move(key), - ballot, only_digest, da, timeout, src_ip] (storage_proxy& sp) { + ballot, only_digest, da, timeout, src_ip, &sys_ks] (storage_proxy& sp) { tracing::trace_state_ptr tr_state = gt; - return paxos::paxos_state::prepare(sp, tr_state, gs, *cmd, key, ballot, only_digest, da, *timeout).then([src_ip, tr_state] (paxos::prepare_response r) { + return paxos::paxos_state::prepare(sp, sys_ks.local(), tr_state, gs, *cmd, key, ballot, only_digest, da, *timeout).then([src_ip, tr_state] (paxos::prepare_response r) { tracing::trace(tr_state, "paxos_prepare: handling is done, sending a response to /{}", src_ip); return make_foreign(std::make_unique(std::move(r))); }); @@ -800,15 +800,15 @@ private: tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_ip, proposal); } - auto f = get_schema_for_read(proposal.update.schema_version(), src_addr, *timeout).then([&sp = _sp, tr_state = std::move(tr_state), + auto f = get_schema_for_read(proposal.update.schema_version(), src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, tr_state = std::move(tr_state), proposal = std::move(proposal), timeout] (schema_ptr schema) mutable { dht::token token = proposal.update.decorated_key(*schema).token(); unsigned shard = schema->table().shard_of(token); bool local = shard == this_shard_id(); sp.get_stats().replica_cross_shard_ops += !local; return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)), - proposal = std::move(proposal), timeout, token] (storage_proxy& sp) { - return paxos::paxos_state::accept(sp, gt, gs, token, proposal, *timeout); + proposal = std::move(proposal), timeout, token, &sys_ks] (storage_proxy& sp) { + return paxos::paxos_state::accept(sp, sys_ks.local(), gt, gs, token, proposal, *timeout); }); }); @@ -843,16 +843,16 @@ private: pruning++; auto d = defer([] { pruning--; }); - return get_schema_for_read(schema_id, src_addr, *timeout).then([&sp = _sp, key = std::move(key), ballot, + return get_schema_for_read(schema_id, src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, key = std::move(key), ballot, timeout, tr_state = std::move(tr_state), src_ip, d = std::move(d)] (schema_ptr schema) mutable { dht::token token = dht::get_token(*schema, key); unsigned shard = schema->table().shard_of(token); bool local = shard == this_shard_id(); sp.get_stats().replica_cross_shard_ops += !local; return smp::submit_to(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)), - key = std::move(key), ballot, timeout, src_ip, d = std::move(d)] () { + key = std::move(key), ballot, timeout, src_ip, d = std::move(d), &sys_ks] () { tracing::trace_state_ptr tr_state = gt; - return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () { + return paxos::paxos_state::prune(sys_ks.local(), gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () { tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip); return netw::messaging_service::no_wait(); }); @@ -1205,7 +1205,7 @@ public: fencing_token) override { tracing::trace(tr_state, "Executing a learn locally"); // TODO: Enforce per partition rate limiting in paxos - return paxos::paxos_state::learn(sp, _schema, *_proposal, timeout, tr_state); + return paxos::paxos_state::learn(sp, sp.remote().system_keyspace(), _schema, *_proposal, timeout, tr_state); } virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, @@ -1891,7 +1891,7 @@ future paxos_response_handler::prepare_ballot(utils::UUI auto da = digest_algorithm(*_proxy); if (fbu::is_me(peer)) { tracing::trace(tr_state, "prepare_ballot: prepare {} locally", ballot); - response = co_await paxos::paxos_state::prepare(*_proxy, tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout); + response = co_await paxos::paxos_state::prepare(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout); } else { response = co_await _proxy->remote().send_paxos_prepare(netw::msg_addr(peer), _timeout, tr_state, *_cmd, _key.key(), ballot, only_digest, da); } @@ -2050,7 +2050,7 @@ future paxos_response_handler::accept_proposal(lw_shared_ptrupdate.decorated_key(*_schema).token(), *proposal, _timeout); + accepted = co_await paxos::paxos_state::accept(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout); } else { accepted = co_await _proxy->remote().send_paxos_accept(netw::msg_addr(peer), _timeout, tr_state, *proposal); } @@ -2206,7 +2206,7 @@ void paxos_response_handler::prune(utils::UUID ballot) { (void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable { if (fbu::is_me(peer)) { tracing::trace(tr_state, "prune: prune {} locally", ballot); - return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state); + return paxos::paxos_state::prune(_proxy->remote().system_keyspace(), _schema, _key.key(), ballot, _timeout, tr_state); } else { tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer); return _proxy->remote().send_paxos_prune(netw::msg_addr(peer), _timeout, tr_state, _schema->version(), _key.key(), ballot); From b9ef16c06fcc8bc58f6e81531d3b6c3cc152bf7e Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 19 Jul 2023 17:23:41 +0300 Subject: [PATCH 4/5] db/system_keyspace: Make paxos methods non-static The service::paxos_state methods that call those already have system keyspace reference at hand and can call method on an object Signed-off-by: Pavel Emelyanov --- db/system_keyspace.hh | 10 +++++----- service/paxos/paxos_state.cc | 32 +++++++++++++++++--------------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index bb4099cfcf..d293ec8345 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -421,12 +421,12 @@ public: future> load_view_build_progress(); // Paxos related functions - static future load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now, + future load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now, db::timeout_clock::time_point timeout); - static future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout); - static future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout); - static future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout); - static future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout); + future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout); + future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout); + future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout); + future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout); // CDC related functions diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 5569403c3b..d8287202b2 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -46,11 +46,11 @@ future paxos_state::get_cas_lock(const dht::token& key, cloc future paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { - return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] { + return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &sys_ks, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] { dht::token token = dht::get_token(*schema, key); utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [&sp, &cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable { + return with_locked_key(token, timeout, [&sp, &sys_ks, &cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable { // When preparing, we need to use the same time as "now" (that's the time we use to decide if something // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no @@ -58,8 +58,8 @@ future paxos_state::prepare(storage_proxy& sp, db::system_keys // tombstone that hides any re-submit). See CASSANDRA-12043 for details. auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); - auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([&sp, &cmd, token = std::move(token), &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) { + auto f = sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout); + return f.then([&sp, &sys_ks, &cmd, token = std::move(token), &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) { // If received ballot is newer that the one we already accepted it has to be accepted as well, // but we will return the previously accepted proposal so that the new coordinator will use it instead of // its own. @@ -69,7 +69,9 @@ future paxos_state::prepare(storage_proxy& sp, db::system_keys if (utils::get_local_injector().enter("paxos_error_before_save_promise")) { return make_exception_future(utils::injected_error("injected_error_before_save_promise")); } - auto f1 = futurize_invoke(db::system_keyspace::save_paxos_promise, *schema, std::ref(key), ballot, timeout); + auto f1 = futurize_invoke([&] { + return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout); + }); auto f2 = futurize_invoke([&] { return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), [&sp, tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) { @@ -142,13 +144,13 @@ future paxos_state::prepare(storage_proxy& sp, db::system_keys future paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, clock_type::time_point timeout) { return utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout, - [&sp, token = std::move(token), &proposal, schema, tr_state, timeout] { + [&sp, &sys_ks, token = std::move(token), &proposal, schema, tr_state, timeout] { utils::latency_counter lc; lc.start(); - return with_locked_key(token, timeout, [&proposal, schema, tr_state, timeout] () mutable { + return with_locked_key(token, timeout, [&sys_ks, &proposal, schema, tr_state, timeout] () mutable { auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); - auto f = db::system_keyspace::load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout); - return f.then([&proposal, tr_state, schema, timeout] (paxos_state state) { + auto f = sys_ks.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout); + return f.then([&sys_ks, &proposal, tr_state, schema, timeout] (paxos_state state) { // Accept the proposal if we promised to accept it or the proposal is newer than the one we promised. // Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected. if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) { @@ -159,7 +161,7 @@ future paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, return make_exception_future(utils::injected_error("injected_error_before_save_proposal")); } - return db::system_keyspace::save_paxos_proposal(*schema, proposal, timeout).then([] { + return sys_ks.save_paxos_proposal(*schema, proposal, timeout).then([] { if (utils::get_local_injector().enter("paxos_error_after_save_proposal")) { return make_exception_future(utils::injected_error("injected_error_after_save_proposal")); } @@ -187,7 +189,7 @@ future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, sche utils::latency_counter lc; lc.start(); - return do_with(std::move(decision), [&sp, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) { + return do_with(std::move(decision), [&sp, &sys_ks, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) { auto f = utils::get_local_injector().inject("paxos_state_learn_timeout", timeout); replica::table& cf = sp.get_db().local().find_column_family(schema); @@ -224,11 +226,11 @@ future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, sche logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision); tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision); } - return f.then([&decision, schema, timeout] { + return f.then([&sys_ks, &decision, schema, timeout] { // We don't need to lock the partition key if there is no gap between loading paxos // state and saving it, and here we're just blindly updating. - return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&decision, schema, timeout] { - return db::system_keyspace::save_paxos_decision(*schema, decision, timeout); + return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&sys_ks, &decision, schema, timeout] { + return sys_ks.save_paxos_decision(*schema, decision, timeout); }); }); }).finally([&sp, schema, lc] () mutable { @@ -241,7 +243,7 @@ future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, cons tracing::trace_state_ptr tr_state) { logger.debug("Delete paxos state for ballot {}", ballot); tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot); - return db::system_keyspace::delete_paxos_decision(*schema, key, ballot, timeout); + return sys_ks.delete_paxos_decision(*schema, key, ballot, timeout); } } // end of namespace "service::paxos" From 8a87c87824a16b379f8d76606ead9b8b64b19b30 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 19 Jul 2023 17:27:24 +0300 Subject: [PATCH 5/5] db/system_keyspace: Move and use qctx::execute_cql_with_timeout() This template call is only used by system keyspace paxos methods. All those methods are no longer static and can use system_keyspace::_qp reference to real query processor instead of global qctx. The execute_cql_with_timeout() wrapper is moved to system_keyspace to make it work Signed-off-by: Pavel Emelyanov --- db/query_context.hh | 28 ---------------------------- db/system_keyspace.cc | 39 ++++++++++++++++++++++++++++++++++----- db/system_keyspace.hh | 2 ++ 3 files changed, 36 insertions(+), 33 deletions(-) diff --git a/db/query_context.hh b/db/query_context.hh index add1b9add3..383ea8e5ee 100644 --- a/db/query_context.hh +++ b/db/query_context.hh @@ -31,34 +31,6 @@ struct query_context { return _qp.local().execute_internal(req, { data_value(std::forward(args))... }, cql3::query_processor::cache_internal::yes); } - template - future<::shared_ptr> execute_cql_with_timeout(sstring req, - db::timeout_clock::time_point timeout, - Args&&... args) { - const db::timeout_clock::time_point now = db::timeout_clock::now(); - const db::timeout_clock::duration d = - now < timeout ? - timeout - now : - // let the `storage_proxy` time out the query down the call chain - db::timeout_clock::duration::zero(); - - struct timeout_context { - std::unique_ptr client_state; - service::query_state query_state; - timeout_context(db::timeout_clock::duration d) - : client_state(std::make_unique(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d})) - , query_state(*client_state, empty_service_permit()) - {} - }; - return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) { - return _qp.local().execute_internal(req, - cql3::query_options::DEFAULT.get_consistency(), - tctx.query_state, - { data_value(std::forward(args))... }, - cql3::query_processor::cache_internal::yes); - }); - } - cql3::query_processor& qp() { return _qp.local(); } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 98e8e39499..9e73620cdc 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2233,12 +2233,41 @@ future> system_keyspace::load_ }); } + +template +future<::shared_ptr> system_keyspace::execute_cql_with_timeout(sstring req, + db::timeout_clock::time_point timeout, + Args&&... args) { + const db::timeout_clock::time_point now = db::timeout_clock::now(); + const db::timeout_clock::duration d = + now < timeout ? + timeout - now : + // let the `storage_proxy` time out the query down the call chain + db::timeout_clock::duration::zero(); + + struct timeout_context { + std::unique_ptr client_state; + service::query_state query_state; + timeout_context(db::timeout_clock::duration d) + : client_state(std::make_unique(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d})) + , query_state(*client_state, empty_service_permit()) + {} + }; + return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) { + return _qp.execute_internal(req, + cql3::query_options::DEFAULT.get_consistency(), + tctx.query_state, + { data_value(std::forward(args))... }, + cql3::query_processor::cache_internal::yes); + }); +} + future system_keyspace::load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now, db::timeout_clock::time_point timeout) { static auto cql = format("SELECT * FROM system.{} WHERE row_key = ? AND cf_id = ?", PAXOS); // FIXME: we need execute_cql_with_now() (void)now; - auto f = qctx->execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id().uuid()); + auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id().uuid()); return f.then([s, key = std::move(key)] (shared_ptr results) mutable { if (results->empty()) { return service::paxos::paxos_state(); @@ -2277,7 +2306,7 @@ static int32_t paxos_ttl_sec(const schema& s) { future<> system_keyspace::save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) { static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ? WHERE row_key = ? AND cf_id = ?", PAXOS); - return qctx->execute_cql_with_timeout(cql, + return execute_cql_with_timeout(cql, timeout, utils::UUID_gen::micros_timestamp(ballot), paxos_ttl_sec(s), @@ -2290,7 +2319,7 @@ future<> system_keyspace::save_paxos_promise(const schema& s, const partition_ke future<> system_keyspace::save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout) { static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ?, proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS); partition_key_view key = proposal.update.key(); - return qctx->execute_cql_with_timeout(cql, + return execute_cql_with_timeout(cql, timeout, utils::UUID_gen::micros_timestamp(proposal.ballot), paxos_ttl_sec(s), @@ -2312,7 +2341,7 @@ future<> system_keyspace::save_paxos_decision(const schema& s, const service::pa static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null," " most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?", PAXOS); partition_key_view key = decision.update.key(); - return qctx->execute_cql_with_timeout(cql, + return execute_cql_with_timeout(cql, timeout, utils::UUID_gen::micros_timestamp(decision.ballot), paxos_ttl_sec(s), @@ -2329,7 +2358,7 @@ future<> system_keyspace::delete_paxos_decision(const schema& s, const partition // guarantees that if there is more recent round it will not be affected. static auto cql = format("DELETE most_recent_commit FROM system.{} USING TIMESTAMP ? WHERE row_key = ? AND cf_id = ?", PAXOS); - return qctx->execute_cql_with_timeout(cql, + return execute_cql_with_timeout(cql, timeout, utils::UUID_gen::micros_timestamp(ballot), to_legacy(*key.get_compound_type(s), key.representation()), diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index d293ec8345..a18613ca3f 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -502,6 +502,8 @@ public: private: future<::shared_ptr> execute_cql(const sstring& query_string, const std::initializer_list& values); + template + future<::shared_ptr> execute_cql_with_timeout(sstring req, db::timeout_clock::time_point timeout, Args&&... args); public: template