db/view: Introduce view_builder
This patch introduces the view_builder class, a sharded service
responsible for building all defined materialized views. This process
entails walking over the existing data in a given base table, and using
it to calculate and insert the respective entries for one or more views.
This patch introduces only the bootstrap functionality, which is
responsible for loading the data stored in the system tables and
filling the in-memory data structures with the relevant information,
to be used in subsequent patches for the actual view building. The
interaction with the system tables is as follows.
Interaction with the tables in system_keyspace:
- When we start building a view, we add an entry to the
scylla_views_builds_in_progress system table. If the node restarts
at this point, we'll consider these newly inserted views as having
made no progress, and we'll treat them as new views;
- When we finish a build step, we update the progress of the views
that we built during this step by writing the next token to the
scylla_views_builds_in_progress table. If the node restarts here,
we'll start building the views at the token in the next_token
column.
- When we finish building a view, we mark it as completed in the
built views system table, and remove it from the in-progress system
table. Under failure, the following can happen:
* When we fail to mark the view as built, we'll redo the last
step upon node reboot;
* When we fail to delete the in-progress record, upon reboot
we'll remove this record.
A view is marked as completed only when all shards have finished
their share of the work, that is, if a view is not built, then all
shards will still have an entry in the in-progress system table;
- A view that a shard finished building, but not all other shards,
remains in the in-progress system table, with first_token ==
next_token.
Interaction with the distributed system table (view_build_status):
- When we start building a view, we mark the view build as being
in-progress;
- When we finish building a view, we mark the view as being built.
Upon failure, we ensure that if the view is in the in-progress
system table, then it may not have been written to this table. We
don't load the built views from this table when starting. When
starting, the following happens:
* If the view is in the system.built_views table and not the
in-progress system table, then it will be in view_build_status;
* If the view is in the system.built_views table and not in
this one, it will still be in the in-progress system table -
we detect this and mark it as built in this table too,
keeping the invariant;
* If the view is in this table but not in system.built_views,
then it will also be in the in-progress system table - we
don't detect this and will redo the missing step, for
simplicity.
View building is necessarily a sharded process. That means that on
restart, if the number of shards has changed, we need to calculate
the most conservative token range that has been built, and build
the remainder.
Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
268
db/view/view.cc
268
db/view/view.cc
@@ -39,22 +39,33 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/range/algorithm/find_if.hpp>
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
#include <boost/range/algorithm/transform.hpp>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
|
||||
#include <seastar/core/future-util.hh>
|
||||
|
||||
#include "database.hh"
|
||||
#include "clustering_bounds_comparator.hh"
|
||||
#include "cql3/statements/select_statement.hh"
|
||||
#include "cql3/util.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "keys.hh"
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "view_info.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
static logging::logger vlogger("view");
|
||||
|
||||
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
|
||||
@@ -904,6 +915,261 @@ future<> mutate_MV(const dht::token& base_token,
|
||||
return f.finally([fs = std::move(fs)] { });
|
||||
}
|
||||
|
||||
view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks)
|
||||
: _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks) {
|
||||
}
|
||||
|
||||
future<> view_builder::start() {
|
||||
return seastar::async([this] {
|
||||
auto built = system_keyspace::load_built_views().get0();
|
||||
auto in_progress = system_keyspace::load_view_build_progress().get0();
|
||||
calculate_shard_build_step(std::move(built), std::move(in_progress)).get();
|
||||
_current_step = _base_to_build_step.begin();
|
||||
_build_step.trigger();
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::stop() {
|
||||
vlogger.info("Stopping view builder");
|
||||
return _build_step.join();
|
||||
}
|
||||
|
||||
static query::partition_slice make_partition_slice(const schema& s) {
|
||||
query::partition_slice::option_set opts;
|
||||
opts.set(query::partition_slice::option::send_partition_key);
|
||||
opts.set(query::partition_slice::option::send_clustering_key);
|
||||
opts.set(query::partition_slice::option::send_timestamp);
|
||||
opts.set(query::partition_slice::option::send_ttl);
|
||||
return query::partition_slice(
|
||||
{query::full_clustering_range},
|
||||
{ },
|
||||
boost::copy_range<std::vector<column_id>>(s.regular_columns()
|
||||
| boost::adaptors::transformed(std::mem_fn(&column_definition::id))),
|
||||
std::move(opts));
|
||||
}
|
||||
|
||||
view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID base_id) {
|
||||
auto it = _base_to_build_step.find(base_id);
|
||||
if (it == _base_to_build_step.end()) {
|
||||
auto base = _db.find_column_family(base_id).shared_from_this();
|
||||
auto p = _base_to_build_step.emplace(base_id, build_step{base, make_partition_slice(*base->schema())});
|
||||
// Iterators could have been invalidated if there was rehashing, so just reset the cursor.
|
||||
_current_step = p.first;
|
||||
it = p.first;
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void view_builder::initialize_reader_at_current_token(build_step& step) {
|
||||
step.pslice = make_partition_slice(*step.base->schema());
|
||||
step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max());
|
||||
step.reader = make_local_shard_sstable_reader(
|
||||
step.base->schema(),
|
||||
make_lw_shared(sstables::sstable_set(step.base->get_sstable_set())),
|
||||
step.prange,
|
||||
step.pslice,
|
||||
default_priority_class(),
|
||||
no_resource_tracking(),
|
||||
nullptr,
|
||||
streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
void view_builder::load_view_status(view_builder::view_build_status status, std::unordered_set<utils::UUID>& loaded_views) {
|
||||
if (!status.next_token) {
|
||||
// No progress was made on this view, so we'll treat it as new.
|
||||
return;
|
||||
}
|
||||
vlogger.info0("Resuming to build view {}.{} at {}", status.view->ks_name(), status.view->cf_name(), *status.next_token);
|
||||
loaded_views.insert(status.view->id());
|
||||
if (status.first_token == *status.next_token) {
|
||||
// Completed, so nothing to do for this shard. Consider the view
|
||||
// as loaded and not as a new view.
|
||||
_built_views.emplace(status.view->id());
|
||||
return;
|
||||
}
|
||||
get_or_create_build_step(status.view->view_info()->base_id()).build_status.emplace_back(std::move(status));
|
||||
}
|
||||
|
||||
void view_builder::reshard(
|
||||
std::vector<std::vector<view_builder::view_build_status>> view_build_status_per_shard,
|
||||
std::unordered_set<utils::UUID>& loaded_views) {
|
||||
// We must reshard. We aim for a simple algorithm, a step above not starting from scratch.
|
||||
// Shards build entries at different paces, so both first and last tokens will differ. We
|
||||
// want to be conservative when selecting the range that has been built. To do that, we
|
||||
// select the intersection of all the previous shard's ranges for each view.
|
||||
struct view_ptr_hash {
|
||||
std::size_t operator()(const view_ptr& v) const noexcept {
|
||||
return std::hash<utils::UUID>()(v->id());
|
||||
}
|
||||
};
|
||||
struct view_ptr_equals {
|
||||
bool operator()(const view_ptr& v1, const view_ptr& v2) const noexcept {
|
||||
return v1->id() == v2->id();
|
||||
}
|
||||
};
|
||||
std::unordered_map<view_ptr, stdx::optional<nonwrapping_range<dht::token>>, view_ptr_hash, view_ptr_equals> my_status;
|
||||
for (auto& shard_status : view_build_status_per_shard) {
|
||||
for (auto& [view, first_token, next_token] : shard_status ) {
|
||||
// We start from an open-ended range, which we'll try to restrict.
|
||||
auto& my_range = my_status.emplace(
|
||||
std::move(view),
|
||||
nonwrapping_range<dht::token>::make_open_ended_both_sides()).first->second;
|
||||
if (!next_token || !my_range) {
|
||||
// A previous shard made no progress, so for this view we'll start over.
|
||||
my_range = stdx::nullopt;
|
||||
continue;
|
||||
}
|
||||
if (first_token == *next_token) {
|
||||
// Completed, so don't consider this shard's progress. We know that if the view
|
||||
// is marked as in-progress, then at least one shard will have a non-full range.
|
||||
continue;
|
||||
}
|
||||
wrapping_range<dht::token> other_range(first_token, *next_token);
|
||||
if (other_range.is_wrap_around(dht::token_comparator())) {
|
||||
// The intersection of a wrapping range with a non-wrapping range may yield more
|
||||
// multiple non-contiguous ranges. To avoid the complexity of dealing with more
|
||||
// than one range, we'll just take one of the intersections.
|
||||
auto [bottom_range, top_range] = other_range.unwrap();
|
||||
if (auto bottom_int = my_range->intersection(nonwrapping_range(std::move(bottom_range)), dht::token_comparator())) {
|
||||
my_range = std::move(bottom_int);
|
||||
} else {
|
||||
my_range = my_range->intersection(nonwrapping_range(std::move(top_range)), dht::token_comparator());
|
||||
}
|
||||
} else {
|
||||
my_range = my_range->intersection(nonwrapping_range(std::move(other_range)), dht::token_comparator());
|
||||
}
|
||||
}
|
||||
}
|
||||
view_builder::base_to_build_step_type build_step;
|
||||
for (auto& [view, opt_range] : my_status) {
|
||||
if (!opt_range) {
|
||||
continue; // Treat it as a new table.
|
||||
}
|
||||
auto start_bound = opt_range->start() ? std::move(opt_range->start()->value()) : dht::minimum_token();
|
||||
auto end_bound = opt_range->end() ? std::move(opt_range->end()->value()) : dht::minimum_token();
|
||||
auto s = view_build_status{std::move(view), std::move(start_bound), std::move(end_bound)};
|
||||
load_view_status(std::move(s), loaded_views);
|
||||
}
|
||||
}
|
||||
|
||||
future<> view_builder::calculate_shard_build_step(
|
||||
std::vector<system_keyspace::view_name> built,
|
||||
std::vector<system_keyspace::view_build_progress> in_progress) {
|
||||
// Shard 0 makes cleanup changes to the system tables, but none that could conflict
|
||||
// with the other shards; everyone is thus able to proceed independently.
|
||||
auto bookkeeping_ops = std::make_unique<std::vector<future<>>>();
|
||||
auto base_table_exists = [&, this] (const view_ptr& view) {
|
||||
// This is a safety check in case this node missed a create MV statement
|
||||
// but got a drop table for the base, and another node didn't get the
|
||||
// drop notification and sent us the view schema.
|
||||
try {
|
||||
_db.find_schema(view->view_info()->base_id());
|
||||
return true;
|
||||
} catch (const no_such_column_family&) {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
auto maybe_fetch_view = [&, this] (system_keyspace::view_name& name) {
|
||||
try {
|
||||
auto s = _db.find_schema(name.first, name.second);
|
||||
if (s->is_view()) {
|
||||
auto view = view_ptr(std::move(s));
|
||||
if (base_table_exists(view)) {
|
||||
return view;
|
||||
}
|
||||
}
|
||||
// The view was dropped and a table was re-created with the same name,
|
||||
// but the write to the view-related system tables didn't make it.
|
||||
} catch (const no_such_column_family&) {
|
||||
// Fall-through
|
||||
}
|
||||
if (engine().cpu_id() == 0) {
|
||||
bookkeeping_ops->push_back(_sys_dist_ks.remove_view(name.first, name.second));
|
||||
bookkeeping_ops->push_back(system_keyspace::remove_built_view(name.first, name.second));
|
||||
bookkeeping_ops->push_back(
|
||||
system_keyspace::remove_view_build_progress_across_all_shards(
|
||||
std::move(name.first),
|
||||
std::move(name.second)));
|
||||
}
|
||||
return view_ptr(nullptr);
|
||||
};
|
||||
|
||||
auto built_views = boost::copy_range<std::unordered_set<utils::UUID>>(built
|
||||
| boost::adaptors::transformed(maybe_fetch_view)
|
||||
| boost::adaptors::filtered([] (const view_ptr& v) { return bool(v); })
|
||||
| boost::adaptors::transformed([] (const view_ptr& v) { return v->id(); }));
|
||||
|
||||
std::vector<std::vector<view_build_status>> view_build_status_per_shard;
|
||||
for (auto& [view_name, first_token, next_token_opt, cpu_id] : in_progress) {
|
||||
if (auto view = maybe_fetch_view(view_name)) {
|
||||
if (built_views.find(view->id()) != built_views.end()) {
|
||||
if (engine().cpu_id() == 0) {
|
||||
auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([view = std::move(view)] {
|
||||
system_keyspace::remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name());
|
||||
});
|
||||
bookkeeping_ops->push_back(std::move(f));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
view_build_status_per_shard.resize(std::max(view_build_status_per_shard.size(), size_t(cpu_id + 1)));
|
||||
view_build_status_per_shard[cpu_id].emplace_back(view_build_status{
|
||||
std::move(view),
|
||||
std::move(first_token),
|
||||
std::move(next_token_opt)});
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<utils::UUID> loaded_views;
|
||||
if (view_build_status_per_shard.size() != smp::count) {
|
||||
reshard(std::move(view_build_status_per_shard), loaded_views);
|
||||
} else if (!view_build_status_per_shard.empty()) {
|
||||
for (auto& status : view_build_status_per_shard[engine().cpu_id()]) {
|
||||
load_view_status(std::move(status), loaded_views);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& [_, build_step] : _base_to_build_step) {
|
||||
boost::sort(build_step.build_status, [] (view_build_status s1, view_build_status s2) {
|
||||
return *s1.next_token < *s2.next_token;
|
||||
});
|
||||
if (!build_step.build_status.empty()) {
|
||||
build_step.current_key = dht::decorated_key{*build_step.build_status.front().next_token, partition_key::make_empty()};
|
||||
}
|
||||
}
|
||||
|
||||
auto all_views = _db.get_views();
|
||||
auto is_new = [&] (const view_ptr& v) {
|
||||
return base_table_exists(v) && loaded_views.find(v->id()) == loaded_views.end()
|
||||
&& built_views.find(v->id()) == built_views.end();
|
||||
};
|
||||
for (auto&& view : all_views | boost::adaptors::filtered(is_new)) {
|
||||
bookkeeping_ops->push_back(add_new_view(view, get_or_create_build_step(view->view_info()->base_id())));
|
||||
}
|
||||
|
||||
for (auto& [_, build_step] : _base_to_build_step) {
|
||||
initialize_reader_at_current_token(build_step);
|
||||
}
|
||||
|
||||
auto f = seastar::when_all_succeed(bookkeeping_ops->begin(), bookkeeping_ops->end());
|
||||
return f.handle_exception([bookkeeping_ops = std::move(bookkeeping_ops)] (std::exception_ptr ep) {
|
||||
vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::add_new_view(view_ptr view, build_step& step) {
|
||||
vlogger.info0("Building view {}.{}, starting at token {}", view->ks_name(), view->cf_name(), step.current_token());
|
||||
step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt});
|
||||
return when_all_succeed(
|
||||
system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token()),
|
||||
_sys_dist_ks.start_view_build(view->ks_name(), view->cf_name()));
|
||||
}
|
||||
|
||||
future<> view_builder::do_build_step() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
} // namespace view
|
||||
} // namespace db
|
||||
|
||||
|
||||
162
db/view/view_builder.hh
Normal file
162
db/view/view_builder.hh
Normal file
@@ -0,0 +1,162 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "database_fwd.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "keys.hh"
|
||||
#include "query-request.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace db::view {
|
||||
|
||||
/**
|
||||
* The view_builder is a sharded service responsible for building all defined materialized views.
|
||||
* This process entails walking over the existing data in a given base table, and using it to
|
||||
* calculate and insert the respective entries for one or more views.
|
||||
*
|
||||
* We employ a flat_mutation_reader for each base table for which we're building views.
|
||||
*
|
||||
* View building is necessarily a sharded process. That means that on restart, if the number of shards
|
||||
* has changed, we need to calculate the most conservative token range that has been built, and build
|
||||
* the remainder.
|
||||
*
|
||||
* Interaction with the system tables:
|
||||
* - When we start building a view, we add an entry to the scylla_views_builds_in_progress
|
||||
* system table. If the node restarts at this point, we'll consider these newly inserted
|
||||
* views as having made no progress, and we'll treat them as new views;
|
||||
* - When we finish a build step, we update the progress of the views that we built during
|
||||
* this step by writing the next token to the scylla_views_builds_in_progress table. If
|
||||
* the node restarts here, we'll start building the views at the token in the next_token column.
|
||||
* - When we finish building a view, we mark it as completed in the built views system table, and
|
||||
* remove it from the in-progress system table. Under failure, the following can happen:
|
||||
* * When we fail to mark the view as built, we'll redo the last step upon node reboot;
|
||||
* * When we fail to delete the in-progress record, upon reboot we'll remove this record.
|
||||
* A view is marked as completed only when all shards have finished their share of the work, that is,
|
||||
* if a view is not built, then all shards will still have an entry in the in-progress system table,
|
||||
* - A view that a shard finished building, but not all other shards, remains in the in-progress system
|
||||
* table, with first_token == next_token.
|
||||
* Interaction with the distributed system table (view_build_status):
|
||||
* - When we start building a view, we mark the view build as being in-progress;
|
||||
* - When we finish building a view, we mark the view as being built. Upon failure,
|
||||
* we ensure that if the view is in the in-progress system table, then it may not
|
||||
* have been written to this table. We don't load the built views from this table
|
||||
* when starting. When starting, the following happens:
|
||||
* * If the view is in the system.built_views table and not the in-progress
|
||||
* system table, then it will be in view_build_status;
|
||||
* * If the view is in the system.built_views table and not in this one, it
|
||||
* will still be in the in-progress system table - we detect this and mark
|
||||
* it as built in this table too, keeping the invariant;
|
||||
* * If the view is in this table but not in system.built_views, then it will
|
||||
* also be in the in-progress system table - we don't detect this and will
|
||||
* redo the missing step, for simplicity.
|
||||
*/
|
||||
class view_builder final {
|
||||
/**
|
||||
* Keeps track of the build progress for a particular view.
|
||||
* When the view is built, next_token == first_token.
|
||||
*/
|
||||
struct view_build_status final {
|
||||
view_ptr view;
|
||||
dht::token first_token;
|
||||
std::optional<dht::token> next_token;
|
||||
};
|
||||
|
||||
/**
|
||||
* Keeps track of the build progress for all the views of a particular
|
||||
* base table. Each execution of the build step comprises a query of
|
||||
* the base table for the selected range.
|
||||
*
|
||||
* We pin the set of sstables that potentially contain data that should be added to a
|
||||
* view (they are pinned by the flat_mutation_reader). Adding a view v' overwrites the
|
||||
* set of pinned sstables, regardless of there being another view v'' being built. The
|
||||
* new set will potentially contain new data already in v'', written as part of the write
|
||||
* path. We assume this case is rare and optimize for fewer disk space in detriment of
|
||||
* network bandwidth.
|
||||
*/
|
||||
struct build_step final {
|
||||
// Ensure we pin the column_family. It may happen that all views are removed,
|
||||
// and that the base table is too before we can detect it.
|
||||
lw_shared_ptr<column_family> base;
|
||||
query::partition_slice pslice;
|
||||
dht::partition_range prange;
|
||||
flat_mutation_reader reader{nullptr};
|
||||
dht::decorated_key current_key{dht::minimum_token(), partition_key::make_empty()};
|
||||
std::vector<view_build_status> build_status;
|
||||
|
||||
const dht::token& current_token() const {
|
||||
return current_key.token();
|
||||
}
|
||||
};
|
||||
|
||||
using base_to_build_step_type = std::unordered_map<utils::UUID, build_step>;
|
||||
|
||||
database& _db;
|
||||
db::system_distributed_keyspace& _sys_dist_ks;
|
||||
base_to_build_step_type _base_to_build_step;
|
||||
base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
|
||||
serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
|
||||
|
||||
public:
|
||||
view_builder(database&, db::system_distributed_keyspace&);
|
||||
view_builder(view_builder&&) = delete;
|
||||
|
||||
/**
|
||||
* Loads the state stored in the system tables to resume building the existing views.
|
||||
* Requires that all views have been loaded from the system tables and are accessible
|
||||
* through the database, and that the commitlog has been replayed.
|
||||
*/
|
||||
future<> start();
|
||||
|
||||
/**
|
||||
* Stops the view building process.
|
||||
*/
|
||||
future<> stop();
|
||||
|
||||
private:
|
||||
build_step& get_or_create_build_step(utils::UUID);
|
||||
void initialize_reader_at_current_token(build_step&);
|
||||
void load_view_status(view_build_status, std::unordered_set<utils::UUID>&);
|
||||
void reshard(std::vector<std::vector<view_build_status>>, std::unordered_set<utils::UUID>&);
|
||||
future<> calculate_shard_build_step(std::vector<system_keyspace::view_name>, std::vector<system_keyspace::view_build_progress>);
|
||||
future<> add_new_view(view_ptr, build_step&);
|
||||
future<> do_build_step();
|
||||
};
|
||||
|
||||
}
|
||||
10
main.cc
10
main.cc
@@ -40,6 +40,7 @@
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "db/hints/manager.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "utils/runtime.hh"
|
||||
#include "utils/file_lock.hh"
|
||||
#include "log.hh"
|
||||
@@ -691,6 +692,11 @@ int main(int ac, char** av) {
|
||||
proxy.invoke_on_all([] (service::storage_proxy& local_proxy) { local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this()); }).get();
|
||||
}
|
||||
|
||||
static sharded<db::view::view_builder> view_builder;
|
||||
supervisor::notify("starting the view builder");
|
||||
view_builder.start(std::ref(db), std::ref(sys_dist_ks)).get();
|
||||
view_builder.invoke_on_all(&db::view::view_builder::start).get();
|
||||
|
||||
supervisor::notify("starting native transport");
|
||||
service::get_local_storage_service().start_native_transport().get();
|
||||
if (start_thrift) {
|
||||
@@ -721,6 +727,10 @@ int main(int ac, char** av) {
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
});
|
||||
|
||||
engine().at_exit([] {
|
||||
return view_builder.stop();
|
||||
});
|
||||
|
||||
engine().at_exit([&db] {
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
|
||||
Reference in New Issue
Block a user