From fb23cd1ff642ee3347c849a5e1fe477640ad56cc Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sat, 6 Apr 2019 21:35:53 +0300 Subject: [PATCH] Introduce updatable_value The updateable_value and updateable_value_source classes allow broadcasting configuration changes across the application. The updateable_value_source class represents a value that can be updated, and updateable_value tracks its source and reflects changes. A typical use replaces "uint64_t config_item" with "updateable_value config_item", and from now on changes to the source will be reflected in config_item. For more complicated uses, which must run some callback when configuration changes, you can also call config_item.observe(callback) to be actively notified of changes. --- configure.py | 1 + tests/config_test.cc | 26 +++++ tests/observable_test.cc | 2 + utils/updateable_value.cc | 113 ++++++++++++++++++++ utils/updateable_value.hh | 212 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 354 insertions(+) create mode 100644 utils/updateable_value.cc create mode 100644 utils/updateable_value.hh diff --git a/configure.py b/configure.py index d97339c22f..cd5997554c 100755 --- a/configure.py +++ b/configure.py @@ -484,6 +484,7 @@ scylla_core = (['database.cc', 'utils/large_bitset.cc', 'utils/buffer_input_stream.cc', 'utils/limiting_data_source.cc', + 'utils/updateable_value.cc', 'mutation_partition.cc', 'mutation_partition_view.cc', 'mutation_partition_serializer.cc', diff --git a/tests/config_test.cc b/tests/config_test.cc index b477788f37..52007a58f9 100644 --- a/tests/config_test.cc +++ b/tests/config_test.cc @@ -25,11 +25,37 @@ #include #include +#include #include #include "db/config.hh" +#include "utils/updateable_value.hh" +#include using namespace db; +SEASTAR_THREAD_TEST_CASE(test_updateable_value_basics) { + using namespace utils; + updateable_value_source source; + source.set(3); + updateable_value u1(source); + updateable_value u2(u1); + updateable_value u3(std::move(u2)); + u2 = u3; + BOOST_REQUIRE_EQUAL(u1.get(), 3); + BOOST_REQUIRE_EQUAL(u2.get(), 3); + BOOST_REQUIRE_EQUAL(u3.get(), 3); + unsigned called = 0; + auto u3observer = u3.observe([&] (int v) { + ++called; + BOOST_REQUIRE_EQUAL(v, 4); + }); + source.set(4); + BOOST_REQUIRE_EQUAL(u1.get(), 4); + BOOST_REQUIRE_EQUAL(u2.get(), 4); + BOOST_REQUIRE_EQUAL(u3.get(), 4); + BOOST_REQUIRE_EQUAL(called, 1); +} + // stock, default cassandra.yaml const char* cassandra_conf = R"apa( # Cassandra storage config YAML diff --git a/tests/observable_test.cc b/tests/observable_test.cc index 628cff9534..3acbcdc1a9 100644 --- a/tests/observable_test.cc +++ b/tests/observable_test.cc @@ -26,6 +26,7 @@ #include #include "../utils/observable.hh" +#include "../utils/updateable_value.hh" using namespace utils; @@ -84,3 +85,4 @@ BOOST_AUTO_TEST_CASE(test_disconnect_fully_disconnects) { // Would have accessed the overwritten observable before the bug fix. sub.disconnect(); } + diff --git a/utils/updateable_value.cc b/utils/updateable_value.cc new file mode 100644 index 0000000000..640d642dc0 --- /dev/null +++ b/utils/updateable_value.cc @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + + +#include "updateable_value.hh" +#include + +namespace utils { + +updateable_value_base::updateable_value_base(const updateable_value_source_base& source) { + source.add_ref(this); + _source = &source; +} + +updateable_value_base::~updateable_value_base() { + if (_source) { + _source->del_ref(this); + } +} + +updateable_value_base::updateable_value_base(const updateable_value_base& v) { + if (v._source) { + v._source->add_ref(this); + _source = v._source; + } +} + +updateable_value_base& +updateable_value_base::updateable_value_base::operator=(const updateable_value_base& v) { + if (this != &v) { + // If both sources are null, or non-null and equal, nothing needs to be done + if (_source != v._source) { + if (v._source) { + v._source->add_ref(this); + } + if (_source) { + _source->del_ref(this); + } + _source = v._source; + } + } + return *this; +} + +updateable_value_base::updateable_value_base(updateable_value_base&& v) noexcept + : _source(std::exchange(v._source, nullptr)) { + if (_source) { + _source->update_ref(&v, this); + } +} + +updateable_value_base& +updateable_value_base::operator=(updateable_value_base&& v) noexcept { + if (this != &v) { + if (_source) { + _source->del_ref(this); + } + _source = std::exchange(v._source, nullptr); + if (_source) { + _source->update_ref(&v, this); + } + } + return *this; +} + +void +updateable_value_source_base::for_each_ref(std::function func) { + for (auto ref : _refs) { + func(ref); + } +} + +updateable_value_source_base::~updateable_value_source_base() { + for (auto ref : _refs) { + ref->_source = nullptr; + } +} + +void +updateable_value_source_base::add_ref(updateable_value_base* ref) const { + _refs.push_back(ref); +} + +void +updateable_value_source_base::del_ref(updateable_value_base* ref) const { + _refs.erase(std::remove(_refs.begin(), _refs.end(), ref), _refs.end()); +} + +void +updateable_value_source_base::update_ref(updateable_value_base* old_ref, updateable_value_base* new_ref) const { + std::replace(_refs.begin(), _refs.end(), old_ref, new_ref); +} + +} diff --git a/utils/updateable_value.hh b/utils/updateable_value.hh new file mode 100644 index 0000000000..c3297fd790 --- /dev/null +++ b/utils/updateable_value.hh @@ -0,0 +1,212 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + + +#pragma once + +#include +#include +#include +#include "observable.hh" +#include "seastarx.hh" + +namespace utils { + +// This file contains two templates, updateable_value_source and updateable_value. +// +// The two are analogous to T and const T& respectively, with the following additional +// functionality: +// +// - updateable_value contains a copy of T, so it can be accessed without indirection +// - updateable_value and updateable_value_source track each other, so if they move, +// the references are updated +// - an observe() function is provided (to both) that can be used to attach a callback +// that is called whenever the value changes + +template +class updateable_value_source; + +class updateable_value_source_base; + +// Base class for updateable_value, containing functionality for tracking +// the update source. Used to reduce template bloat and not meant to be used +// directly. +class updateable_value_base { +protected: + const updateable_value_source_base* _source = nullptr; +public: + updateable_value_base() = default; + explicit updateable_value_base(const updateable_value_source_base& source); + ~updateable_value_base(); + updateable_value_base(const updateable_value_base&); + updateable_value_base& operator=(const updateable_value_base&); + updateable_value_base(updateable_value_base&&) noexcept; + updateable_value_base& operator=(updateable_value_base&&) noexcept; + + friend class updateable_value_source_base; +}; + + +// A T that can be updated at runtime; uses updateable_value_base to track +// the source as the object is moved or copied. Copying across shards is supported. +template +class updateable_value : public updateable_value_base { + T _value = {}; +private: + const updateable_value_source* source() const; +public: + updateable_value() = default; + explicit updateable_value(T value) : _value(std::move(value)) {} + explicit updateable_value(const updateable_value_source& source); + updateable_value(const updateable_value& v); + updateable_value& operator=(const updateable_value&); + updateable_value(updateable_value&&) noexcept; + updateable_value& operator=(updateable_value&&) noexcept; + const T& operator()() const { return _value; } + operator const T& () const { return _value; } + const T& get() const { return _value; } + observer observe(std::function callback) const; + + friend class updateable_value_source_base; + template + friend class updateable_value_source; +}; + +// Contains the mechanisms to track updateable_value_base. Used to reduce template +// bloat and not meant to be used directly. +class updateable_value_source_base { +protected: + // This class contains two different types of state: values and + // references to updateable_value_base. We consider adding and removing + // such references const operations since they don't change the logical + // state of the object (they don't allow changing the carried value). + mutable std::vector _refs; // all connected updateable_values on this shard + void for_each_ref(std::function func); +protected: + ~updateable_value_source_base(); + void add_ref(updateable_value_base* ref) const; + void del_ref(updateable_value_base* ref) const; + void update_ref(updateable_value_base* old_ref, updateable_value_base* new_ref) const; + + friend class updateable_value_base; +}; + +template +class updateable_value_source : public updateable_value_source_base { + T _value; + mutable observable _updater; + void for_each_ref(std::function*)> func) { + updateable_value_source_base::for_each_ref([func = std::move(func)] (updateable_value_base* ref) { + func(static_cast*>(ref)); + }); + }; +private: + void add_ref(updateable_value* ref) const { + updateable_value_source_base::add_ref(ref); + } + void del_ref(updateable_value* ref) const { + updateable_value_source_base::del_ref(ref); + } + void update_ref(updateable_value* old_ref, updateable_value* new_ref) const { + updateable_value_source_base::update_ref(old_ref, new_ref); + } +public: + explicit updateable_value_source(T value = T{}) + : _value(std::move(value)) {} + updateable_value_source(const updateable_value_source& x) : updateable_value_source(x.get()) { + // We can't copy x's _refs and therefore also _updater. So this is an imperfect copy. + // This copy constructor therefore breaks updates made to the original copy; it only + // exists because unit tests copy configs like mad. + } + void set(T value) { + if (value == _value) { + return; + } + _value = std::move(value); + for_each_ref([&] (updateable_value* ref) { + ref->_value = _value; + }); + _updater(_value); + } + const T& get() const { + return _value; + } + const T& operator()() const { + return _value; + } + observer observe(std::function callback) const { + return _updater.observe(std::move(callback)); + } + + friend class updateable_value_base; +}; + +template +updateable_value::updateable_value(const updateable_value_source& source) + : updateable_value_base(source) + , _value(source.get()) { +} + +template +updateable_value::updateable_value(const updateable_value& v) : updateable_value_base(v), _value(v._value) { +} + +template +updateable_value& updateable_value::operator=(const updateable_value& v) { + if (this != &v) { + // Copy early to trigger exceptions, later move + auto new_val = v._value; + updateable_value_base::operator=(v); + _value = std::move(new_val); + } + return *this; +} + +template +updateable_value::updateable_value(updateable_value&& v) noexcept + : updateable_value_base(v) + , _value(std::move(v._value)) { +} + +template +updateable_value& updateable_value::operator=(updateable_value&& v) noexcept { + if (this != &v) { + updateable_value_base::operator=(std::move(v)); + _value = std::move(v._value); + } + return *this; +} + +template +inline +const updateable_value_source* +updateable_value::source() const { + return static_cast*>(_source); +} + +template +observer +updateable_value::observe(std::function callback) const { + return source()->observe(std::move(callback)); +} + +}