vector_store_client: Move to vector_search module

Vector search related implementation moved to a new module vector_search.
As the vector search functionality is going to be extended, it is
better to keep it in a separate module.
This commit is contained in:
Karol Nowacki
2025-09-18 11:37:19 +02:00
parent 0d2560c07f
commit eae71d3e91
13 changed files with 60 additions and 43 deletions

View File

@@ -282,6 +282,7 @@ add_subdirectory(tracing)
add_subdirectory(transport)
add_subdirectory(types)
add_subdirectory(utils)
add_subdirectory(vector_search)
add_version_library(scylla_version
release.cc)
@@ -323,7 +324,8 @@ set(scylla_libs
tracing
transport
types
utils)
utils
vector_search)
target_link_libraries(scylla PRIVATE
${scylla_libs})

View File

@@ -1256,7 +1256,7 @@ scylla_core = (['message/messaging_service.cc',
'node_ops/task_manager_module.cc',
'reader_concurrency_semaphore_group.cc',
'utils/disk_space_monitor.cc',
'service/vector_store_client.cc',
'vector_search/vector_store_client.cc',
] + [Antlr3Grammar('cql3/Cql.g')] \
+ scylla_raft_core
)

View File

@@ -27,7 +27,6 @@
#include "cql3/untyped_result_set.hh"
#include "db/config.hh"
#include "data_dictionary/data_dictionary.hh"
#include "service/vector_store_client.hh"
#include "utils/hashers.hh"
#include "utils/error_injection.hh"
#include "service/migration_manager.hh"
@@ -69,7 +68,7 @@ static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, service::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm)
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm)
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
, _proxy(proxy)
, _db(db)

View File

