table_helper: remove qp.get_migration_manager() calls

Push those calls up the call stack, to `trace_keyspace_helper` module.
Pass `migration_manager` reference around together with
`query_processor` reference.
This commit is contained in:
Kamil Braun
2022-07-21 15:48:13 +02:00
parent 9e25a3cbed
commit 1b68e8582b
4 changed files with 32 additions and 29 deletions

View File

@@ -16,7 +16,7 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
future<> table_helper::setup_table(cql3::query_processor& qp, const sstring& create_cql) {
future<> table_helper::setup_table(cql3::query_processor& qp, service::migration_manager& mm, const sstring& create_cql) {
auto db = qp.db();
auto parsed = cql3::query_processor::parse_statement(create_cql);
@@ -32,8 +32,6 @@ future<> table_helper::setup_table(cql3::query_processor& qp, const sstring& cre
co_return;
}
auto& mm = qp.get_migration_manager();
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
@@ -58,7 +56,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp, const sstring& cre
} catch (...) {}
}
future<> table_helper::cache_table_info(cql3::query_processor& qp, service::query_state& qs) {
future<> table_helper::cache_table_info(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs) {
if (!_prepared_stmt) {
// if prepared statement has been invalidated - drop cached pointers
_insert_stmt = nullptr;
@@ -89,11 +87,11 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::quer
_insert_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(cql_stmt);
_is_fallback_stmt = true;
});
}).handle_exception([this, &qp] (auto eptr) {
}).handle_exception([this, &qp, &mm] (auto eptr) {
// One of the possible causes for an error here could be the table that doesn't exist.
//FIXME: discarded future.
(void)qp.container().invoke_on(0, [create_cql = _create_cql] (cql3::query_processor& qp) -> future<> {
co_return co_await table_helper::setup_table(qp, create_cql);
(void)qp.container().invoke_on(0, [&mm = mm.container(), create_cql = _create_cql] (cql3::query_processor& qp) -> future<> {
co_return co_await table_helper::setup_table(qp, mm.local(), create_cql);
});
// We throw the bad_column_family exception because the caller
@@ -108,8 +106,8 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::quer
});
}
future<> table_helper::insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
return cache_table_info(qp, qs).then([this, &qp, &qs, opt_maker = std::move(opt_maker)] () mutable {
future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
return cache_table_info(qp, mm, qs).then([this, &qp, &qs, opt_maker = std::move(opt_maker)] () mutable {
return do_with(opt_maker(), [this, &qp, &qs] (auto& opts) {
opts.prepare(_prepared_stmt->bound_names);
return _insert_stmt->execute(qp, qs, opts);
@@ -117,7 +115,7 @@ future<> table_helper::insert(cql3::query_processor& qp, service::query_state& q
}).discard_result();
}
future<> table_helper::setup_keyspace(cql3::query_processor& qp, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
if (this_shard_id() != 0) {
co_return;
}
@@ -129,7 +127,6 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, std::string_vie
}
data_dictionary::database db = qp.db();
auto& mm = qp.get_migration_manager();
if (!db.has_keyspace(keyspace_name)) {
auto group0_guard = co_await mm.start_group0_operation();
@@ -146,7 +143,7 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, std::string_vie
qs.get_client_state().set_keyspace(db.real_database(), keyspace_name);
// Create tables
co_await coroutine::parallel_for_each(tables, [&qp] (table_helper* t) {
return table_helper::setup_table(qp, t->_create_cql);
co_await coroutine::parallel_for_each(tables, [&qp, &mm] (table_helper* t) {
return table_helper::setup_table(qp, mm, t->_create_cql);
});
}

View File

@@ -12,6 +12,10 @@
#include "cql3/statements/prepared_statement.hh"
#include "service/query_state.hh"
namespace service {
class migration_manager;
}
namespace cql3 {
class query_processor;
namespace statements {
@@ -52,13 +56,13 @@ public:
* @return A future that resolves when the operation is complete. Any
* possible errors are ignored.
*/
static future<> setup_table(cql3::query_processor& qp, const sstring& create_cql);
static future<> setup_table(cql3::query_processor& qp, service::migration_manager& mm, const sstring& create_cql);
/**
* @return a future that resolves when the given t_helper is ready to be used for
* data insertion.
*/
future<> cache_table_info(cql3::query_processor& qp, service::query_state&);
future<> cache_table_info(cql3::query_processor& qp, service::migration_manager& mm, service::query_state&);
/**
* @return The table name
@@ -84,15 +88,15 @@ public:
*/
template <typename OptMaker, typename... Args>
requires seastar::CanInvoke<OptMaker, Args...>
future<> insert(cql3::query_processor& qp, service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) {
return insert(qp, qs, noncopyable_function<cql3::query_options ()>([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable {
future<> insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) {
return insert(qp, mm, qs, noncopyable_function<cql3::query_options ()>([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable {
return apply(opt_maker, std::move(args));
}));
}
future<> insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker);
future<> insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker);
static future<> setup_keyspace(cql3::query_processor& qp, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables);
static future<> setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables);
/**
* Makes a monotonically increasing value in 100ns ("nanos") based on the given time

View File

@@ -210,7 +210,8 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
future<> trace_keyspace_helper::start(cql3::query_processor& qp) {
_qp_anchor = &qp;
return table_helper::setup_keyspace(qp, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
auto& mm = qp.get_migration_manager();
return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
}
void trace_keyspace_helper::write_one_session_records(lw_shared_ptr<one_session_records> records) {
@@ -383,12 +384,12 @@ std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one
return values;
}
future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records) {
future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, service::migration_manager& mm, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records) {
if (events_records.empty()) {
return now();
}
return _events.cache_table_info(qp, _dummy_query_state).then([this, &qp, records, &events_records] {
return _events.cache_table_info(qp, mm, _dummy_query_state).then([this, &qp, records, &events_records] {
tlogger.trace("{}: storing {} events records: parent_id {} span_id {}", records->session_id, events_records.size(), records->parent_id, records->my_span_id);
std::vector<cql3::statements::batch_statement::single_statement> modifications(events_records.size(), cql3::statements::batch_statement::single_statement(_events.insert_stmt(), false));
@@ -433,15 +434,16 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
// is cleared on ::stop() after the gate is closed.
assert(_qp_anchor != nullptr);
cql3::query_processor& qp = *_qp_anchor;
return apply_events_mutation(qp, records, events_records).then([this, &qp, session_record_is_ready, records] {
auto& mm = qp.get_migration_manager();
return apply_events_mutation(qp, mm, records, events_records).then([this, &qp, &mm, session_record_is_ready, records] {
if (session_record_is_ready) {
// if session is finished - store a session and a session time index entries
tlogger.trace("{}: going to store a session event", records->session_id);
return _sessions.insert(qp, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, records] {
return _sessions.insert(qp, mm, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, &mm, records] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name());
return _sessions_time_idx.insert(qp, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records));
}).then([this, &qp, records] {
return _sessions_time_idx.insert(qp, mm, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records));
}).then([this, &qp, &mm, records] {
if (!records->do_log_slow_query) {
return now();
}
@@ -449,9 +451,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
// if slow query log is requested - store a slow query log and a slow query log time index entries
auto start_time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(_slow_query_last_nanos, records->session_rec.started_at));
tlogger.trace("{}: going to store a slow query event", records->session_id);
return _slow_query_log.insert(qp, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, records, start_time_id] {
return _slow_query_log.insert(qp, mm, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, &mm, records, start_time_id] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name());
return _slow_query_log_time_idx.insert(qp, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id);
return _slow_query_log_time_idx.insert(qp, mm, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id);
});
});
} else {

View File

@@ -101,7 +101,7 @@ private:
* @note A caller must ensure that @param events_records is alive till the
* returned future resolves.
*/
future<> apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records);
future<> apply_events_mutation(cql3::query_processor& qp, service::migration_manager& mm, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records);
/**
* Create a mutation data for a new session record