Merge '[Backport 2025.4] Introduce TTL and retries to address resolution' from Scylladb[bot]
In production environments, we observed cases where the S3 client would repeatedly fail to connect due to DNS entries becoming stale. Because the existing logic only attempted the first resolved address and lacked a way to refresh DNS state, the client could get stuck in a failure loop. Introduce RR TTL and connection failure retry to - re-resolve the RR in a timely manner - forcefully reset and re-resolve addresses - add a special case when the TTL is 0 and the record must be resolved for every request Fixes: CUSTOMER-96 Fixes: CUSTOMER-139 Should be backported to 2025.3/4 and 2026.1 since we already encountered it in the production clusters for 2025.3 - (cherry picked from commitbd9d5ad75b) - (cherry picked from commit359d0b7a3e) - (cherry picked from commitce0c7b5896) - (cherry picked from commit5b3e513cba) - (cherry picked from commit66a33619da) - (cherry picked from commit6eb7dba352) - (cherry picked from commita05a4593a6) - (cherry picked from commit3a31380b2c) - (cherry picked from commit912c48a806) Parent PR: #27891 Closes scylladb/scylladb#28404 * https://github.com/scylladb/scylladb: connection_factory: includes cleanup dns_connection_factory: refine the move constructor connection_factory: retry on failure connection_factory: introduce TTL timer connection_factory: get rid of shared_future in dns_connection_factory connection_factory: extract connection logic into a member connection_factory: remove unnecessary `else` connection_factory: use all resolved DNS addresses s3_test: remove client double-close
This commit is contained in:
@@ -766,7 +766,6 @@ void test_chunked_download_data_source(const client_maker_function& client_maker
|
||||
#endif
|
||||
|
||||
cln->delete_object(object_name).get();
|
||||
cln->close().get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_minio) {
|
||||
|
||||
111
utils/http.cc
111
utils/http.cc
@@ -23,39 +23,75 @@ future<shared_ptr<tls::certificate_credentials>> utils::http::system_trust_crede
|
||||
co_return system_trust_credentials;
|
||||
}
|
||||
|
||||
utils::http::dns_connection_factory::state::state(shared_ptr<tls::certificate_credentials> cin)
|
||||
: creds(std::move(cin))
|
||||
{}
|
||||
|
||||
future<> utils::http::dns_connection_factory::initialize(lw_shared_ptr<state> state, std::string host, int port, bool use_https, logging::logger& logger) {
|
||||
co_await coroutine::all(
|
||||
[state, host, port] () -> future<> {
|
||||
auto hent = co_await net::dns::get_host_by_name(host, net::inet_address::family::INET);
|
||||
state->addr = socket_address(hent.addr_list.front(), port);
|
||||
},
|
||||
[state, use_https] () -> future<> {
|
||||
if (use_https && !state->creds) {
|
||||
state->creds = co_await system_trust_credentials();
|
||||
}
|
||||
if (!use_https) {
|
||||
state->creds = {};
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
state->initialized = true;
|
||||
logger.debug("Initialized factory, address={} tls={}", state->addr, state->creds == nullptr ? "no" : "yes");
|
||||
future<> utils::http::dns_connection_factory::init_credentials() {
|
||||
if (_use_https && !_creds) {
|
||||
_creds = co_await system_trust_credentials();
|
||||
}
|
||||
if (!_use_https) {
|
||||
_creds = {};
|
||||
}
|
||||
_logger.debug("Initialized credentials, tls={}", _creds == nullptr ? "no" : "yes");
|
||||
}
|
||||
|
||||
utils::http::dns_connection_factory::dns_connection_factory(dns_connection_factory&&) = default;
|
||||
future<net::inet_address> utils::http::dns_connection_factory::get_address() {
|
||||
auto get_addr = [this] -> net::inet_address {
|
||||
const auto& addresses = _addr_list.value();
|
||||
return addresses[_addr_pos++ % addresses.size()];
|
||||
};
|
||||
|
||||
if (_addr_list) {
|
||||
co_return get_addr();
|
||||
}
|
||||
auto units = co_await get_units(_init_semaphore, 1);
|
||||
if (!_addr_list) {
|
||||
auto hent = co_await net::dns::get_host_by_name(_host, net::inet_address::family::INET);
|
||||
_address_ttl = std::ranges::min_element(hent.addr_entries, [](const net::hostent::address_entry& lhs, const net::hostent::address_entry& rhs) {
|
||||
return lhs.ttl < rhs.ttl;
|
||||
})->ttl;
|
||||
if (_address_ttl.count() == 0) {
|
||||
co_return hent.addr_entries[_addr_pos++ % hent.addr_entries.size()].addr;
|
||||
}
|
||||
_addr_list = hent.addr_entries | std::views::transform(&net::hostent::address_entry::addr) | std::ranges::to<std::vector>();
|
||||
_addr_update_timer.rearm(lowres_clock::now() + _address_ttl);
|
||||
}
|
||||
|
||||
co_return get_addr();
|
||||
}
|
||||
|
||||
future<shared_ptr<tls::certificate_credentials>> utils::http::dns_connection_factory::get_creds() {
|
||||
if (!_creds_init) [[unlikely]] {
|
||||
auto units = co_await get_units(_init_semaphore, 1);
|
||||
if (!_creds_init) {
|
||||
co_await init_credentials();
|
||||
_creds_init = true;
|
||||
}
|
||||
}
|
||||
co_return _creds;
|
||||
}
|
||||
|
||||
future<connected_socket> utils::http::dns_connection_factory::connect(net::inet_address address) {
|
||||
auto socket_addr = socket_address(address, _port);
|
||||
if (auto creds = co_await get_creds()) {
|
||||
_logger.debug("Making new HTTPS connection addr={} host={}", socket_addr, _host);
|
||||
co_return co_await tls::connect(creds, socket_addr, tls::tls_options{.server_name = _host});
|
||||
}
|
||||
_logger.debug("Making new HTTP connection addr={} host={}", socket_addr, _host);
|
||||
co_return co_await seastar::connect(socket_addr, {}, transport::TCP);
|
||||
}
|
||||
|
||||
utils::http::dns_connection_factory::dns_connection_factory(std::string host, int port, bool use_https, logging::logger& logger, shared_ptr<tls::certificate_credentials> certs)
|
||||
: _host(std::move(host))
|
||||
, _port(port)
|
||||
, _logger(logger)
|
||||
, _state(make_lw_shared<state>(std::move(certs)))
|
||||
, _done(initialize(_state, _host, _port, use_https, _logger))
|
||||
{}
|
||||
,_creds(std::move(certs))
|
||||
, _use_https(use_https)
|
||||
, _addr_update_timer([this] {
|
||||
if (auto units = try_get_units(_init_semaphore, 1)) {
|
||||
_addr_list.reset();
|
||||
}
|
||||
}) {
|
||||
_addr_update_timer.arm(lowres_clock::now());
|
||||
}
|
||||
|
||||
utils::http::dns_connection_factory::dns_connection_factory(std::string uri, logging::logger& logger, shared_ptr<tls::certificate_credentials> certs)
|
||||
: dns_connection_factory([&] {
|
||||
@@ -68,18 +104,21 @@ utils::http::dns_connection_factory::dns_connection_factory(std::string uri, log
|
||||
{}
|
||||
|
||||
future<connected_socket> utils::http::dns_connection_factory::make(abort_source*) {
|
||||
if (!_state->initialized) {
|
||||
_logger.debug("Waiting for factory to initialize");
|
||||
co_await _done.get_future();
|
||||
try {
|
||||
auto address = co_await get_address();
|
||||
co_return co_await connect(address);
|
||||
} catch (...) {
|
||||
// On failure, forcefully renew address resolution and try again
|
||||
_logger.debug("Connection failed, resetting address provider and retrying: {}", std::current_exception());
|
||||
}
|
||||
_addr_list.reset();
|
||||
auto address = co_await get_address();
|
||||
co_return co_await connect(address);
|
||||
}
|
||||
|
||||
if (_state->creds) {
|
||||
_logger.debug("Making new HTTPS connection addr={} host={}", _state->addr, _host);
|
||||
co_return co_await tls::connect(_state->creds, _state->addr, tls::tls_options{.server_name = _host});
|
||||
} else {
|
||||
_logger.debug("Making new HTTP connection addr={} host={}", _state->addr, _host);
|
||||
co_return co_await seastar::connect(_state->addr, {}, transport::TCP);
|
||||
}
|
||||
future<> utils::http::dns_connection_factory::close() {
|
||||
_addr_update_timer.cancel();
|
||||
co_await get_units(_init_semaphore, 1);
|
||||
}
|
||||
|
||||
static const char HTTPS[] = "https";
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/http/client.hh>
|
||||
#include <seastar/net/dns.hh>
|
||||
#include <seastar/net/tls.hh>
|
||||
@@ -26,23 +25,26 @@ protected:
|
||||
std::string _host;
|
||||
int _port;
|
||||
logging::logger& _logger;
|
||||
struct state {
|
||||
bool initialized = false;
|
||||
socket_address addr;
|
||||
shared_ptr<tls::certificate_credentials> creds;
|
||||
state(shared_ptr<tls::certificate_credentials>);
|
||||
};
|
||||
lw_shared_ptr<state> _state;
|
||||
shared_future<> _done;
|
||||
semaphore _init_semaphore{1};
|
||||
bool _creds_init = false;
|
||||
std::optional<std::vector<net::inet_address>> _addr_list;
|
||||
shared_ptr<tls::certificate_credentials> _creds;
|
||||
uint16_t _addr_pos{0};
|
||||
bool _use_https;
|
||||
std::chrono::seconds _address_ttl{0};
|
||||
timer<lowres_clock> _addr_update_timer;
|
||||
|
||||
// This method can out-live the factory instance, in case `make()` is never called before the instance is destroyed.
|
||||
static future<> initialize(lw_shared_ptr<state> state, std::string host, int port, bool use_https, logging::logger& logger);
|
||||
future<> init_credentials();
|
||||
future<net::inet_address> get_address();
|
||||
future<shared_ptr<tls::certificate_credentials>> get_creds();
|
||||
future<connected_socket> connect(net::inet_address address);
|
||||
public:
|
||||
dns_connection_factory(dns_connection_factory&&);
|
||||
dns_connection_factory(dns_connection_factory&&) = default;
|
||||
dns_connection_factory(std::string host, int port, bool use_https, logging::logger& logger, shared_ptr<tls::certificate_credentials> = {});
|
||||
dns_connection_factory(std::string endpoint_url, logging::logger& logger, shared_ptr<tls::certificate_credentials> = {});
|
||||
|
||||
virtual future<connected_socket> make(abort_source*) override;
|
||||
future<> close() override;
|
||||
};
|
||||
|
||||
// simple URL parser, just enough to handle required aspects for normal endpoint usage
|
||||
|
||||
Reference in New Issue
Block a user