vector_search: add unreachable node detection time config

Add option `vector_store_unreachable_node_detection_time_in_ms` to
control parameters related to detecting unreachable vector store nodes.
This parameter is used to set the TCP connect timeout, keepalive
parameters, and TCP_USER_TIMEOUT. By configuring these parameters,
we can detect unreachable vector store nodes faster and trigger
failover mechanisms in a timely manner.
This commit is contained in:
Karol Nowacki
2026-02-17 09:37:12 +01:00
committed by Nadav Har'El
parent 686029f52c
commit 9269ca9cf7
9 changed files with 45 additions and 40 deletions

View File

@@ -1234,6 +1234,9 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_unreachable_node_detection_time_in_ms(this, "vector_store_unreachable_node_detection_time_in_ms", liveness::LiveUpdate, value_status::Used, 5000,
"Time in milliseconds for detecting an unreachable vector store node. This value is applied to the TCP connect timeout, keepalive parameters, and TCP_USER_TIMEOUT. "
"When any of these mechanisms detects that a node is unreachable within this window, the client fails over to the next available vector store node. Minimum value is 5000.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in `vector_store_primary_uri` and `vector_store_secondary_uri`. The available options are:\n"
"* truststore: (Default: <not set, use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")

View File

@@ -366,6 +366,7 @@ public:
named_value<string_map> request_scheduler_options;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<uint32_t> vector_store_unreachable_node_detection_time_in_ms;
named_value<string_map> vector_store_encryption_options;
named_value<sstring> authenticator;
named_value<sstring> internode_authenticator;

View File

