diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index 9e23282613..1edf82acc4 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -17,8 +17,8 @@ #include "types/set.hh" #include "types/map.hh" #include "utils/UUID_gen.hh" -#include "utils/fb_utilities.hh" #include "utils/class_registrator.hh" +#include "service/storage_proxy.hh" namespace tracing { @@ -214,6 +214,10 @@ future<> trace_keyspace_helper::start(cql3::query_processor& qp, service::migrat return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); } +gms::inet_address trace_keyspace_helper::my_address() const noexcept { + return _qp_anchor->proxy().local_db().get_token_metadata().get_topology().my_address(); +} + void trace_keyspace_helper::write_one_session_records(lw_shared_ptr records) { // Future is waited on indirectly in `stop()` (via `_pending_writes`). (void)with_gate(_pending_writes, [this, records = std::move(records)] { @@ -244,7 +248,7 @@ void trace_keyspace_helper::write_records_bulk(records_bulk& bulk) { }); } -cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_session_records& session_records) { +cql3::query_options trace_keyspace_helper::make_session_mutation_data(gms::inet_address my_address, const one_session_records& session_records) { const session_record& record = session_records.session_rec; auto millis_since_epoch = std::chrono::duration_cast(record.started_at.time_since_epoch()).count(); std::vector> parameters_values_vector; @@ -269,7 +273,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_ cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)), cql3::raw_value::make_value(utf8_type->decompose(type_to_string(record.command))), cql3::raw_value::make_value(inet_addr_type->decompose(record.client.addr())), - cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())), + cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())), cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))), cql3::raw_value::make_value(make_map_value(my_map_type, map_type_impl::native_type(std::move(parameters_values_vector))).serialize()), cql3::raw_value::make_value(utf8_type->decompose(record.request)), @@ -284,7 +288,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_ db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT); } -cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) { +cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& session_records) { auto started_at_duration = session_records.session_rec.started_at.time_since_epoch(); // timestamp in minutes when the query began auto minutes_in_millis = std::chrono::duration_cast(std::chrono::duration_cast(started_at_duration)).count(); @@ -302,7 +306,7 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT); } -cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { +cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(gms::inet_address my_address, const one_session_records& session_records, const utils::UUID& start_time_id) { const session_record& record = session_records.session_rec; auto millis_since_epoch = std::chrono::duration_cast(record.started_at.time_since_epoch()).count(); @@ -325,7 +329,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o auto my_set_type = set_type_impl::get_instance(utf8_type, true); std::vector values({ - cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())), + cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())), cql3::raw_value::make_value(int32_type->decompose((int32_t)(this_shard_id()))), cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)), cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)), @@ -345,7 +349,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT); } -cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { +cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& session_records, const utils::UUID& start_time_id) { auto started_at_duration = session_records.session_rec.started_at.time_since_epoch(); // timestamp in minutes when the query began auto minutes_in_millis = std::chrono::duration_cast(std::chrono::duration_cast(started_at_duration)).count(); @@ -357,7 +361,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)), cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)), cql3::raw_value::make_value(timeuuid_type->decompose(start_time_id)), - cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())), + cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())), cql3::raw_value::make_value(int32_type->decompose(int32_t(this_shard_id()))), cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.session_rec.slow_query_record_ttl.count()))) }); @@ -366,14 +370,14 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT); } -std::vector trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) { +std::vector trace_keyspace_helper::make_event_mutation_data(gms::inet_address my_address, one_session_records& session_records, const event_record& record) { auto backend_state_ptr = static_cast(session_records.backend_state_ptr.get()); std::vector values({ cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)), cql3::raw_value::make_value(timeuuid_type->decompose(utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(backend_state_ptr->last_nanos, record.event_time_point)))), cql3::raw_value::make_value(utf8_type->decompose(record.message)), - cql3::raw_value::make_value(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr())), + cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())), cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))), cql3::raw_value::make_value(utf8_type->decompose(_local_tracing.get_thread_name())), cql3::raw_value::make_value(long_type->decompose(int64_t(session_records.parent_id.get_id()))), @@ -396,7 +400,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, std::vector values; values.reserve(events_records.size()); - std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); }); + std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(my_address(), *all_records, one_event_record)); }); return do_with( cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT), std::move(values)), @@ -440,9 +444,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptrsession_id); - return _sessions.insert(qp, mm, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, &mm, records] { + return _sessions.insert(qp, mm, _dummy_query_state, make_session_mutation_data, my_address(), std::ref(*records)).then([this, &qp, &mm, records] { tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name()); - return _sessions_time_idx.insert(qp, mm, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records)); + return _sessions_time_idx.insert(qp, mm, _dummy_query_state, make_session_time_idx_mutation_data, my_address(), std::ref(*records)); }).then([this, &qp, &mm, records] { if (!records->do_log_slow_query) { return now(); @@ -451,9 +455,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptrsession_rec.started_at)); tlogger.trace("{}: going to store a slow query event", records->session_id); - return _slow_query_log.insert(qp, mm, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, &mm, records, start_time_id] { + return _slow_query_log.insert(qp, mm, _dummy_query_state, make_slow_query_mutation_data, my_address(), std::ref(*records), start_time_id).then([this, &qp, &mm, records, start_time_id] { tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name()); - return _slow_query_log_time_idx.insert(qp, mm, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id); + return _slow_query_log_time_idx.insert(qp, mm, _dummy_query_state, make_slow_query_time_idx_mutation_data, my_address(), std::ref(*records), start_time_id); }); }); } else { diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index 1696a3f536..ee52b716ad 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -73,6 +73,9 @@ public: virtual std::unique_ptr allocate_session_state() const override; private: + // Valid only after start() sets _qp_anchor + gms::inet_address my_address() const noexcept; + /** * Write records of a single tracing session * @@ -115,7 +118,7 @@ private: * * @return the relevant cql3::query_options object with the mutation data */ - static cql3::query_options make_session_mutation_data(const one_session_records& all_records_handle); + static cql3::query_options make_session_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle); /** * Create a mutation data for a new session_idx record @@ -124,7 +127,7 @@ private: * * @return the relevant cql3::query_options object with the mutation data */ - static cql3::query_options make_session_time_idx_mutation_data(const one_session_records& all_records_handle); + static cql3::query_options make_session_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle); /** * Create mutation for a new slow_query_log record @@ -134,7 +137,7 @@ private: * * @return the relevant mutation */ - static cql3::query_options make_slow_query_mutation_data(const one_session_records& all_records_handle, const utils::UUID& start_time_id); + static cql3::query_options make_slow_query_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle, const utils::UUID& start_time_id); /** * Create mutation for a new slow_query_log_time_idx record @@ -144,7 +147,7 @@ private: * * @return the relevant mutation */ - static cql3::query_options make_slow_query_time_idx_mutation_data(const one_session_records& all_records_handle, const utils::UUID& start_time_id); + static cql3::query_options make_slow_query_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& all_records_handle, const utils::UUID& start_time_id); /** * Create a mutation data for a new trace point record @@ -156,7 +159,7 @@ private: * * @return a vector with the mutation data */ - std::vector make_event_mutation_data(one_session_records& session_records, const event_record& record); + std::vector make_event_mutation_data(gms::inet_address my_address, one_session_records& session_records, const event_record& record); /** * Converts a @param elapsed to an int32_t value of microseconds.