table_helper: Require local query processor in calls

Keeping the query processor reference on the table_helper in raii manner
seems waistful, the only user of it -- the trace_keyspace_helper -- has
a bunch of helpers on board, each would then keep its own copy for no
gain.

At the same time the trace_keyspace_helper already gets the query processor
for its needs, so it can share one with table_helper-s.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2020-09-18 16:01:36 +03:00
parent f5d39b9638
commit b18522a7ab
4 changed files with 22 additions and 23 deletions

View File

@@ -57,7 +57,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp) const {
return service::get_local_migration_manager().announce_new_column_family(b.build(), false).discard_result().handle_exception([this] (auto ep) {});;
}
future<> table_helper::cache_table_info(service::query_state& qs) {
future<> table_helper::cache_table_info(cql3::query_processor& qp, service::query_state& qs) {
if (!_prepared_stmt) {
// if prepared statement has been invalidated - drop cached pointers
_insert_stmt = nullptr;
@@ -66,7 +66,6 @@ future<> table_helper::cache_table_info(service::query_state& qs) {
return now();
}
cql3::query_processor& qp = cql3::get_local_query_processor();
return qp.prepare(_insert_cql, qs.get_client_state(), false)
.then([this] (shared_ptr<cql_transport::messages::result_message::prepared> msg_ptr) noexcept {
_prepared_stmt = std::move(msg_ptr->get_prepared());
@@ -106,8 +105,8 @@ future<> table_helper::cache_table_info(service::query_state& qs) {
});
}
future<> table_helper::insert(service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
return cache_table_info(qs).then([this, &qs, opt_maker = std::move(opt_maker)] () mutable {
future<> table_helper::insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
return cache_table_info(qp, qs).then([this, &qs, opt_maker = std::move(opt_maker)] () mutable {
return do_with(opt_maker(), [this, &qs] (auto& opts) {
opts.prepare(_prepared_stmt->bound_names);
return _insert_stmt->execute(service::get_storage_proxy().local(), qs, opts);
@@ -115,7 +114,7 @@ future<> table_helper::insert(service::query_state& qs, noncopyable_function<cql
}).discard_result();
}
future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
future<> table_helper::setup_keyspace(cql3::query_processor& qp, const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
if (this_shard_id() == 0) {
size_t n = tables.size();
for (size_t i = 0; i < n; ++i) {
@@ -123,8 +122,7 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl
throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace");
}
}
return seastar::async([&keyspace_name, replication_factor, &qs, tables] {
cql3::query_processor& qp = cql3::get_local_query_processor();
return seastar::async([&qp, &keyspace_name, replication_factor, &qs, tables] {
database& db = qp.db();
// Create a keyspace

View File

@@ -72,7 +72,7 @@ public:
* @return a future that resolves when the given t_helper is ready to be used for
* data insertion.
*/
future<> cache_table_info(service::query_state&);
future<> cache_table_info(cql3::query_processor& qp, service::query_state&);
/**
* @return The table name
@@ -98,15 +98,15 @@ public:
*/
template <typename OptMaker, typename... Args>
requires seastar::CanInvoke<OptMaker, Args...>
future<> insert(service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) {
return insert(qs, noncopyable_function<cql3::query_options ()>([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable {
future<> insert(cql3::query_processor& qp, service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) {
return insert(qp, qs, noncopyable_function<cql3::query_options ()>([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable {
return apply(opt_maker, std::move(args));
}));
}
future<> insert(service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker);
future<> insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker);
static future<> setup_keyspace(const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables);
static future<> setup_keyspace(cql3::query_processor& qp, const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables);
/**
* Makes a monotonically increasing value in 100ns ("nanos") based on the given time

View File

@@ -242,7 +242,8 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
}
future<> trace_keyspace_helper::start() {
return table_helper::setup_keyspace(KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
cql3::query_processor& qp = cql3::get_local_query_processor();
return table_helper::setup_keyspace(qp, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
}
void trace_keyspace_helper::write_one_session_records(lw_shared_ptr<one_session_records> records) {
@@ -415,17 +416,16 @@ std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one
return values;
}
future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records) {
future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records) {
if (events_records.empty()) {
return now();
}
return _events.cache_table_info(_dummy_query_state).then([this, records, &events_records] {
return _events.cache_table_info(qp, _dummy_query_state).then([this, &qp, records, &events_records] {
tlogger.trace("{}: storing {} events records: parent_id {} span_id {}", records->session_id, events_records.size(), records->parent_id, records->my_span_id);
std::vector<cql3::statements::batch_statement::single_statement> modifications(events_records.size(), cql3::statements::batch_statement::single_statement(_events.insert_stmt(), false));
std::vector<std::vector<cql3::raw_value>> values;
auto& qp = cql3::get_local_query_processor();
values.reserve(events_records.size());
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
@@ -462,15 +462,16 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
auto backend_state_ptr = static_cast<trace_keyspace_backend_sesssion_state*>(records->backend_state_ptr.get());
semaphore& write_sem = backend_state_ptr->write_sem;
return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] {
return apply_events_mutation(records, events_records).then([this, session_record_is_ready, records] {
cql3::query_processor& qp = cql3::get_local_query_processor();
return apply_events_mutation(qp, records, events_records).then([this, &qp, session_record_is_ready, records] {
if (session_record_is_ready) {
// if session is finished - store a session and a session time index entries
tlogger.trace("{}: going to store a session event", records->session_id);
return _sessions.insert(_dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, records] {
return _sessions.insert(qp, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, records] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name());
return _sessions_time_idx.insert(_dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records));
}).then([this, records] {
return _sessions_time_idx.insert(qp, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records));
}).then([this, &qp, records] {
if (!records->do_log_slow_query) {
return now();
}
@@ -478,9 +479,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
// if slow query log is requested - store a slow query log and a slow query log time index entries
auto start_time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(_slow_query_last_nanos, records->session_rec.started_at));
tlogger.trace("{}: going to store a slow query event", records->session_id);
return _slow_query_log.insert(_dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, records, start_time_id] {
return _slow_query_log.insert(qp, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, records, start_time_id] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name());
return _slow_query_log_time_idx.insert(_dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id);
return _slow_query_log_time_idx.insert(qp, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id);
});
});
} else {

View File

@@ -131,7 +131,7 @@ private:
* @note A caller must ensure that @param events_records is alive till the
* returned future resolves.
*/
future<> apply_events_mutation(lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records);
future<> apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records);
/**
* Create a mutation data for a new session record