Merge "storage_service: Move load_broadcaster away" from Pavel E.

The storage_service struct is a collection of diverse things,
most of them requiring only on start and on stop and/or runing
on shard 0 (but is nonetheless sharded).

As a part of clearing this structure and generated by it inter-
-componenes dependencies, here's the sanitation of load_broadcaster.
This commit is contained in:
Tomasz Grabiec
2020-01-14 19:24:53 +01:00
7 changed files with 116 additions and 73 deletions

View File

@@ -23,6 +23,8 @@
#include "service/storage_proxy.hh"
#include <seastar/http/httpd.hh>
namespace service { class load_meter; }
namespace api {
struct http_context {
@@ -31,9 +33,11 @@ struct http_context {
httpd::http_server_control http_server;
distributed<database>& db;
distributed<service::storage_proxy>& sp;
service::load_meter& lmeter;
http_context(distributed<database>& _db,
distributed<service::storage_proxy>& _sp)
: db(_db), sp(_sp) {
distributed<service::storage_proxy>& _sp,
service::load_meter& _lm)
: db(_db), sp(_sp), lmeter(_lm) {
}
};

View File

@@ -27,6 +27,7 @@
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include "service/storage_service.hh"
#include "service/load_meter.hh"
#include "db/commitlog/commitlog.hh"
#include "gms/gossiper.hh"
#include "db/system_keyspace.hh"
@@ -195,8 +196,8 @@ void set_storage_service(http_context& ctx, routes& r) {
return get_cf_stats(ctx, &column_family_stats::live_disk_space_used);
});
ss::get_load_map.set(r, [] (std::unique_ptr<request> req) {
return service::get_local_storage_service().get_load_map().then([] (auto&& load_map) {
ss::get_load_map.set(r, [&ctx] (std::unique_ptr<request> req) {
return ctx.lmeter.get_load_map().then([] (auto&& load_map) {
std::vector<ss::map_string_double> res;
for (auto i : load_map) {
ss::map_string_double val;

21
main.cc
View File

@@ -32,7 +32,7 @@
#include "db/legacy_schema_migrator.hh"
#include "service/storage_service.hh"
#include "service/migration_manager.hh"
#include "service/load_broadcaster.hh"
#include "service/load_meter.hh"
#include "service/view_update_backlog_broker.hh"
#include "streaming/stream_session.hh"
#include "db/system_keyspace.hh"
@@ -462,11 +462,12 @@ int main(int ac, char** av) {
distributed<database> db;
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
service::load_meter load_meter;
debug::db = &db;
auto& qp = cql3::get_query_processor();
auto& proxy = service::get_storage_proxy();
auto& mm = service::get_migration_manager();
api::http_context ctx(db, proxy);
api::http_context ctx(db, proxy, load_meter);
httpd::http_server_control prometheus_server;
utils::directories dirs;
sharded<gms::feature_service> feature_service;
@@ -497,7 +498,7 @@ int main(int ac, char** av) {
tcp_syncookies_sanity();
return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs,
&prometheus_server, &cf_cache_hitrate_calculator, &feature_service] {
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service] {
try {
::stop_signal stop_signal; // we can move this earlier to support SIGINT during initialization
read_config(opts, *cfg).get();
@@ -944,15 +945,13 @@ int main(int ac, char** av) {
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
return b.start();
}).get();
supervisor::notify("starting load broadcaster");
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
// casting to std::function<> will fail to compile
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
lb->start_broadcasting();
service::get_local_storage_service().set_load_broadcaster(lb);
auto stop_load_broadcater = defer_verbose_shutdown("broadcaster", [lb = std::move(lb)] () {
lb->stop_broadcasting().get();
supervisor::notify("starting load meter");
load_meter.init(db, gms::get_local_gossiper()).get();
auto stop_load_meter = defer_verbose_shutdown("load meter", [&load_meter] {
load_meter.exit().get();
});
supervisor::notify("starting cf cache hit rate calculator");
cf_cache_hitrate_calculator.start(std::ref(db)).get();
auto stop_cache_hitrate_calculator = defer_verbose_shutdown("cf cache hit rate calculator",

50
service/load_meter.hh Normal file
View File

@@ -0,0 +1,50 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/distributed.hh>
#include "service/load_broadcaster.hh"
using namespace seastar;
class database;
namespace gms { class gossiper; }
namespace service {
class load_meter {
private:
std::unique_ptr<load_broadcaster> _lb;
/** raw load value */
double get_load() const;
sstring get_load_string() const;
public:
future<std::map<sstring, double>> get_load_map();
future<> init(distributed<database>& db, gms::gossiper& gossiper);
future<> exit();
};
}

View File

@@ -38,6 +38,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "load_meter.hh"
#include "load_broadcaster.hh"
#include "cache_hitrate_calculator.hh"
#include "db/system_keyspace.hh"
@@ -54,6 +55,52 @@ constexpr std::chrono::milliseconds load_broadcaster::BROADCAST_INTERVAL;
logging::logger llogger("load_broadcaster");
future<> load_meter::init(distributed<database>& db, gms::gossiper& gms) {
_lb = std::make_unique<load_broadcaster>(db, gms);
_lb->start_broadcasting();
return make_ready_future<>();
}
future<> load_meter::exit() {
return _lb->stop_broadcasting();
}
future<std::map<sstring, double>> load_meter::get_load_map() {
return smp::submit_to(0, [this] () {
std::map<sstring, double> load_map;
if (_lb) {
for (auto& x : _lb->get_load_info()) {
load_map.emplace(format("{}", x.first), x.second);
llogger.debug("get_load_map endpoint={}, load={}", x.first, x.second);
}
} else {
llogger.debug("load_broadcaster is not set yet!");
}
load_map.emplace(format("{}",
utils::fb_utilities::get_broadcast_address()), get_load());
return load_map;
});
}
double load_meter::get_load() const {
double bytes = 0;
#if 0
for (String keyspaceName : Schema.instance.getKeyspaces())
{
Keyspace keyspace = Schema.instance.getKeyspaceInstance(keyspaceName);
if (keyspace == null)
continue;
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
bytes += cfs.getLiveDiskSpaceUsed();
}
#endif
return bytes;
}
sstring load_meter::get_load_string() const {
return format("{:f}", get_load());
}
void load_broadcaster::start_broadcasting() {
_done = make_ready_future<>();

View File

@@ -2562,43 +2562,6 @@ future<> storage_service::drain() {
});
}
double storage_service::get_load() const {
double bytes = 0;
#if 0
for (String keyspaceName : Schema.instance.getKeyspaces())
{
Keyspace keyspace = Schema.instance.getKeyspaceInstance(keyspaceName);
if (keyspace == null)
continue;
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
bytes += cfs.getLiveDiskSpaceUsed();
}
#endif
return bytes;
}
sstring storage_service::get_load_string() const {
return format("{:f}", get_load());
}
future<std::map<sstring, double>> storage_service::get_load_map() {
return run_with_no_api_lock([] (storage_service& ss) {
std::map<sstring, double> load_map;
auto& lb = ss.get_load_broadcaster();
if (lb) {
for (auto& x : lb->get_load_info()) {
load_map.emplace(format("{}", x.first), x.second);
slogger.debug("get_load_map endpoint={}, load={}", x.first, x.second);
}
} else {
slogger.debug("load_broadcaster is not set yet!");
}
load_map.emplace(format("{}", ss.get_broadcast_address()), ss.get_load());
return load_map;
});
}
future<> storage_service::rebuild(sstring source_dc) {
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) {
slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
@@ -3011,14 +2974,6 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
});
}
void storage_service::set_load_broadcaster(shared_ptr<load_broadcaster> lb) {
_lb = lb;
}
shared_ptr<load_broadcaster>& storage_service::get_load_broadcaster() {
return _lb;
}
future<> storage_service::shutdown_client_servers() {
return do_stop_rpc_server().then([this] { return do_stop_native_transport(); });
}

View File

@@ -81,7 +81,6 @@ class gossiper;
namespace service {
class load_broadcaster;
class storage_service;
extern distributed<storage_service> _the_storage_service;
@@ -158,7 +157,6 @@ private:
// It shouldn't be impossible to actively serialize two callers if the need
// ever arise.
bool _loading_new_sstables = false;
shared_ptr<load_broadcaster> _lb;
std::optional<distributed<cql_transport::cql_server>> _cql_server;
std::optional<distributed<thrift_server>> _thrift_server;
sstring _operation_in_progress;
@@ -201,9 +199,6 @@ public:
future<> gossip_snitch_info();
void set_load_broadcaster(shared_ptr<load_broadcaster> lb);
shared_ptr<load_broadcaster>& get_load_broadcaster();
distributed<database>& db() {
return _db;
}
@@ -891,14 +886,6 @@ private:
// needs to be modified to accept either a keyspace or ARS.
std::unordered_multimap<dht::token_range, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
public:
/** raw load value */
double get_load() const;
sstring get_load_string() const;
future<std::map<sstring, double>> get_load_map();
#if 0
public final void deliverHints(String host) throws UnknownHostException
{