@@ -27,7 +27,7 @@ namespace {
logging::logger client_test_logger("client_test");
const auto REQUEST_TIMEOUT = utils::updateable_value<uint32_t>{100};
const auto UNREACHABLE_NODE_DETECTION_WINDOW = utils::updateable_value<uint32_t>{std::chrono::milliseconds(test::vector_search::STANDARD_WAIT).count()};
constexpr auto PATH = "/api/v1/indexes/ks/idx/ann";
constexpr auto CONTENT = R"({"vector": [0.1, 0.2, 0.3], "limit": 10})";
@@ -47,7 +47,7 @@ future<std::unique_ptr<vs_mock_server>> make_available(std::unique_ptr<unavailab
SEASTAR_TEST_CASE(is_up_after_construction) {
auto server = co_await make_vs_mock_server();
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
BOOST_CHECK(client.is_up());
@@ -58,7 +58,7 @@ SEASTAR_TEST_CASE(is_up_after_construction) {
SEASTAR_TEST_CASE(is_up_when_server_returned_ok_status) {
abort_source_timeout as;
auto server = co_await make_vs_mock_server();
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
@@ -73,7 +73,7 @@ SEASTAR_TEST_CASE(is_up_when_server_returned_client_error_status) {
abort_source_timeout as;
auto server = co_await make_vs_mock_server();
server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::bad_request, "Bad request"});
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
@@ -89,7 +89,7 @@ SEASTAR_TEST_CASE(is_up_when_request_is_aborted) {
abort_source as;
auto server = co_await make_vs_mock_server();
server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::ok, "{}"});
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
as.request_abort();
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as);
@@ -107,7 +107,7 @@ SEASTAR_TEST_CASE(is_up_when_server_returned_server_error_status) {
auto server = co_await make_vs_mock_server();
server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"});
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
@@ -124,7 +124,7 @@ SEASTAR_TEST_CASE(is_up_when_server_returned_service_unavailable_status) {
auto server = co_await make_vs_mock_server();
server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::service_unavailable, "Service Unavailable"});
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
@@ -139,7 +139,7 @@ SEASTAR_TEST_CASE(is_up_when_server_returned_service_unavailable_status) {
SEASTAR_TEST_CASE(is_down_when_server_is_not_available) {
abort_source_timeout as;
auto down_server = co_await make_unavailable_server();
client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(down_server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
@@ -154,7 +154,7 @@ SEASTAR_TEST_CASE(is_down_when_server_is_not_available) {
SEASTAR_TEST_CASE(becomes_up_when_server_status_is_serving) {
abort_source_timeout as;
auto down_server = co_await make_unavailable_server();
client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(down_server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
auto server = co_await make_available(down_server);
@@ -179,7 +179,7 @@ SEASTAR_TEST_CASE(remains_down_when_server_status_is_not_serving) {
};
for (auto const& status : non_serving_statuses) {
auto down_server = co_await make_unavailable_server();
client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT, shared_ptr<seastar::tls::certificate_credentials>{}};
client client{client_test_logger, make_endpoint(down_server), UNREACHABLE_NODE_DETECTION_WINDOW, shared_ptr<seastar::tls::certificate_credentials>{}};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
BOOST_CHECK(!res);
@@ -215,7 +215,7 @@ SEASTAR_TEST_CASE(is_down_when_connection_times_out) {
co_await client.close();
}
SEASTAR_TEST_CASE(connection_timeout_cannot_be_smaller_than_5s) {
SEASTAR_TEST_CASE(unreachable_node_detection_window_cannot_be_smaller_than_5s) {
abort_source_timeout as;
auto unreachable = co_await make_unreachable_socket();
client client{client_test_logger, client::endpoint_type{unreachable.host, unreachable.port, seastar::net::inet_address(unreachable.host)},

View File

@@ -927,7 +927,7 @@ SEASTAR_TEST_CASE(vector_store_client_single_status_check_after_concurrent_failu
}));
}
SEASTAR_TEST_CASE(vector_store_client_updates_backoff_max_time_from_read_request_timeout_cfg) {
SEASTAR_TEST_CASE(vector_store_client_updates_backoff_max_time_from_read_connection_timeout_cfg) {
auto unavail_s = co_await make_unavailable_server();
auto cfg = make_config();
cfg.db_config->vector_store_primary_uri.set(format("http://unavail.node:{}", unavail_s->port()));
@@ -940,7 +940,7 @@ SEASTAR_TEST_CASE(vector_store_client_updates_backoff_max_time_from_read_request
vs.start_background_tasks();
// Set request timeout to 100ms, hence max backoff time is 2x100ms = 200ms.
cfg.db_config->read_request_timeout_in_ms.set(100);
cfg.db_config->vector_store_unreachable_node_detection_time_in_ms.set(100);
// Trigger status checking by making ANN request to unavailable server.
auto result = co_await vs.ann("ks", "idx", schema, std::vector<float>{0.1, 0.2, 0.3}, 2, rjson::empty_object(), as.reset());
BOOST_CHECK(!result);
@@ -1189,8 +1189,9 @@ SEASTAR_TEST_CASE(vector_store_client_high_availability_unreachable) {
auto cfg = make_config();
cfg.db_config->vector_store_primary_uri.set(format("http://unreachable.node:{}", unreachable.port));
cfg.db_config->vector_store_secondary_uri.set(format("http://server.node:{}", server->port()));
cfg.db_config->request_timeout_in_ms.set(5000); // connection timeout to the vector store
cfg.query_timeout = make_query_timeout(std::chrono::seconds(10)); // CQL SELECT query timeout longer than connection timeout
cfg.db_config->vector_store_unreachable_node_detection_time_in_ms.set(5000); // unreachable node detection time for vector store
cfg.query_timeout = make_query_timeout(
std::chrono::seconds(10)); // CQL query timeout longer than vector store unreachable node detection time, to allow for fallback to secondary URI
co_await do_with_cql_env(
[&](cql_test_env& env) -> future<> {
auto schema = co_await create_test_table(env, "ks", "test");
@@ -1201,7 +1202,7 @@ SEASTAR_TEST_CASE(vector_store_client_high_availability_unreachable) {
auto result = co_await env.execute_cql("CREATE CUSTOM INDEX idx ON ks.test (embedding) USING 'vector_index'");
// Execute an ANN SELECT. The primary URI points to an unreachable socket,
// so the client's connection attempt to the primary will fail (request_timeout_in_ms = 5000).
// so the client's connection attempt to the primary will fail (vector_store_unreachable_node_detection_time_in_ms = 5000).
// The client is expected to transparently fall back to the secondary URI and
// complete the request successfully. The entire operation must complete within
// the configured CQL query timeout (10s), so the test expects a normal rows result.

View File

@@ -70,11 +70,11 @@ class client_connection_factory : public http::experimental::connection_factory
shared_ptr<tls::certificate_credentials> _creds;
public:
explicit client_connection_factory(
client::endpoint_type endpoint, shared_ptr<tls::certificate_credentials> creds, utils::updateable_value<uint32_t> connect_timeout_in_ms)
explicit client_connection_factory(client::endpoint_type endpoint, shared_ptr<tls::certificate_credentials> creds,
utils::updateable_value<uint32_t> unreachable_node_detection_time_in_ms)
: _endpoint(std::move(endpoint))
, _creds(std::move(creds))
, _connect_timeout_in_ms(std::move(connect_timeout_in_ms)) {
, _unreachable_node_detection_time_in_ms(std::move(unreachable_node_detection_time_in_ms)) {
}
future<connected_socket> make([[maybe_unused]] abort_source* as) override {
@@ -111,14 +111,14 @@ private:
std::chrono::milliseconds timeout() const {
constexpr std::chrono::milliseconds MIN_TIMEOUT = 5s;
auto timeout_ms = std::chrono::milliseconds(_connect_timeout_in_ms.get());
auto timeout_ms = std::chrono::milliseconds(_unreachable_node_detection_time_in_ms.get());
if (timeout_ms < MIN_TIMEOUT) {
timeout_ms = MIN_TIMEOUT;
}
return timeout_ms;
}
utils::updateable_value<uint32_t> _connect_timeout_in_ms;
utils::updateable_value<uint32_t> _unreachable_node_detection_time_in_ms;
};
bool is_server_unavailable(std::exception_ptr& err) {
@@ -144,12 +144,12 @@ auto constexpr BACKOFF_RETRY_MIN_TIME = 100ms;
} // namespace
client::client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value<uint32_t> request_timeout_in_ms,
client::client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value<uint32_t> unreachable_node_detection_time_in_ms,
::shared_ptr<seastar::tls::certificate_credentials> credentials)
: _endpoint(std::move(endpoint_))
, _http_client(std::make_unique<client_connection_factory>(_endpoint, std::move(credentials), request_timeout_in_ms))
, _http_client(std::make_unique<client_connection_factory>(_endpoint, std::move(credentials), unreachable_node_detection_time_in_ms))
, _logger(logger)
, _request_timeout(std::move(request_timeout_in_ms)) {
, _unreachable_node_detection_time_in_ms(std::move(unreachable_node_detection_time_in_ms)) {
}
seastar::future<client::request_result> client::request(
@@ -237,7 +237,7 @@ bool client::is_checking_status_in_progress() const {
}
std::chrono::milliseconds client::backoff_retry_max() const {
std::chrono::milliseconds ret{_request_timeout.get()};
std::chrono::milliseconds ret{_unreachable_node_detection_time_in_ms.get()};
return ret * 2;
}

View File

@@ -41,7 +41,7 @@ public:
using request_error = std::variant<aborted_error, service_unavailable_error>;
using request_result = std::expected<response, request_error>;
explicit client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value<uint32_t> request_timeout_in_ms,
explicit client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value<uint32_t> unreachable_node_detection_time_in_ms,
::shared_ptr<seastar::tls::certificate_credentials> credentials);
seastar::future<request_result> request(
@@ -71,7 +71,7 @@ private:
seastar::future<> _checking_status_future = seastar::make_ready_future();
seastar::abort_source _as;
logging::logger& _logger;
utils::updateable_value<uint32_t> _request_timeout;
utils::updateable_value<uint32_t> _unreachable_node_detection_time_in_ms;
};
} // namespace vector_search

View File

@@ -51,8 +51,8 @@ auto make_unexpected(const auto& err) {
} // namespace
clients::clients(
logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value<uint32_t> request_timeout_in_ms, truststore& truststore)
clients::clients(logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value<uint32_t> unreachable_node_detection_time_in_ms,
truststore& truststore)
: _producer([&]() -> future<clients_vec> {
return try_with_gate(_gate, [this] -> future<clients_vec> {
_trigger_refresh();
@@ -63,7 +63,7 @@ clients::clients(
, _trigger_refresh(std::move(trigger_refresh))
, _timeout(WAIT_FOR_CLIENT_TIMEOUT)
, _logger(logger)
, _request_timeout_in_ms(std::move(request_timeout_in_ms))
, _unreachable_node_detection_time_in_ms(std::move(unreachable_node_detection_time_in_ms))
, _truststore(truststore) {
}
@@ -172,7 +172,7 @@ future<> clients::close_old_clients() {
seastar::future<seastar::lw_shared_ptr<client>> clients::make_client(const uri& uri_, const seastar::net::inet_address& addr_) {
auto creds = uri_.schema == uri::schema_type::https ? co_await _truststore.get() : nullptr;
auto c = make_lw_shared<client>(_logger, client::endpoint_type{uri_.host, uri_.port, addr_}, _request_timeout_in_ms, creds);
auto c = make_lw_shared<client>(_logger, client::endpoint_type{uri_.host, uri_.port, addr_}, _unreachable_node_detection_time_in_ms, creds);
co_return c;
}

View File

@@ -36,8 +36,8 @@ public:
using get_clients_error = std::variant<aborted_error, addr_unavailable_error>;
using get_clients_result = std::expected<clients_vec, get_clients_error>;
explicit clients(
logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value<uint32_t> request_timeout_in_ms, truststore& truststore);
explicit clients(logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value<uint32_t> unreachable_node_detection_time_in_ms,
truststore& truststore);
seastar::future<request_result> request(
seastar::httpd::operation_type method, seastar::sstring path, std::optional<seastar::sstring> content, seastar::abort_source& as);
@@ -67,7 +67,7 @@ private:
std::chrono::milliseconds _timeout;
clients_vec _old_clients;
logging::logger& _logger;
utils::updateable_value<uint32_t> _request_timeout_in_ms;
utils::updateable_value<uint32_t> _unreachable_node_detection_time_in_ms;
truststore& _truststore;
};

View File

@@ -242,7 +242,7 @@ struct vector_store_client::impl {
clients _secondary_clients;
impl(utils::config_file::named_value<sstring> primary_uris, utils::config_file::named_value<sstring> secondary_uris,
utils::config_file::named_value<uint32_t> read_request_timeout_in_ms,
utils::config_file::named_value<uint32_t> unreachable_node_detection_time_in_ms,
utils::config_file::named_value<utils::config_file::string_map> encryption_options, invoke_on_others_func invoke_on_others)
: _primary_uri_observer(primary_uris.observe([this](seastar::sstring uris_csv) {
handle_uris_changed(std::move(uris_csv), _primary_uris, _primary_clients);
@@ -269,14 +269,14 @@ struct vector_store_client::impl {
[this]() {
_dns.trigger_refresh();
},
read_request_timeout_in_ms, _truststore)
unreachable_node_detection_time_in_ms, _truststore)
, _secondary_clients(
vslogger,
[this]() {
_dns.trigger_refresh();
},
read_request_timeout_in_ms, _truststore) {
unreachable_node_detection_time_in_ms, _truststore) {
_metrics.add_group("vector_store", {seastar::metrics::make_gauge("dns_refreshes", seastar::metrics::description("Number of DNS refreshes"), [this] {
return _dns_refreshes;
}).aggregate({seastar::metrics::shard_label})});
@@ -381,7 +381,7 @@ struct vector_store_client::impl {
};
vector_store_client::vector_store_client(config const& cfg)
: _impl(std::make_unique<impl>(cfg.vector_store_primary_uri, cfg.vector_store_secondary_uri, cfg.read_request_timeout_in_ms,
: _impl(std::make_unique<impl>(cfg.vector_store_primary_uri, cfg.vector_store_secondary_uri, cfg.vector_store_unreachable_node_detection_time_in_ms,
cfg.vector_store_encryption_options, [this](auto func) {
return container().invoke_on_others([func = std::move(func)](auto& self) {
return func(*self._impl);
@@ -410,8 +410,8 @@ auto vector_store_client::get_index_status(keyspace_name keyspace, index_name na
return _impl->get_index_status(std::move(keyspace), std::move(name), as);
}
auto vector_store_client::ann(keyspace_name keyspace, index_name name, schema_ptr schema, vs_vector vs_vector, limit limit, const rjson::value& filter, abort_source& as)
-> future<std::expected<primary_keys, ann_error>> {
auto vector_store_client::ann(keyspace_name keyspace, index_name name, schema_ptr schema, vs_vector vs_vector, limit limit, const rjson::value& filter,
abort_source& as) -> future<std::expected<primary_keys, ann_error>> {
return _impl->ann(keyspace, name, schema, vs_vector, limit, filter, as);
}