diff --git a/transport/server.cc b/transport/server.cc index 40ec149c56..1e0a612392 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -160,27 +160,51 @@ cql_server::cql_server(distributed& qp, auth::service& au namespace sm = seastar::metrics; _metrics.add_group("transport", { - sm::make_derive("cql-connections", _connects, + sm::make_derive("startups", _stats.startups, + sm::description("Counts the total number of received CQL STARTUP messages.")), + + sm::make_derive("auth_responses", _stats.auth_responses, + sm::description("Counts the total number of received CQL AUTH messages.")), + + sm::make_derive("options_requests", _stats.options_requests, + sm::description("Counts the total number of received CQL OPTIONS messages.")), + + sm::make_derive("query_requests", _stats.query_requests, + sm::description("Counts the total number of received CQL QUERY messages.")), + + sm::make_derive("prepare_requests", _stats.prepare_requests, + sm::description("Counts the total number of received CQL PREPARE messages.")), + + sm::make_derive("execute_requests", _stats.execute_requests, + sm::description("Counts the total number of received CQL EXECUTE messages.")), + + sm::make_derive("batch_requests", _stats.batch_requests, + sm::description("Counts the total number of received CQL BATCH messages.")), + + sm::make_derive("register_requests", _stats.register_requests, + sm::description("Counts the total number of received CQL REGISTER messages.")), + + sm::make_derive("cql-connections", _stats.connects, sm::description("Counts a number of client connections.")), - sm::make_gauge("current_connections", _connections, + sm::make_gauge("current_connections", _stats.connections, sm::description("Holds a current number of client connections.")), - sm::make_derive("requests_served", _requests_served, + sm::make_derive("requests_served", _stats.requests_served, sm::description("Counts a number of served requests.")), - sm::make_gauge("requests_serving", _requests_serving, + sm::make_gauge("requests_serving", _stats.requests_serving, sm::description("Holds a number of requests that are being processed right now.")), sm::make_gauge("requests_blocked_memory_current", [this] { return _memory_available.waiters(); }, sm::description( seastar::format("Holds the number of requests that are currently blocked due to reaching the memory quota limit ({}B). " "Non-zero value indicates that our bottleneck is memory and more specifically - the memory quota allocated for the \"CQL transport\" component.", _max_request_size))), - sm::make_derive("requests_blocked_memory", _requests_blocked_memory, + sm::make_derive("requests_blocked_memory", _stats.requests_blocked_memory, sm::description( seastar::format("Holds an incrementing counter with the requests that ever blocked due to reaching the memory quota limit ({}B). " "The first derivative of this value shows how often we block due to memory exhaustion in the \"CQL transport\" component.", _max_request_size))), - sm::make_derive("requests_shed", _requests_shed, + sm::make_derive("requests_shed", _stats.requests_shed, sm::description("Holds an incrementing counter with the requests that were shed due to overload (threshold configured via max_concurrent_requests_per_shard). " "The first derivative of this value shows how often we shed requests due to overload in the \"CQL transport\" component.")), sm::make_gauge("requests_memory_available", [this] { return _memory_available.current(); }, @@ -262,8 +286,8 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) { fd.set_nodelay(true); fd.set_keepalive(keepalive); auto conn = make_shared(*this, server_addr, std::move(fd), std::move(addr)); - ++_connects; - ++_connections; + ++_stats.connects; + ++_stats.connections; // Move the processing into the background. (void)futurize_invoke([this, conn] { return advertise_new_connection(conn); // Notify any listeners about new connection. @@ -275,7 +299,7 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) { } // Block while monitoring for lifetime/errors. return conn->process().finally([this, conn] { - --_connections; + --_stats.connections; return unadvertise_connection(conn); }).handle_exception([] (std::exception_ptr ep) { try { @@ -478,7 +502,7 @@ future>> auto stop_trace = defer([&] { tracing::stop_foreground(trace_state); }); - --_server._requests_serving; + --_server._stats.requests_serving; try { foreign_ptr> response = f.get0(); @@ -639,22 +663,22 @@ future<> cql_server::connection::process_request() { f.length, mem_estimate, _server._max_request_size))); } - if (_server._requests_serving > _server._max_concurrent_requests) { - ++_server._requests_shed; + if (_server._stats.requests_serving > _server._max_concurrent_requests) { + ++_server._stats.requests_shed; return make_exception_future<>( - exceptions::overloaded_exception(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _server._requests_serving))); + exceptions::overloaded_exception(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _server._stats.requests_serving))); } auto fut = get_units(_server._memory_available, mem_estimate); if (_server._memory_available.waiters()) { - ++_server._requests_blocked_memory; + ++_server._stats.requests_blocked_memory; } return fut.then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) { return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable { - ++_server._requests_served; - ++_server._requests_serving; + ++_server._stats.requests_served; + ++_server._stats.requests_serving; _pending_requests_gate.enter(); auto leave = defer([this] { _pending_requests_gate.leave(); }); @@ -755,6 +779,7 @@ future cql_server::connection::read_and_decompress_ future> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state) { + ++_server._stats.startups; auto options = in.read_string_map(); auto compression_opt = options.find("COMPRESSION"); if (compression_opt != options.end()) { @@ -816,6 +841,7 @@ future> cql_server::connection::process_st future> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state) { + ++_server._stats.auth_responses; auto sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge(); auto buf = in.read_raw_bytes_view(in.bytes_left()); auto challenge = sasl_challenge->evaluate_response(buf); @@ -839,6 +865,7 @@ future> cql_server::connection::process_au future> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state) { + ++_server._stats.options_requests; return make_ready_future>(make_supported(stream, std::move(trace_state))); } @@ -924,11 +951,14 @@ process_query_internal(service::client_state& client_state, distributed>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) { + ++_server._stats.query_requests; return process(stream, in, client_state, std::move(permit), std::move(trace_state), process_query_internal); } future> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state) { + ++_server._stats.prepare_requests; + auto query = sstring(in.read_long_string_view()); tracing::add_query(trace_state, query); @@ -1031,6 +1061,7 @@ process_execute_internal(service::client_state& client_state, distributed>> cql_server::connection::process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) { + ++_server._stats.execute_requests; return process(stream, in, client_state, std::move(permit), std::move(trace_state), process_execute_internal); } @@ -1152,12 +1183,14 @@ process_batch_internal(service::client_state& client_state, distributed>> cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) { + ++_server._stats.batch_requests; return process(stream, in, client_state, permit, std::move(trace_state), process_batch_internal); } future> cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state) { + ++_server._stats.register_requests; std::vector event_types; in.read_string_list(event_types); for (auto&& event_type : event_types) { diff --git a/transport/server.hh b/transport/server.hh index a5572b4ed5..59edd94d98 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -117,6 +117,26 @@ struct cql_server_config { }; class cql_server : public seastar::peering_sharded_service { +private: + struct transport_stats { + // server stats + uint64_t connects; + uint64_t connections; + uint64_t requests_served; + uint32_t requests_serving; + uint64_t requests_blocked_memory; + uint64_t requests_shed; + + // cql message stats + uint64_t startups; + uint64_t auth_responses; + uint64_t options_requests; + uint64_t query_requests; + uint64_t prepare_requests; + uint64_t execute_requests; + uint64_t batch_requests; + uint64_t register_requests; + }; private: class event_notifier; @@ -131,12 +151,7 @@ private: seastar::metrics::metric_groups _metrics; std::unique_ptr _notifier; private: - uint64_t _connects = 0; - uint64_t _connections = 0; - uint64_t _requests_served = 0; - uint32_t _requests_serving = 0; - uint64_t _requests_blocked_memory = 0; - uint64_t _requests_shed = 0; + transport_stats _stats = {}; auth::service& _auth_service; public: cql_server(distributed& qp, auth::service&,