tracing: use locator::topology rather than fb_utilities
Get my_address via query_processor->proxy and pass it to all static make_ methods, instead of getting it from utils::fb_utilities. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -17,8 +17,8 @@
|
||||
#include "types/set.hh"
|
||||
#include "types/map.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
|
||||
namespace tracing {
|
||||
|
||||
@@ -214,6 +214,10 @@ future<> trace_keyspace_helper::start(cql3::query_processor& qp, service::migrat
|
||||
return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
|
||||
}
|
||||
|
||||
gms::inet_address trace_keyspace_helper::my_address() const noexcept {
|
||||
return _qp_anchor->proxy().local_db().get_token_metadata().get_topology().my_address();
|
||||
}
|
||||
|
||||
void trace_keyspace_helper::write_one_session_records(lw_shared_ptr<one_session_records> records) {
|
||||
// Future is waited on indirectly in `stop()` (via `_pending_writes`).
|
||||
(void)with_gate(_pending_writes, [this, records = std::move(records)] {
|
||||
@@ -244,7 +248,7 @@ void trace_keyspace_helper::write_records_bulk(records_bulk& bulk) {
|
||||
});
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_session_records& session_records) {
|
||||
cql3::query_options trace_keyspace_helper::make_session_mutation_data(gms::inet_address my_address, const one_session_records& session_records) {
|
||||
const session_record& record = session_records.session_rec;
|
||||
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(record.started_at.time_since_epoch()).count();
|
||||
std::vector<std::pair<data_value, data_value>> parameters_values_vector;
|
||||
@@ -269,7 +273,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
|
||||
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(type_to_string(record.command))),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(record.client.addr())),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
|
||||
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
|
||||
cql3::raw_value::make_value(make_map_value(my_map_type, map_type_impl::native_type(std::move(parameters_values_vector))).serialize()),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(record.request)),
|
||||
@@ -284,7 +288,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
|
||||
db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT);
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) {
|
||||
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& session_records) {
|
||||
auto started_at_duration = session_records.session_rec.started_at.time_since_epoch();
|
||||
// timestamp in minutes when the query began
|
||||
auto minutes_in_millis = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration_cast<std::chrono::minutes>(started_at_duration)).count();
|
||||
@@ -302,7 +306,7 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(gms::inet_address my_address, const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
const session_record& record = session_records.session_rec;
|
||||
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(record.started_at.time_since_epoch()).count();
|
||||
|
||||
@@ -325,7 +329,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
|
||||
auto my_set_type = set_type_impl::get_instance(utf8_type, true);
|
||||
|
||||
std::vector<cql3::raw_value> values({
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
|
||||
cql3::raw_value::make_value(int32_type->decompose((int32_t)(this_shard_id()))),
|
||||
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
|
||||
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
|
||||
@@ -345,7 +349,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
auto started_at_duration = session_records.session_rec.started_at.time_since_epoch();
|
||||
// timestamp in minutes when the query began
|
||||
auto minutes_in_millis = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration_cast<std::chrono::minutes>(started_at_duration)).count();
|
||||
@@ -357,7 +361,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
|
||||
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
|
||||
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
|
||||
cql3::raw_value::make_value(timeuuid_type->decompose(start_time_id)),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
|
||||
cql3::raw_value::make_value(int32_type->decompose(int32_t(this_shard_id()))),
|
||||
cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.session_rec.slow_query_record_ttl.count())))
|
||||
});
|
||||
@@ -366,14 +370,14 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
|
||||
}
|
||||
|
||||
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) {
|
||||
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(gms::inet_address my_address, one_session_records& session_records, const event_record& record) {
|
||||
auto backend_state_ptr = static_cast<trace_keyspace_backend_sesssion_state*>(session_records.backend_state_ptr.get());
|
||||
|
||||
std::vector<cql3::raw_value> values({
|
||||
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
|
||||
cql3::raw_value::make_value(timeuuid_type->decompose(utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(backend_state_ptr->last_nanos, record.event_time_point)))),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(record.message)),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
|
||||
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(_local_tracing.get_thread_name())),
|
||||
cql3::raw_value::make_value(long_type->decompose(int64_t(session_records.parent_id.get_id()))),
|
||||
@@ -396,7 +400,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp,
|
||||
std::vector<cql3::raw_value_vector_with_unset> values;
|
||||
|
||||
values.reserve(events_records.size());
|
||||
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
|
||||
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(my_address(), *all_records, one_event_record)); });
|
||||
|
||||
return do_with(
|
||||
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT), std::move(values)),
|
||||
@@ -440,9 +444,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
|
||||
|
||||
// if session is finished - store a session and a session time index entries
|
||||
tlogger.trace("{}: going to store a session event", records->session_id);
|
||||
return _sessions.insert(qp, mm, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, &mm, records] {
|
||||
return _sessions.insert(qp, mm, _dummy_query_state, make_session_mutation_data, my_address(), std::ref(*records)).then([this, &qp, &mm, records] {
|
||||
tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name());
|
||||
return _sessions_time_idx.insert(qp, mm, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records));
|
||||
return _sessions_time_idx.insert(qp, mm, _dummy_query_state, make_session_time_idx_mutation_data, my_address(), std::ref(*records));
|
||||
}).then([this, &qp, &mm, records] {
|
||||
if (!records->do_log_slow_query) {
|
||||
return now();
|
||||
@@ -451,9 +455,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
|
||||
// if slow query log is requested - store a slow query log and a slow query log time index entries
|
||||
auto start_time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(_slow_query_last_nanos, records->session_rec.started_at));
|
||||
tlogger.trace("{}: going to store a slow query event", records->session_id);
|
||||
return _slow_query_log.insert(qp, mm, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, &mm, records, start_time_id] {
|
||||
return _slow_query_log.insert(qp, mm, _dummy_query_state, make_slow_query_mutation_data, my_address(), std::ref(*records), start_time_id).then([this, &qp, &mm, records, start_time_id] {
|
||||
tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name());
|
||||
return _slow_query_log_time_idx.insert(qp, mm, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id);
|
||||
return _slow_query_log_time_idx.insert(qp, mm, _dummy_query_state, make_slow_query_time_idx_mutation_data, my_address(), std::ref(*records), start_time_id);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
|
||||
@@ -73,6 +73,9 @@ public:
|
||||
virtual std::unique_ptr<backend_session_state_base> allocate_session_state() const override;
|
||||
|
||||
private:
|
||||
// Valid only after start() sets _qp_anchor
|
||||
gms::inet_address my_address() const noexcept;
|
||||
|
||||
/**
|
||||
* Write records of a single tracing session
|
||||
*
|
||||
@@ -115,7 +118,7 @@ private:
|
||||
*
|
||||
* @return the relevant cql3::query_options object with the mutation data
|
||||
*/
|
||||
static cql3::query_options make_session_mutation_data(const one_session_records& all_records_handle);
|
||||
static cql3::query_options make_session_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle);
|
||||
|
||||
/**
|
||||
* Create a mutation data for a new session_idx record
|
||||
@@ -124,7 +127,7 @@ private:
|
||||
*
|
||||
* @return the relevant cql3::query_options object with the mutation data
|
||||
*/
|
||||
static cql3::query_options make_session_time_idx_mutation_data(const one_session_records& all_records_handle);
|
||||
static cql3::query_options make_session_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle);
|
||||
|
||||
/**
|
||||
* Create mutation for a new slow_query_log record
|
||||
@@ -134,7 +137,7 @@ private:
|
||||
*
|
||||
* @return the relevant mutation
|
||||
*/
|
||||
static cql3::query_options make_slow_query_mutation_data(const one_session_records& all_records_handle, const utils::UUID& start_time_id);
|
||||
static cql3::query_options make_slow_query_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle, const utils::UUID& start_time_id);
|
||||
|
||||
/**
|
||||
* Create mutation for a new slow_query_log_time_idx record
|
||||
@@ -144,7 +147,7 @@ private:
|
||||
*
|
||||
* @return the relevant mutation
|
||||
*/
|
||||
static cql3::query_options make_slow_query_time_idx_mutation_data(const one_session_records& all_records_handle, const utils::UUID& start_time_id);
|
||||
static cql3::query_options make_slow_query_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle, const utils::UUID& start_time_id);
|
||||
|
||||
/**
|
||||
* Create a mutation data for a new trace point record
|
||||
@@ -156,7 +159,7 @@ private:
|
||||
*
|
||||
* @return a vector with the mutation data
|
||||
*/
|
||||
std::vector<cql3::raw_value> make_event_mutation_data(one_session_records& session_records, const event_record& record);
|
||||
std::vector<cql3::raw_value> make_event_mutation_data(gms::inet_address my_address, one_session_records& session_records, const event_record& record);
|
||||
|
||||
/**
|
||||
* Converts a @param elapsed to an int32_t value of microseconds.
|
||||
|
||||
Reference in New Issue
Block a user