diff --git a/service_permit.hh b/service_permit.hh index 0934d9e86a..5aeec14987 100644 --- a/service_permit.hh +++ b/service_permit.hh @@ -18,6 +18,13 @@ class service_permit { friend service_permit empty_service_permit(); public: size_t count() const { return _permit ? _permit->count() : 0; }; + // Merge additional semaphore units into this permit. + // Used to grow the permit after the actual resource cost is known. + void adopt(seastar::semaphore_units<>&& units) { + if (_permit) { + _permit->adopt(std::move(units)); + } + } }; inline service_permit make_service_permit(seastar::semaphore_units<>&& permit) { diff --git a/transport/server.cc b/transport/server.cc index f13a472765..7c63e1207e 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -240,6 +240,12 @@ void cql_sg_stats::register_metrics() ); } + transport_metrics.emplace_back( + sm::make_gauge("cql_pending_response_memory", [this] { return _pending_response_memory; }, + sm::description("Holds the total memory in bytes consumed by responses waiting to be sent."), + {{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty() + ); + new_metrics.add_group("transport", std::move(transport_metrics)); _metrics = std::exchange(new_metrics, {}); } @@ -817,6 +823,8 @@ future<> cql_server::connection::process_request() { future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future>> response_f) mutable { try { + auto& sg_stats = _server.get_cql_sg_stats(); + size_t pending_response_size = 0; if (response_f.failed()) { const auto message = format("request processing failed, error [{}]", response_f.get_exception()); clogger.error("{}: {}", _client_state.get_remote_address(), message); @@ -824,9 +832,22 @@ future<> cql_server::connection::process_request() { message, tracing::trace_state_ptr())); } else { - write_response(response_f.get(), std::move(mem_permit), _compression); + auto response = response_f.get(); + // Account for response body size exceeding the initial estimate. + auto resp_size = response->size(); + auto permit_size = mem_permit.count(); + if (resp_size > permit_size) { + auto extra = resp_size - permit_size; + auto extra_units = consume_units(_server._memory_available, extra); + mem_permit.adopt(std::move(extra_units)); + } + pending_response_size = resp_size; + sg_stats._pending_response_memory += pending_response_size; + write_response(std::move(response), _compression); } - _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {}); + _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] { + sg_stats._pending_response_memory -= pending_response_size; + }); } catch (...) { clogger.error("{}: request processing failed: {}", _client_state.get_remote_address(), std::current_exception()); @@ -1667,9 +1688,9 @@ cql_server::connection::make_schema_change_event(const event::schema_change& eve return response; } -void cql_server::connection::write_response(foreign_ptr>&& response, service_permit permit, cql_compression compression) +void cql_server::connection::write_response(foreign_ptr>&& response, cql_compression compression) { - _ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response), permit = std::move(permit)] () mutable { + _ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response)] () mutable { auto message = response->make_message(_version, compression); message.on_delete([response = std::move(response)] { }); return _write_buf.write(std::move(message)).then([this] { diff --git a/transport/server.hh b/transport/server.hh index d67b74f9cd..a99314a4b5 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -135,6 +135,10 @@ struct cql_sg_stats { request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) { return _cql_requests_stats[static_cast(op)]; } void register_metrics(); void rename_metrics(); + + // Track total memory consumed by responses waiting to be sent. + // Incremented when a response is queued, decremented when the write completes. + int64_t _pending_response_memory = 0; private: bool _use_metrics = false; seastar::metrics::metric_groups _metrics; @@ -200,8 +204,11 @@ public: service::endpoint_lifecycle_subscriber* get_lifecycle_listener() const noexcept; service::migration_listener* get_migration_listener() const noexcept; qos::qos_configuration_change_subscriber* get_qos_configuration_listener() const noexcept; + cql_sg_stats& get_cql_sg_stats() { + return scheduling_group_get_specific(_stats_key); + } cql_sg_stats::request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) { - return scheduling_group_get_specific(_stats_key).get_cql_opcode_stats(op); + return get_cql_sg_stats().get_cql_opcode_stats(op); } future> get_client_data(); @@ -330,7 +337,8 @@ private: process_on_shard(shard_id shard, uint16_t stream, fragmented_temporary_buffer::istream is, service::client_state& cs, tracing::trace_state_ptr trace_state, cql3::dialect dialect, cql3::computed_function_values&& cached_vals, Process process_fn); - void write_response(foreign_ptr>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none); + void write_response(foreign_ptr>&& response, cql_compression compression = cql_compression::none); + friend event_notifier; };