wasm: move compilation to an alien thread

The compilation of wasm UDFs is performed by a call to a foreign
function, which cannot be divided with yielding points and, as a
result, causes long reactor stalls for big UDFs.
We avoid them by submitting the compilation task to a non-seastar
std::thread, and retrieving the result using seastar::alien.

The thread is created at the start of the program. It executes
tasks from a queue in an infinite loop.

All seastar shards reference the thread through a std::shared_ptr
to a `alien_thread_runner`.

Considering that the compilation takes a long time anyway, the
alien_thread_runner is implemented with focus on simplicity more
than on performance. The tasks are stored in an std::queue, reading
and writing to it is synchronized using an std::mutex for reading/
writing to the queue, and an std::condition_variable waiting until
the queue has elements.

When the destructor of the alien runner is called, an std::nullopt
sentinel is pushed to the queue, and after all remaining tasks are
finished and the sentinel is read, the thread finishes.
This commit is contained in:
Wojciech Mitros
2023-03-01 17:30:53 +01:00
parent 4609a45ce3
commit 2fd6d495fa
12 changed files with 150 additions and 8 deletions

View File

@@ -1020,6 +1020,7 @@ scylla_core = (['message/messaging_service.cc',
'mutation_writer/feed_writers.cc',
'lang/lua.cc',
'lang/wasm.cc',
'lang/wasm_alien_thread_runner.cc',
'lang/wasm_instance_cache.cc',
'service/raft/group0_state_machine.cc',
'service/raft/raft_sys_table_storage.cc',

View File

@@ -29,6 +29,7 @@
#include "service/client_state.hh"
#include "service/forward_service.hh"
#include "utils/observable.hh"
#include "lang/wasm_alien_thread_runner.hh"
namespace service {
@@ -126,6 +127,7 @@ private:
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;
wasm::instance_cache* _wasm_instance_cache;
std::shared_ptr<wasm::alien_thread_runner> _alien_runner;
public:
static const sstring CQL_VERSION;
@@ -171,10 +173,18 @@ public:
return _wasm_instance_cache;
}
wasm::alien_thread_runner& alien_runner() {
return *_alien_runner;
}
void set_wasm_instance_cache(wasm::instance_cache* cache) {
_wasm_instance_cache = cache;
}
void set_alien_runner(std::shared_ptr<wasm::alien_thread_runner> alien_runner) {
_alien_runner = std::move(alien_runner);
}
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto vp = _authorized_prepared_cache.find(*user, key);

View File

@@ -50,7 +50,7 @@ seastar::future<shared_ptr<functions::function>> create_function_statement::crea
// FIXME: need better way to test wasm compilation without real_database()
wasm::context ctx{db.real_database().wasm_engine(), _name.name, qp.get_wasm_instance_cache(), db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel()};
try {
co_await wasm::precompile(ctx, arg_names, _body);
co_await wasm::precompile(qp.alien_runner(), ctx, arg_names, _body);
co_return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
std::move(return_type), _called_on_null_input, std::move(ctx));
} catch (const wasm::exception& we) {

View File

@@ -1876,7 +1876,7 @@ static seastar::future<shared_ptr<cql3::functions::user_function>> create_func(r
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
} else if (language == "xwasm") {
wasm::context ctx{db.wasm_engine(), name.name, qctx->qp().get_wasm_instance_cache(), db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel()};
co_await wasm::precompile(ctx, arg_names, body);
co_await wasm::precompile(qctx->qp().alien_runner(), ctx, arg_names, body);
co_return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
std::move(body), language, std::move(return_type),
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));

View File

@@ -3,6 +3,7 @@ target_sources(lang
PRIVATE
lua.cc
wasm.cc
wasm_alien_thread_runner.cc
wasm_instance_cache.cc)
target_include_directories(lang
PUBLIC

View File

@@ -21,6 +21,7 @@
#include "rust/wasmtime_bindings.hh"
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include "lang/wasm_alien_thread_runner.hh"
static logging::logger wasm_logger("wasm");
@@ -217,10 +218,15 @@ struct from_val_visitor {
}
};
seastar::future<> precompile(context& ctx, const std::vector<sstring>& arg_names, std::string script) {
seastar::future<> precompile(alien_thread_runner& alien_runner, context& ctx, const std::vector<sstring>& arg_names, std::string script) {
seastar::promise<rust::Box<wasmtime::Module>> done;
alien_runner.submit(done, [&engine_ptr = ctx.engine_ptr, script = std::move(script)] {
return wasmtime::create_module(engine_ptr, rust::Str(script.data(), script.size()));
});
ctx.module = co_await done.get_future();
std::exception_ptr ex;
try {
ctx.module = wasmtime::create_module(ctx.engine_ptr, rust::Str(script.data(), script.size()));
// After precompiling the module, we try creating a store, an instance and a function with it to make sure it's valid.
// If we succeed, we drop them and keep the module, knowing that we will be able to create them again for UDF execution.
ctx.module.value()->compile(ctx.engine_ptr);
@@ -234,7 +240,6 @@ seastar::future<> precompile(context& ctx, const std::vector<sstring>& arg_names
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return;
}
seastar::future<bytes_opt> run_script(context& ctx, wasmtime::Store& store, wasmtime::Instance& instance, wasmtime::Func& func, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input) {
wasm_logger.debug("Running function {}", ctx.function_name);

View File

@@ -12,6 +12,7 @@
#include <seastar/core/future.hh>
#include "db/functions/function_name.hh"
#include "rust/wasmtime_bindings.hh"
#include "lang/wasm_alien_thread_runner.hh"
namespace wasm {
@@ -41,7 +42,7 @@ struct context {
context(wasmtime::Engine& engine_ptr, std::string name, instance_cache* cache, uint64_t yield_fuel, uint64_t total_fuel);
};
seastar::future<> precompile(context& ctx, const std::vector<sstring>& arg_names, std::string script);
seastar::future<> precompile(alien_thread_runner& alien_runner, context& ctx, const std::vector<sstring>& arg_names, std::string script);
seastar::future<bytes_opt> run_script(const db::functions::function_name& name, context& ctx, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input);

View File

@@ -0,0 +1,67 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <exception>
#include <seastar/core/alien.hh>
#include <seastar/core/reactor.hh>
#include "lang/wasm.hh"
#include "lang/wasm_alien_thread_runner.hh"
namespace wasm {
std::optional<wasm_compile_task> task_queue::pop_front() {
std::unique_lock lock(_mut);
_cv.wait(lock, [this] { return !_pending.empty(); });
auto work_item = std::move(_pending.front());
_pending.pop();
return work_item;
}
void task_queue::push_back(std::optional<wasm_compile_task> work_item) {
std::unique_lock lock(_mut);
_pending.emplace(std::move(work_item));
lock.unlock();
_cv.notify_one();
}
alien_thread_runner::alien_thread_runner()
: _thread([this] {
for (;;) {
auto work_item = _pending_queue.pop_front();
if (work_item) {
work_item->func();
} else {
break;
}
}
})
{ }
alien_thread_runner::~alien_thread_runner() {
_pending_queue.push_back(std::nullopt);
_thread.join();
}
void alien_thread_runner::submit(seastar::promise<rust::Box<wasmtime::Module>>& p, std::function<rust::Box<wasmtime::Module>()> f) {
seastar::noncopyable_function<void()> packaged([f = std::move(f), &p, &alien = seastar::engine().alien(), shard = seastar::this_shard_id()] () mutable {
try {
rust::Box<wasmtime::Module> mod = f();
seastar::alien::run_on(alien, shard, [&p, mod = std::move(mod)] () mutable {
p.set_value(std::move(mod));
});
} catch (...) {
seastar::alien::run_on(alien, shard, [&, eptr = std::current_exception()] {
p.set_exception(wasm::exception(format("Compilation failed: {}", eptr)));
});
}
});
_pending_queue.push_back(wasm_compile_task{.func = std::move(packaged), .done = p, .shard = seastar::this_shard_id()});
}
} // namespace wasm

View File

@@ -0,0 +1,51 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <functional>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <thread>
#include <seastar/core/future.hh>
#include <seastar/util/noncopyable_function.hh>
#include "rust/cxx.h"
#include "rust/wasmtime_bindings.hh"
namespace wasm {
struct wasm_compile_task {
seastar::noncopyable_function<void()> func;
seastar::promise<rust::Box<wasmtime::Module>>& done;
unsigned shard;
};
struct task_queue {
std::mutex _mut;
std::condition_variable _cv;
std::queue<std::optional<wasm_compile_task>> _pending;
public:
std::optional<wasm_compile_task> pop_front();
void push_back(std::optional<wasm_compile_task> work_item);
};
class alien_thread_runner {
task_queue _pending_queue;
std::thread _thread;
public:
alien_thread_runner();
~alien_thread_runner();
alien_thread_runner(const alien_thread_runner&) = delete;
alien_thread_runner& operator=(const alien_thread_runner&) = delete;
void submit(seastar::promise<rust::Box<wasmtime::Module>>& p, std::function<rust::Box<wasmtime::Module>()> f);
};
} // namespace wasm

View File

@@ -90,6 +90,7 @@
#include "test/perf/entry_point.hh"
#include "db/per_partition_rate_limit_extension.hh"
#include "lang/wasm_instance_cache.hh"
#include "lang/wasm_alien_thread_runner.hh"
#include "service/raft/raft_address_map.hh"
#include "service/raft/raft_group_registry.hh"
@@ -1006,6 +1007,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
static sharded<wasm::instance_cache> wasm_instance_cache;
auto udf_enabled = cfg->enable_user_defined_functions() && cfg->check_experimental(db::experimental_features_t::feature::UDF);
std::any stop_udf_cache_handlers;
std::shared_ptr<wasm::alien_thread_runner> alien_runner;
if (udf_enabled) {
supervisor::notify("starting wasm udf cache");
size_t max_cache_size = dbcfg.available_memory * cfg->wasm_cache_memory_fraction();
@@ -1013,6 +1015,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
stop_udf_cache_handlers = defer_verbose_shutdown("udf cache", [] {
wasm_instance_cache.stop().get();
});
alien_runner = std::make_shared<wasm::alien_thread_runner>();
}
auto get_tm_cfg = sharded_parameter([&] {
@@ -1146,6 +1149,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
if (udf_enabled) {
qp.invoke_on_all([&] (cql3::query_processor& qp) {
qp.set_wasm_instance_cache(&wasm_instance_cache.local());
qp.set_alien_runner(alien_runner);
}).get();
}
supervisor::notify("initializing batchlog manager");

View File

@@ -59,13 +59,14 @@ SEASTAR_TEST_CASE(test_allocation_failures) {
engine().handle_signal(SIGILL, [] {});
int errors_during_compilation = 0;
int errors_during_execution = 0;
wasm::alien_thread_runner alien_runner;
for (size_t fail_after = 0;;fail_after++) {
auto wasm_engine = wasmtime::create_test_engine(1024 * 1024, fail_after);
auto wasm_cache = std::make_unique<wasm::instance_cache>(100 * 1024 * 1024, 1024 * 1024, std::chrono::seconds(1));
auto wasm_ctx = wasm::context(*wasm_engine, "grow_return", wasm_cache.get(), 1000, 1000000000);
try {
// Function that ignores the input, grows its memory by 1 page, and returns 10
co_await wasm::precompile(wasm_ctx, {}, grow_return);
co_await wasm::precompile(alien_runner, wasm_ctx, {}, grow_return);
wasm_ctx.module.value()->compile(*wasm_engine);
} catch (const wasm::exception& e) {
errors_during_compilation++;

View File

@@ -17,10 +17,11 @@
SEASTAR_TEST_CASE(test_long_udf_yields) {
auto wasm_engine = wasmtime::create_engine(1024 * 1024);
wasm::alien_thread_runner alien_runner;
auto wasm_cache = std::make_unique<wasm::instance_cache>(100 * 1024 * 1024, 1024 * 1024, std::chrono::seconds(1));
auto wasm_ctx = wasm::context(*wasm_engine, "fib", wasm_cache.get(), 100000, 100000000000);
// Recursive fibonacci function
co_await wasm::precompile(wasm_ctx, {}, R"(
co_await wasm::precompile(alien_runner, wasm_ctx, {}, R"(
(module
(type (;0;) (func (param i64) (result i64)))
(func (;0;) (type 0) (param i64) (result i64)