locator::production_snitch_base: unify property file parsing facilities
- Move property file parsing code into production_snitch_base class.
- Make a parsing code more general:
- Save the parsed keys in the hash table.
- Check only two types of errors:
- Repeating keys.
- Add a set of all supported keys and add the check for a key
being supported.
- Added production_snitch_base.cc file.
Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
This commit is contained in:
@@ -354,6 +354,7 @@ urchin_core = (['database.cc',
|
||||
'locator/simple_snitch.cc',
|
||||
'locator/rack_inferring_snitch.cc',
|
||||
'locator/gossiping_property_file_snitch.cc',
|
||||
'locator/production_snitch_base.cc',
|
||||
'message/messaging_service.cc',
|
||||
'service/migration_task.cc',
|
||||
'service/storage_service.cc',
|
||||
|
||||
@@ -40,11 +40,9 @@
|
||||
|
||||
namespace locator {
|
||||
future<bool> gossiping_property_file_snitch::property_file_was_modified() {
|
||||
return engine().open_file_dma(_fname, open_flags::ro)
|
||||
return engine().open_file_dma(_prop_file_name, open_flags::ro)
|
||||
.then([this](file f) {
|
||||
_sf = make_lw_shared(std::move(f));
|
||||
|
||||
return _sf->stat();
|
||||
return f.stat();
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
auto st = std::get<0>(f.get());
|
||||
@@ -57,7 +55,7 @@ future<bool> gossiping_property_file_snitch::property_file_was_modified() {
|
||||
return false;
|
||||
}
|
||||
} catch (...) {
|
||||
logger().error("Failed to open {} for read or to get stats", _fname);
|
||||
logger().error("Failed to open {} for read or to get stats", _prop_file_name);
|
||||
throw;
|
||||
}
|
||||
});
|
||||
@@ -65,7 +63,7 @@ future<bool> gossiping_property_file_snitch::property_file_was_modified() {
|
||||
|
||||
gossiping_property_file_snitch::gossiping_property_file_snitch(
|
||||
const sstring& fname, unsigned io_cpu_id)
|
||||
: _fname(fname), _file_reader_cpu_id(io_cpu_id) {}
|
||||
: production_snitch_base(fname), _file_reader_cpu_id(io_cpu_id) {}
|
||||
|
||||
future<> gossiping_property_file_snitch::start() {
|
||||
using namespace std::chrono_literals;
|
||||
@@ -142,18 +140,7 @@ void gossiping_property_file_snitch::gossiper_starting() {
|
||||
future<> gossiping_property_file_snitch::read_property_file() {
|
||||
using namespace exceptions;
|
||||
|
||||
return engine().open_file_dma(_fname, open_flags::ro).then([this](file f) {
|
||||
_sf = make_lw_shared(std::move(f));
|
||||
|
||||
return _sf->size();
|
||||
}).then([this](size_t s) {
|
||||
_fsize = s;
|
||||
|
||||
return _sf->dma_read_exactly<char>(0, _fsize);
|
||||
}).then([this](temporary_buffer<char> tb) {
|
||||
|
||||
_srting_buf = std::move(std::string(tb.get(), _fsize));
|
||||
|
||||
return load_property_file().then([this] {
|
||||
return reload_configuration();
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
@@ -166,11 +153,10 @@ future<> gossiping_property_file_snitch::read_property_file() {
|
||||
// - Print an error when reloading.
|
||||
//
|
||||
if (_state == snitch_state::initializing) {
|
||||
logger().error("Failed to parse a properties file ({}). Halting...", _fname);
|
||||
logger().error("Failed to parse a properties file ({}). Halting...", _prop_file_name);
|
||||
throw;
|
||||
} else {
|
||||
logger().warn("Failed to reload a properties file ({}). "
|
||||
"Using previous values.", _fname);
|
||||
logger().warn("Failed to reload a properties file ({}). Using previous values.", _prop_file_name);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
@@ -178,89 +164,34 @@ future<> gossiping_property_file_snitch::read_property_file() {
|
||||
}
|
||||
|
||||
future<> gossiping_property_file_snitch::reload_configuration() {
|
||||
using namespace boost::algorithm;
|
||||
|
||||
std::string line;
|
||||
//
|
||||
// Using two bool variables instead of std::experimental::optional<bool>
|
||||
// since there is a bug in gcc causing it to report "'new_prefer_local' may
|
||||
// be used uninitialized in this function" if we do.
|
||||
//
|
||||
bool new_prefer_local;
|
||||
bool read_prefer_local = false;
|
||||
std::experimental::optional<sstring> new_dc;
|
||||
std::experimental::optional<sstring> new_rack;
|
||||
std::istringstream istrm(_srting_buf);
|
||||
std::vector<std::string> split_line;
|
||||
|
||||
while (std::getline(istrm, line)) {
|
||||
trim(line);
|
||||
|
||||
// Skip comments or empty lines
|
||||
if (!line.size() || line.at(0) == '#') {
|
||||
continue;
|
||||
}
|
||||
|
||||
split_line.clear();
|
||||
split(split_line, line, is_any_of("="));
|
||||
|
||||
if (split_line.size() != 2) {
|
||||
throw_bad_format(line);
|
||||
}
|
||||
|
||||
auto key = split_line[0]; trim(key);
|
||||
auto val = split_line[1]; trim(val);
|
||||
|
||||
if (!val.size()) {
|
||||
throw_bad_format(line);
|
||||
}
|
||||
|
||||
if (!key.compare("dc")) {
|
||||
if (new_dc) {
|
||||
throw_double_declaration("dc");
|
||||
}
|
||||
|
||||
new_dc = sstring(val);
|
||||
} else if (!key.compare("rack")) {
|
||||
if (new_rack) {
|
||||
throw_double_declaration("rack");
|
||||
}
|
||||
|
||||
new_rack = sstring(val);
|
||||
} else if (!key.compare("prefer_local")) {
|
||||
if (read_prefer_local) {
|
||||
throw_double_declaration("prefer_local");
|
||||
}
|
||||
|
||||
if (!val.compare("false")) {
|
||||
new_prefer_local = false;
|
||||
} else if (!val.compare("true")) {
|
||||
new_prefer_local = true;
|
||||
} else {
|
||||
throw_bad_format(line);
|
||||
}
|
||||
|
||||
read_prefer_local = true;
|
||||
} else {
|
||||
throw_bad_format(line);
|
||||
}
|
||||
}
|
||||
// "prefer_local" is FALSE by default
|
||||
bool new_prefer_local = false;
|
||||
sstring new_dc;
|
||||
sstring new_rack;
|
||||
|
||||
// Rack and Data Center have to be defined in the properties file!
|
||||
if (!new_dc || !new_rack) {
|
||||
if (!_prop_values.count(dc_property_key) || !_prop_values.count(rack_property_key)) {
|
||||
throw_incomplete_file();
|
||||
}
|
||||
|
||||
// "prefer_local" is FALSE by default
|
||||
if (!read_prefer_local) {
|
||||
new_prefer_local = false;
|
||||
new_dc = _prop_values[dc_property_key];
|
||||
new_rack = _prop_values[rack_property_key];
|
||||
|
||||
if (_prop_values.count(prefer_local_property_key)) {
|
||||
if (_prop_values[prefer_local_property_key] == "false") {
|
||||
new_prefer_local = false;
|
||||
} else if (_prop_values[prefer_local_property_key] == "true") {
|
||||
new_prefer_local = true;
|
||||
} else {
|
||||
throw_bad_format("prefer_local configuration is malformed");
|
||||
}
|
||||
}
|
||||
|
||||
if (_state == snitch_state::initializing || _my_dc != *new_dc ||
|
||||
_my_rack != *new_rack || _prefer_local != new_prefer_local) {
|
||||
if (_state == snitch_state::initializing || _my_dc != new_dc ||
|
||||
_my_rack != new_rack || _prefer_local != new_prefer_local) {
|
||||
|
||||
_my_dc = *new_dc;
|
||||
_my_rack = *new_rack;
|
||||
_my_dc = new_dc;
|
||||
_my_rack = new_rack;
|
||||
_prefer_local = new_prefer_local;
|
||||
|
||||
assert(_my_distributed);
|
||||
|
||||
@@ -51,8 +51,6 @@
|
||||
|
||||
namespace locator {
|
||||
|
||||
class bad_property_file_error : public std::exception {};
|
||||
|
||||
/**
|
||||
* cassandra-rackdc.properties file has the following format:
|
||||
*
|
||||
@@ -62,7 +60,6 @@ class bad_property_file_error : public std::exception {};
|
||||
*/
|
||||
class gossiping_property_file_snitch : public production_snitch_base {
|
||||
public:
|
||||
static constexpr const char* snitch_properties_filename = "cassandra-rackdc.properties";
|
||||
// Check the property file for changes every 60s.
|
||||
static constexpr timer<>::duration reload_property_file_period() {
|
||||
return std::chrono::seconds(60);
|
||||
@@ -82,21 +79,6 @@ public:
|
||||
unsigned io_cpu_id = 0);
|
||||
|
||||
private:
|
||||
void throw_double_declaration(const sstring& key) const {
|
||||
logger().error("double \"{}\" declaration in {}", key, _fname);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
void throw_bad_format(const sstring& line) const {
|
||||
logger().error("Bad format in properties file {}: {}", _fname, line);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
void throw_incomplete_file() const {
|
||||
logger().error("Property file {} is incomplete. Both \"dc\" and \"rack\" labels have to be defined.", _fname);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
void periodic_reader_callback();
|
||||
|
||||
/**
|
||||
@@ -138,12 +120,8 @@ private:
|
||||
void start_io();
|
||||
|
||||
private:
|
||||
sstring _fname;
|
||||
timer<> _file_reader;
|
||||
lw_shared_ptr<file> _sf;
|
||||
std::experimental::optional<timespec> _last_file_mod;
|
||||
size_t _fsize;
|
||||
std::string _srting_buf;
|
||||
std::istringstream _istrm;
|
||||
bool _gossip_started = false;
|
||||
bool _prefer_local = false;
|
||||
|
||||
59
locator/production_snitch_base.cc
Normal file
59
locator/production_snitch_base.cc
Normal file
@@ -0,0 +1,59 @@
|
||||
#include "locator/production_snitch_base.hh"
|
||||
|
||||
namespace locator {
|
||||
future<> production_snitch_base::load_property_file() {
|
||||
return engine().open_file_dma(_prop_file_name, open_flags::ro)
|
||||
.then([this] (file f) {
|
||||
_sf = make_lw_shared(std::move(f));
|
||||
|
||||
return _sf->size();
|
||||
}).then([this] (size_t s) {
|
||||
_prop_file_size = s;
|
||||
|
||||
return _sf->dma_read_exactly<char>(0, s);
|
||||
}).then([this] (temporary_buffer<char> tb) {
|
||||
_prop_file_contents = std::move(std::string(tb.get(), _prop_file_size));
|
||||
parse_property_file();
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
void production_snitch_base::parse_property_file() {
|
||||
using namespace boost::algorithm;
|
||||
|
||||
std::string line;
|
||||
std::istringstream istrm(_prop_file_contents);
|
||||
std::vector<std::string> split_line;
|
||||
_prop_values.clear();
|
||||
|
||||
while (std::getline(istrm, line)) {
|
||||
trim(line);
|
||||
|
||||
// Skip comments or empty lines
|
||||
if (!line.size() || line.at(0) == '#') {
|
||||
continue;
|
||||
}
|
||||
|
||||
split_line.clear();
|
||||
split(split_line, line, is_any_of("="));
|
||||
|
||||
if (split_line.size() != 2) {
|
||||
throw_bad_format(line);
|
||||
}
|
||||
|
||||
auto key = split_line[0]; trim(key);
|
||||
auto val = split_line[1]; trim(val);
|
||||
|
||||
if (val.empty() || !allowed_property_keys.count(key)) {
|
||||
throw_bad_format(line);
|
||||
}
|
||||
|
||||
if (_prop_values.count(key)) {
|
||||
throw_double_declaration(key);
|
||||
}
|
||||
|
||||
_prop_values[key] = val;
|
||||
}
|
||||
}
|
||||
} // namespace locator
|
||||
@@ -41,17 +41,22 @@
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <experimental/optional>
|
||||
#include <unordered_set>
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "gms/endpoint_state.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/config.hh"
|
||||
#include "core/sstring.hh"
|
||||
#include "snitch_base.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
class bad_property_file_error : public std::exception {};
|
||||
|
||||
class production_snitch_base : public snitch_base {
|
||||
public:
|
||||
// map of inet address to (datacenter, rack) pair
|
||||
@@ -59,6 +64,31 @@ public:
|
||||
|
||||
static constexpr const char* default_dc = "UNKNOWN_DC";
|
||||
static constexpr const char* default_rack = "UNKNOWN_RACK";
|
||||
static constexpr const char* snitch_properties_filename = "cassandra-rackdc.properties";
|
||||
|
||||
// only these property values are supported
|
||||
static constexpr const char* dc_property_key = "dc";
|
||||
static constexpr const char* rack_property_key = "rack";
|
||||
static constexpr const char* prefer_local_property_key = "prefer_local";
|
||||
static constexpr const char* dc_suffix_property_key = "dc_suffix";
|
||||
const std::unordered_set<sstring> allowed_property_keys;
|
||||
|
||||
production_snitch_base(const sstring prop_file_name = "")
|
||||
: allowed_property_keys({ dc_property_key,
|
||||
rack_property_key,
|
||||
prefer_local_property_key,
|
||||
dc_suffix_property_key }){
|
||||
if (!prop_file_name.empty()) {
|
||||
_prop_file_name = prop_file_name;
|
||||
} else {
|
||||
using namespace boost::filesystem;
|
||||
|
||||
path def_prop_file(db::config::get_conf_dir());
|
||||
def_prop_file /= path(snitch_properties_filename);
|
||||
|
||||
_prop_file_name = def_prop_file.string();
|
||||
}
|
||||
}
|
||||
|
||||
virtual sstring get_rack(inet_address endpoint) {
|
||||
if (endpoint == utils::fb_utilities::get_broadcast_address()) {
|
||||
@@ -133,9 +163,41 @@ private:
|
||||
_my_rack = new_rack;
|
||||
}
|
||||
|
||||
void parse_property_file();
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Loads the contents of the property file into the map
|
||||
*
|
||||
* @return ready future when the file contents has been loaded.
|
||||
*/
|
||||
future<> load_property_file();
|
||||
|
||||
void throw_double_declaration(const sstring& key) const {
|
||||
logger().error("double \"{}\" declaration in {}", key, _prop_file_name);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
void throw_bad_format(const sstring& line) const {
|
||||
logger().error("Bad format in properties file {}: {}", _prop_file_name, line);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
void throw_incomplete_file() const {
|
||||
logger().error("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
|
||||
throw bad_property_file_error();
|
||||
}
|
||||
|
||||
protected:
|
||||
promise<> _io_is_stopped;
|
||||
std::experimental::optional<addr2dc_rack_map> _saved_endpoints;
|
||||
distributed<snitch_ptr>* _my_distributed = nullptr;
|
||||
std::string _prop_file_contents;
|
||||
sstring _prop_file_name;
|
||||
std::unordered_map<sstring, sstring> _prop_values;
|
||||
|
||||
private:
|
||||
lw_shared_ptr<file> _sf;
|
||||
size_t _prop_file_size;
|
||||
};
|
||||
} // namespace locator
|
||||
|
||||
Reference in New Issue
Block a user