utils: add disk_space_monitor

Instantiated only on shard 0.
Currently, only subscribe from unit test

Manual unit test using loop mount was added.
Note that the test requires sudo access
and root access to /dev/loop, so it cannot
run in rootless podman instance, and it'd
fail with Permission denied.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#21523
This commit is contained in:
Benny Halevy
2024-11-10 15:35:45 +02:00
committed by Avi Kivity
parent 288f9b2b15
commit 8d2ff8a915
8 changed files with 218 additions and 2 deletions

View File

@@ -1173,6 +1173,7 @@ scylla_core = (['message/messaging_service.cc',
'node_ops/node_ops_ctl.cc',
'node_ops/task_manager_module.cc',
'reader_concurrency_semaphore_group.cc',
'utils/disk_space_monitor.cc',
] + [Antlr3Grammar('cql3/Cql.g')] \
+ scylla_raft_core
)

View File

@@ -1268,6 +1268,9 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"to try to slow down the client and prevent buildup of unfinished view updates. "
"To be effective, this maximal delay should be larger than the typical latencies. "
"Setting view_flow_control_delay_limit_in_ms to 0 disables view-update flow control.")
, disk_space_monitor_normal_polling_interval_in_seconds(this, "disk_space_monitor_normal_polling_interval_in_seconds", value_status::Used, 10, "Disk-space polling interval while below polling threshold")
, disk_space_monitor_high_polling_interval_in_seconds(this, "disk_space_monitor_high_polling_interval_in_seconds", value_status::Used, 1, "Disk-space polling interval at or above polling threshold")
, disk_space_monitor_polling_interval_threshold(this, "disk_space_monitor_polling_interval_threshold", value_status::Used, 0.9, "Disk-space polling threshold. Polling interval is increased when disk utilization is greater than or equal to this threshold")
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")

View File

@@ -521,6 +521,10 @@ public:
named_value<bool> enable_tablets;
named_value<uint32_t> view_flow_control_delay_limit_in_ms;
named_value<int> disk_space_monitor_normal_polling_interval_in_seconds;
named_value<int> disk_space_monitor_high_polling_interval_in_seconds;
named_value<float> disk_space_monitor_polling_interval_threshold;
static const sstring default_tls_priority;
private:
template<typename T>

26
main.cc
View File

