From 243f2217dd66144085705bb9286d1528935304db Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 2 Aug 2023 21:09:39 +0300 Subject: [PATCH] wasm: Make wasm sharded The wasm::manager is just cql3::wasm_context renamed. It now sits in lang/wasm* and is started as a sharded service in main (and cql test env). This move also needs some headers shuffling, but it's not severe This change is required to make it possible for the wasm::manager to be shared (by reference) between q.p. and replica::database further Signed-off-by: Pavel Emelyanov --- cql3/query_processor.cc | 10 ++-------- cql3/query_processor.hh | 18 +++--------------- lang/wasm.cc | 6 ++++++ lang/wasm.hh | 15 +++++++++++++++ lang/wasm_instance_cache.hh | 5 ++++- main.cc | 6 +++++- test/lib/cql_test_env.cc | 6 +++++- 7 files changed, 40 insertions(+), 26 deletions(-) diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index 20e0abae74..fead4ca650 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -58,13 +58,7 @@ static service::query_state query_state_for_internal_call() { return {service::client_state::for_internal_calls(), empty_service_permit()}; } -wasm_context::wasm_context(std::optional& ctx) - : _engine(ctx ? std::move(ctx->engine) : nullptr) - , _instance_cache(ctx ? std::make_optional(ctx->cache_size, ctx->instance_size, ctx->timer_period) : std::nullopt) - , _alien_runner(ctx ? std::move(ctx->alien_runner) : nullptr) -{} - -query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, std::optional wasm_ctx) +query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, wasm::manager& wasm) : _migration_subscriber{std::make_unique(this)} , _proxy(proxy) , _db(db) @@ -77,7 +71,7 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary: , _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); }) , _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb)) , _authorized_prepared_cache_validity_in_ms_observer(_db.get_config().permissions_validity_in_ms.observe(_auth_prepared_cache_cfg_cb)) - , _wasm(wasm_ctx) + , _wasm(wasm) { namespace sm = seastar::metrics; namespace stm = statements; diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index c20d036b0c..718830fcca 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -21,14 +21,13 @@ #include "cql3/authorized_prepared_statements_cache.hh" #include "cql3/statements/prepared_statement.hh" #include "exceptions/exceptions.hh" -#include "lang/wasm_instance_cache.hh" #include "service/migration_listener.hh" #include "transport/messages/result_message.hh" #include "service/qos/service_level_controller.hh" #include "service/client_state.hh" #include "service/broadcast_tables/experimental/query_result.hh" #include "utils/observable.hh" -#include "lang/wasm_alien_thread_runner.hh" +#include "lang/wasm.hh" namespace service { @@ -88,17 +87,6 @@ public: class cql_config; class query_options; class cql_statement; -class query_processor; - -class wasm_context { - std::shared_ptr> _engine; - std::optional _instance_cache; - std::shared_ptr _alien_runner; - -public: - wasm_context(std::optional&); - friend class query_processor; -}; class query_processor : public seastar::peering_sharded_service { public: @@ -140,7 +128,7 @@ private: // don't bother with expiration on those. std::unordered_map> _internal_statements; - wasm_context _wasm; + wasm::manager& _wasm; public: static const sstring CQL_VERSION; @@ -155,7 +143,7 @@ public: static std::unique_ptr parse_statement(const std::string_view& query); static std::vector> parse_statements(std::string_view queries); - query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, std::optional wasm_ctx); + query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, wasm::manager& wasm); ~query_processor(); diff --git a/lang/wasm.cc b/lang/wasm.cc index 4405abe6f5..a50442fda9 100644 --- a/lang/wasm.cc +++ b/lang/wasm.cc @@ -35,6 +35,12 @@ startup_context::startup_context(db::config& cfg, replica::database_config& dbcf , timer_period(std::chrono::milliseconds(cfg.wasm_cache_timeout_in_ms())) { } +manager::manager(const std::optional& ctx) + : _engine(ctx ? ctx->engine : nullptr) + , _instance_cache(ctx ? std::make_optional(ctx->cache_size, ctx->instance_size, ctx->timer_period) : std::nullopt) + , _alien_runner(ctx ? ctx->alien_runner : nullptr) +{} + context::context(wasmtime::Engine& engine_ptr, std::string name, instance_cache& cache, uint64_t yield_fuel, uint64_t total_fuel) : engine_ptr(engine_ptr) , function_name(name) diff --git a/lang/wasm.hh b/lang/wasm.hh index b61a5dd35c..6883364eb4 100644 --- a/lang/wasm.hh +++ b/lang/wasm.hh @@ -13,10 +13,15 @@ #include #include "db/functions/function_name.hh" #include "rust/wasmtime_bindings.hh" +#include "lang/wasm_instance_cache.hh" #include "lang/wasm_alien_thread_runner.hh" #include "db/config.hh" #include "replica/database.hh" +namespace cql3 { +class query_processor; +} + namespace wasm { class instance_cache; @@ -44,6 +49,16 @@ struct startup_context { startup_context(db::config& cfg, replica::database_config& dbcfg); }; +class manager { + std::shared_ptr> _engine; + std::optional _instance_cache; + std::shared_ptr _alien_runner; + +public: + manager(const std::optional&); + friend class cql3::query_processor; +}; + struct context { wasmtime::Engine& engine_ptr; std::optional> module; diff --git a/lang/wasm_instance_cache.hh b/lang/wasm_instance_cache.hh index d9e72da330..6e95a8074b 100644 --- a/lang/wasm_instance_cache.hh +++ b/lang/wasm_instance_cache.hh @@ -16,12 +16,15 @@ #include #include #include -#include "lang/wasm.hh" #include "rust/cxx.h" #include "rust/wasmtime_bindings.hh" +#include "types/types.hh" namespace wasm { +class instance_cache; +struct context; + class module_handle { wasmtime::Module& _module; instance_cache& _cache; diff --git a/main.cc b/main.cc index 452ddef0d9..ea707ef0df 100644 --- a/main.cc +++ b/main.cc @@ -1034,7 +1034,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl wasm_ctx.emplace(*cfg, dbcfg); } - qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notifier), qp_mcfg, std::ref(cql_config), std::move(auth_prep_cache_config), std::move(wasm_ctx)).get(); + static sharded wasm; + wasm.start(std::ref(wasm_ctx)).get(); + // don't stop until query_processor stops + + qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notifier), qp_mcfg, std::ref(cql_config), std::move(auth_prep_cache_config), std::ref(wasm)).get(); supervisor::notify("starting lifecycle notifier"); lifecycle_notifier.start().get(); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index d6289327c0..48a62cc188 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -694,7 +694,11 @@ public: wasm_ctx.emplace(*cfg, dbcfg); } - qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), qp_mcfg, std::ref(cql_config), auth_prep_cache_config, wasm_ctx).get(); + sharded wasm; + wasm.start(std::ref(wasm_ctx)).get(); + auto stop_wasm = defer([&wasm] { wasm.stop().get(); }); + + qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), qp_mcfg, std::ref(cql_config), auth_prep_cache_config, std::ref(wasm)).get(); auto stop_qp = defer([&qp] { qp.stop().get(); }); sharded elc_notif;