Merge 'Remove qctx from system.paxos table access methods' from Pavel Emelyanov

The "fix" is straightforward -- callers of system_keyspace::*paxos* methods need to get system keyspace from somewhere. This time the only caller is storage_proxy::remote that can have system keyspace via direct dependency reference.

Closes #14758

* github.com:scylladb/scylladb:
  db/system_keyspace: Move and use qctx::execute_cql_with_timeout()
  db/system_keyspace: Make paxos methods non-static
  service/paxos: Add db::system_keyspace& argument to some methods
  test: Optionally initialize proxy remote for cql_test_env
  proxy/remote: Keep sharded<db::system_keyspace>& dependency
This commit is contained in:
Botond Dénes
2023-07-20 16:53:25 +03:00
13 changed files with 121 additions and 88 deletions

View File

@@ -31,34 +31,6 @@ struct query_context {
return _qp.local().execute_internal(req, { data_value(std::forward<Args>(args))... }, cql3::query_processor::cache_internal::yes);
}
template <typename... Args>
future<::shared_ptr<cql3::untyped_result_set>> execute_cql_with_timeout(sstring req,
db::timeout_clock::time_point timeout,
Args&&... args) {
const db::timeout_clock::time_point now = db::timeout_clock::now();
const db::timeout_clock::duration d =
now < timeout ?
timeout - now :
// let the `storage_proxy` time out the query down the call chain
db::timeout_clock::duration::zero();
struct timeout_context {
std::unique_ptr<service::client_state> client_state;
service::query_state query_state;
timeout_context(db::timeout_clock::duration d)
: client_state(std::make_unique<service::client_state>(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d}))
, query_state(*client_state, empty_service_permit())
{}
};
return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) {
return _qp.local().execute_internal(req,
cql3::query_options::DEFAULT.get_consistency(),
tctx.query_state,
{ data_value(std::forward<Args>(args))... },
cql3::query_processor::cache_internal::yes);
});
}
cql3::query_processor& qp() {
return _qp.local();
}

View File

