utils: introduce alien_worker
Introduces a util which launches a new OS thread and accepts callables for concurrent execution. Meant to be created once at startup and used until shutdown, for running nonpreemptible, 3rd party, non-interactive code. Note: this new utility is almost identical to wasm::alien_thread_runner. Maybe we should unify them.
This commit is contained in:
@@ -761,6 +761,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'row_cache.cc',
|
||||
'schema_mutations.cc',
|
||||
'generic_server.cc',
|
||||
'utils/alien_worker.cc',
|
||||
'utils/array-search.cc',
|
||||
'utils/base64.cc',
|
||||
'utils/logalloc.cc',
|
||||
|
||||
@@ -5,6 +5,7 @@ add_library(utils STATIC)
|
||||
target_sources(utils
|
||||
PRIVATE
|
||||
UUID_gen.cc
|
||||
alien_worker.cc
|
||||
arch/powerpc/crc32-vpmsum/crc32_wrapper.cc
|
||||
arch/powerpc/crc32-vpmsum/crc32.S
|
||||
array-search.cc
|
||||
|
||||
60
utils/alien_worker.cc
Normal file
60
utils/alien_worker.cc
Normal file
@@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "utils/alien_worker.hh"
|
||||
#include <seastar/util/log.hh>
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
namespace utils {
|
||||
|
||||
std::thread alien_worker::spawn(seastar::logger& log, int niceness) {
|
||||
sigset_t newset;
|
||||
sigset_t oldset;
|
||||
sigfillset(&newset);
|
||||
auto r = ::pthread_sigmask(SIG_SETMASK, &newset, &oldset);
|
||||
assert(r == 0);
|
||||
auto thread = std::thread([this, &log, niceness] () noexcept {
|
||||
errno = 0;
|
||||
int nice_value = nice(niceness);
|
||||
if (nice_value == -1 && errno != 0) {
|
||||
log.warn("Unable to renice worker thread (system error number {}); the thread will compete with reactor, which can cause latency spikes. Try adding CAP_SYS_NICE", errno);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
std::unique_lock lk(_mut);
|
||||
_cv.wait(lk, [this] { return !_pending.empty() || !_running; });
|
||||
if (!_running) {
|
||||
return;
|
||||
}
|
||||
auto f = std::move(_pending.front());
|
||||
_pending.pop();
|
||||
lk.unlock();
|
||||
f();
|
||||
}
|
||||
});
|
||||
r = ::pthread_sigmask(SIG_SETMASK, &oldset, nullptr);
|
||||
assert(r == 0);
|
||||
return thread;
|
||||
}
|
||||
|
||||
alien_worker::alien_worker(seastar::logger& log, int niceness)
|
||||
: _thread(spawn(log, niceness))
|
||||
{}
|
||||
|
||||
alien_worker::~alien_worker() {
|
||||
{
|
||||
std::unique_lock lk(_mut);
|
||||
_running = false;
|
||||
}
|
||||
_cv.notify_one();
|
||||
_thread.join();
|
||||
}
|
||||
|
||||
} // namespace utils
|
||||
|
||||
70
utils/alien_worker.hh
Normal file
70
utils/alien_worker.hh
Normal file
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/alien.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
|
||||
#include <queue>
|
||||
|
||||
namespace seastar {
|
||||
class logger;
|
||||
} // namespace seastar
|
||||
|
||||
namespace utils {
|
||||
|
||||
// Spawns a new OS thread, which can be used as a worker for running nonpreemptible 3rd party code.
|
||||
// Callables can be sent to the thread for execution via submit().
|
||||
class alien_worker {
|
||||
bool _running = true;
|
||||
std::mutex _mut;
|
||||
std::condition_variable _cv;
|
||||
std::queue<seastar::noncopyable_function<void() noexcept>> _pending;
|
||||
// Note: initialization of _thread uses other fields, so it must be performed last.
|
||||
std::thread _thread;
|
||||
|
||||
std::thread spawn(seastar::logger&, int niceness);
|
||||
public:
|
||||
alien_worker(seastar::logger&, int niceness);
|
||||
~alien_worker();
|
||||
// The worker captures `this`, so `this` must have a stable address.
|
||||
alien_worker(const alien_worker&) = delete;
|
||||
alien_worker(alien_worker&&) = delete;
|
||||
|
||||
// Submits a new callable to the thread for execution.
|
||||
// This callable will run on a different OS thread,
|
||||
// concurrently with the current thread, so be careful not to cause a data race.
|
||||
// Avoid capturing references in the callable if possible, and if you do,
|
||||
// be extremely careful about their concurrent uses.
|
||||
template <typename T>
|
||||
seastar::future<T> submit(seastar::noncopyable_function<T()> f) {
|
||||
auto p = seastar::promise<T>();
|
||||
auto fut = p.get_future();
|
||||
auto wrapper = [p = std::move(p), f = std::move(f), shard = seastar::this_shard_id(), &alien = seastar::engine().alien()] () mutable noexcept {
|
||||
try {
|
||||
auto v = f();
|
||||
seastar::alien::run_on(alien, shard, [v = std::move(v), p = std::move(p)] () mutable noexcept {
|
||||
p.set_value(std::move(v));
|
||||
});
|
||||
} catch (...) {
|
||||
seastar::alien::run_on(alien, shard, [p = std::move(p), ep = std::current_exception()] () mutable noexcept {
|
||||
p.set_exception(ep);
|
||||
});
|
||||
}
|
||||
};
|
||||
{
|
||||
std::unique_lock lk(_mut);
|
||||
_pending.push(std::move(wrapper));
|
||||
}
|
||||
_cv.notify_one();
|
||||
return fut;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace utils
|
||||
Reference in New Issue
Block a user