Files
scylla/db/view/view_update_generator.hh
Asias He d3034e0fab view_update_generator: Increase the registration_queue_size
When repair writes a sstable to disk, we check if the sstable needs view
update processing. If yes, the sstable will be placed into the staging
dir for processing, with the _registration_sem semaphore to prevent too
many pending unprocessed sstables.

We have seen multiple cases in the field where view update processing is
inefficient and way too slow which blocks the base table repair to
finish on time.

This patch increases the registration_queue_size to a bigger number to
mitigate the problem that slow view update processing blocks repair.

It is better to have a consistent base table + inconsistent view table
than inconsistent base table + inconsistent view table.

Currently, sstables in staging dir are not compacted. So we could not
increase the _registration_sem with too big number to avoid accumulate
too many sstables.

The view_build_test.cc is updated to make the test pass.

Closes #14241
2023-07-12 15:51:35 +03:00

95 lines
2.6 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "sstables/shared_sstable.hh"
#include "db/timeout_clock.hh"
#include "utils/chunked_vector.hh"
#include <seastar/core/sharded.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/semaphore.hh>
using namespace seastar;
struct frozen_mutation_and_schema;
namespace dht {
class token;
}
namespace tracing {
class trace_state_ptr;
}
namespace replica {
class database;
class table;
struct cf_stats;
}
namespace service {
class storage_proxy;
struct allow_hints_tag;
using allow_hints = bool_class<allow_hints_tag>;
}
namespace db::view {
class stats;
struct wait_for_all_updates_tag {};
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
class view_update_generator : public async_sharded_service<view_update_generator> {
public:
static constexpr size_t registration_queue_size = 100;
private:
replica::database& _db;
sharded<service::storage_proxy>& _proxy;
seastar::abort_source _as;
future<> _started = make_ready_future<>();
seastar::condition_variable _pending_sstables;
named_semaphore _registration_sem{registration_queue_size, named_semaphore_exception_factory{"view update generator"}};
std::unordered_map<lw_shared_ptr<replica::table>, std::vector<sstables::shared_sstable>> _sstables_with_tables;
std::unordered_map<lw_shared_ptr<replica::table>, std::vector<sstables::shared_sstable>> _sstables_to_move;
metrics::metric_groups _metrics;
class progress_tracker;
std::unique_ptr<progress_tracker> _progress_tracker;
public:
view_update_generator(replica::database& db, sharded<service::storage_proxy>& proxy);
~view_update_generator();
future<> start();
future<> stop();
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
replica::database& get_db() noexcept { return _db; }
future<> mutate_MV(
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
replica::cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,
db::timeout_semaphore_units pending_view_updates,
service::allow_hints allow_hints,
wait_for_all_updates wait_for_all);
ssize_t available_register_units() const { return _registration_sem.available_units(); }
private:
bool should_throttle() const;
void setup_metrics();
void discover_staging_sstables();
};
}