@@ -117,6 +117,7 @@
#include "utils/advanced_rpc_compressor.hh"
#include "utils/shared_dict.hh"
#include "message/dictionary_service.hh"
#include "utils/disk_space_monitor.hh"
seastar::metrics::metric_groups app_metrics;
@@ -751,6 +752,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sharded<locator::effective_replication_map_factory> erm_factory;
sharded<service::migration_notifier> mm_notifier;
sharded<service::endpoint_lifecycle_notifier> lifecycle_notifier;
std::optional<utils::disk_space_monitor> disk_space_monitor_shard0;
sharded<compaction_manager> cm;
sharded<sstables::storage_manager> sstm;
distributed<replica::database> db;
@@ -813,7 +815,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
tcp_syncookies_sanity();
tcp_timestamps_sanity();
return seastar::async([&app, cfg, ext, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
return seastar::async([&app, cfg, ext, &disk_space_monitor_shard0, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service, &gossiper, &snitch,
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
&repair, &sst_loader, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker] {
@@ -1130,7 +1132,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
utils::directories::set data_dir_set;
data_dir_set.add(cfg->data_file_directories());
dirs->create_and_verify(data_dir_set, utils::directories::recursive::no).get();
utils::directories::verify_owner_and_mode_of_data_dir(std::move(data_dir_set)).get();
utils::directories::verify_owner_and_mode_of_data_dir(data_dir_set).get();
auto hints_dir_initializer = db::hints::directory_initializer::make(*dirs, cfg->hints_directory()).get();
auto view_hints_dir_initializer = db::hints::directory_initializer::make(*dirs, cfg->view_hints_directory()).get();
@@ -1185,6 +1187,26 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
scheduling_group_key_config cql_sg_stats_cfg = make_scheduling_group_key_config<cql_transport::cql_sg_stats>(maintenance_socket_enabled::no);
auto cql_sg_stats_key = scheduling_group_key_create(cql_sg_stats_cfg).get();
supervisor::notify("starting disk space monitor");
auto dsm_cfg = utils::disk_space_monitor::config{
.sched_group = dbcfg.streaming_scheduling_group,
.normal_polling_interval = cfg->disk_space_monitor_normal_polling_interval_in_seconds,
.high_polling_interval = cfg->disk_space_monitor_high_polling_interval_in_seconds,
.polling_interval_threshold = cfg->disk_space_monitor_polling_interval_threshold,
};
if (data_dir_set.get_paths().empty()) {
throw std::runtime_error("data_dir_set must be non-empty");
}
fs::path data_dir = *data_dir_set.get_paths().begin();
if (data_dir_set.get_paths().size() > 1) {
startlog.warn("Multiple data directories aren't supported. Will monitor only {}", data_dir);
}
disk_space_monitor_shard0.emplace(stop_signal.as_local_abort_source(), data_dir, dsm_cfg);
disk_space_monitor_shard0->start().get();
auto stop_dsm = defer_verbose_shutdown("disk space monitor", [&disk_space_monitor_shard0] {
disk_space_monitor_shard0->stop().get();
});
supervisor::notify("starting compaction_manager");
// get_cm_cfg is called on each shard when starting a sharded<compaction_manager>
// we need the getter since updateable_value is not shard-safe (#7316)

View File

@@ -68,6 +68,7 @@
#include "sstables/sstables_manager.hh"
#include "init.hh"
#include "lang/manager.hh"
#include "utils/disk_space_monitor.hh"
#include <sys/time.h>
#include <sys/resource.h>
@@ -154,6 +155,7 @@ private:
sharded<locator::shared_token_metadata> _token_metadata;
sharded<locator::effective_replication_map_factory> _erm_factory;
sharded<sstables::directory_semaphore> _sst_dir_semaphore;
std::optional<utils::disk_space_monitor> _disk_space_monitor_shard0;
sharded<lang::manager> _lang_manager;
sharded<cql3::cql_config> _cql_config;
sharded<service::endpoint_lifecycle_notifier> _elc_notif;
@@ -575,6 +577,16 @@ private:
_task_manager.stop().get();
});
utils::disk_space_monitor::config dsm_cfg = {
.sched_group = scheduling_groups.streaming_scheduling_group,
.normal_polling_interval = cfg->disk_space_monitor_normal_polling_interval_in_seconds,
.high_polling_interval = cfg->disk_space_monitor_high_polling_interval_in_seconds,
.polling_interval_threshold = cfg->disk_space_monitor_polling_interval_threshold,
};
_disk_space_monitor_shard0.emplace(abort_sources.local(), data_dir_path, dsm_cfg);
_disk_space_monitor_shard0->start().get();
auto stop_dsm = defer([this] { _disk_space_monitor_shard0->stop().get(); });
// get_cm_cfg is called on each shard when starting a sharded<compaction_manager>
// we need the getter since updateable_value is not shard-safe (#7316)
auto get_cm_cfg = sharded_parameter([&] {

View File

@@ -21,6 +21,7 @@ target_sources(utils
dict_trainer.cc
directories.cc
disk-error-handler.cc
disk_space_monitor.cc
dynamic_bitset.cc
error_injection.cc
exceptions.cc

View File

@@ -0,0 +1,87 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <filesystem>
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include "utils/disk_space_monitor.hh"
#include "utils/assert.hh"
#include "utils/log.hh"
using namespace std::chrono_literals;
namespace utils {
seastar::logger dsmlog("disk_space_monitor");
disk_space_monitor::disk_space_monitor(abort_source& as, std::filesystem::path data_dir, config cfg)
: _as_sub(as.subscribe([this] () noexcept { _as.request_abort(); }))
, _data_dir(std::move(data_dir))
, _cfg(std::move(cfg))
{}
disk_space_monitor::~disk_space_monitor() {
SCYLLA_ASSERT(_poller_fut.available());
}
future<> disk_space_monitor::start() {
_space_info = co_await get_filesystem_space();
_poller_fut = poll();
}
future<> disk_space_monitor::stop() noexcept {
_as.request_abort();
co_await _signal_barrier.advance_and_await();
co_await std::exchange(_poller_fut, make_ready_future());
}
disk_space_monitor::signal_connection_type disk_space_monitor::listen(signal_callback_type callback) {
return _signal_source.connect([this, callback = std::move(callback)] () mutable -> future<> {
auto op = _signal_barrier.start();
co_await callback(*this);
});
}
future<> disk_space_monitor::poll() {
try {
while (!_as.abort_requested()) {
auto now = clock_type::now();
_space_info = co_await get_filesystem_space();
if (_as.abort_requested()) {
co_return;
}
co_await _signal_barrier.advance_and_await();
_signal_source();
auto passed = clock_type::now() - now;
auto interval = get_polling_interval();
if (interval > passed) {
co_await sleep_abortable<clock_type>(interval - passed, _as);
}
}
} catch (const sleep_aborted&) {
} catch (const abort_requested_exception&) {
} catch (...) {
dsmlog.error("poll loop exited with error: {}", std::current_exception());
}
}
future<std::filesystem::space_info> disk_space_monitor::get_filesystem_space() {
return engine().file_system_space(_data_dir.native());
}
disk_space_monitor::clock_type::duration disk_space_monitor::get_polling_interval() const noexcept {
auto du = disk_utilization();
return std::chrono::seconds(du < _cfg.polling_interval_threshold.get() ? _cfg.normal_polling_interval.get() : _cfg.high_polling_interval.get());
}
} // namespace utils

View File

@@ -0,0 +1,86 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <chrono>
#include <filesystem>
#include <boost/signals2/connection.hpp>
#include <boost/signals2/signal_type.hpp>
#include <boost/signals2/dummy_mutex.hpp>
#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/timer.hh>
#include <seastar/util/optimized_optional.hh>
#include "seastarx.hh"
#include "utils/updateable_value.hh"
#include "utils/phased_barrier.hh"
namespace utils {
// Instantiated only on shard 0
class disk_space_monitor {
public:
using clock_type = lowres_clock;
using signal_type = boost::signals2::signal_type<void (), boost::signals2::keywords::mutex_type<boost::signals2::dummy_mutex>>::type;
using signal_callback_type = std::function<future<>(const disk_space_monitor&)>;
using signal_connection_type = boost::signals2::scoped_connection;
struct config {
scheduling_group sched_group;
updateable_value<int> normal_polling_interval;
updateable_value<int> high_polling_interval;
// Use high_polling_interval above this threshold
updateable_value<float> polling_interval_threshold;
};
private:
abort_source _as;
optimized_optional<abort_source::subscription> _as_sub;
future<> _poller_fut = make_ready_future();
utils::phased_barrier _signal_barrier;
signal_type _signal_source;
std::filesystem::space_info _space_info;
std::filesystem::path _data_dir;
config _cfg;
public:
disk_space_monitor(abort_source& as, std::filesystem::path data_dir, config cfg);
~disk_space_monitor();
future<> start();
future<> stop() noexcept;
const std::filesystem::path& data_dir() const noexcept {
return _data_dir;
}
std::filesystem::space_info space() const noexcept {
return _space_info;
}
float disk_utilization() const noexcept {
return _space_info.capacity ? (float)(_space_info.capacity - _space_info.available) / _space_info.capacity : -1;
}
signal_connection_type listen(signal_callback_type callback);
private:
future<> poll();
future<std::filesystem::space_info> get_filesystem_space();
clock_type::duration get_polling_interval() const noexcept;
};
} // namespace utils