treewide: add const qualifiers throughout the code base
This commit is contained in:
@@ -55,12 +55,12 @@ class error_collector : public error_listener<RecognizerType, ExceptionBaseType>
|
||||
/**
|
||||
* The offset of the first token of the snippet.
|
||||
*/
|
||||
static const int32_t FIRST_TOKEN_OFFSET = 10;
|
||||
static constexpr int32_t FIRST_TOKEN_OFFSET = 10;
|
||||
|
||||
/**
|
||||
* The offset of the last token of the snippet.
|
||||
*/
|
||||
static const int32_t LAST_TOKEN_OFFSET = 2;
|
||||
static constexpr int32_t LAST_TOKEN_OFFSET = 2;
|
||||
|
||||
/**
|
||||
* The CQL query.
|
||||
|
||||
@@ -244,7 +244,7 @@ lists::marker::bind(const query_options& options) {
|
||||
}
|
||||
}
|
||||
|
||||
constexpr const db_clock::time_point lists::precision_time::REFERENCE_TIME;
|
||||
constexpr db_clock::time_point lists::precision_time::REFERENCE_TIME;
|
||||
thread_local lists::precision_time lists::precision_time::_last = {db_clock::time_point::max(), 0};
|
||||
|
||||
lists::precision_time
|
||||
|
||||
@@ -623,7 +623,7 @@ query_options query_processor::make_internal_options(
|
||||
const std::initializer_list<data_value>& values,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
int32_t page_size) {
|
||||
int32_t page_size) const {
|
||||
if (p->bound_names.size() != values.size()) {
|
||||
throw std::invalid_argument(
|
||||
format("Invalid number of values. Expecting {:d} but got {:d}", p->bound_names.size(), values.size()));
|
||||
|
||||
@@ -325,7 +325,7 @@ private:
|
||||
const std::initializer_list<data_value>&,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
int32_t page_size = -1);
|
||||
int32_t page_size = -1) const;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options);
|
||||
|
||||
@@ -1222,14 +1222,14 @@ public:
|
||||
|
||||
private:
|
||||
::cf_stats _cf_stats;
|
||||
static const size_t max_count_concurrent_reads{100};
|
||||
static constexpr size_t max_count_concurrent_reads{100};
|
||||
size_t max_memory_concurrent_reads() { return _dbcfg.available_memory * 0.02; }
|
||||
// Assume a queued read takes up 10kB of memory, and allow 2% of memory to be filled up with such reads.
|
||||
size_t max_inactive_queue_length() { return _dbcfg.available_memory * 0.02 / 10000; }
|
||||
// They're rather heavyweight, so limit more
|
||||
static const size_t max_count_streaming_concurrent_reads{10};
|
||||
static constexpr size_t max_count_streaming_concurrent_reads{10};
|
||||
size_t max_memory_streaming_concurrent_reads() { return _dbcfg.available_memory * 0.02; }
|
||||
static const size_t max_count_system_concurrent_reads{10};
|
||||
static constexpr size_t max_count_system_concurrent_reads{10};
|
||||
size_t max_memory_system_concurrent_reads() { return _dbcfg.available_memory * 0.02; };
|
||||
static constexpr size_t max_concurrent_sstable_loads() { return 3; }
|
||||
size_t max_memory_pending_view_updates() const { return _dbcfg.available_memory * 0.1; }
|
||||
|
||||
@@ -52,10 +52,10 @@ using segment_id_type = uint64_t;
|
||||
using position_type = uint32_t;
|
||||
|
||||
struct replay_position {
|
||||
static const constexpr size_t max_cpu_bits = 10; // 1024 cpus. should be enough for anyone
|
||||
static const constexpr size_t max_ts_bits = 8 * sizeof(segment_id_type) - max_cpu_bits;
|
||||
static const constexpr segment_id_type ts_mask = (segment_id_type(1) << max_ts_bits) - 1;
|
||||
static const constexpr segment_id_type cpu_mask = ~ts_mask;
|
||||
static constexpr size_t max_cpu_bits = 10; // 1024 cpus. should be enough for anyone
|
||||
static constexpr size_t max_ts_bits = 8 * sizeof(segment_id_type) - max_cpu_bits;
|
||||
static constexpr segment_id_type ts_mask = (segment_id_type(1) << max_ts_bits) - 1;
|
||||
static constexpr segment_id_type cpu_mask = ~ts_mask;
|
||||
|
||||
segment_id_type id;
|
||||
position_type pos;
|
||||
|
||||
@@ -46,7 +46,7 @@ private:
|
||||
// We need a concurrency of:
|
||||
// C = (1GB/s / 1MB) * 4ms = 1k/s * 4ms = 4
|
||||
// 16 should be enough for everybody.
|
||||
static const size_t max_concurrency = 16;
|
||||
static constexpr size_t max_concurrency = 16;
|
||||
semaphore _sem{max_concurrency};
|
||||
|
||||
// A convenience function for using the above semaphore. Unlike the global with_semaphore, this will not wait on the
|
||||
|
||||
@@ -233,7 +233,7 @@ private:
|
||||
// ranges that can be repaired in parallel. Each element will be accessed
|
||||
// by one shared.
|
||||
std::vector<semaphore> _range_parallelism_semaphores;
|
||||
static const size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
|
||||
static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
|
||||
void start(int id);
|
||||
void done(int id, bool succeeded);
|
||||
public:
|
||||
|
||||
@@ -37,32 +37,32 @@ class priority_manager {
|
||||
|
||||
public:
|
||||
const ::io_priority_class&
|
||||
commitlog_priority() {
|
||||
commitlog_priority() const {
|
||||
return _commitlog_priority;
|
||||
}
|
||||
|
||||
const ::io_priority_class&
|
||||
memtable_flush_priority() {
|
||||
memtable_flush_priority() const {
|
||||
return _mt_flush_priority;
|
||||
}
|
||||
|
||||
const ::io_priority_class&
|
||||
streaming_read_priority() {
|
||||
streaming_read_priority() const {
|
||||
return _stream_read_priority;
|
||||
}
|
||||
|
||||
const ::io_priority_class&
|
||||
streaming_write_priority() {
|
||||
streaming_write_priority() const {
|
||||
return _stream_write_priority;
|
||||
}
|
||||
|
||||
const ::io_priority_class&
|
||||
sstable_query_read_priority() {
|
||||
sstable_query_read_priority() const {
|
||||
return _sstable_query_read;
|
||||
}
|
||||
|
||||
const ::io_priority_class&
|
||||
compaction_priority() {
|
||||
compaction_priority() const {
|
||||
return _compaction_priority;
|
||||
}
|
||||
|
||||
|
||||
@@ -2143,7 +2143,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
|
||||
}
|
||||
|
||||
template<typename Range>
|
||||
bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) {
|
||||
bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const {
|
||||
// if hints are disabled we "can always hint" since there's going to be no hint generated in this case
|
||||
return hints_enabled(type) && boost::algorithm::any_of(targets, std::bind(&db::hints::manager::too_many_in_flight_hints_for, &*_hints_manager, std::placeholders::_1));
|
||||
}
|
||||
@@ -4119,7 +4119,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address> storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) {
|
||||
std::vector<gms::inet_address> storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const {
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
std::vector<gms::inet_address> eps = rs.get_natural_endpoints(token);
|
||||
auto itend = boost::range::remove_if(eps, std::not1(std::bind1st(std::mem_fn(&gms::gossiper::is_alive), &gms::get_local_gossiper())));
|
||||
@@ -4136,7 +4136,7 @@ void storage_proxy::sort_endpoints_by_proximity(std::vector<gms::inet_address>&
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) {
|
||||
std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const {
|
||||
auto eps = get_live_endpoints(ks, token);
|
||||
sort_endpoints_by_proximity(eps);
|
||||
return eps;
|
||||
@@ -4245,7 +4245,7 @@ void query_ranges_to_vnodes_generator::process_one_range(size_t n, dht::partitio
|
||||
}
|
||||
}
|
||||
|
||||
bool storage_proxy::hints_enabled(db::write_type type) noexcept {
|
||||
bool storage_proxy::hints_enabled(db::write_type type) const noexcept {
|
||||
return (bool(_hints_manager) && type != db::write_type::CAS) || type == db::write_type::VIEW;
|
||||
}
|
||||
|
||||
|
||||
@@ -297,12 +297,12 @@ private:
|
||||
size_t hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept;
|
||||
void hint_to_dead_endpoints(response_id_type, db::consistency_level);
|
||||
template<typename Range>
|
||||
bool cannot_hint(const Range& targets, db::write_type type);
|
||||
bool hints_enabled(db::write_type type) noexcept;
|
||||
bool cannot_hint(const Range& targets, db::write_type type) const;
|
||||
bool hints_enabled(db::write_type type) const noexcept;
|
||||
db::hints::manager& hints_manager_for(db::write_type type);
|
||||
std::vector<gms::inet_address> get_live_endpoints(keyspace& ks, const dht::token& token);
|
||||
std::vector<gms::inet_address> get_live_endpoints(keyspace& ks, const dht::token& token) const;
|
||||
static void sort_endpoints_by_proximity(std::vector<gms::inet_address>& eps);
|
||||
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token);
|
||||
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const;
|
||||
db::read_repair_decision new_read_repair_decision(const schema& s);
|
||||
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd,
|
||||
schema_ptr schema,
|
||||
@@ -327,7 +327,6 @@ private:
|
||||
dht::partition_range_vector partition_ranges,
|
||||
db::consistency_level cl,
|
||||
coordinator_query_options optional_params);
|
||||
float estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks);
|
||||
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
|
||||
future<query_partition_key_range_concurrent_result> query_partition_key_range_concurrent(clock_type::time_point timeout,
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
|
||||
@@ -622,7 +621,7 @@ public:
|
||||
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
|
||||
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
|
||||
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);
|
||||
uint64_t id() {
|
||||
uint64_t id() const {
|
||||
return _id;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -294,7 +294,7 @@ storage_service::isolate_on_commit_error() {
|
||||
do_isolate_on_error(disk_error::commit);
|
||||
}
|
||||
|
||||
bool storage_service::is_auto_bootstrap() {
|
||||
bool storage_service::is_auto_bootstrap() const {
|
||||
return _db.local().get_config().auto_bootstrap();
|
||||
}
|
||||
sstring storage_service::get_known_features() {
|
||||
@@ -417,7 +417,7 @@ bool get_property_load_ring_state() {
|
||||
return get_local_storage_service().db().local().get_config().load_ring_state();
|
||||
}
|
||||
|
||||
bool storage_service::should_bootstrap() {
|
||||
bool storage_service::should_bootstrap() const {
|
||||
return is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && !_gossiper.get_seeds().count(get_broadcast_address());
|
||||
}
|
||||
|
||||
@@ -816,7 +816,7 @@ future<> storage_service::join_ring() {
|
||||
});
|
||||
}
|
||||
|
||||
bool storage_service::is_joined() {
|
||||
bool storage_service::is_joined() const {
|
||||
// Every time we set _joined, we do it on all shards, so we can read its
|
||||
// value locally.
|
||||
return _joined;
|
||||
@@ -2601,7 +2601,7 @@ future<> storage_service::drain() {
|
||||
});
|
||||
}
|
||||
|
||||
double storage_service::get_load() {
|
||||
double storage_service::get_load() const {
|
||||
double bytes = 0;
|
||||
#if 0
|
||||
for (String keyspaceName : Schema.instance.getKeyspaces())
|
||||
@@ -2616,7 +2616,7 @@ double storage_service::get_load() {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
sstring storage_service::get_load_string() {
|
||||
sstring storage_service::get_load_string() const {
|
||||
return format("{:f}", get_load());
|
||||
}
|
||||
|
||||
|
||||
@@ -214,7 +214,7 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
bool is_auto_bootstrap();
|
||||
bool is_auto_bootstrap() const;
|
||||
inet_address get_broadcast_address() const {
|
||||
return utils::fb_utilities::get_broadcast_address();
|
||||
}
|
||||
@@ -237,7 +237,7 @@ public:
|
||||
}
|
||||
#endif
|
||||
public:
|
||||
dht::token_range_vector get_local_ranges(const sstring& keyspace_name) {
|
||||
dht::token_range_vector get_local_ranges(const sstring& keyspace_name) const {
|
||||
return get_ranges_for_endpoint(keyspace_name, get_broadcast_address());
|
||||
}
|
||||
#if 0
|
||||
@@ -501,14 +501,14 @@ public:
|
||||
}
|
||||
#endif
|
||||
private:
|
||||
bool should_bootstrap();
|
||||
bool should_bootstrap() const;
|
||||
void prepare_to_join(std::vector<inet_address> loaded_endpoints, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes);
|
||||
void join_token_ring(int delay);
|
||||
void wait_for_feature_listeners_to_finish();
|
||||
void maybe_start_sys_dist_ks();
|
||||
public:
|
||||
future<> join_ring();
|
||||
bool is_joined();
|
||||
bool is_joined() const;
|
||||
|
||||
future<> rebuild(sstring source_dc);
|
||||
|
||||
@@ -556,7 +556,7 @@ private:
|
||||
void bootstrap();
|
||||
|
||||
public:
|
||||
bool is_bootstrap_mode() {
|
||||
bool is_bootstrap_mode() const {
|
||||
return _is_bootstrap_mode;
|
||||
}
|
||||
|
||||
@@ -855,7 +855,7 @@ private:
|
||||
|
||||
void add_expire_time_if_found(inet_address endpoint, int64_t expire_time);
|
||||
|
||||
int64_t extract_expire_time(const std::vector<sstring>& pieces) {
|
||||
int64_t extract_expire_time(const std::vector<sstring>& pieces) const {
|
||||
return std::stoll(pieces[2]);
|
||||
}
|
||||
|
||||
@@ -895,9 +895,9 @@ private:
|
||||
std::unordered_multimap<dht::token_range, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
|
||||
public:
|
||||
/** raw load value */
|
||||
double get_load();
|
||||
double get_load() const;
|
||||
|
||||
sstring get_load_string();
|
||||
sstring get_load_string() const;
|
||||
|
||||
future<std::map<sstring, double>> get_load_map();
|
||||
|
||||
@@ -2275,14 +2275,14 @@ private:
|
||||
void do_isolate_on_error(disk_error type);
|
||||
utils::UUID _local_host_id;
|
||||
public:
|
||||
utils::UUID get_local_id() { return _local_host_id; }
|
||||
utils::UUID get_local_id() const { return _local_host_id; }
|
||||
|
||||
sstring get_config_supported_features();
|
||||
std::set<sstring> get_config_supported_features_set();
|
||||
sstring get_known_features();
|
||||
std::set<sstring> get_known_features_set();
|
||||
|
||||
bool cluster_supports_range_tombstones() {
|
||||
bool cluster_supports_range_tombstones() const {
|
||||
return bool(_range_tombstones_feature);
|
||||
}
|
||||
|
||||
|
||||
@@ -175,7 +175,7 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
|
||||
return weight;
|
||||
}
|
||||
|
||||
bool compaction_manager::can_register_weight(column_family* cf, int weight) {
|
||||
bool compaction_manager::can_register_weight(column_family* cf, int weight) const {
|
||||
auto has_cf_ongoing_compaction = [&] () -> bool {
|
||||
return boost::range::count_if(_tasks, [&] (const lw_shared_ptr<task>& task) {
|
||||
return task->compacting_cf == cf && task->compaction_running;
|
||||
|
||||
@@ -104,7 +104,7 @@ private:
|
||||
future<> task_stop(lw_shared_ptr<task> task);
|
||||
|
||||
// Return true if weight is not registered.
|
||||
bool can_register_weight(column_family* cf, int weight);
|
||||
bool can_register_weight(column_family* cf, int weight) const;
|
||||
// Register weight for a column family. Do that only if can_register_weight()
|
||||
// returned true.
|
||||
void register_weight(int weight);
|
||||
|
||||
@@ -71,8 +71,8 @@
|
||||
|
||||
namespace hll {
|
||||
|
||||
static const double pow_2_32 = 4294967296.0; ///< 2^32
|
||||
static const double neg_pow_2_32 = -4294967296.0; ///< -(2^32)
|
||||
static constexpr double pow_2_32 = 4294967296.0; ///< 2^32
|
||||
static constexpr double neg_pow_2_32 = -4294967296.0; ///< -(2^32)
|
||||
|
||||
|
||||
static inline size_t size_unsigned_var_int(unsigned int value) {
|
||||
|
||||
@@ -202,15 +202,15 @@ private:
|
||||
ADD_BLOCK,
|
||||
} state = state::CLUSTERING_START;
|
||||
|
||||
bool is_block_empty() {
|
||||
bool is_block_empty() const {
|
||||
return (ck_blocks_header & (uint64_t(1) << (2 * ck_blocks_header_offset))) != 0;
|
||||
}
|
||||
|
||||
bool is_block_null() {
|
||||
bool is_block_null() const {
|
||||
return (ck_blocks_header & (uint64_t(1) << (2 * ck_blocks_header_offset + 1))) != 0;
|
||||
}
|
||||
|
||||
bool no_more_ck_blocks() { return ck_range.empty(); }
|
||||
bool no_more_ck_blocks() const { return ck_range.empty(); }
|
||||
|
||||
void move_to_next_ck_block() {
|
||||
ck_range.advance_begin(1);
|
||||
@@ -220,10 +220,10 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
bool should_read_block_header() {
|
||||
bool should_read_block_header() const {
|
||||
return ck_blocks_header_offset == 0u;
|
||||
}
|
||||
std::optional<uint32_t> get_ck_block_value_length() {
|
||||
std::optional<uint32_t> get_ck_block_value_length() const {
|
||||
return ck_range.front();
|
||||
}
|
||||
|
||||
@@ -428,7 +428,7 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
void verify_end_state() {
|
||||
void verify_end_state() const {
|
||||
if (_num_blocks_left != 0) {
|
||||
throw std::runtime_error("promoted_index_blocks_reader - no more data but parsing is incomplete");
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ private:
|
||||
inline bool is_mc_format() const { return static_cast<bool>(_ck_values_fixed_lengths); }
|
||||
|
||||
public:
|
||||
void verify_end_state() {
|
||||
void verify_end_state() const {
|
||||
if (this->_remain > 0) {
|
||||
throw std::runtime_error("index_consume_entry_context - no more data but parsing is incomplete");
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ public:
|
||||
bool operator==(const key_view& k) const { return k._bytes == _bytes; }
|
||||
bool operator!=(const key_view& k) const { return !(k == *this); }
|
||||
|
||||
bool empty() { return _bytes.empty(); }
|
||||
bool empty() const { return _bytes.empty(); }
|
||||
|
||||
explicit operator bytes_view() const {
|
||||
return _bytes;
|
||||
|
||||
@@ -173,7 +173,7 @@ public:
|
||||
// column_name as seen by the row consumer. This means that if our
|
||||
// exploded clustering keys has more rows than expected, we are dealing
|
||||
// with a collection.
|
||||
bool is_collection(const schema& s) {
|
||||
bool is_collection(const schema& s) const {
|
||||
auto expected_normal = s.clustering_key_size() + 1;
|
||||
// Note that we can have less than the expected. That is the case for
|
||||
// incomplete prefixes, for instance.
|
||||
@@ -247,7 +247,7 @@ private:
|
||||
|
||||
collection_mutation() : _cdef(nullptr) {}
|
||||
|
||||
bool is_new_collection(const column_definition *c) {
|
||||
bool is_new_collection(const column_definition *c) const {
|
||||
if (!_cdef || ((_cdef->id != c->id) || (_cdef->kind != c->kind))) {
|
||||
return true;
|
||||
}
|
||||
@@ -872,7 +872,7 @@ class mp_row_consumer_m : public consumer_m {
|
||||
bound_kind kind;
|
||||
tombstone tomb;
|
||||
|
||||
position_in_partition_view position() {
|
||||
position_in_partition_view position() const {
|
||||
return position_in_partition_view(position_in_partition_view::range_tag_t{}, bound_view(ck, kind));
|
||||
}
|
||||
};
|
||||
@@ -919,7 +919,7 @@ class mp_row_consumer_m : public consumer_m {
|
||||
return maybe_push_range_tombstone(std::move(rt));
|
||||
}
|
||||
|
||||
const column_definition& get_column_definition(std::optional<column_id> column_id) {
|
||||
const column_definition& get_column_definition(std::optional<column_id> column_id) const {
|
||||
auto column_type = _inside_static_row ? column_kind::static_column : column_kind::regular_column;
|
||||
return _schema->column_at(column_type, *column_id);
|
||||
}
|
||||
@@ -956,7 +956,7 @@ class mp_row_consumer_m : public consumer_m {
|
||||
_opened_range_tombstone.reset();
|
||||
}
|
||||
|
||||
void check_schema_mismatch(const column_translation::column_info& column_info, const column_definition& column_def) {
|
||||
void check_schema_mismatch(const column_translation::column_info& column_info, const column_definition& column_def) const {
|
||||
if (column_info.schema_mismatch) {
|
||||
throw malformed_sstable_exception(
|
||||
format("{} definition in serialization header does not match schema. Expected {} but got {}",
|
||||
@@ -967,7 +967,7 @@ class mp_row_consumer_m : public consumer_m {
|
||||
}
|
||||
|
||||
void check_column_missing_in_current_schema(const column_translation::column_info& column_info,
|
||||
api::timestamp_type timestamp) {
|
||||
api::timestamp_type timestamp) const {
|
||||
if (!column_info.id) {
|
||||
sstring name = sstring(to_sstring_view(*column_info.name));
|
||||
auto it = _schema->dropped_columns().find(name);
|
||||
|
||||
@@ -56,7 +56,7 @@ public:
|
||||
_in = std::make_unique < input_stream < char >> (open_at(pos));
|
||||
}
|
||||
|
||||
bool eof() { return _in->eof(); }
|
||||
bool eof() const { return _in->eof(); }
|
||||
|
||||
virtual future<> close() {
|
||||
return _close_gate.close().then([this] {
|
||||
@@ -97,4 +97,4 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -700,7 +700,7 @@ private:
|
||||
}
|
||||
_row->_columns.advance_begin(pos);
|
||||
}
|
||||
bool no_more_columns() { return _row->_columns.empty(); }
|
||||
bool no_more_columns() const { return _row->_columns.empty(); }
|
||||
void move_to_next_column() {
|
||||
size_t current_pos = _row->_columns_selector.size() - _row->_columns.size();
|
||||
size_t next_pos = _row->_columns_selector.find_next(current_pos);
|
||||
@@ -708,12 +708,12 @@ private:
|
||||
: next_pos - current_pos;
|
||||
_row->_columns.advance_begin(jump_to_next);
|
||||
}
|
||||
bool is_column_simple() { return !_row->_columns.front().is_collection; }
|
||||
bool is_column_counter() { return _row->_columns.front().is_counter; }
|
||||
const column_translation::column_info& get_column_info() {
|
||||
bool is_column_simple() const { return !_row->_columns.front().is_collection; }
|
||||
bool is_column_counter() const { return _row->_columns.front().is_counter; }
|
||||
const column_translation::column_info& get_column_info() const {
|
||||
return _row->_columns.front();
|
||||
}
|
||||
std::optional<uint32_t> get_column_value_length() {
|
||||
std::optional<uint32_t> get_column_value_length() const {
|
||||
return _row->_columns.front().value_length;
|
||||
}
|
||||
void setup_ck(const std::vector<std::optional<uint32_t>>& column_value_fix_lengths) {
|
||||
@@ -727,7 +727,7 @@ private:
|
||||
}
|
||||
_ck_blocks_header_offset = 0u;
|
||||
}
|
||||
bool no_more_ck_blocks() { return _ck_column_value_fix_lengths.empty(); }
|
||||
bool no_more_ck_blocks() const { return _ck_column_value_fix_lengths.empty(); }
|
||||
void move_to_next_ck_block() {
|
||||
_ck_column_value_fix_lengths.advance_begin(1);
|
||||
++_ck_blocks_header_offset;
|
||||
@@ -735,16 +735,16 @@ private:
|
||||
_ck_blocks_header_offset = 0u;
|
||||
}
|
||||
}
|
||||
std::optional<uint32_t> get_ck_block_value_length() {
|
||||
std::optional<uint32_t> get_ck_block_value_length() const {
|
||||
return _ck_column_value_fix_lengths.front();
|
||||
}
|
||||
bool is_block_empty() {
|
||||
bool is_block_empty() const {
|
||||
return (_ck_blocks_header & (uint64_t(1) << (2 * _ck_blocks_header_offset))) != 0;
|
||||
}
|
||||
bool is_block_null() {
|
||||
bool is_block_null() const {
|
||||
return (_ck_blocks_header & (uint64_t(1) << (2 * _ck_blocks_header_offset + 1))) != 0;
|
||||
}
|
||||
bool should_read_block_header() {
|
||||
bool should_read_block_header() const {
|
||||
return _ck_blocks_header_offset == 0u;
|
||||
}
|
||||
public:
|
||||
|
||||
@@ -86,7 +86,7 @@ class size_tiered_backlog_tracker final : public compaction_backlog_tracker::imp
|
||||
|
||||
inflight_component partial_backlog(const compaction_backlog_tracker::ongoing_writes& ongoing_writes) const {
|
||||
inflight_component in;
|
||||
for (auto& swp : ongoing_writes) {
|
||||
for (auto const& swp : ongoing_writes) {
|
||||
auto written = swp.second->written();
|
||||
if (written > 0) {
|
||||
in.total_bytes += written;
|
||||
@@ -98,7 +98,7 @@ class size_tiered_backlog_tracker final : public compaction_backlog_tracker::imp
|
||||
|
||||
inflight_component compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const {
|
||||
inflight_component in;
|
||||
for (auto& crp : ongoing_compactions) {
|
||||
for (auto const& crp : ongoing_compactions) {
|
||||
auto compacted = crp.second->compacted();
|
||||
in.total_bytes += compacted;
|
||||
in.contribution += compacted * log4((crp.first->data_size()));
|
||||
|
||||
@@ -129,11 +129,11 @@ class size_tiered_compaction_strategy : public compaction_strategy_impl {
|
||||
most_interesting_bucket(std::vector<std::vector<sstables::shared_sstable>> buckets, unsigned min_threshold, unsigned max_threshold);
|
||||
|
||||
// Return the average size of a given list of sstables.
|
||||
uint64_t avg_size(std::vector<sstables::shared_sstable>& sstables) {
|
||||
uint64_t avg_size(std::vector<sstables::shared_sstable> const& sstables) const {
|
||||
assert(sstables.size() > 0); // this should never fail
|
||||
uint64_t n = 0;
|
||||
|
||||
for (auto& sstable : sstables) {
|
||||
for (auto const& sstable : sstables) {
|
||||
// FIXME: Switch to sstable->bytes_on_disk() afterwards. That's what C* uses.
|
||||
n += sstable->data_size();
|
||||
}
|
||||
|
||||
@@ -2538,7 +2538,7 @@ uint64_t sstable::ondisk_data_size() const {
|
||||
return _data_file_size;
|
||||
}
|
||||
|
||||
uint64_t sstable::bytes_on_disk() {
|
||||
uint64_t sstable::bytes_on_disk() const {
|
||||
assert(_bytes_on_disk > 0);
|
||||
return _bytes_on_disk;
|
||||
}
|
||||
|
||||
@@ -340,7 +340,7 @@ public:
|
||||
}
|
||||
|
||||
// Returns the total bytes of all components.
|
||||
uint64_t bytes_on_disk();
|
||||
uint64_t bytes_on_disk() const;
|
||||
|
||||
const partition_key& get_first_partition_key() const;
|
||||
const partition_key& get_last_partition_key() const;
|
||||
@@ -712,7 +712,7 @@ public:
|
||||
return (_version == sstable_version_types::mc) || has_scylla_component();
|
||||
}
|
||||
|
||||
bool filter_has_key(const key& key) {
|
||||
bool filter_has_key(const key& key) const {
|
||||
return _components->filter->is_present(bytes_view(key));
|
||||
}
|
||||
|
||||
@@ -724,11 +724,11 @@ public:
|
||||
*/
|
||||
future<bool> has_partition_key(const utils::hashed_key& hk, const dht::decorated_key& dk);
|
||||
|
||||
bool filter_has_key(utils::hashed_key key) {
|
||||
bool filter_has_key(utils::hashed_key key) const {
|
||||
return _components->filter->is_present(key);
|
||||
}
|
||||
|
||||
bool filter_has_key(const schema& s, partition_key_view key) {
|
||||
bool filter_has_key(const schema& s, partition_key_view key) const {
|
||||
return filter_has_key(key::from_partition_key(s, key));
|
||||
}
|
||||
|
||||
@@ -736,10 +736,10 @@ public:
|
||||
|
||||
filter_tracker& get_filter_tracker() { return _filter_tracker; }
|
||||
|
||||
uint64_t filter_get_false_positive() {
|
||||
uint64_t filter_get_false_positive() const {
|
||||
return _filter_tracker.false_positive;
|
||||
}
|
||||
uint64_t filter_get_true_positive() {
|
||||
uint64_t filter_get_true_positive() const {
|
||||
return _filter_tracker.true_positive;
|
||||
}
|
||||
uint64_t filter_get_recent_false_positive() {
|
||||
|
||||
@@ -146,7 +146,7 @@ public:
|
||||
// The maximum amount of buckets we segregate data into when writing into sstables.
|
||||
// To prevent an explosion in the number of sstables we cap it.
|
||||
// Better co-locate some windows into the same sstables than OOM.
|
||||
static const uint64_t max_data_segregation_window_count = 100;
|
||||
static constexpr uint64_t max_data_segregation_window_count = 100;
|
||||
|
||||
public:
|
||||
time_window_compaction_strategy(const std::map<sstring, sstring>& options);
|
||||
|
||||
@@ -622,14 +622,14 @@ inline column_mask operator|(column_mask m1, column_mask m2) {
|
||||
}
|
||||
|
||||
class unfiltered_flags_m final {
|
||||
static const uint8_t END_OF_PARTITION = 0x01u;
|
||||
static const uint8_t IS_MARKER = 0x02u;
|
||||
static const uint8_t HAS_TIMESTAMP = 0x04u;
|
||||
static const uint8_t HAS_TTL = 0x08u;
|
||||
static const uint8_t HAS_DELETION = 0x10u;
|
||||
static const uint8_t HAS_ALL_COLUMNS = 0x20u;
|
||||
static const uint8_t HAS_COMPLEX_DELETION = 0x40u;
|
||||
static const uint8_t HAS_EXTENDED_FLAGS = 0x80u;
|
||||
static constexpr uint8_t END_OF_PARTITION = 0x01u;
|
||||
static constexpr uint8_t IS_MARKER = 0x02u;
|
||||
static constexpr uint8_t HAS_TIMESTAMP = 0x04u;
|
||||
static constexpr uint8_t HAS_TTL = 0x08u;
|
||||
static constexpr uint8_t HAS_DELETION = 0x10u;
|
||||
static constexpr uint8_t HAS_ALL_COLUMNS = 0x20u;
|
||||
static constexpr uint8_t HAS_COMPLEX_DELETION = 0x40u;
|
||||
static constexpr uint8_t HAS_EXTENDED_FLAGS = 0x80u;
|
||||
uint8_t _flags;
|
||||
bool check_flag(const uint8_t flag) const {
|
||||
return (_flags & flag) != 0u;
|
||||
|
||||
@@ -72,7 +72,7 @@ public:
|
||||
/**
|
||||
* @return true if file transfer is completed
|
||||
*/
|
||||
bool is_completed() {
|
||||
bool is_completed() const {
|
||||
return current_bytes >= total_bytes;
|
||||
}
|
||||
|
||||
|
||||
@@ -47,48 +47,48 @@ void session_info::update_progress(progress_info new_progress) {
|
||||
current_files[new_progress.file_name] = new_progress;
|
||||
}
|
||||
|
||||
std::vector<progress_info> session_info::get_receiving_files() {
|
||||
std::vector<progress_info> session_info::get_receiving_files() const {
|
||||
std::vector<progress_info> ret;
|
||||
for (auto& x : receiving_files) {
|
||||
for (auto const& x : receiving_files) {
|
||||
ret.push_back(x.second);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::vector<progress_info> session_info::get_sending_files() {
|
||||
std::vector<progress_info> session_info::get_sending_files() const {
|
||||
std::vector<progress_info> ret;
|
||||
for (auto& x : sending_files) {
|
||||
for (auto const& x : sending_files) {
|
||||
ret.push_back(x.second);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
long session_info::get_total_size_in_progress(std::vector<progress_info> files) {
|
||||
long session_info::get_total_size_in_progress(std::vector<progress_info> files) const {
|
||||
long total = 0;
|
||||
for (auto& file : files) {
|
||||
for (auto const& file : files) {
|
||||
total += file.current_bytes;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
long session_info::get_total_files(std::vector<stream_summary>& summaries) {
|
||||
long session_info::get_total_files(std::vector<stream_summary> const& summaries) const {
|
||||
long total = 0;
|
||||
for (auto& summary : summaries) {
|
||||
for (auto const& summary : summaries) {
|
||||
total += summary.files;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
long session_info::get_total_sizes(std::vector<stream_summary>& summaries) {
|
||||
long session_info::get_total_sizes(std::vector<stream_summary> const& summaries) const {
|
||||
long total = 0;
|
||||
for (auto& summary : summaries)
|
||||
for (auto const& summary : summaries)
|
||||
total += summary.total_size;
|
||||
return total;
|
||||
}
|
||||
|
||||
long session_info::get_total_files_completed(std::vector<progress_info> files) {
|
||||
long session_info::get_total_files_completed(std::vector<progress_info> files) const {
|
||||
long size = 0;
|
||||
for (auto& x : files) {
|
||||
for (auto const& x : files) {
|
||||
if (x.is_completed()) {
|
||||
size++;
|
||||
}
|
||||
|
||||
@@ -86,74 +86,74 @@ public:
|
||||
*/
|
||||
void update_progress(progress_info new_progress);
|
||||
|
||||
std::vector<progress_info> get_receiving_files();
|
||||
std::vector<progress_info> get_receiving_files() const;
|
||||
|
||||
std::vector<progress_info> get_sending_files();
|
||||
std::vector<progress_info> get_sending_files() const;
|
||||
|
||||
/**
|
||||
* @return total number of files already received.
|
||||
*/
|
||||
long get_total_files_received() {
|
||||
long get_total_files_received() const {
|
||||
return get_total_files_completed(get_receiving_files());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total number of files already sent.
|
||||
*/
|
||||
long get_total_files_sent() {
|
||||
long get_total_files_sent() const {
|
||||
return get_total_files_completed(get_sending_files());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total size(in bytes) already received.
|
||||
*/
|
||||
long get_total_size_received() {
|
||||
long get_total_size_received() const {
|
||||
return get_total_size_in_progress(get_receiving_files());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total size(in bytes) already sent.
|
||||
*/
|
||||
long get_total_size_sent() {
|
||||
long get_total_size_sent() const {
|
||||
return get_total_size_in_progress(get_sending_files());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total number of files to receive in the session
|
||||
*/
|
||||
long get_total_files_to_receive() {
|
||||
long get_total_files_to_receive() const {
|
||||
return get_total_files(receiving_summaries);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total number of files to send in the session
|
||||
*/
|
||||
long get_total_files_to_send() {
|
||||
long get_total_files_to_send() const {
|
||||
return get_total_files(sending_summaries);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total size(in bytes) to receive in the session
|
||||
*/
|
||||
long get_total_size_to_receive() {
|
||||
long get_total_size_to_receive() const {
|
||||
return get_total_sizes(receiving_summaries);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total size(in bytes) to send in the session
|
||||
*/
|
||||
long get_total_size_to_send() {
|
||||
long get_total_size_to_send() const {
|
||||
return get_total_sizes(sending_summaries);
|
||||
}
|
||||
|
||||
private:
|
||||
long get_total_size_in_progress(std::vector<progress_info> files);
|
||||
long get_total_size_in_progress(std::vector<progress_info> files) const;
|
||||
|
||||
long get_total_files(std::vector<stream_summary>& summaries);
|
||||
long get_total_files(std::vector<stream_summary> const& summaries) const;
|
||||
|
||||
long get_total_sizes(std::vector<stream_summary>& summaries);
|
||||
long get_total_sizes(std::vector<stream_summary> const& summaries) const;
|
||||
|
||||
long get_total_files_completed(std::vector<progress_info> files);
|
||||
long get_total_files_completed(std::vector<progress_info> files) const;
|
||||
};
|
||||
|
||||
} // namespace streaming
|
||||
|
||||
@@ -47,8 +47,8 @@ extern logging::logger sslog;
|
||||
|
||||
using gms::inet_address;
|
||||
|
||||
bool stream_coordinator::has_active_sessions() {
|
||||
for (auto& x : _peer_sessions) {
|
||||
bool stream_coordinator::has_active_sessions() const {
|
||||
for (auto const& x : _peer_sessions) {
|
||||
auto state = x.second->get_state();
|
||||
if (state != stream_session_state::COMPLETE && state != stream_session_state::FAILED) {
|
||||
return true;
|
||||
@@ -57,40 +57,40 @@ bool stream_coordinator::has_active_sessions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<shared_ptr<stream_session>> stream_coordinator::get_all_stream_sessions() {
|
||||
std::vector<shared_ptr<stream_session>> stream_coordinator::get_all_stream_sessions() const {
|
||||
std::vector<shared_ptr<stream_session>> results;
|
||||
for (auto& x : _peer_sessions) {
|
||||
for (auto const& x : _peer_sessions) {
|
||||
results.push_back(x.second);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
std::vector<session_info> stream_coordinator::get_all_session_info() {
|
||||
std::vector<session_info> stream_coordinator::get_all_session_info() const {
|
||||
std::vector<session_info> results;
|
||||
for (auto& x : _peer_sessions) {
|
||||
for (auto const& x : _peer_sessions) {
|
||||
auto& session = x.second;
|
||||
results.push_back(session->get_session_info());
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
std::vector<session_info> stream_coordinator::get_peer_session_info(inet_address peer) {
|
||||
std::vector<session_info> stream_coordinator::get_peer_session_info(inet_address peer) const {
|
||||
std::vector<session_info> results;
|
||||
auto it = _peer_sessions.find(peer);
|
||||
if (it != _peer_sessions.end()) {
|
||||
auto& session = it->second;
|
||||
auto const& session = it->second;
|
||||
results.push_back(session->get_session_info());
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
bool stream_coordinator::is_receiving() {
|
||||
bool stream_coordinator::is_receiving() const {
|
||||
return _is_receiving;
|
||||
}
|
||||
|
||||
std::set<inet_address> stream_coordinator::get_peers() {
|
||||
std::set<inet_address> stream_coordinator::get_peers() const {
|
||||
std::set<inet_address> results;
|
||||
for (auto& x : _peer_sessions) {
|
||||
for (auto const& x : _peer_sessions) {
|
||||
results.insert(x.first);
|
||||
}
|
||||
return results;
|
||||
|
||||
@@ -70,14 +70,14 @@ public:
|
||||
/**
|
||||
* @return true if any stream session is active
|
||||
*/
|
||||
bool has_active_sessions();
|
||||
bool has_active_sessions() const;
|
||||
|
||||
std::vector<shared_ptr<stream_session>> get_all_stream_sessions();
|
||||
std::vector<shared_ptr<stream_session>> get_all_stream_sessions() const;
|
||||
|
||||
bool is_receiving();
|
||||
bool is_receiving() const;
|
||||
|
||||
void connect_all_stream_sessions();
|
||||
std::set<inet_address> get_peers();
|
||||
std::set<inet_address> get_peers() const;
|
||||
|
||||
public:
|
||||
shared_ptr<stream_session> get_or_create_session(inet_address peer) {
|
||||
@@ -88,8 +88,8 @@ public:
|
||||
return session;
|
||||
}
|
||||
|
||||
std::vector<session_info> get_all_session_info();
|
||||
std::vector<session_info> get_peer_session_info(inet_address peer);
|
||||
std::vector<session_info> get_all_session_info() const;
|
||||
std::vector<session_info> get_peer_session_info(inet_address peer) const;
|
||||
|
||||
void abort_all_stream_sessions();
|
||||
};
|
||||
|
||||
@@ -92,7 +92,7 @@ void stream_manager::register_receiving(shared_ptr<stream_result_future> result)
|
||||
_receiving_streams[result->plan_id] = std::move(result);
|
||||
}
|
||||
|
||||
shared_ptr<stream_result_future> stream_manager::get_sending_stream(UUID plan_id) {
|
||||
shared_ptr<stream_result_future> stream_manager::get_sending_stream(UUID plan_id) const {
|
||||
auto it = _initiated_streams.find(plan_id);
|
||||
if (it != _initiated_streams.end()) {
|
||||
return it->second;
|
||||
@@ -100,7 +100,7 @@ shared_ptr<stream_result_future> stream_manager::get_sending_stream(UUID plan_id
|
||||
return {};
|
||||
}
|
||||
|
||||
shared_ptr<stream_result_future> stream_manager::get_receiving_stream(UUID plan_id) {
|
||||
shared_ptr<stream_result_future> stream_manager::get_receiving_stream(UUID plan_id) const {
|
||||
auto it = _receiving_streams.find(plan_id);
|
||||
if (it != _receiving_streams.end()) {
|
||||
return it->second;
|
||||
@@ -118,11 +118,11 @@ void stream_manager::remove_stream(UUID plan_id) {
|
||||
});
|
||||
}
|
||||
|
||||
void stream_manager::show_streams() {
|
||||
for (auto& x : _initiated_streams) {
|
||||
void stream_manager::show_streams() const {
|
||||
for (auto const& x : _initiated_streams) {
|
||||
sslog.debug("stream_manager:initiated_stream: plan_id={}", x.first);
|
||||
}
|
||||
for (auto& x : _receiving_streams) {
|
||||
for (auto const& x : _receiving_streams) {
|
||||
sslog.debug("stream_manager:receiving_stream: plan_id={}", x.first);
|
||||
}
|
||||
}
|
||||
@@ -163,14 +163,14 @@ void stream_manager::remove_progress(UUID plan_id) {
|
||||
_stream_bytes.erase(plan_id);
|
||||
}
|
||||
|
||||
stream_bytes stream_manager::get_progress(UUID plan_id, gms::inet_address peer) {
|
||||
auto& sbytes = _stream_bytes[plan_id];
|
||||
return sbytes[peer];
|
||||
stream_bytes stream_manager::get_progress(UUID plan_id, gms::inet_address peer) const {
|
||||
auto const& sbytes = _stream_bytes.at(plan_id);
|
||||
return sbytes.at(peer);
|
||||
}
|
||||
|
||||
stream_bytes stream_manager::get_progress(UUID plan_id) {
|
||||
stream_bytes stream_manager::get_progress(UUID plan_id) const {
|
||||
stream_bytes ret;
|
||||
for (auto& x : _stream_bytes[plan_id]) {
|
||||
for (auto const& x : _stream_bytes.at(plan_id)) {
|
||||
ret += x.second;
|
||||
}
|
||||
return ret;
|
||||
@@ -182,7 +182,7 @@ future<> stream_manager::remove_progress_on_all_shards(UUID plan_id) {
|
||||
});
|
||||
}
|
||||
|
||||
future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id, gms::inet_address peer) {
|
||||
future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id, gms::inet_address peer) const {
|
||||
return get_stream_manager().map_reduce0(
|
||||
[plan_id, peer] (auto& sm) {
|
||||
return sm.get_progress(plan_id, peer);
|
||||
@@ -192,7 +192,7 @@ future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id, gm
|
||||
);
|
||||
}
|
||||
|
||||
future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id) {
|
||||
future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id) const {
|
||||
return get_stream_manager().map_reduce0(
|
||||
[plan_id] (auto& sm) {
|
||||
return sm.get_progress(plan_id);
|
||||
@@ -202,7 +202,7 @@ future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id) {
|
||||
);
|
||||
}
|
||||
|
||||
future<stream_bytes> stream_manager::get_progress_on_all_shards(gms::inet_address peer) {
|
||||
future<stream_bytes> stream_manager::get_progress_on_all_shards(gms::inet_address peer) const {
|
||||
return get_stream_manager().map_reduce0(
|
||||
[peer] (auto& sm) {
|
||||
stream_bytes ret;
|
||||
@@ -216,7 +216,7 @@ future<stream_bytes> stream_manager::get_progress_on_all_shards(gms::inet_addres
|
||||
);
|
||||
}
|
||||
|
||||
future<stream_bytes> stream_manager::get_progress_on_all_shards() {
|
||||
future<stream_bytes> stream_manager::get_progress_on_all_shards() const {
|
||||
return get_stream_manager().map_reduce0(
|
||||
[] (auto& sm) {
|
||||
stream_bytes ret;
|
||||
@@ -232,17 +232,17 @@ future<stream_bytes> stream_manager::get_progress_on_all_shards() {
|
||||
);
|
||||
}
|
||||
|
||||
stream_bytes stream_manager::get_progress_on_local_shard() {
|
||||
stream_bytes stream_manager::get_progress_on_local_shard() const {
|
||||
stream_bytes ret;
|
||||
for (auto& sbytes : _stream_bytes) {
|
||||
for (auto& sb : sbytes.second) {
|
||||
for (auto const& sbytes : _stream_bytes) {
|
||||
for (auto const& sb : sbytes.second) {
|
||||
ret += sb.second;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool stream_manager::has_peer(inet_address endpoint) {
|
||||
bool stream_manager::has_peer(inet_address endpoint) const {
|
||||
for (auto sr : get_all_streams()) {
|
||||
for (auto session : sr->get_coordinator()->get_all_stream_sessions()) {
|
||||
if (session->peer == endpoint) {
|
||||
|
||||
@@ -108,11 +108,11 @@ public:
|
||||
|
||||
void register_receiving(shared_ptr<stream_result_future> result);
|
||||
|
||||
shared_ptr<stream_result_future> get_sending_stream(UUID plan_id);
|
||||
shared_ptr<stream_result_future> get_sending_stream(UUID plan_id) const;
|
||||
|
||||
shared_ptr<stream_result_future> get_receiving_stream(UUID plan_id);
|
||||
shared_ptr<stream_result_future> get_receiving_stream(UUID plan_id) const;
|
||||
|
||||
std::vector<shared_ptr<stream_result_future>> get_all_streams() const ;
|
||||
std::vector<shared_ptr<stream_result_future>> get_all_streams() const;
|
||||
|
||||
|
||||
const std::unordered_map<UUID, shared_ptr<stream_result_future>>& get_initiated_streams() const {
|
||||
@@ -125,7 +125,7 @@ public:
|
||||
|
||||
void remove_stream(UUID plan_id);
|
||||
|
||||
void show_streams();
|
||||
void show_streams() const;
|
||||
|
||||
future<> stop() {
|
||||
fail_all_sessions();
|
||||
@@ -137,21 +137,21 @@ public:
|
||||
|
||||
void remove_progress(UUID plan_id);
|
||||
|
||||
stream_bytes get_progress(UUID plan_id, gms::inet_address peer);
|
||||
stream_bytes get_progress(UUID plan_id, gms::inet_address peer) const;
|
||||
|
||||
stream_bytes get_progress(UUID plan_id);
|
||||
stream_bytes get_progress(UUID plan_id) const;
|
||||
|
||||
future<> remove_progress_on_all_shards(UUID plan_id);
|
||||
|
||||
future<stream_bytes> get_progress_on_all_shards(UUID plan_id, gms::inet_address peer);
|
||||
future<stream_bytes> get_progress_on_all_shards(UUID plan_id, gms::inet_address peer) const;
|
||||
|
||||
future<stream_bytes> get_progress_on_all_shards(UUID plan_id);
|
||||
future<stream_bytes> get_progress_on_all_shards(UUID plan_id) const;
|
||||
|
||||
future<stream_bytes> get_progress_on_all_shards(gms::inet_address peer);
|
||||
future<stream_bytes> get_progress_on_all_shards(gms::inet_address peer) const;
|
||||
|
||||
future<stream_bytes> get_progress_on_all_shards();
|
||||
future<stream_bytes> get_progress_on_all_shards() const;
|
||||
|
||||
stream_bytes get_progress_on_local_shard();
|
||||
stream_bytes get_progress_on_local_shard() const;
|
||||
|
||||
public:
|
||||
virtual void on_join(inet_address endpoint, endpoint_state ep_state) override {}
|
||||
@@ -165,7 +165,7 @@ public:
|
||||
private:
|
||||
void fail_all_sessions();
|
||||
void fail_sessions(inet_address endpoint);
|
||||
bool has_peer(inet_address endpoint);
|
||||
bool has_peer(inet_address endpoint) const;
|
||||
};
|
||||
|
||||
extern distributed<stream_manager> _the_stream_manager;
|
||||
|
||||
@@ -136,7 +136,7 @@ public:
|
||||
/**
|
||||
* @return true if this plan has no plan to execute
|
||||
*/
|
||||
bool is_empty() {
|
||||
bool is_empty() const {
|
||||
return !_coordinator->has_active_sessions();
|
||||
}
|
||||
|
||||
|
||||
@@ -61,11 +61,11 @@ public:
|
||||
stream_receive_task(shared_ptr<stream_session> _session, UUID _cf_id, int _total_files, long _total_size);
|
||||
~stream_receive_task();
|
||||
|
||||
virtual int get_total_number_of_files() override {
|
||||
virtual int get_total_number_of_files() const override {
|
||||
return total_files;
|
||||
}
|
||||
|
||||
virtual long get_total_size() override {
|
||||
virtual long get_total_size() const override {
|
||||
return total_size;
|
||||
}
|
||||
|
||||
|
||||
@@ -667,11 +667,11 @@ void stream_session::init(shared_ptr<stream_result_future> stream_result_) {
|
||||
_stream_result = stream_result_;
|
||||
}
|
||||
|
||||
utils::UUID stream_session::plan_id() {
|
||||
utils::UUID stream_session::plan_id() const {
|
||||
return _stream_result ? _stream_result->plan_id : UUID();
|
||||
}
|
||||
|
||||
sstring stream_session::description() {
|
||||
sstring stream_session::description() const {
|
||||
return _stream_result ? _stream_result->description : "";
|
||||
}
|
||||
|
||||
|
||||
@@ -223,9 +223,9 @@ public:
|
||||
stream_session(inet_address peer_);
|
||||
~stream_session();
|
||||
|
||||
UUID plan_id();
|
||||
UUID plan_id() const;
|
||||
|
||||
sstring description();
|
||||
sstring description() const;
|
||||
|
||||
public:
|
||||
/**
|
||||
@@ -281,7 +281,7 @@ public:
|
||||
/**
|
||||
* @return current state
|
||||
*/
|
||||
stream_session_state get_state() {
|
||||
stream_session_state get_state() const {
|
||||
return _state;
|
||||
}
|
||||
|
||||
@@ -290,7 +290,7 @@ public:
|
||||
*
|
||||
* @return true if session completed successfully.
|
||||
*/
|
||||
bool is_success() {
|
||||
bool is_success() const {
|
||||
return _state == stream_session_state::COMPLETE;
|
||||
}
|
||||
|
||||
|
||||
@@ -60,8 +60,8 @@ public:
|
||||
, sessions(std::move(sessions_)) {
|
||||
}
|
||||
|
||||
bool has_failed_session() {
|
||||
for (auto& x : sessions) {
|
||||
bool has_failed_session() const {
|
||||
for (auto const& x : sessions) {
|
||||
if (x.is_failed()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -65,12 +65,12 @@ public:
|
||||
/**
|
||||
* @return total number of files this task receives/streams.
|
||||
*/
|
||||
virtual int get_total_number_of_files() = 0;
|
||||
virtual int get_total_number_of_files() const = 0;
|
||||
|
||||
/**
|
||||
* @return total bytes expected to receive
|
||||
*/
|
||||
virtual long get_total_size() = 0;
|
||||
virtual long get_total_size() const = 0;
|
||||
|
||||
/**
|
||||
* Abort the task.
|
||||
@@ -81,7 +81,7 @@ public:
|
||||
/**
|
||||
* @return StreamSummary that describes this task
|
||||
*/
|
||||
virtual stream_summary get_summary() {
|
||||
virtual stream_summary get_summary() const {
|
||||
return stream_summary(this->cf_id, this->get_total_number_of_files(), this->get_total_size());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -69,11 +69,11 @@ public:
|
||||
virtual void abort() override {
|
||||
}
|
||||
|
||||
virtual int get_total_number_of_files() override {
|
||||
virtual int get_total_number_of_files() const override {
|
||||
return 1;
|
||||
}
|
||||
|
||||
virtual long get_total_size() override {
|
||||
virtual long get_total_size() const override {
|
||||
return _total_size;
|
||||
}
|
||||
|
||||
|
||||
@@ -27,9 +27,9 @@
|
||||
|
||||
namespace tests::data_model {
|
||||
|
||||
static constexpr const api::timestamp_type previously_removed_column_timestamp = 100;
|
||||
static constexpr const api::timestamp_type data_timestamp = 200;
|
||||
static constexpr const api::timestamp_type column_removal_timestamp = 300;
|
||||
static constexpr api::timestamp_type previously_removed_column_timestamp = 100;
|
||||
static constexpr api::timestamp_type data_timestamp = 200;
|
||||
static constexpr api::timestamp_type column_removal_timestamp = 300;
|
||||
|
||||
class mutation_description {
|
||||
public:
|
||||
|
||||
@@ -265,7 +265,7 @@ cql_server::connection::frame_size() const {
|
||||
}
|
||||
|
||||
cql_binary_frame_v3
|
||||
cql_server::connection::parse_frame(temporary_buffer<char> buf) {
|
||||
cql_server::connection::parse_frame(temporary_buffer<char> buf) const {
|
||||
if (buf.size() != frame_size()) {
|
||||
throw cql_frame_error();
|
||||
}
|
||||
@@ -941,7 +941,7 @@ cql_server::connection::process_register(uint16_t stream, request_reader in, ser
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -952,7 +952,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_e
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -964,7 +964,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_read_timeout_
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
if (_version < 4) {
|
||||
return make_read_timeout_error(stream, err, std::move(msg), cl, received, blockfor, data_present, tr_state);
|
||||
@@ -980,7 +980,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_read_failure_
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -992,7 +992,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_writ
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
if (_version < 4) {
|
||||
return make_mutation_write_timeout_error(stream, err, std::move(msg), cl, received, blockfor, type, tr_state);
|
||||
@@ -1008,7 +1008,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_writ
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -1018,7 +1018,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_already_exist
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -1027,7 +1027,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_unprepared_er
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -1035,38 +1035,38 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
return std::make_unique<cql_server::response>(stream, cql_binary_opcode::READY, tr_state);
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_autheticate(int16_t stream, const sstring& clz, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_autheticate(int16_t stream, const sstring& clz, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::AUTHENTICATE, tr_state);
|
||||
response->write_string(clz);
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_auth_success(int16_t stream, bytes b, const tracing::trace_state_ptr& tr_state) {
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_auth_success(int16_t stream, bytes b, const tracing::trace_state_ptr& tr_state) const {
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::AUTH_SUCCESS, tr_state);
|
||||
response->write_bytes(std::move(b));
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_auth_challenge(int16_t stream, bytes b, const tracing::trace_state_ptr& tr_state) {
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_auth_challenge(int16_t stream, bytes b, const tracing::trace_state_ptr& tr_state) const {
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::AUTH_CHALLENGE, tr_state);
|
||||
response->write_bytes(std::move(b));
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state)
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
std::multimap<sstring, sstring> opts;
|
||||
opts.insert({"CQL_VERSION", cql3::query_processor::CQL_VERSION});
|
||||
opts.insert({"COMPRESSION", "lz4"});
|
||||
opts.insert({"COMPRESSION", "snappy"});
|
||||
if (_server._config.allow_shard_aware_drivers) {
|
||||
auto& part = dht::global_partitioner();
|
||||
auto const& part = dht::global_partitioner();
|
||||
opts.insert({"SCYLLA_SHARD", format("{:d}", engine().cpu_id())});
|
||||
opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)});
|
||||
opts.insert({"SCYLLA_SHARDING_ALGORITHM", part.cpu_sharding_algorithm_name()});
|
||||
@@ -1152,7 +1152,7 @@ public:
|
||||
};
|
||||
|
||||
std::unique_ptr<cql_server::response>
|
||||
cql_server::connection::make_result(int16_t stream, shared_ptr<messages::result_message> msg, const tracing::trace_state_ptr& tr_state, bool skip_metadata)
|
||||
cql_server::connection::make_result(int16_t stream, shared_ptr<messages::result_message> msg, const tracing::trace_state_ptr& tr_state, bool skip_metadata) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::RESULT, tr_state);
|
||||
if (__builtin_expect(!msg->warnings().empty() && _version > 3, false)) {
|
||||
@@ -1165,7 +1165,7 @@ cql_server::connection::make_result(int16_t stream, shared_ptr<messages::result_
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response>
|
||||
cql_server::connection::make_topology_change_event(const event::topology_change& event)
|
||||
cql_server::connection::make_topology_change_event(const event::topology_change& event) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(-1, cql_binary_opcode::EVENT, tracing::trace_state_ptr());
|
||||
response->write_string("TOPOLOGY_CHANGE");
|
||||
@@ -1175,7 +1175,7 @@ cql_server::connection::make_topology_change_event(const event::topology_change&
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response>
|
||||
cql_server::connection::make_status_change_event(const event::status_change& event)
|
||||
cql_server::connection::make_status_change_event(const event::status_change& event) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(-1, cql_binary_opcode::EVENT, tracing::trace_state_ptr());
|
||||
response->write_string("STATUS_CHANGE");
|
||||
@@ -1185,7 +1185,7 @@ cql_server::connection::make_status_change_event(const event::status_change& eve
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response>
|
||||
cql_server::connection::make_schema_change_event(const event::schema_change& event)
|
||||
cql_server::connection::make_schema_change_event(const event::schema_change& event) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(-1, cql_binary_opcode::EVENT, tracing::trace_state_ptr());
|
||||
response->write_string("SCHEMA_CHANGE");
|
||||
|
||||
@@ -178,12 +178,12 @@ private:
|
||||
future<> process_request();
|
||||
future<> shutdown();
|
||||
private:
|
||||
const ::timeout_config& timeout_config() { return _server.timeout_config(); }
|
||||
const ::timeout_config& timeout_config() const { return _server.timeout_config(); }
|
||||
friend class process_request_executor;
|
||||
future<std::unique_ptr<cql_server::response>> process_request_one(fragmented_temporary_buffer::istream buf, uint8_t op, uint16_t stream, service::client_state& client_state, tracing_request_type tracing_request, service_permit permit);
|
||||
unsigned frame_size() const;
|
||||
unsigned pick_request_cpu();
|
||||
cql_binary_frame_v3 parse_frame(temporary_buffer<char> buf);
|
||||
cql_binary_frame_v3 parse_frame(temporary_buffer<char> buf) const;
|
||||
future<fragmented_temporary_buffer> read_and_decompress_frame(size_t length, uint8_t flags);
|
||||
future<std::optional<cql_binary_frame_v3>> read_frame();
|
||||
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
@@ -195,23 +195,23 @@ private:
|
||||
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
|
||||
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
|
||||
std::unique_ptr<cql_server::response> make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_result(int16_t stream, shared_ptr<cql_transport::messages::result_message> msg, const tracing::trace_state_ptr& tr_state, bool skip_metadata = false);
|
||||
std::unique_ptr<cql_server::response> make_topology_change_event(const cql_transport::event::topology_change& event);
|
||||
std::unique_ptr<cql_server::response> make_status_change_event(const cql_transport::event::status_change& event);
|
||||
std::unique_ptr<cql_server::response> make_schema_change_event(const cql_transport::event::schema_change& event);
|
||||
std::unique_ptr<cql_server::response> make_autheticate(int16_t, const sstring&, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_auth_success(int16_t, bytes, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_auth_challenge(int16_t, bytes, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_result(int16_t stream, shared_ptr<cql_transport::messages::result_message> msg, const tracing::trace_state_ptr& tr_state, bool skip_metadata = false) const;
|
||||
std::unique_ptr<cql_server::response> make_topology_change_event(const cql_transport::event::topology_change& event) const;
|
||||
std::unique_ptr<cql_server::response> make_status_change_event(const cql_transport::event::status_change& event) const;
|
||||
std::unique_ptr<cql_server::response> make_schema_change_event(const cql_transport::event::schema_change& event) const;
|
||||
std::unique_ptr<cql_server::response> make_autheticate(int16_t, const sstring&, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_auth_success(int16_t, bytes, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_auth_challenge(int16_t, bytes, const tracing::trace_state_ptr& tr_state) const;
|
||||
|
||||
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none);
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ private:
|
||||
return _broadcast_rpc_address;
|
||||
}
|
||||
public:
|
||||
static const int32_t MAX_UNSIGNED_SHORT = 0xFFFF;
|
||||
static constexpr int32_t MAX_UNSIGNED_SHORT = 0xFFFF;
|
||||
|
||||
static void set_broadcast_address(inet_address addr) {
|
||||
broadcast_address() = addr;
|
||||
|
||||
@@ -39,13 +39,13 @@ namespace utils {
|
||||
|
||||
template <typename T> class in;
|
||||
template <typename T> struct is_in {
|
||||
static const bool value = false;
|
||||
static constexpr bool value = false;
|
||||
};
|
||||
template <typename T> struct is_in<in<T>> {
|
||||
static const bool value = true;
|
||||
static constexpr bool value = true;
|
||||
};
|
||||
template <typename T> struct is_in<const in<T>> {
|
||||
static const bool value = true;
|
||||
static constexpr bool value = true;
|
||||
};
|
||||
|
||||
//
|
||||
|
||||
@@ -165,6 +165,6 @@ template<typename... Ts>
|
||||
constexpr size_t size = internal::get_size<Ts...>::value;
|
||||
|
||||
template<template <class> typename Predicate, typename... Ts>
|
||||
static constexpr const bool all_of = std::conjunction_v<Predicate<Ts>...>;
|
||||
static constexpr bool all_of = std::conjunction_v<Predicate<Ts>...>;
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user