@@ -28,7 +28,7 @@
#include "transport/messages/result_message.hh"
#include "service/client_state.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "service/vector_store_client.hh"
#include "vector_search/vector_store_client.hh"
#include "utils/assert.hh"
#include "utils/observable.hh"
#include "service/raft/raft_group0_client.hh"
@@ -109,7 +109,7 @@ private:
service::storage_proxy& _proxy;
data_dictionary::database _db;
service::migration_notifier& _mnotifier;
service::vector_store_client& _vector_store_client;
vector_search::vector_store_client& _vector_store_client;
memory_config _mcfg;
const cql_config& _cql_config;
@@ -149,7 +149,8 @@ public:
static std::unique_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query, dialect d);
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, service::vector_store_client& vsc, memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
~query_processor();
@@ -179,11 +180,11 @@ public:
lang::manager& lang() { return _lang_manager; }
const service::vector_store_client& vector_store_client() const noexcept {
const vector_search::vector_store_client& vector_store_client() const noexcept {
return _vector_store_client;
}
service::vector_store_client& vector_store_client() noexcept {
vector_search::vector_store_client& vector_store_client() noexcept {
return _vector_store_client;
}

View File

@@ -23,7 +23,7 @@
#include <seastar/coroutine/exception.hh>
#include "service/broadcast_tables/experimental/lang.hh"
#include "service/qos/qos_common.hh"
#include "service/vector_store_client.hh"
#include "vector_search/vector_store_client.hh"
#include "transport/messages/result_message.hh"
#include "cql3/functions/as_json_function.hh"
#include "cql3/selection/selection.hh"
@@ -1223,9 +1223,8 @@ indexed_table_select_statement::actually_do_execute(query_processor& qp,
auto as = abort_source();
auto pkeys = co_await qp.vector_store_client().ann(_schema->ks_name(), _index.metadata().name(), _schema , std::move(ann_vector), limit, as);
if (!pkeys.has_value()) {
co_await coroutine::return_exception(exceptions::invalid_request_exception(
std::visit(service::vector_store_client::ann_error_visitor{}, pkeys.error())
));
co_await coroutine::return_exception(
exceptions::invalid_request_exception(std::visit(vector_search::vector_store_client::ann_error_visitor{}, pkeys.error())));
}
// If there are no clustering columns, we have to convert the partition keys to partition ranges.

View File

@@ -42,7 +42,6 @@
#include "service/migration_manager.hh"
#include "service/tablet_allocator.hh"
#include "service/load_meter.hh"
#include "service/vector_store_client.hh"
#include "service/view_update_backlog_broker.hh"
#include "service/qos/service_level_controller.hh"
#include "streaming/stream_session.hh"
@@ -64,6 +63,7 @@
#include "release.hh"
#include "repair/repair.hh"
#include "repair/row_level.hh"
#include "vector_search/vector_store_client.hh"
#include <cstdio>
#include <seastar/core/file.hh>
#include <unistd.h>
@@ -747,7 +747,7 @@ sharded<locator::shared_token_metadata> token_metadata;
sharded<service::mapreduce_service> mapreduce_service;
sharded<gms::gossiper> gossiper;
sharded<locator::snitch_ptr> snitch;
sharded<service::vector_store_client> vector_store_client;
sharded<vector_search::vector_store_client> vector_store_client;
// This worker wasn't designed to be used from multiple threads.
// If you are attempting to do that, make sure you know what you are doing.
@@ -1339,7 +1339,7 @@ sharded<locator::shared_token_metadata> token_metadata;
auto stop_vector_store_client = defer_verbose_shutdown("vector store client", [&vector_store_client] {
vector_store_client.stop().get();
});
vector_store_client.invoke_on_all(&service::vector_store_client::start_background_tasks).get();
vector_store_client.invoke_on_all(&vector_search::vector_store_client::start_background_tasks).get();
checkpoint(stop_signal, "starting query processor");
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};

View File

@@ -33,8 +33,7 @@ target_sources(service
task_manager_module.cc
topology_coordinator.cc
topology_mutation.cc
topology_state_machine.cc
vector_store_client.cc)
topology_state_machine.cc)
target_include_directories(service
PUBLIC
${CMAKE_SOURCE_DIR})

View File

@@ -305,7 +305,7 @@ add_scylla_test(wrapping_interval_test
add_scylla_test(address_map_test
KIND SEASTAR)
add_scylla_test(vector_store_client_test
KIND SEASTAR)
KIND SEASTAR LIBRARIES vector_search)
add_scylla_test(combined_tests
KIND SEASTAR

View File

@@ -6,7 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "service/vector_store_client.hh"
#include "vector_search/vector_store_client.hh"
#include "db/config.hh"
#include "exceptions/exceptions.hh"
#include "cql3/statements/select_statement.hh"
@@ -33,8 +33,8 @@ namespace {
using namespace seastar;
using vector_store_client = service::vector_store_client;
using vector_store_client_tester = service::vector_store_client_tester;
using vector_store_client = vector_search::vector_store_client;
using vector_store_client_tester = vector_search::vector_store_client_tester;
using config = vector_store_client::config;
using configuration_exception = exceptions::configuration_exception;
using inet_address = seastar::net::inet_address;
@@ -132,10 +132,10 @@ auto create_test_table(cql_test_env& env, const sstring& ks, const sstring& cf)
}
class configure {
std::reference_wrapper<service::vector_store_client> vs_ref;
std::reference_wrapper<vector_search::vector_store_client> vs_ref;
public:
explicit configure(service::vector_store_client& vs)
explicit configure(vector_search::vector_store_client& vs)
: vs_ref(vs) {
with_dns_refresh_interval(seconds(2));
with_wait_for_client_timeout(milliseconds(100));

View File

@@ -176,7 +176,7 @@ private:
sharded<gms::gossip_address_map> _gossip_address_map;
sharded<service::direct_fd_pinger> _fd_pinger;
sharded<cdc::cdc_service> _cdc;
sharded<service::vector_store_client> _vector_store_client;
sharded<vector_search::vector_store_client> _vector_store_client;
db::config* _db_config;
service::raft_group0_client* _group0_client;

View File

@@ -0,0 +1,17 @@
add_library(vector_search STATIC)
target_sources(vector_search
PRIVATE
vector_store_client.cc)
target_link_libraries(vector_search
PUBLIC
Seastar::seastar
PRIVATE
cql3
db
utils
schema
scylla_dht
)
check_headers(check-headers vector_search
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -34,21 +34,21 @@ namespace {
using namespace std::chrono_literals;
using ann_error = service::vector_store_client::ann_error;
using ann_error = vector_search::vector_store_client::ann_error;
using configuration_exception = exceptions::configuration_exception;
using duration = lowres_clock::duration;
using vs_vector = service::vector_store_client::vs_vector;
using limit = service::vector_store_client::limit;
using host_name = service::vector_store_client::host_name;
using vs_vector = vector_search::vector_store_client::vs_vector;
using limit = vector_search::vector_store_client::limit;
using host_name = vector_search::vector_store_client::host_name;
using http_path = sstring;
using inet_address = seastar::net::inet_address;
using json_content = sstring;
using milliseconds = std::chrono::milliseconds;
using operation_type = httpd::operation_type;
using port_number = service::vector_store_client::port_number;
using primary_key = service::vector_store_client::primary_key;
using primary_keys = service::vector_store_client::primary_keys;
using service_reply_format_error = service::vector_store_client::service_reply_format_error;
using port_number = vector_search::vector_store_client::port_number;
using primary_key = vector_search::vector_store_client::primary_key;
using primary_keys = vector_search::vector_store_client::primary_keys;
using service_reply_format_error = vector_search::vector_store_client::service_reply_format_error;
using tcp_keepalive_params = net::tcp_keepalive_params;
using time_point = lowres_clock::time_point;
@@ -303,7 +303,7 @@ sstring response_content_to_sstring(const std::vector<temporary_buffer<char>>& b
} // namespace
namespace service {
namespace vector_search {
struct vector_store_client::impl {
@@ -650,4 +650,4 @@ auto vector_store_client_tester::resolve_hostname(vector_store_client& vsc, abor
co_return client.value()->addr();
}
} // namespace service
} // namespace vector_search

View File

@@ -28,7 +28,7 @@ namespace seastar::net {
class inet_address;
}
namespace service {
namespace vector_search {
/// A client with the vector-store service.
class vector_store_client final {
@@ -71,22 +71,22 @@ public:
using ann_error = std::variant<disabled, aborted, addr_unavailable, service_unavailable, service_error, service_reply_format_error>;
struct ann_error_visitor {
sstring operator()(service::vector_store_client::service_error e) const {
sstring operator()(vector_store_client::service_error e) const {
return fmt::format("Vector Store error: HTTP status {}", e.status);
}
sstring operator()(service::vector_store_client::disabled) const {
sstring operator()(vector_store_client::disabled) const {
return fmt::format("Vector Store is disabled");
}
sstring operator()(service::vector_store_client::aborted) const {
sstring operator()(vector_store_client::aborted) const {
return fmt::format("Vector Store request was aborted");
}
sstring operator()(service::vector_store_client::addr_unavailable) const {
sstring operator()(vector_store_client::addr_unavailable) const {
return fmt::format("Vector Store service address could not be fetched from DNS");
}
sstring operator()(service::vector_store_client::service_unavailable) const {
sstring operator()(vector_store_client::service_unavailable) const {
return fmt::format("Vector Store service is unavailable");
}
sstring operator()(service::vector_store_client::service_reply_format_error) const {
sstring operator()(vector_store_client::service_reply_format_error) const {
return fmt::format("Vector Store returned an invalid JSON");
}
};
@@ -127,4 +127,4 @@ struct vector_store_client_tester {
static auto resolve_hostname(vector_store_client& vsc, abort_source& as) -> future<std::optional<net::inet_address>>;
};
} // namespace service
} // namespace vector_search