diff --git a/db/view/node_view_update_backlog.hh b/db/view/node_view_update_backlog.hh new file mode 100644 index 0000000000..a02f75b7eb --- /dev/null +++ b/db/view/node_view_update_backlog.hh @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "db/view/view_update_backlog.hh" + +#include +#include + +#include +#include +#include + +namespace db::view { + +/** + * An atomic view update backlog representation, safe to update from multiple shards. + * It is legal for a stale current max value to be returned. + */ +class node_update_backlog { + using clock = seastar::lowres_clock; + struct per_shard_backlog { + // Multiply by 2 to defeat the prefetcher + alignas(seastar::cache_line_size * 2) std::atomic backlog = update_backlog::no_backlog(); + + update_backlog load() const { + return backlog.load(std::memory_order_relaxed); + } + }; + std::vector _backlogs; + std::chrono::milliseconds _interval; + std::atomic _last_update; + std::atomic _max; + +public: + explicit node_update_backlog(size_t shards, std::chrono::milliseconds interval) + : _backlogs(shards) + , _interval(interval) + , _last_update(clock::now() - _interval) + , _max(update_backlog::no_backlog()) { + } + + update_backlog add_fetch(unsigned shard, update_backlog backlog); + + // Exposed for testing only. + update_backlog load() const { + return _max.load(std::memory_order_relaxed); + } +}; + +} diff --git a/db/view/view.cc b/db/view/view.cc index 1fe1b66679..ddf296ba8b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1643,5 +1643,22 @@ future<> view_builder::wait_until_built(const sstring& ks_name, const sstring& v }); } +update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog backlog) { + _backlogs[shard].backlog.store(backlog, std::memory_order_relaxed); + auto now = clock::now(); + if (now >= _last_update.load(std::memory_order_relaxed) + _interval) { + _last_update.store(now, std::memory_order_relaxed); + auto new_max = boost::accumulate( + _backlogs, + update_backlog::no_backlog(), + [] (const update_backlog& lhs, const per_shard_backlog& rhs) { + return std::max(lhs, rhs.load()); + }); + _max.store(new_max, std::memory_order_relaxed); + return new_max; + } + return std::max(backlog, _max.load(std::memory_order_relaxed)); +} + } // namespace view } // namespace db