@@ -2233,12 +2233,41 @@ future<std::vector<system_keyspace::view_build_progress>> system_keyspace::load_
});
}
template <typename... Args>
future<::shared_ptr<cql3::untyped_result_set>> system_keyspace::execute_cql_with_timeout(sstring req,
db::timeout_clock::time_point timeout,
Args&&... args) {
const db::timeout_clock::time_point now = db::timeout_clock::now();
const db::timeout_clock::duration d =
now < timeout ?
timeout - now :
// let the `storage_proxy` time out the query down the call chain
db::timeout_clock::duration::zero();
struct timeout_context {
std::unique_ptr<service::client_state> client_state;
service::query_state query_state;
timeout_context(db::timeout_clock::duration d)
: client_state(std::make_unique<service::client_state>(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d}))
, query_state(*client_state, empty_service_permit())
{}
};
return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) {
return _qp.execute_internal(req,
cql3::query_options::DEFAULT.get_consistency(),
tctx.query_state,
{ data_value(std::forward<Args>(args))... },
cql3::query_processor::cache_internal::yes);
});
}
future<service::paxos::paxos_state> system_keyspace::load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now,
db::timeout_clock::time_point timeout) {
static auto cql = format("SELECT * FROM system.{} WHERE row_key = ? AND cf_id = ?", PAXOS);
// FIXME: we need execute_cql_with_now()
(void)now;
auto f = qctx->execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id().uuid());
auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id().uuid());
return f.then([s, key = std::move(key)] (shared_ptr<cql3::untyped_result_set> results) mutable {
if (results->empty()) {
return service::paxos::paxos_state();
@@ -2277,7 +2306,7 @@ static int32_t paxos_ttl_sec(const schema& s) {
future<> system_keyspace::save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
return qctx->execute_cql_with_timeout(cql,
return execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(ballot),
paxos_ttl_sec(s),
@@ -2290,7 +2319,7 @@ future<> system_keyspace::save_paxos_promise(const schema& s, const partition_ke
future<> system_keyspace::save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout) {
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ?, proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
partition_key_view key = proposal.update.key();
return qctx->execute_cql_with_timeout(cql,
return execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(proposal.ballot),
paxos_ttl_sec(s),
@@ -2312,7 +2341,7 @@ future<> system_keyspace::save_paxos_decision(const schema& s, const service::pa
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null,"
" most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
partition_key_view key = decision.update.key();
return qctx->execute_cql_with_timeout(cql,
return execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(decision.ballot),
paxos_ttl_sec(s),
@@ -2329,7 +2358,7 @@ future<> system_keyspace::delete_paxos_decision(const schema& s, const partition
// guarantees that if there is more recent round it will not be affected.
static auto cql = format("DELETE most_recent_commit FROM system.{} USING TIMESTAMP ? WHERE row_key = ? AND cf_id = ?", PAXOS);
return qctx->execute_cql_with_timeout(cql,
return execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(ballot),
to_legacy(*key.get_compound_type(s), key.representation()),

View File

@@ -421,12 +421,12 @@ public:
future<std::vector<view_build_progress>> load_view_build_progress();
// Paxos related functions
static future<service::paxos::paxos_state> load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now,
future<service::paxos::paxos_state> load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now,
db::timeout_clock::time_point timeout);
static future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
static future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);
static future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout);
static future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);
future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout);
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
// CDC related functions
@@ -502,6 +502,8 @@ public:
private:
future<::shared_ptr<cql3::untyped_result_set>> execute_cql(const sstring& query_string, const std::initializer_list<data_value>& values);
template <typename... Args>
future<::shared_ptr<cql3::untyped_result_set>> execute_cql_with_timeout(sstring req, db::timeout_clock::time_point timeout, Args&&... args);
public:
template <typename... Args>

View File

@@ -1446,7 +1446,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
supervisor::notify("initializing migration manager RPC verbs");
mm.invoke_on_all(&service::migration_manager::init_messaging_service).get();
supervisor::notify("initializing storage proxy RPC verbs");
proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(messaging), std::ref(gossiper), std::ref(mm)).get();
proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(messaging), std::ref(gossiper), std::ref(mm), std::ref(sys_ks)).get();
auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] {
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
});

View File

@@ -43,14 +43,14 @@ future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, cloc
co_return m;
}
future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema,
future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] {
return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &sys_ks, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] {
dht::token token = dht::get_token(*schema, key);
utils::latency_counter lc;
lc.start();
return with_locked_key(token, timeout, [&sp, &cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable {
return with_locked_key(token, timeout, [&sp, &sys_ks, &cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable {
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
// on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
@@ -58,8 +58,8 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
// tombstone that hides any re-submit). See CASSANDRA-12043 for details.
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
auto f = db::system_keyspace::load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
return f.then([&sp, &cmd, token = std::move(token), &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) {
auto f = sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
return f.then([&sp, &sys_ks, &cmd, token = std::move(token), &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) {
// If received ballot is newer that the one we already accepted it has to be accepted as well,
// but we will return the previously accepted proposal so that the new coordinator will use it instead of
// its own.
@@ -69,7 +69,9 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
if (utils::get_local_injector().enter("paxos_error_before_save_promise")) {
return make_exception_future<prepare_response>(utils::injected_error("injected_error_before_save_promise"));
}
auto f1 = futurize_invoke(db::system_keyspace::save_paxos_promise, *schema, std::ref(key), ballot, timeout);
auto f1 = futurize_invoke([&] {
return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout);
});
auto f2 = futurize_invoke([&] {
return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}),
[&sp, tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) {
@@ -139,16 +141,16 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
});
}
future<bool> paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
clock_type::time_point timeout) {
return utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout,
[&sp, token = std::move(token), &proposal, schema, tr_state, timeout] {
[&sp, &sys_ks, token = std::move(token), &proposal, schema, tr_state, timeout] {
utils::latency_counter lc;
lc.start();
return with_locked_key(token, timeout, [&proposal, schema, tr_state, timeout] () mutable {
return with_locked_key(token, timeout, [&sys_ks, &proposal, schema, tr_state, timeout] () mutable {
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
auto f = db::system_keyspace::load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
return f.then([&proposal, tr_state, schema, timeout] (paxos_state state) {
auto f = sys_ks.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
return f.then([&sys_ks, &proposal, tr_state, schema, timeout] (paxos_state state) {
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {
@@ -159,7 +161,7 @@ future<bool> paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_
return make_exception_future<bool>(utils::injected_error("injected_error_before_save_proposal"));
}
return db::system_keyspace::save_paxos_proposal(*schema, proposal, timeout).then([] {
return sys_ks.save_paxos_proposal(*schema, proposal, timeout).then([] {
if (utils::get_local_injector().enter("paxos_error_after_save_proposal")) {
return make_exception_future<bool>(utils::injected_error("injected_error_after_save_proposal"));
}
@@ -178,7 +180,7 @@ future<bool> paxos_state::accept(storage_proxy& sp, tracing::trace_state_ptr tr_
});
}
future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decision, clock_type::time_point timeout,
future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
if (utils::get_local_injector().enter("paxos_error_before_learn")) {
return make_exception_future<>(utils::injected_error("injected_error_before_learn"));
@@ -187,7 +189,7 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis
utils::latency_counter lc;
lc.start();
return do_with(std::move(decision), [&sp, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) {
return do_with(std::move(decision), [&sp, &sys_ks, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) {
auto f = utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
replica::table& cf = sp.get_db().local().find_column_family(schema);
@@ -224,11 +226,11 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
}
return f.then([&decision, schema, timeout] {
return f.then([&sys_ks, &decision, schema, timeout] {
// We don't need to lock the partition key if there is no gap between loading paxos
// state and saving it, and here we're just blindly updating.
return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&decision, schema, timeout] {
return db::system_keyspace::save_paxos_decision(*schema, decision, timeout);
return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&sys_ks, &decision, schema, timeout] {
return sys_ks.save_paxos_decision(*schema, decision, timeout);
});
});
}).finally([&sp, schema, lc] () mutable {
@@ -237,11 +239,11 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis
});
}
future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
logger.debug("Delete paxos state for ballot {}", ballot);
tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot);
return db::system_keyspace::delete_paxos_decision(*schema, key, ballot, timeout);
return sys_ks.delete_paxos_decision(*schema, key, ballot, timeout);
}
} // end of namespace "service::paxos"

View File

@@ -18,6 +18,7 @@
namespace service {
class storage_proxy;
}
namespace db { class system_keyspace; }
namespace service::paxos {
@@ -111,16 +112,16 @@ public:
, _accepted_proposal(std::move(accepted))
, _most_recent_commit(std::move(commit)) {}
// Replica RPC endpoint for Paxos "prepare" phase.
static future<prepare_response> prepare(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema,
static future<prepare_response> prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout);
// Replica RPC endpoint for Paxos "accept" phase.
static future<bool> accept(storage_proxy& sp, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
static future<bool> accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
clock_type::time_point timeout);
// Replica RPC endpoint for Paxos "learn".
static future<> learn(storage_proxy& sp, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
static future<> learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
// Replica RPC endpoint for pruning Paxos table
static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
static future<> prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state);
};

View File

@@ -163,6 +163,7 @@ class storage_proxy::remote {
netw::messaging_service& _ms;
const gms::gossiper& _gossiper;
migration_manager& _mm;
sharded<db::system_keyspace>& _sys_ks;
netw::connection_drop_slot_t _connection_dropped;
netw::connection_drop_registration_t _condrop_registration;
@@ -170,8 +171,8 @@ class storage_proxy::remote {
bool _stopped{false};
public:
remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm)
: _sp(sp), _ms(ms), _gossiper(g), _mm(mm)
remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded<db::system_keyspace>& sys_ks)
: _sp(sp), _ms(ms), _gossiper(g), _mm(mm), _sys_ks(sys_ks)
, _connection_dropped(std::bind_front(&remote::connection_dropped, this))
, _condrop_registration(_ms.when_connection_drops(_connection_dropped))
{
@@ -209,6 +210,10 @@ public:
return _gossiper.is_alive(ep);
}
db::system_keyspace& system_keyspace() {
return _sys_ks.local();
}
// Note: none of the `send_*` functions use `remote` after yielding - by the first yield,
// control is delegated to another service (messaging_service). Thus unfinished `send`s
// do not make it unsafe to destroy the `remote` object.
@@ -562,9 +567,9 @@ private:
return handle_write(src_addr, t, schema_version, std::move(decision), forward, reply_to, shard,
response_id, trace_info,
fencing_token{},
/* apply_fn */ [] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s,
/* apply_fn */ [this] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s,
const paxos::proposal& decision, clock_type::time_point timeout, fencing_token) {
return paxos::paxos_state::learn(*p, std::move(s), decision, timeout, tr_state);
return paxos::paxos_state::learn(*p, _sys_ks.local(), std::move(s), decision, timeout, tr_state);
},
/* forward_fn */ [this] (shared_ptr<storage_proxy>&, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m,
gms::inet_address reply_to, unsigned shard, response_id_type response_id,
@@ -765,7 +770,7 @@ private:
cmd.max_result_size.emplace(cinfo.retrieve_auxiliary<uint64_t>("max_result_size"));
}
return get_schema_for_read(cmd.schema_version, src_addr, *timeout).then([&sp = _sp, cmd = std::move(cmd), key = std::move(key), ballot,
return get_schema_for_read(cmd.schema_version, src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, cmd = std::move(cmd), key = std::move(key), ballot,
only_digest, da, timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable {
dht::token token = dht::get_token(*schema, key);
unsigned shard = schema->table().shard_of(token);
@@ -773,9 +778,9 @@ private:
sp.get_stats().replica_cross_shard_ops += !local;
return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
cmd = make_lw_shared<query::read_command>(std::move(cmd)), key = std::move(key),
ballot, only_digest, da, timeout, src_ip] (storage_proxy& sp) {
ballot, only_digest, da, timeout, src_ip, &sys_ks] (storage_proxy& sp) {
tracing::trace_state_ptr tr_state = gt;
return paxos::paxos_state::prepare(sp, tr_state, gs, *cmd, key, ballot, only_digest, da, *timeout).then([src_ip, tr_state] (paxos::prepare_response r) {
return paxos::paxos_state::prepare(sp, sys_ks.local(), tr_state, gs, *cmd, key, ballot, only_digest, da, *timeout).then([src_ip, tr_state] (paxos::prepare_response r) {
tracing::trace(tr_state, "paxos_prepare: handling is done, sending a response to /{}", src_ip);
return make_foreign(std::make_unique<paxos::prepare_response>(std::move(r)));
});
@@ -795,15 +800,15 @@ private:
tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_ip, proposal);
}
auto f = get_schema_for_read(proposal.update.schema_version(), src_addr, *timeout).then([&sp = _sp, tr_state = std::move(tr_state),
auto f = get_schema_for_read(proposal.update.schema_version(), src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, tr_state = std::move(tr_state),
proposal = std::move(proposal), timeout] (schema_ptr schema) mutable {
dht::token token = proposal.update.decorated_key(*schema).token();
unsigned shard = schema->table().shard_of(token);
bool local = shard == this_shard_id();
sp.get_stats().replica_cross_shard_ops += !local;
return sp.container().invoke_on(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
proposal = std::move(proposal), timeout, token] (storage_proxy& sp) {
return paxos::paxos_state::accept(sp, gt, gs, token, proposal, *timeout);
proposal = std::move(proposal), timeout, token, &sys_ks] (storage_proxy& sp) {
return paxos::paxos_state::accept(sp, sys_ks.local(), gt, gs, token, proposal, *timeout);
});
});
@@ -838,16 +843,16 @@ private:
pruning++;
auto d = defer([] { pruning--; });
return get_schema_for_read(schema_id, src_addr, *timeout).then([&sp = _sp, key = std::move(key), ballot,
return get_schema_for_read(schema_id, src_addr, *timeout).then([&sp = _sp, &sys_ks = _sys_ks, key = std::move(key), ballot,
timeout, tr_state = std::move(tr_state), src_ip, d = std::move(d)] (schema_ptr schema) mutable {
dht::token token = dht::get_token(*schema, key);
unsigned shard = schema->table().shard_of(token);
bool local = shard == this_shard_id();
sp.get_stats().replica_cross_shard_ops += !local;
return smp::submit_to(shard, sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
key = std::move(key), ballot, timeout, src_ip, d = std::move(d)] () {
key = std::move(key), ballot, timeout, src_ip, d = std::move(d), &sys_ks] () {
tracing::trace_state_ptr tr_state = gt;
return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
return paxos::paxos_state::prune(sys_ks.local(), gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip);
return netw::messaging_service::no_wait();
});
@@ -1200,7 +1205,7 @@ public:
fencing_token) override {
tracing::trace(tr_state, "Executing a learn locally");
// TODO: Enforce per partition rate limiting in paxos
return paxos::paxos_state::learn(sp, _schema, *_proposal, timeout, tr_state);
return paxos::paxos_state::learn(sp, sp.remote().system_keyspace(), _schema, *_proposal, timeout, tr_state);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
@@ -1886,7 +1891,7 @@ future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUI
auto da = digest_algorithm(*_proxy);
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "prepare_ballot: prepare {} locally", ballot);
response = co_await paxos::paxos_state::prepare(*_proxy, tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout);
response = co_await paxos::paxos_state::prepare(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout);
} else {
response = co_await _proxy->remote().send_paxos_prepare(netw::msg_addr(peer), _timeout, tr_state, *_cmd, _key.key(), ballot, only_digest, da);
}
@@ -2045,7 +2050,7 @@ future<bool> paxos_response_handler::accept_proposal(lw_shared_ptr<paxos::propos
try {
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "accept_proposal: accept {} locally", *proposal);
accepted = co_await paxos::paxos_state::accept(*_proxy, tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout);
accepted = co_await paxos::paxos_state::accept(*_proxy, _proxy->remote().system_keyspace(), tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout);
} else {
accepted = co_await _proxy->remote().send_paxos_accept(netw::msg_addr(peer), _timeout, tr_state, *proposal);
}
@@ -2201,7 +2206,7 @@ void paxos_response_handler::prune(utils::UUID ballot) {
(void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable {
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "prune: prune {} locally", ballot);
return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state);
return paxos::paxos_state::prune(_proxy->remote().system_keyspace(), _schema, _key.key(), ballot, _timeout, tr_state);
} else {
tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer);
return _proxy->remote().send_paxos_prune(netw::msg_addr(peer), _timeout, tr_state, _schema->version(), _key.key(), ballot);
@@ -6233,8 +6238,8 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname, std:
return remote().send_truncate_blocking(std::move(keyspace), std::move(cfname), timeout_in_ms);
}
void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm) {
_remote = std::make_unique<struct remote>(*this, ms, g, mm);
void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded<db::system_keyspace>& sys_ks) {
_remote = std::make_unique<struct remote>(*this, ms, g, mm, sys_ks);
}
future<> storage_proxy::stop_remote() {

View File

@@ -490,7 +490,7 @@ public:
}
// Start/stop the remote part of `storage_proxy` that is required for performing distributed queries.
void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&);
void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&, sharded<db::system_keyspace>& sys_ks);
future<> stop_remote();
private:

View File

@@ -4682,6 +4682,8 @@ static void prepared_on_shard(cql_test_env& e, const sstring& query,
}
SEASTAR_TEST_CASE(test_null_value_tuple_floating_types_and_uuids) {
cql_test_config cfg;
cfg.need_remote_proxy = true;
return do_with_cql_env_thread([] (cql_test_env& e) {
auto test_for_single_type = [&e] (const shared_ptr<const abstract_type>& type, auto update_value) {
cquery_nofail(e, format("CREATE TABLE IF NOT EXISTS t (k int PRIMARY KEY, test {})", type->cql3_type_name()));
@@ -4703,10 +4705,12 @@ SEASTAR_TEST_CASE(test_null_value_tuple_floating_types_and_uuids) {
test_for_single_type(float_type, 1.0f);
test_for_single_type(uuid_type, utils::make_random_uuid());
test_for_single_type(timeuuid_type, utils::UUID("00000000-0000-1000-0000-000000000000"));
});
}, std::move(cfg));
}
SEASTAR_TEST_CASE(test_like_parameter_marker) {
cql_test_config cfg;
cfg.need_remote_proxy = true;
return do_with_cql_env_thread([] (cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE t (pk int PRIMARY KEY, col text)");
cquery_nofail(e, "INSERT INTO t (pk, col) VALUES (1, 'aaa')");
@@ -4720,10 +4724,12 @@ SEASTAR_TEST_CASE(test_like_parameter_marker) {
prepared_on_shard(e, query, {T("err"), I(1), T("a%")}, {{B(false), "chg"}});
prepared_on_shard(e, query, {T("chg"), I(2), T("b%")}, {{B(true), "bbb"}});
prepared_on_shard(e, query, {T("err"), I(1), T("a%")}, {{B(false), "chg"}});
});
}, std::move(cfg));
}
SEASTAR_TEST_CASE(test_list_parameter_marker) {
cql_test_config cfg;
cfg.need_remote_proxy = true;
return do_with_cql_env_thread([] (cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE t (k int PRIMARY KEY, v list<int>)");
cquery_nofail(e, "INSERT INTO t (k, v) VALUES (1, [10, 20, 30])");
@@ -4742,10 +4748,12 @@ SEASTAR_TEST_CASE(test_list_parameter_marker) {
prepared_on_shard(e, query,
{list_of({100, 200, 300}), I(1), list_of({20, 21, 22})},
{{B(true), list_of({10, 20, 30})}});
});
}, std::move(cfg));
}
SEASTAR_TEST_CASE(test_select_serial_consistency) {
cql_test_config cfg;
cfg.need_remote_proxy = true;
return do_with_cql_env_thread([] (cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE t (a int, b int, primary key (a,b))");
cquery_nofail(e, "INSERT INTO t (a, b) VALUES (1, 1)");
@@ -4769,7 +4777,7 @@ SEASTAR_TEST_CASE(test_select_serial_consistency) {
check_fails("select * from t where b > 0 allow filtering");
check_fails("select * from t where a in (1, 3)");
prepared_on_shard(e, "select * from t where a = 1", {}, {{I(1), I(1)}, {I(1), I(2)}}, db::consistency_level::SERIAL);
});
}, std::move(cfg));
}

View File

@@ -126,6 +126,8 @@ SEASTAR_TEST_CASE(paxos_grace_seconds_extension) {
auto ext = std::make_shared<db::extensions>();
ext->add_schema_extension<db::paxos_grace_seconds_extension>(db::paxos_grace_seconds_extension::NAME);
auto cfg = ::make_shared<db::config>(ext);
cql_test_config cql_cfg(cfg);
cql_cfg.need_remote_proxy = true;
return do_with_cql_env([] (cql_test_env& e) {
// Verify that paxos_grace_seconds extensions gets recognized properly
@@ -156,7 +158,7 @@ SEASTAR_TEST_CASE(paxos_grace_seconds_extension) {
});
return f;
}, cfg);
}, std::move(cql_cfg));
}
SEASTAR_TEST_CASE(test_extension_remove) {

View File

@@ -202,6 +202,8 @@ auto make_options(clevel cl) {
} // anonymous namespace
SEASTAR_TEST_CASE(test_query_counters) {
cql_test_config cfg;
cfg.need_remote_proxy = true;
return do_with_cql_env_thread([](cql_test_env& e) {
// Executes a query and waits for it to complete.
auto process_query = [&e](const sstring& query, clevel cl) mutable {
@@ -278,7 +280,7 @@ SEASTAR_TEST_CASE(test_query_counters) {
clevel::ANY);
expected["ANY"] += 2;
BOOST_CHECK_EQUAL(expected, get_query_metrics());
});
}, std::move(cfg));
}
SEASTAR_TEST_CASE(test_select_full_scan_metrics) {

View File

@@ -897,6 +897,15 @@ public:
sys_dist_ks.start(std::ref(qp), std::ref(mm), std::ref(proxy)).get();
if (cfg_in.need_remote_proxy) {
proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(ms), std::ref(gossiper), std::ref(mm), std::ref(sys_ks)).get();
}
auto stop_proxy_remote = defer([&proxy, need = cfg_in.need_remote_proxy] {
if (need) {
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
}
});
sl_controller.invoke_on_all([&sys_dist_ks, &sl_controller] (qos::service_level_controller& service) {
qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor =
::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(

View File

@@ -90,6 +90,7 @@ public:
std::optional<replica::database_config> dbcfg;
std::set<sstring> disabled_features;
std::optional<cql3::query_processor::memory_config> qp_mcfg;
bool need_remote_proxy = false;
cql_test_config();
cql_test_config(const cql_test_config&);