wasm: Make wasm sharded<manager>

The wasm::manager is just cql3::wasm_context renamed. It now sits in
lang/wasm* and is started as a sharded service in main (and cql test
env). This move also needs some headers shuffling, but it's not severe

This change is required to make it possible for the wasm::manager to be
shared (by reference) between q.p. and replica::database further

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-08-02 21:09:39 +03:00
parent dde285e7e9
commit 243f2217dd
7 changed files with 40 additions and 26 deletions

View File

@@ -58,13 +58,7 @@ static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}
wasm_context::wasm_context(std::optional<wasm::startup_context>& ctx)
: _engine(ctx ? std::move(ctx->engine) : nullptr)
, _instance_cache(ctx ? std::make_optional<wasm::instance_cache>(ctx->cache_size, ctx->instance_size, ctx->timer_period) : std::nullopt)
, _alien_runner(ctx ? std::move(ctx->alien_runner) : nullptr)
{}
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, std::optional<wasm::startup_context> wasm_ctx)
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, wasm::manager& wasm)
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
, _proxy(proxy)
, _db(db)
@@ -77,7 +71,7 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
, _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); })
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
, _authorized_prepared_cache_validity_in_ms_observer(_db.get_config().permissions_validity_in_ms.observe(_auth_prepared_cache_cfg_cb))
, _wasm(wasm_ctx)
, _wasm(wasm)
{
namespace sm = seastar::metrics;
namespace stm = statements;

View File

@@ -21,14 +21,13 @@
#include "cql3/authorized_prepared_statements_cache.hh"
#include "cql3/statements/prepared_statement.hh"
#include "exceptions/exceptions.hh"
#include "lang/wasm_instance_cache.hh"
#include "service/migration_listener.hh"
#include "transport/messages/result_message.hh"
#include "service/qos/service_level_controller.hh"
#include "service/client_state.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "utils/observable.hh"
#include "lang/wasm_alien_thread_runner.hh"
#include "lang/wasm.hh"
namespace service {
@@ -88,17 +87,6 @@ public:
class cql_config;
class query_options;
class cql_statement;
class query_processor;
class wasm_context {
std::shared_ptr<rust::Box<wasmtime::Engine>> _engine;
std::optional<wasm::instance_cache> _instance_cache;
std::shared_ptr<wasm::alien_thread_runner> _alien_runner;
public:
wasm_context(std::optional<wasm::startup_context>&);
friend class query_processor;
};
class query_processor : public seastar::peering_sharded_service<query_processor> {
public:
@@ -140,7 +128,7 @@ private:
// don't bother with expiration on those.
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;
wasm_context _wasm;
wasm::manager& _wasm;
public:
static const sstring CQL_VERSION;
@@ -155,7 +143,7 @@ public:
static std::unique_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query);
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, std::optional<wasm::startup_context> wasm_ctx);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, wasm::manager& wasm);
~query_processor();

View File

@@ -35,6 +35,12 @@ startup_context::startup_context(db::config& cfg, replica::database_config& dbcf
, timer_period(std::chrono::milliseconds(cfg.wasm_cache_timeout_in_ms())) {
}
manager::manager(const std::optional<wasm::startup_context>& ctx)
: _engine(ctx ? ctx->engine : nullptr)
, _instance_cache(ctx ? std::make_optional<wasm::instance_cache>(ctx->cache_size, ctx->instance_size, ctx->timer_period) : std::nullopt)
, _alien_runner(ctx ? ctx->alien_runner : nullptr)
{}
context::context(wasmtime::Engine& engine_ptr, std::string name, instance_cache& cache, uint64_t yield_fuel, uint64_t total_fuel)
: engine_ptr(engine_ptr)
, function_name(name)

View File

@@ -13,10 +13,15 @@
#include <seastar/core/future.hh>
#include "db/functions/function_name.hh"
#include "rust/wasmtime_bindings.hh"
#include "lang/wasm_instance_cache.hh"
#include "lang/wasm_alien_thread_runner.hh"
#include "db/config.hh"
#include "replica/database.hh"
namespace cql3 {
class query_processor;
}
namespace wasm {
class instance_cache;
@@ -44,6 +49,16 @@ struct startup_context {
startup_context(db::config& cfg, replica::database_config& dbcfg);
};
class manager {
std::shared_ptr<rust::Box<wasmtime::Engine>> _engine;
std::optional<wasm::instance_cache> _instance_cache;
std::shared_ptr<wasm::alien_thread_runner> _alien_runner;
public:
manager(const std::optional<wasm::startup_context>&);
friend class cql3::query_processor;
};
struct context {
wasmtime::Engine& engine_ptr;
std::optional<rust::Box<wasmtime::Module>> module;

View File

@@ -16,12 +16,15 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/timer.hh>
#include <unordered_map>
#include "lang/wasm.hh"
#include "rust/cxx.h"
#include "rust/wasmtime_bindings.hh"
#include "types/types.hh"
namespace wasm {
class instance_cache;
struct context;
class module_handle {
wasmtime::Module& _module;
instance_cache& _cache;

View File

@@ -1034,7 +1034,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
wasm_ctx.emplace(*cfg, dbcfg);
}
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notifier), qp_mcfg, std::ref(cql_config), std::move(auth_prep_cache_config), std::move(wasm_ctx)).get();
static sharded<wasm::manager> wasm;
wasm.start(std::ref(wasm_ctx)).get();
// don't stop until query_processor stops
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notifier), qp_mcfg, std::ref(cql_config), std::move(auth_prep_cache_config), std::ref(wasm)).get();
supervisor::notify("starting lifecycle notifier");
lifecycle_notifier.start().get();

View File

@@ -694,7 +694,11 @@ public:
wasm_ctx.emplace(*cfg, dbcfg);
}
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), qp_mcfg, std::ref(cql_config), auth_prep_cache_config, wasm_ctx).get();
sharded<wasm::manager> wasm;
wasm.start(std::ref(wasm_ctx)).get();
auto stop_wasm = defer([&wasm] { wasm.stop().get(); });
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), qp_mcfg, std::ref(cql_config), auth_prep_cache_config, std::ref(wasm)).get();
auto stop_qp = defer([&qp] { qp.stop().get(); });
sharded<service::endpoint_lifecycle_notifier> elc_notif;