cql_metrics: Add counters for CQL request messages

This change adds metrics for counting request message types
listed in the CQL v.4 spec under section 4.1
(https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec).
To organize things properly, we introduce a new cql_server::transport_stats
object type for aggregating the message and server statistics.

Fixes #4888

Closes #7574
This commit is contained in:
Piotr Wojtczak
2020-11-09 18:55:02 +01:00
committed by Avi Kivity
parent d5a6aa4533
commit d9810ec8eb
2 changed files with 70 additions and 22 deletions

View File

@@ -160,27 +160,51 @@ cql_server::cql_server(distributed<cql3::query_processor>& qp, auth::service& au
namespace sm = seastar::metrics;
_metrics.add_group("transport", {
sm::make_derive("cql-connections", _connects,
sm::make_derive("startups", _stats.startups,
sm::description("Counts the total number of received CQL STARTUP messages.")),
sm::make_derive("auth_responses", _stats.auth_responses,
sm::description("Counts the total number of received CQL AUTH messages.")),
sm::make_derive("options_requests", _stats.options_requests,
sm::description("Counts the total number of received CQL OPTIONS messages.")),
sm::make_derive("query_requests", _stats.query_requests,
sm::description("Counts the total number of received CQL QUERY messages.")),
sm::make_derive("prepare_requests", _stats.prepare_requests,
sm::description("Counts the total number of received CQL PREPARE messages.")),
sm::make_derive("execute_requests", _stats.execute_requests,
sm::description("Counts the total number of received CQL EXECUTE messages.")),
sm::make_derive("batch_requests", _stats.batch_requests,
sm::description("Counts the total number of received CQL BATCH messages.")),
sm::make_derive("register_requests", _stats.register_requests,
sm::description("Counts the total number of received CQL REGISTER messages.")),
sm::make_derive("cql-connections", _stats.connects,
sm::description("Counts a number of client connections.")),
sm::make_gauge("current_connections", _connections,
sm::make_gauge("current_connections", _stats.connections,
sm::description("Holds a current number of client connections.")),
sm::make_derive("requests_served", _requests_served,
sm::make_derive("requests_served", _stats.requests_served,
sm::description("Counts a number of served requests.")),
sm::make_gauge("requests_serving", _requests_serving,
sm::make_gauge("requests_serving", _stats.requests_serving,
sm::description("Holds a number of requests that are being processed right now.")),
sm::make_gauge("requests_blocked_memory_current", [this] { return _memory_available.waiters(); },
sm::description(
seastar::format("Holds the number of requests that are currently blocked due to reaching the memory quota limit ({}B). "
"Non-zero value indicates that our bottleneck is memory and more specifically - the memory quota allocated for the \"CQL transport\" component.", _max_request_size))),
sm::make_derive("requests_blocked_memory", _requests_blocked_memory,
sm::make_derive("requests_blocked_memory", _stats.requests_blocked_memory,
sm::description(
seastar::format("Holds an incrementing counter with the requests that ever blocked due to reaching the memory quota limit ({}B). "
"The first derivative of this value shows how often we block due to memory exhaustion in the \"CQL transport\" component.", _max_request_size))),
sm::make_derive("requests_shed", _requests_shed,
sm::make_derive("requests_shed", _stats.requests_shed,
sm::description("Holds an incrementing counter with the requests that were shed due to overload (threshold configured via max_concurrent_requests_per_shard). "
"The first derivative of this value shows how often we shed requests due to overload in the \"CQL transport\" component.")),
sm::make_gauge("requests_memory_available", [this] { return _memory_available.current(); },
@@ -262,8 +286,8 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
fd.set_nodelay(true);
fd.set_keepalive(keepalive);
auto conn = make_shared<connection>(*this, server_addr, std::move(fd), std::move(addr));
++_connects;
++_connections;
++_stats.connects;
++_stats.connections;
// Move the processing into the background.
(void)futurize_invoke([this, conn] {
return advertise_new_connection(conn); // Notify any listeners about new connection.
@@ -275,7 +299,7 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
}
// Block while monitoring for lifetime/errors.
return conn->process().finally([this, conn] {
--_connections;
--_stats.connections;
return unadvertise_connection(conn);
}).handle_exception([] (std::exception_ptr ep) {
try {
@@ -478,7 +502,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
auto stop_trace = defer([&] {
tracing::stop_foreground(trace_state);
});
--_server._requests_serving;
--_server._stats.requests_serving;
try {
foreign_ptr<std::unique_ptr<cql_server::response>> response = f.get0();
@@ -639,22 +663,22 @@ future<> cql_server::connection::process_request() {
f.length, mem_estimate, _server._max_request_size)));
}
if (_server._requests_serving > _server._max_concurrent_requests) {
++_server._requests_shed;
if (_server._stats.requests_serving > _server._max_concurrent_requests) {
++_server._stats.requests_shed;
return make_exception_future<>(
exceptions::overloaded_exception(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _server._requests_serving)));
exceptions::overloaded_exception(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _server._stats.requests_serving)));
}
auto fut = get_units(_server._memory_available, mem_estimate);
if (_server._memory_available.waiters()) {
++_server._requests_blocked_memory;
++_server._stats.requests_blocked_memory;
}
return fut.then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) {
return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable {
++_server._requests_served;
++_server._requests_serving;
++_server._stats.requests_served;
++_server._stats.requests_serving;
_pending_requests_gate.enter();
auto leave = defer([this] { _pending_requests_gate.leave(); });
@@ -755,6 +779,7 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
++_server._stats.startups;
auto options = in.read_string_map();
auto compression_opt = options.find("COMPRESSION");
if (compression_opt != options.end()) {
@@ -816,6 +841,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
++_server._stats.auth_responses;
auto sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge();
auto buf = in.read_raw_bytes_view(in.bytes_left());
auto challenge = sasl_challenge->evaluate_response(buf);
@@ -839,6 +865,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_au
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
++_server._stats.options_requests;
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, std::move(trace_state)));
}
@@ -924,11 +951,14 @@ process_query_internal(service::client_state& client_state, distributed<cql3::qu
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) {
++_server._stats.query_requests;
return process(stream, in, client_state, std::move(permit), std::move(trace_state), process_query_internal);
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
++_server._stats.prepare_requests;
auto query = sstring(in.read_long_string_view());
tracing::add_query(trace_state, query);
@@ -1031,6 +1061,7 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connection::process_execute(uint16_t stream, request_reader in,
service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) {
++_server._stats.execute_requests;
return process(stream, in, client_state, std::move(permit), std::move(trace_state), process_execute_internal);
}
@@ -1152,12 +1183,14 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit,
tracing::trace_state_ptr trace_state) {
++_server._stats.batch_requests;
return process(stream, in, client_state, permit, std::move(trace_state), process_batch_internal);
}
future<std::unique_ptr<cql_server::response>>
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
++_server._stats.register_requests;
std::vector<sstring> event_types;
in.read_string_list(event_types);
for (auto&& event_type : event_types) {

View File

@@ -117,6 +117,26 @@ struct cql_server_config {
};
class cql_server : public seastar::peering_sharded_service<cql_server> {
private:
struct transport_stats {
// server stats
uint64_t connects;
uint64_t connections;
uint64_t requests_served;
uint32_t requests_serving;
uint64_t requests_blocked_memory;
uint64_t requests_shed;
// cql message stats
uint64_t startups;
uint64_t auth_responses;
uint64_t options_requests;
uint64_t query_requests;
uint64_t prepare_requests;
uint64_t execute_requests;
uint64_t batch_requests;
uint64_t register_requests;
};
private:
class event_notifier;
@@ -131,12 +151,7 @@ private:
seastar::metrics::metric_groups _metrics;
std::unique_ptr<event_notifier> _notifier;
private:
uint64_t _connects = 0;
uint64_t _connections = 0;
uint64_t _requests_served = 0;
uint32_t _requests_serving = 0;
uint64_t _requests_blocked_memory = 0;
uint64_t _requests_shed = 0;
transport_stats _stats = {};
auth::service& _auth_service;
public:
cql_server(distributed<cql3::query_processor>& qp, auth::service&,