Introduce updatable_value
The updateable_value and updateable_value_source classes allow broadcasting configuration changes across the application. The updateable_value_source class represents a value that can be updated, and updateable_value tracks its source and reflects changes. A typical use replaces "uint64_t config_item" with "updateable_value<uint64_t> config_item", and from now on changes to the source will be reflected in config_item. For more complicated uses, which must run some callback when configuration changes, you can also call config_item.observe(callback) to be actively notified of changes.
This commit is contained in:
@@ -484,6 +484,7 @@ scylla_core = (['database.cc',
|
||||
'utils/large_bitset.cc',
|
||||
'utils/buffer_input_stream.cc',
|
||||
'utils/limiting_data_source.cc',
|
||||
'utils/updateable_value.cc',
|
||||
'mutation_partition.cc',
|
||||
'mutation_partition_view.cc',
|
||||
'mutation_partition_serializer.cc',
|
||||
|
||||
@@ -25,11 +25,37 @@
|
||||
#include <iostream>
|
||||
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include "db/config.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <algorithm>
|
||||
|
||||
using namespace db;
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_updateable_value_basics) {
|
||||
using namespace utils;
|
||||
updateable_value_source<int> source;
|
||||
source.set(3);
|
||||
updateable_value<int> u1(source);
|
||||
updateable_value<int> u2(u1);
|
||||
updateable_value<int> u3(std::move(u2));
|
||||
u2 = u3;
|
||||
BOOST_REQUIRE_EQUAL(u1.get(), 3);
|
||||
BOOST_REQUIRE_EQUAL(u2.get(), 3);
|
||||
BOOST_REQUIRE_EQUAL(u3.get(), 3);
|
||||
unsigned called = 0;
|
||||
auto u3observer = u3.observe([&] (int v) {
|
||||
++called;
|
||||
BOOST_REQUIRE_EQUAL(v, 4);
|
||||
});
|
||||
source.set(4);
|
||||
BOOST_REQUIRE_EQUAL(u1.get(), 4);
|
||||
BOOST_REQUIRE_EQUAL(u2.get(), 4);
|
||||
BOOST_REQUIRE_EQUAL(u3.get(), 4);
|
||||
BOOST_REQUIRE_EQUAL(called, 1);
|
||||
}
|
||||
|
||||
// stock, default cassandra.yaml
|
||||
const char* cassandra_conf = R"apa(
|
||||
# Cassandra storage config YAML
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <numeric>
|
||||
|
||||
#include "../utils/observable.hh"
|
||||
#include "../utils/updateable_value.hh"
|
||||
|
||||
using namespace utils;
|
||||
|
||||
@@ -84,3 +85,4 @@ BOOST_AUTO_TEST_CASE(test_disconnect_fully_disconnects) {
|
||||
// Would have accessed the overwritten observable before the bug fix.
|
||||
sub.disconnect();
|
||||
}
|
||||
|
||||
|
||||
113
utils/updateable_value.cc
Normal file
113
utils/updateable_value.cc
Normal file
@@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
|
||||
|
||||
#include "updateable_value.hh"
|
||||
#include <seastar/core/reactor.hh>
|
||||
|
||||
namespace utils {
|
||||
|
||||
updateable_value_base::updateable_value_base(const updateable_value_source_base& source) {
|
||||
source.add_ref(this);
|
||||
_source = &source;
|
||||
}
|
||||
|
||||
updateable_value_base::~updateable_value_base() {
|
||||
if (_source) {
|
||||
_source->del_ref(this);
|
||||
}
|
||||
}
|
||||
|
||||
updateable_value_base::updateable_value_base(const updateable_value_base& v) {
|
||||
if (v._source) {
|
||||
v._source->add_ref(this);
|
||||
_source = v._source;
|
||||
}
|
||||
}
|
||||
|
||||
updateable_value_base&
|
||||
updateable_value_base::updateable_value_base::operator=(const updateable_value_base& v) {
|
||||
if (this != &v) {
|
||||
// If both sources are null, or non-null and equal, nothing needs to be done
|
||||
if (_source != v._source) {
|
||||
if (v._source) {
|
||||
v._source->add_ref(this);
|
||||
}
|
||||
if (_source) {
|
||||
_source->del_ref(this);
|
||||
}
|
||||
_source = v._source;
|
||||
}
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
updateable_value_base::updateable_value_base(updateable_value_base&& v) noexcept
|
||||
: _source(std::exchange(v._source, nullptr)) {
|
||||
if (_source) {
|
||||
_source->update_ref(&v, this);
|
||||
}
|
||||
}
|
||||
|
||||
updateable_value_base&
|
||||
updateable_value_base::operator=(updateable_value_base&& v) noexcept {
|
||||
if (this != &v) {
|
||||
if (_source) {
|
||||
_source->del_ref(this);
|
||||
}
|
||||
_source = std::exchange(v._source, nullptr);
|
||||
if (_source) {
|
||||
_source->update_ref(&v, this);
|
||||
}
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
void
|
||||
updateable_value_source_base::for_each_ref(std::function<void (updateable_value_base* ref)> func) {
|
||||
for (auto ref : _refs) {
|
||||
func(ref);
|
||||
}
|
||||
}
|
||||
|
||||
updateable_value_source_base::~updateable_value_source_base() {
|
||||
for (auto ref : _refs) {
|
||||
ref->_source = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
updateable_value_source_base::add_ref(updateable_value_base* ref) const {
|
||||
_refs.push_back(ref);
|
||||
}
|
||||
|
||||
void
|
||||
updateable_value_source_base::del_ref(updateable_value_base* ref) const {
|
||||
_refs.erase(std::remove(_refs.begin(), _refs.end(), ref), _refs.end());
|
||||
}
|
||||
|
||||
void
|
||||
updateable_value_source_base::update_ref(updateable_value_base* old_ref, updateable_value_base* new_ref) const {
|
||||
std::replace(_refs.begin(), _refs.end(), old_ref, new_ref);
|
||||
}
|
||||
|
||||
}
|
||||
212
utils/updateable_value.hh
Normal file
212
utils/updateable_value.hh
Normal file
@@ -0,0 +1,212 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
#include "observable.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace utils {
|
||||
|
||||
// This file contains two templates, updateable_value_source<T> and updateable_value<T>.
|
||||
//
|
||||
// The two are analogous to T and const T& respectively, with the following additional
|
||||
// functionality:
|
||||
//
|
||||
// - updateable_value contains a copy of T, so it can be accessed without indirection
|
||||
// - updateable_value and updateable_value_source track each other, so if they move,
|
||||
// the references are updated
|
||||
// - an observe() function is provided (to both) that can be used to attach a callback
|
||||
// that is called whenever the value changes
|
||||
|
||||
template <typename T>
|
||||
class updateable_value_source;
|
||||
|
||||
class updateable_value_source_base;
|
||||
|
||||
// Base class for updateable_value<T>, containing functionality for tracking
|
||||
// the update source. Used to reduce template bloat and not meant to be used
|
||||
// directly.
|
||||
class updateable_value_base {
|
||||
protected:
|
||||
const updateable_value_source_base* _source = nullptr;
|
||||
public:
|
||||
updateable_value_base() = default;
|
||||
explicit updateable_value_base(const updateable_value_source_base& source);
|
||||
~updateable_value_base();
|
||||
updateable_value_base(const updateable_value_base&);
|
||||
updateable_value_base& operator=(const updateable_value_base&);
|
||||
updateable_value_base(updateable_value_base&&) noexcept;
|
||||
updateable_value_base& operator=(updateable_value_base&&) noexcept;
|
||||
|
||||
friend class updateable_value_source_base;
|
||||
};
|
||||
|
||||
|
||||
// A T that can be updated at runtime; uses updateable_value_base to track
|
||||
// the source as the object is moved or copied. Copying across shards is supported.
|
||||
template <typename T>
|
||||
class updateable_value : public updateable_value_base {
|
||||
T _value = {};
|
||||
private:
|
||||
const updateable_value_source<T>* source() const;
|
||||
public:
|
||||
updateable_value() = default;
|
||||
explicit updateable_value(T value) : _value(std::move(value)) {}
|
||||
explicit updateable_value(const updateable_value_source<T>& source);
|
||||
updateable_value(const updateable_value& v);
|
||||
updateable_value& operator=(const updateable_value&);
|
||||
updateable_value(updateable_value&&) noexcept;
|
||||
updateable_value& operator=(updateable_value&&) noexcept;
|
||||
const T& operator()() const { return _value; }
|
||||
operator const T& () const { return _value; }
|
||||
const T& get() const { return _value; }
|
||||
observer<T> observe(std::function<void (const T&)> callback) const;
|
||||
|
||||
friend class updateable_value_source_base;
|
||||
template <typename U>
|
||||
friend class updateable_value_source;
|
||||
};
|
||||
|
||||
// Contains the mechanisms to track updateable_value_base. Used to reduce template
|
||||
// bloat and not meant to be used directly.
|
||||
class updateable_value_source_base {
|
||||
protected:
|
||||
// This class contains two different types of state: values and
|
||||
// references to updateable_value_base. We consider adding and removing
|
||||
// such references const operations since they don't change the logical
|
||||
// state of the object (they don't allow changing the carried value).
|
||||
mutable std::vector<updateable_value_base*> _refs; // all connected updateable_values on this shard
|
||||
void for_each_ref(std::function<void (updateable_value_base* ref)> func);
|
||||
protected:
|
||||
~updateable_value_source_base();
|
||||
void add_ref(updateable_value_base* ref) const;
|
||||
void del_ref(updateable_value_base* ref) const;
|
||||
void update_ref(updateable_value_base* old_ref, updateable_value_base* new_ref) const;
|
||||
|
||||
friend class updateable_value_base;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class updateable_value_source : public updateable_value_source_base {
|
||||
T _value;
|
||||
mutable observable<T> _updater;
|
||||
void for_each_ref(std::function<void (updateable_value<T>*)> func) {
|
||||
updateable_value_source_base::for_each_ref([func = std::move(func)] (updateable_value_base* ref) {
|
||||
func(static_cast<updateable_value<T>*>(ref));
|
||||
});
|
||||
};
|
||||
private:
|
||||
void add_ref(updateable_value<T>* ref) const {
|
||||
updateable_value_source_base::add_ref(ref);
|
||||
}
|
||||
void del_ref(updateable_value<T>* ref) const {
|
||||
updateable_value_source_base::del_ref(ref);
|
||||
}
|
||||
void update_ref(updateable_value<T>* old_ref, updateable_value<T>* new_ref) const {
|
||||
updateable_value_source_base::update_ref(old_ref, new_ref);
|
||||
}
|
||||
public:
|
||||
explicit updateable_value_source(T value = T{})
|
||||
: _value(std::move(value)) {}
|
||||
updateable_value_source(const updateable_value_source& x) : updateable_value_source(x.get()) {
|
||||
// We can't copy x's _refs and therefore also _updater. So this is an imperfect copy.
|
||||
// This copy constructor therefore breaks updates made to the original copy; it only
|
||||
// exists because unit tests copy configs like mad.
|
||||
}
|
||||
void set(T value) {
|
||||
if (value == _value) {
|
||||
return;
|
||||
}
|
||||
_value = std::move(value);
|
||||
for_each_ref([&] (updateable_value<T>* ref) {
|
||||
ref->_value = _value;
|
||||
});
|
||||
_updater(_value);
|
||||
}
|
||||
const T& get() const {
|
||||
return _value;
|
||||
}
|
||||
const T& operator()() const {
|
||||
return _value;
|
||||
}
|
||||
observer<T> observe(std::function<void (const T&)> callback) const {
|
||||
return _updater.observe(std::move(callback));
|
||||
}
|
||||
|
||||
friend class updateable_value_base;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
updateable_value<T>::updateable_value(const updateable_value_source<T>& source)
|
||||
: updateable_value_base(source)
|
||||
, _value(source.get()) {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
updateable_value<T>::updateable_value(const updateable_value& v) : updateable_value_base(v), _value(v._value) {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
updateable_value<T>& updateable_value<T>::operator=(const updateable_value& v) {
|
||||
if (this != &v) {
|
||||
// Copy early to trigger exceptions, later move
|
||||
auto new_val = v._value;
|
||||
updateable_value_base::operator=(v);
|
||||
_value = std::move(new_val);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
updateable_value<T>::updateable_value(updateable_value&& v) noexcept
|
||||
: updateable_value_base(v)
|
||||
, _value(std::move(v._value)) {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
updateable_value<T>& updateable_value<T>::operator=(updateable_value&& v) noexcept {
|
||||
if (this != &v) {
|
||||
updateable_value_base::operator=(std::move(v));
|
||||
_value = std::move(v._value);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
inline
|
||||
const updateable_value_source<T>*
|
||||
updateable_value<T>::source() const {
|
||||
return static_cast<const updateable_value_source<T>*>(_source);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
observer<T>
|
||||
updateable_value<T>::observe(std::function<void (const T&)> callback) const {
|
||||
return source()->observe(std::move(callback));
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user