Merge "Update log messages to {fmt} rules" from Pavel E
"
Before seastar is updated with the {fmt} engine under the
logging hood, some changes are to be made in scylla to
conform to {fmt} standards.
Compilation and tests checked against both -- old (current)
and new seastar-s.
tests: unit(dev), manual
"
* 'br-logging-update' of https://github.com/xemul/scylla:
code: Force formatting of pointer in .debug and .trace
code: Format { and } as {fmt} needs
streaming: Do not reveal raw pointer in info message
mp_row_consumer: Provide hex-formatting wrapper for bytes_view
heat_load_balance: Include fmt/ranges.h
This commit is contained in:
4
bytes.cc
4
bytes.cc
@@ -100,3 +100,7 @@ std::ostream& operator<<(std::ostream& os, const bytes_view& b) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const fmt_hex& b) {
|
||||
return os << to_hex(b.v);
|
||||
}
|
||||
|
||||
6
bytes.hh
6
bytes.hh
@@ -50,6 +50,12 @@ struct hash<bytes_view> {
|
||||
|
||||
}
|
||||
|
||||
struct fmt_hex {
|
||||
bytes_view& v;
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const fmt_hex& hex);
|
||||
|
||||
bytes from_hex(sstring_view s);
|
||||
sstring to_hex(bytes_view b);
|
||||
sstring to_hex(const bytes& b);
|
||||
|
||||
@@ -157,7 +157,7 @@ public:
|
||||
, _read_context(std::move(ctx))
|
||||
, _next_row(*_schema, *_snp)
|
||||
{
|
||||
clogger.trace("csm {}: table={}.{}", this, _schema->ks_name(), _schema->cf_name());
|
||||
clogger.trace("csm {}: table={}.{}", fmt::ptr(this), _schema->ks_name(), _schema->cf_name());
|
||||
push_mutation_fragment(partition_start(std::move(dk), _snp->partition_tombstone()));
|
||||
}
|
||||
cache_flat_mutation_reader(const cache_flat_mutation_reader&) = delete;
|
||||
@@ -231,7 +231,7 @@ future<> cache_flat_mutation_reader::fill_buffer(db::timeout_clock::time_point t
|
||||
return after_static_row();
|
||||
}
|
||||
}
|
||||
clogger.trace("csm {}: fill_buffer(), range={}, lb={}", this, *_ck_ranges_curr, _lower_bound);
|
||||
clogger.trace("csm {}: fill_buffer(), range={}, lb={}", fmt::ptr(this), *_ck_ranges_curr, _lower_bound);
|
||||
return do_until([this] { return _end_of_stream || is_buffer_full(); }, [this, timeout] {
|
||||
return do_fill_buffer(timeout);
|
||||
});
|
||||
@@ -276,7 +276,7 @@ future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_poin
|
||||
// assert(_state == state::reading_from_cache)
|
||||
return _lsa_manager.run_in_read_section([this] {
|
||||
auto next_valid = _next_row.iterators_valid();
|
||||
clogger.trace("csm {}: reading_from_cache, range=[{}, {}), next={}, valid={}", this, _lower_bound,
|
||||
clogger.trace("csm {}: reading_from_cache, range=[{}, {}), next={}, valid={}", fmt::ptr(this), _lower_bound,
|
||||
_upper_bound, _next_row.position(), next_valid);
|
||||
// We assume that if there was eviction, and thus the range may
|
||||
// no longer be continuous, the cursor was invalidated.
|
||||
@@ -290,7 +290,7 @@ future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_poin
|
||||
}
|
||||
}
|
||||
_next_row.maybe_refresh();
|
||||
clogger.trace("csm {}: next={}, cont={}", this, _next_row.position(), _next_row.continuous());
|
||||
clogger.trace("csm {}: next={}, cont={}", fmt::ptr(this), _next_row.position(), _next_row.continuous());
|
||||
_lower_bound_changed = false;
|
||||
while (_state == state::reading_from_cache) {
|
||||
copy_from_cache_to_buffer();
|
||||
@@ -356,7 +356,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
|
||||
e.release();
|
||||
auto next = std::next(it);
|
||||
it->set_continuous(next->continuous());
|
||||
clogger.trace("csm {}: inserted dummy at {}, cont={}", this, it->position(), it->continuous());
|
||||
clogger.trace("csm {}: inserted dummy at {}, cont={}", fmt::ptr(this), it->position(), it->continuous());
|
||||
}
|
||||
});
|
||||
} else if (ensure_population_lower_bound()) {
|
||||
@@ -367,11 +367,11 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
|
||||
auto insert_result = rows.insert_check(_next_row.get_iterator_in_latest_version(), *e, less);
|
||||
auto inserted = insert_result.second;
|
||||
if (inserted) {
|
||||
clogger.trace("csm {}: inserted dummy at {}", this, _upper_bound);
|
||||
clogger.trace("csm {}: inserted dummy at {}", fmt::ptr(this), _upper_bound);
|
||||
_snp->tracker()->insert(*e);
|
||||
e.release();
|
||||
} else {
|
||||
clogger.trace("csm {}: mark {} as continuous", this, insert_result.first->position());
|
||||
clogger.trace("csm {}: mark {} as continuous", fmt::ptr(this), insert_result.first->position());
|
||||
insert_result.first->set_continuous(true);
|
||||
}
|
||||
});
|
||||
@@ -412,7 +412,7 @@ bool cache_flat_mutation_reader::ensure_population_lower_bound() {
|
||||
auto insert_result = rows.insert_check(rows.end(), *e, less);
|
||||
auto inserted = insert_result.second;
|
||||
if (inserted) {
|
||||
clogger.trace("csm {}: inserted lower bound dummy at {}", this, e->position());
|
||||
clogger.trace("csm {}: inserted lower bound dummy at {}", fmt::ptr(this), e->position());
|
||||
_snp->tracker()->insert(*e);
|
||||
e.release();
|
||||
}
|
||||
@@ -452,7 +452,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
_read_context->cache().on_mispopulate();
|
||||
return;
|
||||
}
|
||||
clogger.trace("csm {}: populate({})", this, clustering_row::printer(*_schema, cr));
|
||||
clogger.trace("csm {}: populate({})", fmt::ptr(this), clustering_row::printer(*_schema, cr));
|
||||
_lsa_manager.run_in_update_section_with_allocator([this, &cr] {
|
||||
mutation_partition& mp = _snp->version()->partition();
|
||||
rows_entry::compare less(*_schema);
|
||||
@@ -474,7 +474,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
|
||||
rows_entry& e = *it;
|
||||
if (ensure_population_lower_bound()) {
|
||||
clogger.trace("csm {}: set_continuous({})", this, e.position());
|
||||
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), e.position());
|
||||
e.set_continuous(true);
|
||||
} else {
|
||||
_read_context->cache().on_mispopulate();
|
||||
@@ -493,14 +493,14 @@ bool cache_flat_mutation_reader::after_current_range(position_in_partition_view
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::start_reading_from_underlying() {
|
||||
clogger.trace("csm {}: start_reading_from_underlying(), range=[{}, {})", this, _lower_bound, _next_row_in_range ? _next_row.position() : _upper_bound);
|
||||
clogger.trace("csm {}: start_reading_from_underlying(), range=[{}, {})", fmt::ptr(this), _lower_bound, _next_row_in_range ? _next_row.position() : _upper_bound);
|
||||
_state = state::move_to_underlying;
|
||||
_next_row.touch();
|
||||
}
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
|
||||
clogger.trace("csm {}: copy_from_cache, next={}, next_row_in_range={}", this, _next_row.position(), _next_row_in_range);
|
||||
clogger.trace("csm {}: copy_from_cache, next={}, next_row_in_range={}", fmt::ptr(this), _next_row.position(), _next_row_in_range);
|
||||
_next_row.touch();
|
||||
position_in_partition_view next_lower_bound = _next_row.dummy() ? _next_row.position() : position_in_partition_view::after_key(_next_row.key());
|
||||
for (auto &&rts : _snp->range_tombstones(_lower_bound, _next_row_in_range ? next_lower_bound : _upper_bound)) {
|
||||
@@ -532,7 +532,7 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
|
||||
inline
|
||||
void cache_flat_mutation_reader::move_to_end() {
|
||||
finish_reader();
|
||||
clogger.trace("csm {}: eos", this);
|
||||
clogger.trace("csm {}: eos", fmt::ptr(this));
|
||||
}
|
||||
|
||||
inline
|
||||
@@ -557,7 +557,7 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
|
||||
_ck_ranges_curr = next_it;
|
||||
auto adjacent = _next_row.advance_to(_lower_bound);
|
||||
_next_row_in_range = !after_current_range(_next_row.position());
|
||||
clogger.trace("csm {}: move_to_range(), range={}, lb={}, ub={}, next={}", this, *_ck_ranges_curr, _lower_bound, _upper_bound, _next_row.position());
|
||||
clogger.trace("csm {}: move_to_range(), range={}, lb={}, ub={}, next={}", fmt::ptr(this), *_ck_ranges_curr, _lower_bound, _upper_bound, _next_row.position());
|
||||
if (!adjacent && !_next_row.continuous()) {
|
||||
// FIXME: We don't insert a dummy for singular range to avoid allocating 3 entries
|
||||
// for a hit (before, at and after). If we supported the concept of an incomplete row,
|
||||
@@ -567,7 +567,7 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
|
||||
// Insert dummy for lower bound
|
||||
if (can_populate()) {
|
||||
// FIXME: _lower_bound could be adjacent to the previous row, in which case we could skip this
|
||||
clogger.trace("csm {}: insert dummy at {}", this, _lower_bound);
|
||||
clogger.trace("csm {}: insert dummy at {}", fmt::ptr(this), _lower_bound);
|
||||
auto it = with_allocator(_lsa_manager.region().allocator(), [&] {
|
||||
auto& rows = _snp->version()->partition().clustered_rows();
|
||||
auto new_entry = current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no);
|
||||
@@ -586,7 +586,7 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
|
||||
// _next_row must be inside the range.
|
||||
inline
|
||||
void cache_flat_mutation_reader::move_to_next_entry() {
|
||||
clogger.trace("csm {}: move_to_next_entry(), curr={}", this, _next_row.position());
|
||||
clogger.trace("csm {}: move_to_next_entry(), curr={}", fmt::ptr(this), _next_row.position());
|
||||
if (no_clustering_row_between(*_schema, _next_row.position(), _upper_bound)) {
|
||||
move_to_next_range();
|
||||
} else {
|
||||
@@ -595,7 +595,7 @@ void cache_flat_mutation_reader::move_to_next_entry() {
|
||||
return;
|
||||
}
|
||||
_next_row_in_range = !after_current_range(_next_row.position());
|
||||
clogger.trace("csm {}: next={}, cont={}, in_range={}", this, _next_row.position(), _next_row.continuous(), _next_row_in_range);
|
||||
clogger.trace("csm {}: next={}, cont={}, in_range={}", fmt::ptr(this), _next_row.position(), _next_row.continuous(), _next_row_in_range);
|
||||
if (!_next_row.continuous()) {
|
||||
start_reading_from_underlying();
|
||||
}
|
||||
@@ -604,7 +604,7 @@ void cache_flat_mutation_reader::move_to_next_entry() {
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_to_buffer(mutation_fragment&& mf) {
|
||||
clogger.trace("csm {}: add_to_buffer({})", this, mutation_fragment::printer(*_schema, mf));
|
||||
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), mutation_fragment::printer(*_schema, mf));
|
||||
if (mf.is_clustering_row()) {
|
||||
add_clustering_row_to_buffer(std::move(mf));
|
||||
} else {
|
||||
@@ -626,7 +626,7 @@ void cache_flat_mutation_reader::add_to_buffer(const partition_snapshot_row_curs
|
||||
// (2) If _lower_bound > mf.position(), mf was emitted
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_clustering_row_to_buffer(mutation_fragment&& mf) {
|
||||
clogger.trace("csm {}: add_clustering_row_to_buffer({})", this, mutation_fragment::printer(*_schema, mf));
|
||||
clogger.trace("csm {}: add_clustering_row_to_buffer({})", fmt::ptr(this), mutation_fragment::printer(*_schema, mf));
|
||||
auto& row = mf.as_clustering_row();
|
||||
auto new_lower_bound = position_in_partition::after_key(row.key());
|
||||
push_mutation_fragment(std::move(mf));
|
||||
@@ -636,7 +636,7 @@ void cache_flat_mutation_reader::add_clustering_row_to_buffer(mutation_fragment&
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_to_buffer(range_tombstone&& rt) {
|
||||
clogger.trace("csm {}: add_to_buffer({})", this, rt);
|
||||
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), rt);
|
||||
// This guarantees that rt starts after any emitted clustering_row
|
||||
// and not before any emitted range tombstone.
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
@@ -655,7 +655,7 @@ void cache_flat_mutation_reader::add_to_buffer(range_tombstone&& rt) {
|
||||
inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const range_tombstone& rt) {
|
||||
if (can_populate()) {
|
||||
clogger.trace("csm {}: maybe_add_to_cache({})", this, rt);
|
||||
clogger.trace("csm {}: maybe_add_to_cache({})", fmt::ptr(this), rt);
|
||||
_lsa_manager.run_in_update_section_with_allocator([&] {
|
||||
_snp->version()->partition().row_tombstones().apply_monotonically(*_schema, rt);
|
||||
});
|
||||
@@ -667,7 +667,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const range_tombstone& rt) {
|
||||
inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const static_row& sr) {
|
||||
if (can_populate()) {
|
||||
clogger.trace("csm {}: populate({})", this, static_row::printer(*_schema, sr));
|
||||
clogger.trace("csm {}: populate({})", fmt::ptr(this), static_row::printer(*_schema, sr));
|
||||
_read_context->cache().on_static_row_insert();
|
||||
_lsa_manager.run_in_update_section_with_allocator([&] {
|
||||
if (_read_context->digest_requested()) {
|
||||
@@ -683,7 +683,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const static_row& sr) {
|
||||
inline
|
||||
void cache_flat_mutation_reader::maybe_set_static_row_continuous() {
|
||||
if (can_populate()) {
|
||||
clogger.trace("csm {}: set static row continuous", this);
|
||||
clogger.trace("csm {}: set static row continuous", fmt::ptr(this));
|
||||
_snp->version()->partition().set_static_row_continuous(true);
|
||||
} else {
|
||||
_read_context->cache().on_mispopulate();
|
||||
|
||||
@@ -31,11 +31,11 @@ namespace db {
|
||||
|
||||
void data_listeners::install(data_listener* listener) {
|
||||
_listeners.emplace(listener);
|
||||
dblog.debug("data_listeners: install listener {}", listener);
|
||||
dblog.debug("data_listeners: install listener {}", fmt::ptr(listener));
|
||||
}
|
||||
|
||||
void data_listeners::uninstall(data_listener* listener) {
|
||||
dblog.debug("data_listeners: uninstall listener {}", listener);
|
||||
dblog.debug("data_listeners: uninstall listener {}", fmt::ptr(listener));
|
||||
_listeners.erase(listener);
|
||||
}
|
||||
|
||||
@@ -64,17 +64,17 @@ toppartitions_item_key::operator sstring() const {
|
||||
}
|
||||
|
||||
toppartitions_data_listener::toppartitions_data_listener(database& db, sstring ks, sstring cf) : _db(db), _ks(ks), _cf(cf) {
|
||||
dblog.debug("toppartitions_data_listener: installing {}", this);
|
||||
dblog.debug("toppartitions_data_listener: installing {}", fmt::ptr(this));
|
||||
_db.data_listeners().install(this);
|
||||
}
|
||||
|
||||
toppartitions_data_listener::~toppartitions_data_listener() {
|
||||
dblog.debug("toppartitions_data_listener: uninstalling {}", this);
|
||||
dblog.debug("toppartitions_data_listener: uninstalling {}", fmt::ptr(this));
|
||||
_db.data_listeners().uninstall(this);
|
||||
}
|
||||
|
||||
future<> toppartitions_data_listener::stop() {
|
||||
dblog.debug("toppartitions_data_listener: stopping {}", this);
|
||||
dblog.debug("toppartitions_data_listener: stopping {}", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -138,7 +138,7 @@ future<toppartitions_query::results> toppartitions_query::gather(unsigned res_si
|
||||
dblog.debug("toppartitions_query::gather");
|
||||
|
||||
auto map = [res_size, this] (toppartitions_data_listener& listener) {
|
||||
dblog.trace("toppartitions_query::map_reduce with listener {}", &listener);
|
||||
dblog.trace("toppartitions_query::map_reduce with listener {}", fmt::ptr(&listener));
|
||||
top_t rd = toppartitions_data_listener::globalize(listener._top_k_read.top(res_size));
|
||||
top_t wr = toppartitions_data_listener::globalize(listener._top_k_write.top(res_size));
|
||||
return make_foreign(std::make_unique<std::tuple<top_t, top_t>>(std::move(rd), std::move(wr)));
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include <vector>
|
||||
#include <list>
|
||||
#include <random>
|
||||
#include <fmt/ranges.h>
|
||||
#include "heat_load_balance.hh"
|
||||
|
||||
logging::logger hr_logger("heat_load_balance");
|
||||
@@ -316,7 +317,7 @@ redistribute(const std::vector<float>& p, unsigned me, unsigned k) {
|
||||
float s = 0;
|
||||
int count = mixed_count;
|
||||
for (auto& d : sorted_deficits) {
|
||||
hr_logger.trace("next sorted deficit={{}, {}}", d.first, d.second);
|
||||
hr_logger.trace("next sorted deficit={{{}, {}}}", d.first, d.second);
|
||||
// What "diff" to distribute
|
||||
auto diff = d.second - s;
|
||||
s = d.second;
|
||||
|
||||
@@ -34,7 +34,7 @@ void row_locker::upgrade(schema_ptr new_schema) {
|
||||
if (new_schema == _schema) {
|
||||
return;
|
||||
}
|
||||
mylog.debug("row_locker::upgrade from {} to {}", _schema.get(), new_schema.get());
|
||||
mylog.debug("row_locker::upgrade from {} to {}", fmt::ptr(_schema.get()), fmt::ptr(new_schema.get()));
|
||||
_schema = new_schema;
|
||||
}
|
||||
|
||||
|
||||
@@ -1476,7 +1476,7 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
|
||||
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_alive(addr, local_state);
|
||||
logger.trace("Notified {}", subscriber.get());
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1489,7 +1489,7 @@ void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(local_state));
|
||||
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_dead(addr, local_state);
|
||||
logger.trace("Notified {}", subscriber.get());
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -174,7 +174,7 @@ void compression::segmented_offsets::init(uint32_t chunk_size) {
|
||||
|
||||
sstlog.trace(
|
||||
"{} {}(): chunk size {} (log2)",
|
||||
this,
|
||||
fmt::ptr(this),
|
||||
__FUNCTION__,
|
||||
static_cast<int>(params.first.chunk_size_log2));
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ public:
|
||||
}
|
||||
|
||||
future<> skip_to(indexable_element el, uint64_t begin) {
|
||||
sstlog.trace("data_consume_rows_context {}: skip_to({} -> {}, el={})", _ctx.get(), _ctx->position(), begin, static_cast<int>(el));
|
||||
sstlog.trace("data_consume_rows_context {}: skip_to({} -> {}, el={})", fmt::ptr(_ctx.get()), _ctx->position(), begin, static_cast<int>(el));
|
||||
if (begin <= _ctx->position()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -149,37 +149,37 @@ public:
|
||||
switch (_state) {
|
||||
// START comes first, to make the handling of the 0-quantity case simpler
|
||||
case state::START:
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::START);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::START);
|
||||
_state = state::KEY_SIZE;
|
||||
break;
|
||||
case state::KEY_SIZE:
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::KEY_SIZE);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::KEY_SIZE);
|
||||
_entry_offset = current_pos();
|
||||
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::KEY_BYTES;
|
||||
break;
|
||||
}
|
||||
case state::KEY_BYTES:
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::KEY_BYTES);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::KEY_BYTES);
|
||||
if (this->read_bytes(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::POSITION;
|
||||
break;
|
||||
}
|
||||
case state::POSITION:
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::POSITION);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::POSITION);
|
||||
if (read_vint_or_uint64(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::PROMOTED_SIZE;
|
||||
break;
|
||||
}
|
||||
case state::PROMOTED_SIZE:
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::PROMOTED_SIZE);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::PROMOTED_SIZE);
|
||||
_position = this->_u64;
|
||||
if (read_vint_or_uint32(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::PARTITION_HEADER_LENGTH_1;
|
||||
break;
|
||||
}
|
||||
case state::PARTITION_HEADER_LENGTH_1: {
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::PARTITION_HEADER_LENGTH_1);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::PARTITION_HEADER_LENGTH_1);
|
||||
auto promoted_index_size_with_header = get_uint32();
|
||||
_promoted_index_end = current_pos() + promoted_index_size_with_header;
|
||||
if (promoted_index_size_with_header == 0) {
|
||||
@@ -197,25 +197,25 @@ public:
|
||||
}
|
||||
}
|
||||
case state::PARTITION_HEADER_LENGTH_2:
|
||||
sstlog.trace("{}: pos {} state {} {}", this, current_pos(), state::PARTITION_HEADER_LENGTH_2, this->_u64);
|
||||
sstlog.trace("{}: pos {} state {} {}", fmt::ptr(this), current_pos(), state::PARTITION_HEADER_LENGTH_2, this->_u64);
|
||||
_partition_header_length = this->_u64;
|
||||
state_LOCAL_DELETION_TIME:
|
||||
case state::LOCAL_DELETION_TIME:
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::LOCAL_DELETION_TIME);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::LOCAL_DELETION_TIME);
|
||||
_deletion_time.emplace();
|
||||
if (this->read_32(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::MARKED_FOR_DELETE_AT;
|
||||
break;
|
||||
}
|
||||
case state::MARKED_FOR_DELETE_AT:
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::MARKED_FOR_DELETE_AT);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::MARKED_FOR_DELETE_AT);
|
||||
_deletion_time->local_deletion_time = this->_u32;
|
||||
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::NUM_PROMOTED_INDEX_BLOCKS;
|
||||
break;
|
||||
}
|
||||
case state::NUM_PROMOTED_INDEX_BLOCKS:
|
||||
sstlog.trace("{}: pos {} state {}", this, current_pos(), state::NUM_PROMOTED_INDEX_BLOCKS);
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::NUM_PROMOTED_INDEX_BLOCKS);
|
||||
_deletion_time->marked_for_delete_at = this->_u64;
|
||||
if (read_vint_or_uint32(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::CONSUME_ENTRY;
|
||||
@@ -225,7 +225,7 @@ public:
|
||||
case state::CONSUME_ENTRY: {
|
||||
auto promoted_index_start = current_pos();
|
||||
auto promoted_index_size = _promoted_index_end - promoted_index_start;
|
||||
sstlog.trace("{}: pos {} state {} size {}", this, current_pos(), state::CONSUME_ENTRY, promoted_index_size);
|
||||
sstlog.trace("{}: pos {} state {} size {}", fmt::ptr(this), current_pos(), state::CONSUME_ENTRY, promoted_index_size);
|
||||
if (_deletion_time) {
|
||||
_num_pi_blocks = get_uint32();
|
||||
}
|
||||
@@ -270,13 +270,13 @@ public:
|
||||
data.trim_front(promoted_index_size);
|
||||
} else {
|
||||
data.trim(0);
|
||||
sstlog.trace("{}: skip {} pos {} state {}", this, promoted_index_size - data_size, current_pos(), _state);
|
||||
sstlog.trace("{}: skip {} pos {} state {}", fmt::ptr(this), promoted_index_size - data_size, current_pos(), _state);
|
||||
return skip_bytes{promoted_index_size - data_size};
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
sstlog.trace("{}: exit pos {} state {}", this, current_pos(), _state);
|
||||
sstlog.trace("{}: exit pos {} state {}", fmt::ptr(this), current_pos(), _state);
|
||||
return proceed::yes;
|
||||
}
|
||||
|
||||
@@ -408,7 +408,7 @@ class index_reader {
|
||||
|
||||
private:
|
||||
void advance_to_end(index_bound& bound) {
|
||||
sstlog.trace("index {}: advance_to_end() bound {}", this, &bound);
|
||||
sstlog.trace("index {}: advance_to_end() bound {}", fmt::ptr(this), fmt::ptr(&bound));
|
||||
bound.data_file_position = data_file_end();
|
||||
bound.element = indexable_element::partition;
|
||||
bound.current_list = {};
|
||||
@@ -417,16 +417,16 @@ private:
|
||||
|
||||
// Must be called for non-decreasing summary_idx.
|
||||
future<> advance_to_page(index_bound& bound, uint64_t summary_idx) {
|
||||
sstlog.trace("index {}: advance_to_page({}), bound {}", this, summary_idx, &bound);
|
||||
sstlog.trace("index {}: advance_to_page({}), bound {}", fmt::ptr(this), summary_idx, fmt::ptr(&bound));
|
||||
assert(!bound.current_list || bound.current_summary_idx <= summary_idx);
|
||||
if (bound.current_list && bound.current_summary_idx == summary_idx) {
|
||||
sstlog.trace("index {}: same page", this);
|
||||
sstlog.trace("index {}: same page", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto& summary = _sstable->get_summary();
|
||||
if (summary_idx >= summary.header.size) {
|
||||
sstlog.trace("index {}: eof", this);
|
||||
sstlog.trace("index {}: eof", fmt::ptr(this));
|
||||
advance_to_end(bound);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -482,7 +482,7 @@ private:
|
||||
bound.end_open_marker.reset();
|
||||
|
||||
if (sstlog.is_enabled(seastar::log_level::trace)) {
|
||||
sstlog.trace("index {} bound {}: page:", this, &bound);
|
||||
sstlog.trace("index {} bound {}: page:", fmt::ptr(this), fmt::ptr(&bound));
|
||||
for (const index_entry& e : *bound.current_list) {
|
||||
auto dk = dht::decorate_key(*_sstable->_schema,
|
||||
e.get_key().to_partition_key(*_sstable->_schema));
|
||||
@@ -533,7 +533,7 @@ private:
|
||||
}
|
||||
|
||||
future<> advance_to_next_partition(index_bound& bound) {
|
||||
sstlog.trace("index {} bound {}: advance_to_next_partition()", &bound, this);
|
||||
sstlog.trace("index {} bound {}: advance_to_next_partition()", fmt::ptr(&bound), fmt::ptr(this));
|
||||
if (!partition_data_ready(bound)) {
|
||||
return advance_to_page(bound, 0).then([this, &bound] {
|
||||
return advance_to_next_partition(bound);
|
||||
@@ -557,10 +557,10 @@ private:
|
||||
|
||||
future<> advance_to(index_bound& bound, dht::ring_position_view pos) {
|
||||
sstlog.trace("index {} bound {}: advance_to({}), _previous_summary_idx={}, _current_summary_idx={}",
|
||||
this, &bound, pos, bound.previous_summary_idx, bound.current_summary_idx);
|
||||
fmt::ptr(this), fmt::ptr(&bound), pos, bound.previous_summary_idx, bound.current_summary_idx);
|
||||
|
||||
if (pos.is_min()) {
|
||||
sstlog.trace("index {}: first entry", this);
|
||||
sstlog.trace("index {}: first entry", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
} else if (pos.is_max()) {
|
||||
advance_to_end(bound);
|
||||
@@ -572,13 +572,13 @@ private:
|
||||
std::lower_bound(summary.entries.begin() + bound.previous_summary_idx, summary.entries.end(), pos, index_comparator(*_sstable->_schema)));
|
||||
|
||||
if (bound.previous_summary_idx == 0) {
|
||||
sstlog.trace("index {}: first entry", this);
|
||||
sstlog.trace("index {}: first entry", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto summary_idx = bound.previous_summary_idx - 1;
|
||||
|
||||
sstlog.trace("index {}: summary_idx={}", this, summary_idx);
|
||||
sstlog.trace("index {}: summary_idx={}", fmt::ptr(this), summary_idx);
|
||||
|
||||
// Despite the requirement that the values of 'pos' in subsequent calls
|
||||
// are increasing we still may encounter a situation when we try to read
|
||||
@@ -601,11 +601,11 @@ private:
|
||||
}
|
||||
|
||||
return advance_to_page(bound, summary_idx).then([this, &bound, pos, summary_idx] {
|
||||
sstlog.trace("index {}: old page index = {}", this, bound.current_index_idx);
|
||||
sstlog.trace("index {}: old page index = {}", fmt::ptr(this), bound.current_index_idx);
|
||||
auto& entries = *bound.current_list;
|
||||
auto i = std::lower_bound(std::begin(entries) + bound.current_index_idx, std::end(entries), pos, index_comparator(*_sstable->_schema));
|
||||
if (i == std::end(entries)) {
|
||||
sstlog.trace("index {}: not found", this);
|
||||
sstlog.trace("index {}: not found", fmt::ptr(this));
|
||||
return advance_to_page(bound, summary_idx + 1);
|
||||
}
|
||||
bound.current_index_idx = std::distance(std::begin(entries), i);
|
||||
@@ -613,7 +613,7 @@ private:
|
||||
bound.data_file_position = i->position();
|
||||
bound.element = indexable_element::partition;
|
||||
bound.end_open_marker.reset();
|
||||
sstlog.trace("index {}: new page index = {}, pos={}", this, bound.current_index_idx, bound.data_file_position);
|
||||
sstlog.trace("index {}: new page index = {}, pos={}", fmt::ptr(this), bound.current_index_idx, bound.data_file_position);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
@@ -628,7 +628,7 @@ private:
|
||||
//
|
||||
// Must be called only when !eof().
|
||||
future<> advance_upper_past(position_in_partition_view pos) {
|
||||
sstlog.trace("index {}: advance_upper_past({})", this, pos);
|
||||
sstlog.trace("index {}: advance_upper_past({})", fmt::ptr(this), pos);
|
||||
|
||||
// We advance cursor within the current lower bound partition
|
||||
// So need to make sure first that it is read
|
||||
@@ -646,7 +646,7 @@ private:
|
||||
index_entry& e = current_partition_entry(*_upper_bound);
|
||||
|
||||
if (!e.get_promoted_index()) {
|
||||
sstlog.trace("index {}: no promoted index", this);
|
||||
sstlog.trace("index {}: no promoted index", fmt::ptr(this));
|
||||
return advance_to_next_partition(*_upper_bound);
|
||||
}
|
||||
|
||||
@@ -657,7 +657,7 @@ private:
|
||||
}
|
||||
_upper_bound->data_file_position = e.position() + *off;
|
||||
_upper_bound->element = indexable_element::cell;
|
||||
sstlog.trace("index {} upper bound: skipped to cell, _data_file_position={}", this, _upper_bound->data_file_position);
|
||||
sstlog.trace("index {} upper bound: skipped to cell, _data_file_position={}", fmt::ptr(this), _upper_bound->data_file_position);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
@@ -674,7 +674,7 @@ public:
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
{
|
||||
sstlog.trace("index {}: index_reader for {}", this, _sstable->get_filename());
|
||||
sstlog.trace("index {}: index_reader for {}", fmt::ptr(this), _sstable->get_filename());
|
||||
}
|
||||
|
||||
// Ensures that partition_data_ready() returns true.
|
||||
@@ -730,7 +730,7 @@ public:
|
||||
// Must be called only after advanced to some partition and !eof().
|
||||
future<> advance_to(position_in_partition_view pos) {
|
||||
sstlog.trace("index {}: advance_to({}), current data_file_pos={}",
|
||||
this, pos, _lower_bound.data_file_position);
|
||||
fmt::ptr(this), pos, _lower_bound.data_file_position);
|
||||
|
||||
const schema& s = *_sstable->_schema;
|
||||
if (pos.is_before_all_fragments(s)) {
|
||||
@@ -739,7 +739,7 @@ public:
|
||||
|
||||
if (!partition_data_ready()) {
|
||||
return read_partition_data().then([this, pos] {
|
||||
sstlog.trace("index {}: page done", this);
|
||||
sstlog.trace("index {}: page done", fmt::ptr(this));
|
||||
assert(partition_data_ready(_lower_bound));
|
||||
return advance_to(pos);
|
||||
});
|
||||
@@ -747,14 +747,14 @@ public:
|
||||
|
||||
index_entry& e = current_partition_entry();
|
||||
if (!e.get_promoted_index()) {
|
||||
sstlog.trace("index {}: no promoted index", this);
|
||||
sstlog.trace("index {}: no promoted index", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
promoted_index& pi = *e.get_promoted_index();
|
||||
return pi.cursor().advance_to(pos).then([this, &e] (std::optional<clustered_index_cursor::skip_info> si) {
|
||||
if (!si) {
|
||||
sstlog.trace("index {}: position in the same block", this);
|
||||
sstlog.trace("index {}: position in the same block", fmt::ptr(this));
|
||||
return;
|
||||
}
|
||||
if (!si->active_tombstone) {
|
||||
@@ -765,7 +765,7 @@ public:
|
||||
}
|
||||
_lower_bound.data_file_position = e.position() + si->offset;
|
||||
_lower_bound.element = indexable_element::cell;
|
||||
sstlog.trace("index {}: skipped to cell, _data_file_position={}", this, _lower_bound.data_file_position);
|
||||
sstlog.trace("index {}: skipped to cell, _data_file_position={}", fmt::ptr(this), _lower_bound.data_file_position);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -338,7 +338,7 @@ private:
|
||||
_out_of_range |= _ck_ranges_walker->out_of_range();
|
||||
}
|
||||
|
||||
sstlog.trace("mp_row_consumer_k_l {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, pos, _out_of_range, _skip_in_progress);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: advance_to({}) => out_of_range={}, skip_in_progress={}", fmt::ptr(this), pos, _out_of_range, _skip_in_progress);
|
||||
}
|
||||
|
||||
// Assumes that this and other advance_to() overloads are called with monotonic positions.
|
||||
@@ -355,7 +355,7 @@ private:
|
||||
_out_of_range |= _ck_ranges_walker->out_of_range();
|
||||
}
|
||||
|
||||
sstlog.trace("mp_row_consumer_k_l {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, rt, _out_of_range, _skip_in_progress);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: advance_to({}) => out_of_range={}, skip_in_progress={}", fmt::ptr(this), rt, _out_of_range, _skip_in_progress);
|
||||
}
|
||||
|
||||
void advance_to(const mutation_fragment& mf) {
|
||||
@@ -367,7 +367,7 @@ private:
|
||||
}
|
||||
|
||||
void set_up_ck_ranges(const partition_key& pk) {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: set_up_ck_ranges({})", this, pk);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: set_up_ck_ranges({})", fmt::ptr(this), pk);
|
||||
_ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk);
|
||||
_ck_ranges_walker.emplace(*_schema, _ck_ranges->ranges(), _schema->has_static_columns());
|
||||
_last_lower_bound_counter = 0;
|
||||
@@ -423,7 +423,7 @@ public:
|
||||
}
|
||||
|
||||
proceed flush() {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush(in_progress={}, ready={}, skip={})", this,
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush(in_progress={}, ready={}, skip={})", fmt::ptr(this),
|
||||
_in_progress ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_in_progress) : std::optional<mutation_fragment::printer>(),
|
||||
_ready ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_ready) : std::optional<mutation_fragment::printer>(),
|
||||
_skip_in_progress);
|
||||
@@ -443,7 +443,7 @@ public:
|
||||
}
|
||||
|
||||
proceed flush_if_needed(range_tombstone&& rt) {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush_if_needed(in_progress={}, ready={}, skip={})", this,
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush_if_needed(in_progress={}, ready={}, skip={})", fmt::ptr(this),
|
||||
_in_progress ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_in_progress) : std::optional<mutation_fragment::printer>(),
|
||||
_ready ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_ready) : std::optional<mutation_fragment::printer>(),
|
||||
_skip_in_progress);
|
||||
@@ -463,7 +463,7 @@ public:
|
||||
}
|
||||
|
||||
proceed flush_if_needed(bool is_static, position_in_partition&& pos) {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush_if_needed({})", this, pos);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush_if_needed({})", fmt::ptr(this), pos);
|
||||
|
||||
// Part of workaround for #1203
|
||||
_first_row_encountered = !is_static;
|
||||
@@ -760,7 +760,7 @@ public:
|
||||
}
|
||||
|
||||
virtual void reset(indexable_element el) override {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: reset({})", this, static_cast<int>(el));
|
||||
sstlog.trace("mp_row_consumer_k_l {}: reset({})", fmt::ptr(this), static_cast<int>(el));
|
||||
_ready = {};
|
||||
if (el == indexable_element::partition) {
|
||||
_pending_collection = {};
|
||||
@@ -795,7 +795,7 @@ public:
|
||||
// must be after it.
|
||||
//
|
||||
std::optional<position_in_partition_view> fast_forward_to(position_range r, db::timeout_clock::time_point timeout) {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: fast_forward_to({})", this, r);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: fast_forward_to({})", fmt::ptr(this), r);
|
||||
_out_of_range = _is_mutation_end;
|
||||
_fwd_end = std::move(r).end();
|
||||
|
||||
@@ -805,7 +805,7 @@ public:
|
||||
if (_ck_ranges_walker->out_of_range()) {
|
||||
_out_of_range = true;
|
||||
_ready = {};
|
||||
sstlog.trace("mp_row_consumer_k_l {}: no more ranges", this);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: no more ranges", fmt::ptr(this));
|
||||
return { };
|
||||
}
|
||||
|
||||
@@ -818,24 +818,24 @@ public:
|
||||
if (_in_progress) {
|
||||
advance_to(*_in_progress);
|
||||
if (!_skip_in_progress) {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: _in_progress in range", this);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: _in_progress in range", fmt::ptr(this));
|
||||
return { };
|
||||
}
|
||||
}
|
||||
|
||||
if (_out_of_range) {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: _out_of_range=true", this);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: _out_of_range=true", fmt::ptr(this));
|
||||
return { };
|
||||
}
|
||||
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
if (!less(start, _fwd_end)) {
|
||||
_out_of_range = true;
|
||||
sstlog.trace("mp_row_consumer_k_l {}: no overlap with restrictions", this);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: no overlap with restrictions", fmt::ptr(this));
|
||||
return { };
|
||||
}
|
||||
|
||||
sstlog.trace("mp_row_consumer_k_l {}: advance_context({})", this, start);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: advance_context({})", fmt::ptr(this), start);
|
||||
_last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter();
|
||||
return start;
|
||||
}
|
||||
@@ -852,7 +852,7 @@ public:
|
||||
return { };
|
||||
}
|
||||
_last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter();
|
||||
sstlog.trace("mp_row_consumer_k_l {}: advance_context({})", this, _ck_ranges_walker->lower_bound());
|
||||
sstlog.trace("mp_row_consumer_k_l {}: advance_context({})", fmt::ptr(this), _ck_ranges_walker->lower_bound());
|
||||
return _ck_ranges_walker->lower_bound();
|
||||
}
|
||||
};
|
||||
@@ -900,7 +900,7 @@ class mp_row_consumer_m : public consumer_m {
|
||||
std::optional<range_tombstone_start> _opened_range_tombstone;
|
||||
|
||||
void consume_range_tombstone_start(clustering_key_prefix ck, bound_kind k, tombstone t) {
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_start(ck={}, k={}, t={})", this, ck, k, t);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_start(ck={}, k={}, t={})", fmt::ptr(this), ck, k, t);
|
||||
if (_opened_range_tombstone) {
|
||||
throw sstables::malformed_sstable_exception(
|
||||
format("Range tombstones have to be disjoint: current opened range tombstone {}, new tombstone {}",
|
||||
@@ -910,7 +910,7 @@ class mp_row_consumer_m : public consumer_m {
|
||||
}
|
||||
|
||||
proceed consume_range_tombstone_end(clustering_key_prefix ck, bound_kind k, tombstone t) {
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_end(ck={}, k={}, t={})", this, ck, k, t);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_end(ck={}, k={}, t={})", fmt::ptr(this), ck, k, t);
|
||||
if (!_opened_range_tombstone) {
|
||||
throw sstables::malformed_sstable_exception(
|
||||
format("Closing range tombstone that wasn't opened: clustering {}, kind {}, tombstone {}",
|
||||
@@ -1067,7 +1067,7 @@ public:
|
||||
}
|
||||
|
||||
void setup_for_partition(const partition_key& pk) {
|
||||
sstlog.trace("mp_row_consumer_m {}: setup_for_partition({})", this, pk);
|
||||
sstlog.trace("mp_row_consumer_m {}: setup_for_partition({})", fmt::ptr(this), pk);
|
||||
_is_mutation_end = false;
|
||||
_mf_filter.emplace(*_schema, _slice, pk, _fwd);
|
||||
}
|
||||
@@ -1111,7 +1111,7 @@ public:
|
||||
}
|
||||
|
||||
virtual proceed consume_partition_start(sstables::key_view key, sstables::deletion_time deltime) override {
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_partition_start(deltime=({}, {})), _is_mutation_end={}", this,
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_partition_start(deltime=({}, {})), _is_mutation_end={}", fmt::ptr(this),
|
||||
deltime.local_deletion_time, deltime.marked_for_delete_at, _is_mutation_end);
|
||||
if (!_is_mutation_end) {
|
||||
return proceed::yes;
|
||||
@@ -1127,7 +1127,7 @@ public:
|
||||
auto key = clustering_key_prefix::from_range(ecp | boost::adaptors::transformed(
|
||||
[] (const temporary_buffer<char>& b) { return to_bytes_view(b); }));
|
||||
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_row_start({})", this, key);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_row_start({})", fmt::ptr(this), key);
|
||||
|
||||
// enagaged _in_progress_row means we have already split around this key.
|
||||
if (_opened_range_tombstone && !_in_progress_row) {
|
||||
@@ -1141,7 +1141,7 @@ public:
|
||||
ck,
|
||||
end_kind,
|
||||
_opened_range_tombstone->tomb);
|
||||
sstlog.trace("mp_row_consumer_m {}: push({})", this, rt);
|
||||
sstlog.trace("mp_row_consumer_m {}: push({})", fmt::ptr(this), rt);
|
||||
_opened_range_tombstone->ck = std::move(ck);
|
||||
_opened_range_tombstone->kind = was_non_full_key ? bound_kind::incl_start : bound_kind::excl_start;
|
||||
|
||||
@@ -1155,10 +1155,10 @@ public:
|
||||
|
||||
switch (_mf_filter->apply(_in_progress_row->position())) {
|
||||
case mutation_fragment_filter::result::emit:
|
||||
sstlog.trace("mp_row_consumer_m {}: emit", this);
|
||||
sstlog.trace("mp_row_consumer_m {}: emit", fmt::ptr(this));
|
||||
return consumer_m::row_processing_result::do_proceed;
|
||||
case mutation_fragment_filter::result::ignore:
|
||||
sstlog.trace("mp_row_consumer_m {}: ignore", this);
|
||||
sstlog.trace("mp_row_consumer_m {}: ignore", fmt::ptr(this));
|
||||
if (_mf_filter->out_of_range()) {
|
||||
_reader->on_out_of_clustering_range();
|
||||
// We actually want skip_later, which doesn't exist, but retry_later
|
||||
@@ -1174,7 +1174,7 @@ public:
|
||||
return consumer_m::row_processing_result::skip_row;
|
||||
}
|
||||
case mutation_fragment_filter::result::store_and_finish:
|
||||
sstlog.trace("mp_row_consumer_m {}: store_and_finish", this);
|
||||
sstlog.trace("mp_row_consumer_m {}: store_and_finish", fmt::ptr(this));
|
||||
_reader->on_out_of_clustering_range();
|
||||
return consumer_m::row_processing_result::retry_later;
|
||||
}
|
||||
@@ -1184,7 +1184,7 @@ public:
|
||||
virtual proceed consume_row_marker_and_tombstone(
|
||||
const liveness_info& info, tombstone tomb, tombstone shadowable_tomb) override {
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_row_marker_and_tombstone({}, {}, {}), key={}",
|
||||
this, info.to_row_marker(), tomb, shadowable_tomb, _in_progress_row->position());
|
||||
fmt::ptr(this), info.to_row_marker(), tomb, shadowable_tomb, _in_progress_row->position());
|
||||
_in_progress_row->apply(info.to_row_marker());
|
||||
_in_progress_row->apply(tomb);
|
||||
if (shadowable_tomb) {
|
||||
@@ -1194,7 +1194,7 @@ public:
|
||||
}
|
||||
|
||||
virtual consumer_m::row_processing_result consume_static_row_start() override {
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_static_row_start()", this);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_static_row_start()", fmt::ptr(this));
|
||||
if (_treat_static_row_as_regular) {
|
||||
return consume_row_start({});
|
||||
}
|
||||
@@ -1211,8 +1211,8 @@ public:
|
||||
gc_clock::time_point local_deletion_time,
|
||||
bool is_deleted) override {
|
||||
const std::optional<column_id>& column_id = column_info.id;
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_column(id={}, path={}, value={}, ts={}, ttl={}, del_time={}, deleted={})", this,
|
||||
column_id, cell_path, value, timestamp, ttl.count(), local_deletion_time.time_since_epoch().count(), is_deleted);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_column(id={}, path={}, value={}, ts={}, ttl={}, del_time={}, deleted={})", fmt::ptr(this),
|
||||
column_id, fmt_hex(cell_path), fmt_hex(value), timestamp, ttl.count(), local_deletion_time.time_since_epoch().count(), is_deleted);
|
||||
check_column_missing_in_current_schema(column_info, timestamp);
|
||||
if (!column_id) {
|
||||
return proceed::yes;
|
||||
@@ -1262,7 +1262,7 @@ public:
|
||||
|
||||
virtual proceed consume_complex_column_start(const sstables::column_translation::column_info& column_info,
|
||||
tombstone tomb) override {
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_complex_column_start({}, {})", this, column_info.id, tomb);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_complex_column_start({}, {})", fmt::ptr(this), column_info.id, tomb);
|
||||
_cm.tomb = tomb;
|
||||
_cm.cells.clear();
|
||||
return proceed::yes;
|
||||
@@ -1270,7 +1270,7 @@ public:
|
||||
|
||||
virtual proceed consume_complex_column_end(const sstables::column_translation::column_info& column_info) override {
|
||||
const std::optional<column_id>& column_id = column_info.id;
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_complex_column_end({})", this, column_id);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_complex_column_end({})", fmt::ptr(this), column_id);
|
||||
if (_cm.tomb) {
|
||||
check_column_missing_in_current_schema(column_info, _cm.tomb.timestamp);
|
||||
}
|
||||
@@ -1290,7 +1290,7 @@ public:
|
||||
bytes_view value,
|
||||
api::timestamp_type timestamp) override {
|
||||
const std::optional<column_id>& column_id = column_info.id;
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_counter_column({}, {}, {})", this, column_id, value, timestamp);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_counter_column({}, {}, {})", fmt::ptr(this), column_id, fmt_hex(value), timestamp);
|
||||
check_column_missing_in_current_schema(column_info, timestamp);
|
||||
if (!column_id) {
|
||||
return proceed::yes;
|
||||
@@ -1355,7 +1355,7 @@ public:
|
||||
|
||||
if (_inside_static_row) {
|
||||
fill_cells(column_kind::static_column, _in_progress_static_row.cells());
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_row_end(_in_progress_static_row={})", this, static_row::printer(*_schema, _in_progress_static_row));
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_row_end(_in_progress_static_row={})", fmt::ptr(this), static_row::printer(*_schema, _in_progress_static_row));
|
||||
_inside_static_row = false;
|
||||
if (!_in_progress_static_row.empty()) {
|
||||
auto action = _mf_filter->apply(_in_progress_static_row);
|
||||
@@ -1381,7 +1381,7 @@ public:
|
||||
}
|
||||
|
||||
virtual void on_end_of_stream() override {
|
||||
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream()", this);
|
||||
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream()", fmt::ptr(this));
|
||||
if (_opened_range_tombstone) {
|
||||
if (!_mf_filter || _mf_filter->out_of_range()) {
|
||||
throw sstables::malformed_sstable_exception("Unclosed range tombstone.");
|
||||
@@ -1399,7 +1399,7 @@ public:
|
||||
end_bound.prefix(),
|
||||
end_bound.kind(),
|
||||
_opened_range_tombstone->tomb};
|
||||
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream(), emitting last tombstone: {}", this, rt);
|
||||
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream(), emitting last tombstone: {}", fmt::ptr(this), rt);
|
||||
_opened_range_tombstone.reset();
|
||||
_reader->push_mutation_fragment(std::move(rt));
|
||||
}
|
||||
@@ -1411,7 +1411,7 @@ public:
|
||||
}
|
||||
|
||||
virtual proceed consume_partition_end() override {
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_partition_end()", this);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_partition_end()", fmt::ptr(this));
|
||||
reset_for_new_partition();
|
||||
|
||||
if (_fwd == streamed_mutation::forwarding::yes) {
|
||||
@@ -1427,7 +1427,7 @@ public:
|
||||
}
|
||||
|
||||
virtual void reset(sstables::indexable_element el) override {
|
||||
sstlog.trace("mp_row_consumer_m {}: reset({})", this, static_cast<int>(el));
|
||||
sstlog.trace("mp_row_consumer_m {}: reset({})", fmt::ptr(this), static_cast<int>(el));
|
||||
if (el == indexable_element::partition) {
|
||||
reset_for_new_partition();
|
||||
} else {
|
||||
|
||||
@@ -406,16 +406,16 @@ private:
|
||||
_current_pos = position_in_partition::after_all_clustered_rows();
|
||||
}
|
||||
tracing::trace(_trace_state, "mc_bsearch_clustered_cursor: bisecting done, current=[{}] .start={}", _current_idx, _current_pos);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: bisecting done, current=[{}] .start={}", this, _current_idx, _current_pos);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: bisecting done, current=[{}] .start={}", fmt::ptr(this), _current_idx, _current_pos);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
|
||||
auto mid = _current_idx + (_upper_idx - _current_idx) / 2;
|
||||
tracing::trace(_trace_state, "mc_bsearch_clustered_cursor: bisecting range [{}, {}], mid={}", _current_idx, _upper_idx, mid);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: bisecting range [{}, {}], mid={}", this, _current_idx, _upper_idx, mid);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: bisecting range [{}, {}], mid={}", fmt::ptr(this), _current_idx, _upper_idx, mid);
|
||||
return _promoted_index.get_block_with_start(mid, _trace_state).then([this, mid, pos] (promoted_index_block* block) {
|
||||
position_in_partition::less_compare less(_s);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: compare with [{}] .start={}", this, mid, block->start);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: compare with [{}] .start={}", fmt::ptr(this), mid, block->start);
|
||||
if (less(pos, *block->start)) {
|
||||
// Eventually _current_idx will reach _upper_idx, so _current_pos only needs to be
|
||||
// updated whenever _upper_idx changes.
|
||||
@@ -447,11 +447,11 @@ public:
|
||||
position_in_partition::less_compare less(_s);
|
||||
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: advance_to({}), _current_pos={}, _current_idx={}, cached={}",
|
||||
this, pos, _current_pos, _current_idx, _promoted_index.file().cached_bytes());
|
||||
fmt::ptr(this), pos, _current_pos, _current_idx, _promoted_index.file().cached_bytes());
|
||||
|
||||
if (_current_pos) {
|
||||
if (less(pos, *_current_pos)) {
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: same block", this);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: same block", fmt::ptr(this));
|
||||
return make_ready_future<std::optional<skip_info>>(std::nullopt);
|
||||
}
|
||||
++_current_idx;
|
||||
@@ -459,13 +459,13 @@ public:
|
||||
|
||||
return advance_to_upper_bound(pos).then([this] {
|
||||
if (_current_idx == 0) {
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: same block", this);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: same block", fmt::ptr(this));
|
||||
return make_ready_future<std::optional<skip_info>>(std::nullopt);
|
||||
}
|
||||
return _promoted_index.get_block(_current_idx - 1, _trace_state).then([this] (promoted_index_block* block) {
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: [{}] = {}", this, _current_idx - 1, *block);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: [{}] = {}", fmt::ptr(this), _current_idx - 1, *block);
|
||||
offset_in_partition datafile_offset = block->data_file_offset;
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: datafile_offset={}", this, datafile_offset);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: datafile_offset={}", fmt::ptr(this), datafile_offset);
|
||||
if (_current_idx < 2) {
|
||||
return make_ready_future<std::optional<skip_info>>(
|
||||
skip_info{datafile_offset, tombstone(), position_in_partition::before_all_clustered_rows()});
|
||||
@@ -477,7 +477,7 @@ public:
|
||||
// to read the active tombstone from the preceding block, _current_idx - 2.
|
||||
return _promoted_index.get_block(_current_idx - 2, _trace_state)
|
||||
.then([this, datafile_offset] (promoted_index_block* block) -> std::optional<skip_info> {
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: [{}] = {}", this, _current_idx - 2, *block);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: [{}] = {}", fmt::ptr(this), _current_idx - 2, *block);
|
||||
// XXX: Until we have automatic eviction, we need to invalidate cached index blocks
|
||||
// as we walk so that memory footprint is not O(N) but O(log(N)).
|
||||
_promoted_index.invalidate_prior(block, _trace_state);
|
||||
@@ -485,7 +485,7 @@ public:
|
||||
return skip_info{datafile_offset, tombstone(), position_in_partition::before_all_clustered_rows()};
|
||||
}
|
||||
auto tomb = tombstone(*block->end_open_marker);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: tombstone={}, pos={}", this, tomb, *block->end);
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: tombstone={}, pos={}", fmt::ptr(this), tomb, *block->end);
|
||||
return skip_info{datafile_offset, tomb, *block->end};
|
||||
});
|
||||
});
|
||||
@@ -502,7 +502,7 @@ public:
|
||||
}
|
||||
return _promoted_index.get_block(_current_idx, _trace_state)
|
||||
.then([this] (promoted_index_block* block) -> std::optional<entry_info> {
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: block {}: start={}, end={}, offset={}", this, _current_idx,
|
||||
sstlog.trace("mc_bsearch_clustered_cursor {}: block {}: start={}, end={}, offset={}", fmt::ptr(this), _current_idx,
|
||||
*block->start, *block->end, block->data_file_offset);
|
||||
++_current_idx;
|
||||
return entry_info{*block->start, *block->end, block->data_file_offset};
|
||||
|
||||
@@ -243,11 +243,11 @@ private:
|
||||
return *_index_reader;
|
||||
}
|
||||
future<> advance_to_next_partition() {
|
||||
sstlog.trace("reader {}: advance_to_next_partition()", this);
|
||||
sstlog.trace("reader {}: advance_to_next_partition()", fmt::ptr(this));
|
||||
_before_partition = true;
|
||||
auto& consumer = _consumer;
|
||||
if (consumer.is_mutation_end()) {
|
||||
sstlog.trace("reader {}: already at partition boundary", this);
|
||||
sstlog.trace("reader {}: already at partition boundary", fmt::ptr(this));
|
||||
_index_in_current_partition = false;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -267,10 +267,10 @@ private:
|
||||
});
|
||||
}
|
||||
future<> read_from_index() {
|
||||
sstlog.trace("reader {}: read from index", this);
|
||||
sstlog.trace("reader {}: read from index", fmt::ptr(this));
|
||||
auto tomb = _index_reader->partition_tombstone();
|
||||
if (!tomb) {
|
||||
sstlog.trace("reader {}: no tombstone", this);
|
||||
sstlog.trace("reader {}: no tombstone", fmt::ptr(this));
|
||||
return read_from_datafile();
|
||||
}
|
||||
auto pk = _index_reader->partition_key().to_partition_key(*_schema);
|
||||
@@ -280,16 +280,16 @@ private:
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> read_from_datafile() {
|
||||
sstlog.trace("reader {}: read from data file", this);
|
||||
sstlog.trace("reader {}: read from data file", fmt::ptr(this));
|
||||
return _context->read();
|
||||
}
|
||||
// Assumes that we're currently positioned at partition boundary.
|
||||
future<> read_partition() {
|
||||
sstlog.trace("reader {}: reading partition", this);
|
||||
sstlog.trace("reader {}: reading partition", fmt::ptr(this));
|
||||
|
||||
_end_of_stream = true; // on_next_partition() will set it to true
|
||||
if (!_read_enabled) {
|
||||
sstlog.trace("reader {}: eof", this);
|
||||
sstlog.trace("reader {}: eof", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -306,7 +306,7 @@ private:
|
||||
//
|
||||
if (_index_in_current_partition) {
|
||||
if (_context->eof()) {
|
||||
sstlog.trace("reader {}: eof", this);
|
||||
sstlog.trace("reader {}: eof", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (_index_reader->partition_data_ready()) {
|
||||
@@ -324,12 +324,12 @@ private:
|
||||
}
|
||||
// Can be called from any position.
|
||||
future<> read_next_partition() {
|
||||
sstlog.trace("reader {}: read next partition", this);
|
||||
sstlog.trace("reader {}: read next partition", fmt::ptr(this));
|
||||
// If next partition exists then on_next_partition will be called
|
||||
// and _end_of_stream will be set to false again.
|
||||
_end_of_stream = true;
|
||||
if (!_read_enabled || _single_partition_read) {
|
||||
sstlog.trace("reader {}: eof", this);
|
||||
sstlog.trace("reader {}: eof", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return advance_to_next_partition().then([this] {
|
||||
|
||||
@@ -328,7 +328,7 @@ private:
|
||||
return row_consumer::proceed::yes;
|
||||
}
|
||||
#endif
|
||||
sstlog.trace("data_consume_row_context {}: state={}, size={}", this, static_cast<int>(_state), data.size());
|
||||
sstlog.trace("data_consume_row_context {}: state={}, size={}", fmt::ptr(this), static_cast<int>(_state), data.size());
|
||||
switch (_state) {
|
||||
case state::ROW_START:
|
||||
if (read_short_length_bytes(data, _key) != read_status::ready) {
|
||||
|
||||
@@ -71,7 +71,7 @@ private:
|
||||
|
||||
// Unconditionally reads the promoted index blocks from the next data buffer
|
||||
future<> get_next_pi_blocks() {
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: parsing more blocks", this);
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: parsing more blocks", fmt::ptr(this));
|
||||
promoted_index_blocks& blocks = _reader.get_pi_blocks();
|
||||
blocks = promoted_index_blocks{};
|
||||
_reader.switch_to_consume_next_mode();
|
||||
@@ -123,7 +123,7 @@ public:
|
||||
const promoted_index_blocks* pi_blocks = &get_pi_blocks();
|
||||
|
||||
if (all_blocks_read() && _current_pi_idx >= pi_blocks->size() - 1) {
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: position in current block (all blocks are read)", this);
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: position in current block (all blocks are read)", fmt::ptr(this));
|
||||
return make_ready_future<std::optional<skip_info>>(std::nullopt);
|
||||
}
|
||||
|
||||
@@ -133,7 +133,7 @@ public:
|
||||
};
|
||||
|
||||
if (!pi_blocks->empty() && cmp_with_start(pos, (*pi_blocks)[_current_pi_idx])) {
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: position in current block (exact match)", this);
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: position in current block (exact match)", fmt::ptr(this));
|
||||
return make_ready_future<std::optional<skip_info>>(std::nullopt);
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ public:
|
||||
|
||||
auto info = get_info_from_promoted_block(i, *pi_blocks);
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: lower bound skipped to cell, _current_pi_idx={}, offset={}",
|
||||
this, _current_pi_idx, info.offset);
|
||||
fmt::ptr(this), _current_pi_idx, info.offset);
|
||||
return make_ready_future<std::optional<skip_info>>(std::move(info));
|
||||
}
|
||||
|
||||
@@ -158,14 +158,14 @@ public:
|
||||
}
|
||||
auto info = get_info_from_promoted_block(i, *pi_blocks);
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: skipped to cell, _current_pi_idx={}, offset={}",
|
||||
this, _current_pi_idx, info.offset);
|
||||
fmt::ptr(this), _current_pi_idx, info.offset);
|
||||
return std::make_optional(std::move(info));
|
||||
});
|
||||
}
|
||||
|
||||
future<std::optional<offset_in_partition>> probe_upper_bound(position_in_partition_view pos) override {
|
||||
if (get_total_pi_blocks_count() == 0) {
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: no promoted index", this);
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: no promoted index", fmt::ptr(this));
|
||||
return make_ready_future<std::optional<offset_in_partition>>(std::nullopt);
|
||||
}
|
||||
|
||||
@@ -192,7 +192,7 @@ public:
|
||||
}
|
||||
|
||||
auto pi_index = std::distance(pi_blocks->begin(), i);
|
||||
sstlog.trace("scanning_clustered_index_cursor {} upper bound: skipped to cell, pi_idx={}, offset={}", this, pi_index, i->offset());
|
||||
sstlog.trace("scanning_clustered_index_cursor {} upper bound: skipped to cell, pi_idx={}, offset={}", fmt::ptr(this), pi_index, i->offset());
|
||||
return make_ready_future<std::optional<offset_in_partition>>(offset_in_partition(i->offset()));
|
||||
}
|
||||
|
||||
@@ -208,7 +208,7 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: next_entry(), pi_idx={}", this, _current_pi_idx);
|
||||
sstlog.trace("scanning_clustered_index_cursor {}: next_entry(), pi_idx={}", fmt::ptr(this), _current_pi_idx);
|
||||
promoted_index_block& block = pi_blocks[_current_pi_idx];
|
||||
auto ei = entry_info{block.start(_s), block.end(_s), block.offset()};
|
||||
++_current_pi_idx;
|
||||
|
||||
@@ -373,7 +373,11 @@ void stream_session::received_failed_complete_message() {
|
||||
}
|
||||
|
||||
void stream_session::abort() {
|
||||
sslog.info("[Stream #{}] Aborted stream session={}, peer={}, is_initialized={}", plan_id(), this, peer, is_initialized());
|
||||
if (sslog.is_enabled(logging::log_level::debug)) {
|
||||
sslog.debug("[Stream #{}] Aborted stream session={}, peer={}, is_initialized={}", plan_id(), fmt::ptr(this), peer, is_initialized());
|
||||
} else {
|
||||
sslog.info("[Stream #{}] Aborted stream session, peer={}, is_initialized={}", plan_id(), peer, is_initialized());
|
||||
}
|
||||
close_session(stream_session_state::FAILED);
|
||||
}
|
||||
|
||||
@@ -501,7 +505,7 @@ void stream_session::send_failed_complete_message() {
|
||||
bool stream_session::maybe_completed() {
|
||||
bool completed = _receivers.empty() && _transfers.empty();
|
||||
if (completed) {
|
||||
sslog.debug("[Stream #{}] maybe_completed: {} -> COMPLETE: session={}, peer={}", plan_id(), _state, this, peer);
|
||||
sslog.debug("[Stream #{}] maybe_completed: {} -> COMPLETE: session={}, peer={}", plan_id(), _state, fmt::ptr(this), peer);
|
||||
close_session(stream_session_state::COMPLETE);
|
||||
}
|
||||
return completed;
|
||||
@@ -581,7 +585,7 @@ future<> stream_session::receiving_failed(UUID cf_id)
|
||||
}
|
||||
|
||||
void stream_session::close_session(stream_session_state final_state) {
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}, is_aborted={}", plan_id(), this, final_state, _is_aborted);
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}, is_aborted={}", plan_id(), fmt::ptr(this), final_state, _is_aborted);
|
||||
if (!_is_aborted) {
|
||||
_is_aborted = true;
|
||||
set_state(final_state);
|
||||
@@ -589,12 +593,12 @@ void stream_session::close_session(stream_session_state final_state) {
|
||||
if (final_state == stream_session_state::FAILED) {
|
||||
for (auto& x : _transfers) {
|
||||
stream_transfer_task& task = x.second;
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}, abort stream_transfer_task cf_id={}", plan_id(), this, final_state, task.cf_id);
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}, abort stream_transfer_task cf_id={}", plan_id(), fmt::ptr(this), final_state, task.cf_id);
|
||||
task.abort();
|
||||
}
|
||||
for (auto& x : _receivers) {
|
||||
stream_receive_task& task = x.second;
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}, abort stream_receive_task cf_id={}", plan_id(), this, final_state, task.cf_id);
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}, abort stream_receive_task cf_id={}", plan_id(), fmt::ptr(this), final_state, task.cf_id);
|
||||
//FIXME: discarded future.
|
||||
(void)receiving_failed(x.first);
|
||||
task.abort();
|
||||
@@ -609,7 +613,7 @@ void stream_session::close_session(stream_session_state final_state) {
|
||||
_stream_result->handle_session_complete(shared_from_this());
|
||||
}
|
||||
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}", plan_id(), this, final_state);
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}", plan_id(), fmt::ptr(this), final_state);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
8
table.cc
8
table.cc
@@ -130,7 +130,7 @@ public:
|
||||
, _fn(std::move(fn)) {
|
||||
|
||||
tlogger.trace("incremental_reader_selector {}: created for range: {} with {} sstables",
|
||||
this,
|
||||
fmt::ptr(this),
|
||||
*_pr,
|
||||
_sstables->all()->size());
|
||||
}
|
||||
@@ -142,7 +142,7 @@ public:
|
||||
incremental_reader_selector& operator=(incremental_reader_selector&&) = delete;
|
||||
|
||||
virtual std::vector<flat_mutation_reader> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
|
||||
tlogger.trace("incremental_reader_selector {}: {}({})", this, __FUNCTION__, seastar::lazy_deref(pos));
|
||||
tlogger.trace("incremental_reader_selector {}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos));
|
||||
|
||||
auto readers = std::vector<flat_mutation_reader>();
|
||||
|
||||
@@ -150,7 +150,7 @@ public:
|
||||
auto selection = _selector->select(_selector_position);
|
||||
_selector_position = selection.next_position;
|
||||
|
||||
tlogger.trace("incremental_reader_selector {}: {} sstables to consider, advancing selector to {}", this, selection.sstables.size(),
|
||||
tlogger.trace("incremental_reader_selector {}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(),
|
||||
_selector_position);
|
||||
|
||||
readers = boost::copy_range<std::vector<flat_mutation_reader>>(selection.sstables
|
||||
@@ -158,7 +158,7 @@ public:
|
||||
| boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); }));
|
||||
} while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
|
||||
|
||||
tlogger.trace("incremental_reader_selector {}: created {} new readers", this, readers.size());
|
||||
tlogger.trace("incremental_reader_selector {}: created {} new readers", fmt::ptr(this), readers.size());
|
||||
|
||||
// prevents sstable_set::incremental_selector::_current_sstables from holding reference to
|
||||
// sstables when done selecting.
|
||||
|
||||
@@ -42,7 +42,7 @@ public:
|
||||
const query::partition_slice& slice, flat_mutation_reader&& rd) {
|
||||
if (s->cf_name() == _cf_name) {
|
||||
return make_filtering_reader(std::move(rd), [this, &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
|
||||
testlog.info("listener {}: read {}", this, dk);
|
||||
testlog.info("listener {}: read {}", fmt::ptr(this), dk);
|
||||
++read;
|
||||
return true;
|
||||
});
|
||||
@@ -75,7 +75,7 @@ results test_data_listeners(cql_test_env& e, sstring cf_name) {
|
||||
e.db().invoke_on_all([&listeners, &cf_name] (database& db) {
|
||||
auto listener = std::make_unique<table_listener>(cf_name);
|
||||
db.data_listeners().install(&*listener);
|
||||
testlog.info("installed listener {}", &*listener);
|
||||
testlog.info("installed listener {}", fmt::ptr(&*listener));
|
||||
listeners.push_back(std::move(listener));
|
||||
}).get();
|
||||
|
||||
@@ -93,7 +93,7 @@ results test_data_listeners(cql_test_env& e, sstring cf_name) {
|
||||
continue;
|
||||
}
|
||||
results res{li->read, li->write};
|
||||
testlog.info("uninstalled listener {}: rd={} wr={}", li, li->read, li->write);
|
||||
testlog.info("uninstalled listener {}: rd={} wr={}", fmt::ptr(li), li->read, li->write);
|
||||
db.data_listeners().uninstall(li);
|
||||
return res;
|
||||
}
|
||||
|
||||
@@ -2723,7 +2723,7 @@ SEASTAR_TEST_CASE(test_no_misses_when_read_is_repeated) {
|
||||
|
||||
for (auto n_ranges : {1, 2, 4}) {
|
||||
auto ranges = gen.make_random_ranges(n_ranges);
|
||||
testlog.info("Reading {{}}", ranges);
|
||||
testlog.info("Reading {{{}}}", ranges);
|
||||
|
||||
populate_range(cache, pr, ranges);
|
||||
check_continuous(cache, pr, ranges);
|
||||
|
||||
@@ -890,7 +890,7 @@ void segment_pool::free_segment(segment* seg) noexcept {
|
||||
}
|
||||
|
||||
void segment_pool::free_segment(segment* seg, segment_descriptor& desc) noexcept {
|
||||
llogger.trace("Releasing segment {}", seg);
|
||||
llogger.trace("Releasing segment {}", fmt::ptr(seg));
|
||||
desc._region = nullptr;
|
||||
deallocate_segment(seg);
|
||||
--_segments_in_use;
|
||||
@@ -1230,7 +1230,7 @@ private:
|
||||
auto pos =_active->at<char>(_active_offset);
|
||||
desc.encode(pos);
|
||||
}
|
||||
llogger.trace("Closing segment {}, used={}, waste={} [B]", _active, _active->occupancy(), segment::size - _active_offset);
|
||||
llogger.trace("Closing segment {}, used={}, waste={} [B]", fmt::ptr(_active), _active->occupancy(), segment::size - _active_offset);
|
||||
_closed_occupancy += _active->occupancy();
|
||||
|
||||
_segment_descs.push(shard_segment_pool.descriptor(_active));
|
||||
@@ -1264,7 +1264,7 @@ private:
|
||||
|
||||
void compact_segment_locked(segment* seg, segment_descriptor& desc) {
|
||||
auto seg_occupancy = desc.occupancy();
|
||||
llogger.debug("Compacting segment {} from region {}, {}", seg, id(), seg_occupancy);
|
||||
llogger.debug("Compacting segment {} from region {}, {}", fmt::ptr(seg), id(), seg_occupancy);
|
||||
|
||||
++_invalidate_counter;
|
||||
|
||||
@@ -2052,7 +2052,7 @@ void tracker::impl::register_region(region::impl* r) {
|
||||
}
|
||||
reclaiming_lock _(*this);
|
||||
_regions.push_back(r);
|
||||
llogger.debug("Registered region @{} with id={}", r, r->id());
|
||||
llogger.debug("Registered region @{} with id={}", fmt::ptr(r), r->id());
|
||||
}
|
||||
|
||||
void tracker::impl::unregister_region(region::impl* r) noexcept {
|
||||
@@ -2288,10 +2288,10 @@ void allocating_section::on_alloc_failure(logalloc::region& r) {
|
||||
r.allocator().invalidate_references();
|
||||
if (shard_segment_pool.allocation_failure_flag()) {
|
||||
_lsa_reserve *= 2; // FIXME: decay?
|
||||
llogger.debug("LSA allocation failure, increasing reserve in section {} to {} segments", this, _lsa_reserve);
|
||||
llogger.debug("LSA allocation failure, increasing reserve in section {} to {} segments", fmt::ptr(this), _lsa_reserve);
|
||||
} else {
|
||||
_std_reserve *= 2; // FIXME: decay?
|
||||
llogger.debug("Standard allocator failure, increasing head-room in section {} to {} [B]", this, _std_reserve);
|
||||
llogger.debug("Standard allocator failure, increasing head-room in section {} to {} [B]", fmt::ptr(this), _std_reserve);
|
||||
}
|
||||
reserve();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user