code: Use qctx::evecute_cql methods, not global ones

There are global db::execute_cql() helpers that just forward
the args into qctx::execute_cql(). The former are going away,
so patch all callers to use qctx themselves.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2020-11-10 14:47:19 +03:00
parent 8bf6b1298c
commit 303ebe4a36
5 changed files with 43 additions and 43 deletions

View File

@@ -49,7 +49,7 @@ future<> notify_new_client(client_data cd) {
= format("INSERT INTO system.{} (address, port, client_type, connection_stage, shard_id, protocol_version, username) "
"VALUES (?, ?, ?, ?, ?, ?, ?);", db::system_keyspace::CLIENTS);
return db::execute_cql(req,
return db::qctx->execute_cql(req,
std::move(cd.ip), cd.port, to_string(cd.ct), to_string(cd.connection_stage), cd.shard_id,
cd.protocol_version.has_value() ? data_value(*cd.protocol_version) : data_value::make_null(int32_type),
cd.username.value_or("anonymous")).discard_result();
@@ -60,7 +60,7 @@ future<> notify_disconnected_client(net::inet_address addr, int port, client_typ
const static sstring req
= format("DELETE FROM system.{} where address=? AND port=? AND client_type=?;",
db::system_keyspace::CLIENTS);
return db::execute_cql(req, std::move(addr), port, to_string(ct)).discard_result();
return db::qctx->execute_cql(req, std::move(addr), port, to_string(ct)).discard_result();
}
future<> clear_clientlist() {

View File

@@ -101,6 +101,6 @@ struct notify_client_change {
= format("UPDATE system.{} SET {}=? WHERE address=? AND port=? AND client_type=?;",
db::system_keyspace::CLIENTS, column_literal<column_enum_val>);
return db::execute_cql(req, std::forward<T>(value), std::move(addr), port, to_string(ct)).discard_result();
return db::qctx->execute_cql(req, std::forward<T>(value), std::move(addr), port, to_string(ct)).discard_result();
}
};

View File

@@ -83,7 +83,7 @@ static future<> try_record(std::string_view large_table, const sstables::sstable
std::string pk_str = key_to_str(partition_key.to_partition_key(s), s);
auto timestamp = db_clock::now();
large_data_logger.warn("Writing large {} {}/{}: {}{} ({} bytes)", desc, ks_name, cf_name, pk_str, extra_path, size);
return db::execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...)
return db::qctx->execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...)
.discard_result()
.handle_exception([ks_name, cf_name, large_table, sstable_name] (std::exception_ptr ep) {
large_data_logger.warn("Failed to add a record to system.large_{}s: ks = {}, table = {}, sst = {} exception = {}",
@@ -133,7 +133,7 @@ future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s
const sstring req =
format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?",
large_table_name);
return db::execute_cql(req, s.ks_name(), s.cf_name(), sstable_name)
return db::qctx->execute_cql(req, s.ks_name(), s.cf_name(), sstable_name)
.discard_result()
.handle_exception([&s, sstable_name, large_table_name] (std::exception_ptr ep) {
large_data_logger.warn("Failed to drop entries from {}: ks = {}, table = {}, sst = {} exception = {}",

View File

@@ -233,7 +233,7 @@ future<> save_system_schema(const sstring & ksname) {
// delete old, possibly obsolete entries in schema tables
return parallel_for_each(all_table_names(schema_features::full()), [ksm] (sstring cf) {
auto deletion_timestamp = schema_creation_timestamp() - 1;
return db::execute_cql(format("DELETE FROM {}.{} USING TIMESTAMP {} WHERE keyspace_name = ?", NAME, cf,
return qctx->execute_cql(format("DELETE FROM {}.{} USING TIMESTAMP {} WHERE keyspace_name = ?", NAME, cf,
deletion_timestamp), ksm->name()).discard_result();
}).then([ksm] {
auto mvec = make_create_keyspace_mutations(ksm, schema_creation_timestamp(), true);

View File

@@ -1163,7 +1163,7 @@ static future<> setup_version(distributed<gms::feature_service>& feat, sharded<n
, db::system_keyspace::LOCAL);
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
return execute_cql(req, sstring(db::system_keyspace::LOCAL),
return qctx->execute_cql(req, sstring(db::system_keyspace::LOCAL),
version::release(),
cql3::query_processor::CQL_VERSION,
::cassandra::thrift_version,
@@ -1199,7 +1199,7 @@ struct local_cache {
static distributed<local_cache> _local_cache;
static future<> build_dc_rack_info() {
return execute_cql(format("SELECT peer, data_center, rack from system.{}", PEERS)).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
return qctx->execute_cql(format("SELECT peer, data_center, rack from system.{}", PEERS)).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
return do_for_each(*msg, [] (auto& row) {
net::inet_address peer = row.template get_as<net::inet_address>("peer");
if (!row.has("data_center") || !row.has("rack")) {
@@ -1221,7 +1221,7 @@ static future<> build_dc_rack_info() {
static future<> build_bootstrap_info() {
sstring req = format("SELECT bootstrapped FROM system.{} WHERE key = ? ", LOCAL);
return execute_cql(req, sstring(LOCAL)).then([] (auto msg) {
return qctx->execute_cql(req, sstring(LOCAL)).then([] (auto msg) {
static auto state_map = std::unordered_map<sstring, bootstrap_state>({
{ "NEEDS_BOOTSTRAP", bootstrap_state::NEEDS_BOOTSTRAP },
{ "COMPLETED", bootstrap_state::COMPLETED },
@@ -1411,7 +1411,7 @@ future<> update_tokens(gms::inet_address ep, const std::unordered_set<dht::token
sstring req = format("INSERT INTO system.{} (peer, tokens) VALUES (?, ?)", PEERS);
auto set_type = set_type_impl::get_instance(utf8_type, true);
return execute_cql(req, ep.addr(), make_set_value(set_type, prepare_tokens(tokens))).discard_result().then([] {
return qctx->execute_cql(req, ep.addr(), make_set_value(set_type, prepare_tokens(tokens))).discard_result().then([] {
return force_blocking_flush(PEERS);
});
}
@@ -1419,7 +1419,7 @@ future<> update_tokens(gms::inet_address ep, const std::unordered_set<dht::token
future<std::unordered_map<gms::inet_address, std::unordered_set<dht::token>>> load_tokens() {
sstring req = format("SELECT peer, tokens FROM system.{}", PEERS);
return execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return qctx->execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
std::unordered_map<gms::inet_address, std::unordered_set<dht::token>> ret;
for (auto& row : *cql_result) {
auto peer = gms::inet_address(row.get_as<net::inet_address>("peer"));
@@ -1437,7 +1437,7 @@ future<std::unordered_map<gms::inet_address, std::unordered_set<dht::token>>> lo
future<std::unordered_map<gms::inet_address, utils::UUID>> load_host_ids() {
sstring req = format("SELECT peer, host_id FROM system.{}", PEERS);
return execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return qctx->execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
std::unordered_map<gms::inet_address, utils::UUID> ret;
for (auto& row : *cql_result) {
auto peer = gms::inet_address(row.get_as<net::inet_address>("peer"));
@@ -1451,7 +1451,7 @@ future<std::unordered_map<gms::inet_address, utils::UUID>> load_host_ids() {
future<std::unordered_map<gms::inet_address, sstring>> load_peer_features() {
sstring req = format("SELECT peer, supported_features FROM system.{}", PEERS);
return execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return qctx->execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
std::unordered_map<gms::inet_address, sstring> ret;
for (auto& row : *cql_result) {
if (row.has("supported_features")) {
@@ -1465,14 +1465,14 @@ future<std::unordered_map<gms::inet_address, sstring>> load_peer_features() {
future<> update_preferred_ip(gms::inet_address ep, gms::inet_address preferred_ip) {
sstring req = format("INSERT INTO system.{} (peer, preferred_ip) VALUES (?, ?)", PEERS);
return execute_cql(req, ep.addr(), preferred_ip.addr()).discard_result().then([] {
return qctx->execute_cql(req, ep.addr(), preferred_ip.addr()).discard_result().then([] {
return force_blocking_flush(PEERS);
});
}
future<std::unordered_map<gms::inet_address, gms::inet_address>> get_preferred_ips() {
sstring req = format("SELECT peer, preferred_ip FROM system.{}", PEERS);
return execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_res_set) {
return qctx->execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_res_set) {
std::unordered_map<gms::inet_address, gms::inet_address> res;
for (auto& r : *cql_res_set) {
@@ -1513,7 +1513,7 @@ future<> update_peer_info(gms::inet_address ep, sstring column_name, Value value
return update_cached_values(ep, column_name, value).then([ep, column_name, value] {
sstring req = format("INSERT INTO system.{} (peer, {}) VALUES (?, ?)", PEERS, column_name);
return execute_cql(req, ep.addr(), value).discard_result();
return qctx->execute_cql(req, ep.addr(), value).discard_result();
});
}
// sets are not needed, since tokens are updated by another method
@@ -1523,12 +1523,12 @@ template future<> update_peer_info<net::inet_address>(gms::inet_address ep, sstr
future<> set_scylla_local_param(const sstring& key, const sstring& value) {
sstring req = format("UPDATE system.{} SET value = ? WHERE key = ?", SCYLLA_LOCAL);
return execute_cql(req, value, key).discard_result();
return qctx->execute_cql(req, value, key).discard_result();
}
future<std::optional<sstring>> get_scylla_local_param(const sstring& key){
sstring req = format("SELECT value FROM system.{} WHERE key = ?", SCYLLA_LOCAL);
return execute_cql(req, key).then([] (::shared_ptr<cql3::untyped_result_set> res) {
return qctx->execute_cql(req, key).then([] (::shared_ptr<cql3::untyped_result_set> res) {
if (res->empty() || !res->one().has("value")) {
return std::optional<sstring>();
}
@@ -1538,7 +1538,7 @@ future<std::optional<sstring>> get_scylla_local_param(const sstring& key){
future<> update_schema_version(utils::UUID version) {
sstring req = format("INSERT INTO system.{} (key, schema_version) VALUES (?, ?)", LOCAL);
return execute_cql(req, sstring(LOCAL), version).discard_result();
return qctx->execute_cql(req, sstring(LOCAL), version).discard_result();
}
/**
@@ -1549,7 +1549,7 @@ future<> remove_endpoint(gms::inet_address ep) {
lc._cached_dc_rack_info.erase(ep);
}).then([ep] {
sstring req = format("DELETE FROM system.{} WHERE peer = ?", PEERS);
return execute_cql(req, ep.addr()).discard_result();
return qctx->execute_cql(req, ep.addr()).discard_result();
}).then([] {
return force_blocking_flush(PEERS);
});
@@ -1562,13 +1562,13 @@ future<> update_tokens(const std::unordered_set<dht::token>& tokens) {
sstring req = format("INSERT INTO system.{} (key, tokens) VALUES (?, ?)", LOCAL);
auto set_type = set_type_impl::get_instance(utf8_type, true);
return execute_cql(req, sstring(LOCAL), make_set_value(set_type, prepare_tokens(tokens))).discard_result().then([] {
return qctx->execute_cql(req, sstring(LOCAL), make_set_value(set_type, prepare_tokens(tokens))).discard_result().then([] {
return force_blocking_flush(LOCAL);
});
}
future<> update_cdc_streams_timestamp(db_clock::time_point tp) {
return execute_cql(format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)",
return qctx->execute_cql(format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)",
v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), tp)
.discard_result().then([] { return force_blocking_flush(v3::CDC_LOCAL); });
}
@@ -1591,11 +1591,11 @@ future<> force_blocking_flush(sstring cfname) {
future<> check_health() {
using namespace cql_transport::messages;
sstring req = format("SELECT cluster_name FROM system.{} WHERE key=?", LOCAL);
return execute_cql(req, sstring(LOCAL)).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
return qctx->execute_cql(req, sstring(LOCAL)).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
if (msg->empty() || !msg->one().has("cluster_name")) {
// this is a brand new node
sstring ins_req = format("INSERT INTO system.{} (key, cluster_name) VALUES (?, ?)", LOCAL);
return execute_cql(ins_req, sstring(LOCAL), qctx->db().get_config().cluster_name()).discard_result();
return qctx->execute_cql(ins_req, sstring(LOCAL), qctx->db().get_config().cluster_name()).discard_result();
} else {
auto saved_cluster_name = msg->one().get_as<sstring>("cluster_name");
auto cluster_name = qctx->db().get_config().cluster_name();
@@ -1611,7 +1611,7 @@ future<> check_health() {
future<std::unordered_set<dht::token>> get_saved_tokens() {
sstring req = format("SELECT tokens FROM system.{} WHERE key = ?", LOCAL);
return execute_cql(req, sstring(LOCAL)).then([] (auto msg) {
return qctx->execute_cql(req, sstring(LOCAL)).then([] (auto msg) {
if (msg->empty() || !msg->one().has("tokens")) {
return make_ready_future<std::unordered_set<dht::token>>();
}
@@ -1637,7 +1637,7 @@ future<std::unordered_set<dht::token>> get_local_tokens() {
}
future<std::optional<db_clock::time_point>> get_saved_cdc_streams_timestamp() {
return execute_cql(format("SELECT streams_timestamp FROM system.{} WHERE key = ?", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL))
return qctx->execute_cql(format("SELECT streams_timestamp FROM system.{} WHERE key = ?", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL))
.then([] (::shared_ptr<cql3::untyped_result_set> msg)-> std::optional<db_clock::time_point> {
if (msg->empty() || !msg->one().has("streams_timestamp")) {
return {};
@@ -1674,7 +1674,7 @@ future<> set_bootstrap_state(bootstrap_state state) {
sstring state_name = state_to_name.at(state);
sstring req = format("INSERT INTO system.{} (key, bootstrapped) VALUES (?, ?)", LOCAL);
return execute_cql(req, sstring(LOCAL), state_name).discard_result().then([state] {
return qctx->execute_cql(req, sstring(LOCAL), state_name).discard_result().then([state] {
return force_blocking_flush(LOCAL).then([state] {
return _local_cache.invoke_on_all([state] (local_cache& lc) {
lc._state = state;
@@ -1764,7 +1764,7 @@ void make(database& db, bool durable, bool volatile_testing_only) {
future<utils::UUID> get_local_host_id() {
using namespace cql_transport::messages;
sstring req = format("SELECT host_id FROM system.{} WHERE key=?", LOCAL);
return execute_cql(req, sstring(LOCAL)).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
return qctx->execute_cql(req, sstring(LOCAL)).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
auto new_id = [] {
auto host_id = utils::make_random_uuid();
return set_local_host_id(host_id);
@@ -1780,7 +1780,7 @@ future<utils::UUID> get_local_host_id() {
future<utils::UUID> set_local_host_id(const utils::UUID& host_id) {
sstring req = format("INSERT INTO system.{} (key, host_id) VALUES (?, ?)", LOCAL);
return execute_cql(req, sstring(LOCAL), host_id).then([] (auto msg) {
return qctx->execute_cql(req, sstring(LOCAL), host_id).then([] (auto msg) {
return force_blocking_flush(LOCAL);
}).then([host_id] {
return host_id;
@@ -1872,7 +1872,7 @@ future<> update_compaction_history(utils::UUID uuid, sstring ksname, sstring cfn
, COMPACTION_HISTORY);
db_clock::time_point tp{db_clock::duration{compacted_at}};
return execute_cql(req, uuid, ksname, cfname, tp, bytes_in, bytes_out,
return qctx->execute_cql(req, uuid, ksname, cfname, tp, bytes_in, bytes_out,
make_map_value(map_type, prepare_rows_merged(rows_merged))).discard_result().handle_exception([] (auto ep) {
slogger.error("update compaction history failed: {}: ignored", ep);
});
@@ -1949,7 +1949,7 @@ mutation make_size_estimates_mutation(const sstring& ks, std::vector<range_estim
future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) {
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)",
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
return execute_cql(
return qctx->execute_cql(
std::move(req),
std::move(ks_name),
std::move(view_name),
@@ -1961,7 +1961,7 @@ future<> register_view_for_building(sstring ks_name, sstring view_name, const dh
future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)",
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
return execute_cql(
return qctx->execute_cql(
std::move(req),
std::move(ks_name),
std::move(view_name),
@@ -1970,14 +1970,14 @@ future<> update_view_build_progress(sstring ks_name, sstring view_name, const dh
}
future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) {
return execute_cql(
return qctx->execute_cql(
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
std::move(ks_name),
std::move(view_name)).discard_result();
}
future<> remove_view_build_progress(sstring ks_name, sstring view_name) {
return execute_cql(
return qctx->execute_cql(
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
std::move(ks_name),
std::move(view_name),
@@ -1985,21 +1985,21 @@ future<> remove_view_build_progress(sstring ks_name, sstring view_name) {
}
future<> mark_view_as_built(sstring ks_name, sstring view_name) {
return execute_cql(
return qctx->execute_cql(
format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS),
std::move(ks_name),
std::move(view_name)).discard_result();
}
future<> remove_built_view(sstring ks_name, sstring view_name) {
return execute_cql(
return qctx->execute_cql(
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS),
std::move(ks_name),
std::move(view_name)).discard_result();
}
future<std::vector<view_name>> load_built_views() {
return execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return qctx->execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
return boost::copy_range<std::vector<view_name>>(*cql_result
| boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) {
auto ks_name = row.get_as<sstring>("keyspace_name");
@@ -2010,7 +2010,7 @@ future<std::vector<view_name>> load_built_views() {
}
future<std::vector<view_build_progress>> load_view_build_progress() {
return execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}",
return qctx->execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}",
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
std::vector<view_build_progress> progress;
for (auto& row : *cql_result) {
@@ -2041,7 +2041,7 @@ future<service::paxos::paxos_state> load_paxos_state(partition_key_view key, sch
static auto cql = format("SELECT * FROM system.{} WHERE row_key = ? AND cf_id = ?", PAXOS);
// FIXME: we need execute_cql_with_now()
(void)now;
auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id());
auto f = qctx->execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id());
return f.then([s, key = std::move(key)] (shared_ptr<cql3::untyped_result_set> results) mutable {
if (results->empty()) {
return service::paxos::paxos_state();
@@ -2080,7 +2080,7 @@ static int32_t paxos_ttl_sec(const schema& s) {
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
return execute_cql_with_timeout(cql,
return qctx->execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(ballot),
paxos_ttl_sec(s),
@@ -2093,7 +2093,7 @@ future<> save_paxos_promise(const schema& s, const partition_key& key, const uti
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout) {
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ?, proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
partition_key_view key = proposal.update.key();
return execute_cql_with_timeout(cql,
return qctx->execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(proposal.ballot),
paxos_ttl_sec(s),
@@ -2115,7 +2115,7 @@ future<> save_paxos_decision(const schema& s, const service::paxos::proposal& de
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null,"
" most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
partition_key_view key = decision.update.key();
return execute_cql_with_timeout(cql,
return qctx->execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(decision.ballot),
paxos_ttl_sec(s),
@@ -2132,7 +2132,7 @@ future<> delete_paxos_decision(const schema& s, const partition_key& key, const
// guarantees that if there is more recent round it will not be affected.
static auto cql = format("DELETE most_recent_commit FROM system.{} USING TIMESTAMP ? WHERE row_key = ? AND cf_id = ?", PAXOS);
return execute_cql_with_timeout(cql,
return qctx->execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(ballot),
to_legacy(*key.get_compound_type(s), key.representation()),