atomic_cell_or_collection: make operator<< show cell content
After the new in-memory representation of cells was introduced there was a regression in atomic_cell_or_collection::operator<< which stopped printing the content of the cell. This makes debugging more incovenient are time-consuming. This patch fixes the problem. Schema is propagated to the atomic_cell_or_collection printer and the full content of the cell is printed. Fixes #3571. Message-Id: <20181024095413.10736-1-pdziepak@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
a9836ad758
commit
637b9a7b3b
@@ -243,16 +243,18 @@ size_t atomic_cell_or_collection::external_memory_usage(const abstract_type& t)
|
||||
+ imr_object_type::size_overhead + external_value_size;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const atomic_cell_or_collection& c) {
|
||||
if (!c._data.get()) {
|
||||
std::ostream& operator<<(std::ostream& os, const atomic_cell_or_collection::printer& p) {
|
||||
if (!p._cell._data.get()) {
|
||||
return os << "{ null atomic_cell_or_collection }";
|
||||
}
|
||||
using dc = data::cell;
|
||||
os << "{ ";
|
||||
if (dc::structure::get_member<dc::tags::flags>(c._data.get()).get<dc::tags::collection>()) {
|
||||
os << "collection";
|
||||
if (dc::structure::get_member<dc::tags::flags>(p._cell._data.get()).get<dc::tags::collection>()) {
|
||||
os << "collection ";
|
||||
auto cmv = p._cell.as_collection_mutation();
|
||||
os << to_hex(cmv.data.linearize());
|
||||
} else {
|
||||
os << "atomic cell";
|
||||
os << p._cell.as_atomic_cell(p._cdef);
|
||||
}
|
||||
return os << " @" << static_cast<const void*>(c._data.get()) << " }";
|
||||
return os << " }";
|
||||
}
|
||||
|
||||
@@ -67,7 +67,19 @@ public:
|
||||
bytes_view serialize() const;
|
||||
bool equals(const abstract_type& type, const atomic_cell_or_collection& other) const;
|
||||
size_t external_memory_usage(const abstract_type&) const;
|
||||
friend std::ostream& operator<<(std::ostream&, const atomic_cell_or_collection&);
|
||||
|
||||
class printer {
|
||||
const column_definition& _cdef;
|
||||
const atomic_cell_or_collection& _cell;
|
||||
public:
|
||||
printer(const column_definition& cdef, const atomic_cell_or_collection& cell)
|
||||
: _cdef(cdef), _cell(cell) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream&, const printer&);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream&, const printer&);
|
||||
};
|
||||
|
||||
namespace std {
|
||||
|
||||
@@ -425,7 +425,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
_read_context->cache().on_mispopulate();
|
||||
return;
|
||||
}
|
||||
clogger.trace("csm {}: populate({})", this, cr);
|
||||
clogger.trace("csm {}: populate({})", this, clustering_row::printer(*_schema, cr));
|
||||
_lsa_manager.run_in_update_section_with_allocator([this, &cr] {
|
||||
mutation_partition& mp = _snp->version()->partition();
|
||||
rows_entry::compare less(*_schema);
|
||||
@@ -577,7 +577,7 @@ void cache_flat_mutation_reader::move_to_next_entry() {
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_to_buffer(mutation_fragment&& mf) {
|
||||
clogger.trace("csm {}: add_to_buffer({})", this, mf);
|
||||
clogger.trace("csm {}: add_to_buffer({})", this, mutation_fragment::printer(*_schema, mf));
|
||||
if (mf.is_clustering_row()) {
|
||||
add_clustering_row_to_buffer(std::move(mf));
|
||||
} else {
|
||||
@@ -599,7 +599,7 @@ void cache_flat_mutation_reader::add_to_buffer(const partition_snapshot_row_curs
|
||||
// (2) If _lower_bound > mf.position(), mf was emitted
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_clustering_row_to_buffer(mutation_fragment&& mf) {
|
||||
clogger.trace("csm {}: add_clustering_row_to_buffer({})", this, mf);
|
||||
clogger.trace("csm {}: add_clustering_row_to_buffer({})", this, mutation_fragment::printer(*_schema, mf));
|
||||
auto& row = mf.as_clustering_row();
|
||||
auto new_lower_bound = position_in_partition::after_key(row.key());
|
||||
push_mutation_fragment(std::move(mf));
|
||||
@@ -640,7 +640,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const range_tombstone& rt) {
|
||||
inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const static_row& sr) {
|
||||
if (can_populate()) {
|
||||
clogger.trace("csm {}: populate({})", this, sr);
|
||||
clogger.trace("csm {}: populate({})", this, static_row::printer(*_schema, sr));
|
||||
_read_context->cache().on_static_row_insert();
|
||||
_lsa_manager.run_in_update_section_with_allocator([&] {
|
||||
if (_read_context->digest_requested()) {
|
||||
|
||||
@@ -723,5 +723,5 @@ std::ostream& operator<<(std::ostream& out, memtable& mt) {
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const memtable_entry& mt) {
|
||||
return out << "{" << mt.key() << ": " << mt.partition() << "}";
|
||||
return out << "{" << mt.key() << ": " << partition_entry::printer(*mt.schema(), mt.partition()) << "}";
|
||||
}
|
||||
|
||||
@@ -274,6 +274,6 @@ future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reade
|
||||
std::ostream& operator<<(std::ostream& os, const mutation& m) {
|
||||
const ::schema& s = *m.schema();
|
||||
fprint(os, "{%s.%s %s ", s.ks_name(), s.cf_name(), m.decorated_key());
|
||||
os << m.partition() << "}";
|
||||
os << mutation_partition::printer(s, m.partition()) << "}";
|
||||
return os;
|
||||
}
|
||||
|
||||
@@ -27,13 +27,15 @@
|
||||
#include "mutation_fragment.hh"
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const clustering_row& row) {
|
||||
return os << "{clustering_row: ck " << row._ck << " t " << row._t << " row_marker " << row._marker << " cells " << row._cells << "}";
|
||||
operator<<(std::ostream& os, const clustering_row::printer& p) {
|
||||
auto& row = p._clustering_row;
|
||||
return os << "{clustering_row: ck " << row._ck << " t " << row._t << " row_marker " << row._marker << " cells "
|
||||
<< row::printer(p._schema, column_kind::regular_column, row._cells) << "}";
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const static_row& row) {
|
||||
return os << "{static_row: "<< row._cells << "}";
|
||||
operator<<(std::ostream& os, const static_row::printer& p) {
|
||||
return os << "{static_row: "<< row::printer(p._schema, column_kind::static_column, p._static_row._cells) << "}";
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
@@ -202,11 +204,14 @@ std::ostream& operator<<(std::ostream& os, mutation_fragment::kind k)
|
||||
abort();
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const mutation_fragment& mf) {
|
||||
std::ostream& operator<<(std::ostream& os, const mutation_fragment::printer& p) {
|
||||
auto& mf = p._mutation_fragment;
|
||||
os << "{mutation_fragment: " << mf._kind << " " << mf.position() << " ";
|
||||
mf.visit([&os] (const auto& what) -> void {
|
||||
os << what;
|
||||
});
|
||||
mf.visit(make_visitor(
|
||||
[&] (const clustering_row& cr) { os << clustering_row::printer(p._schema, cr); },
|
||||
[&] (const static_row& sr) { os << static_row::printer(p._schema, sr); },
|
||||
[&] (const auto& what) -> void { os << what; }
|
||||
));
|
||||
os << "}";
|
||||
return os;
|
||||
}
|
||||
|
||||
@@ -128,7 +128,17 @@ public:
|
||||
&& _cells.equal(column_kind::regular_column, s, other._cells, s);
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const clustering_row& row);
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
const clustering_row& _clustering_row;
|
||||
public:
|
||||
printer(const schema& s, const clustering_row& r) : _schema(s), _clustering_row(r) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
|
||||
class static_row {
|
||||
@@ -174,7 +184,17 @@ public:
|
||||
return _cells.equal(column_kind::static_column, s, other._cells, s);
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& is, const static_row& row);
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
const static_row& _static_row;
|
||||
public:
|
||||
printer(const schema& s, const static_row& r) : _schema(s), _static_row(r) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
|
||||
class partition_start final {
|
||||
@@ -316,7 +336,7 @@ private:
|
||||
friend class position_in_partition;
|
||||
public:
|
||||
struct clustering_row_tag_t { };
|
||||
|
||||
|
||||
template<typename... Args>
|
||||
mutation_fragment(clustering_row_tag_t, Args&&... args)
|
||||
: _kind(kind::clustering_row)
|
||||
@@ -501,7 +521,17 @@ public:
|
||||
return _kind == mf._kind && _kind != kind::range_tombstone;
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream&, const mutation_fragment& mf);
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
const mutation_fragment& _mutation_fragment;
|
||||
public:
|
||||
printer(const schema& s, const mutation_fragment& mf) : _schema(s), _mutation_fragment(mf) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
|
||||
inline position_in_partition_view static_row::position() const
|
||||
@@ -526,8 +556,6 @@ inline position_in_partition_view partition_end::position() const
|
||||
|
||||
std::ostream& operator<<(std::ostream&, mutation_fragment::kind);
|
||||
|
||||
std::ostream& operator<<(std::ostream&, const mutation_fragment& mf);
|
||||
|
||||
using mutation_fragment_opt = optimized_optional<mutation_fragment>;
|
||||
|
||||
namespace streamed_mutation {
|
||||
|
||||
@@ -899,7 +899,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const std::pair<column_id, const atomic_cell_or_collection&>& c) {
|
||||
operator<<(std::ostream& os, const std::pair<column_id, const atomic_cell_or_collection::printer&>& c) {
|
||||
return fprint(os, "{column: %s %s}", c.first, c.second);
|
||||
}
|
||||
|
||||
@@ -911,14 +911,21 @@ static auto prefixed(const sstring& prefix, const RangeOfPrintable& r) {
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const row& r) {
|
||||
operator<<(std::ostream& os, const row::printer& p) {
|
||||
auto add_printer = [&] (const auto& c) {
|
||||
return std::pair<column_id, atomic_cell_or_collection::printer>(std::piecewise_construct,
|
||||
std::forward_as_tuple(c.first),
|
||||
std::forward_as_tuple(p._schema.column_at(p._kind, c.first), c.second)
|
||||
);
|
||||
};
|
||||
|
||||
sstring cells;
|
||||
switch (r._type) {
|
||||
switch (p._row._type) {
|
||||
case row::storage_type::set:
|
||||
cells = ::join(",", prefixed("\n ", r.get_range_set()));
|
||||
cells = ::join(",", prefixed("\n ", p._row.get_range_set() | boost::adaptors::transformed(add_printer)));
|
||||
break;
|
||||
case row::storage_type::vector:
|
||||
cells = ::join(",", prefixed("\n ", r.get_range_vector()));
|
||||
cells = ::join(",", prefixed("\n ", p._row.get_range_vector() | boost::adaptors::transformed(add_printer)));
|
||||
break;
|
||||
}
|
||||
return fprint(os, "{row: %s}", cells);
|
||||
@@ -937,7 +944,8 @@ operator<<(std::ostream& os, const row_marker& rm) {
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const deletable_row& dr) {
|
||||
operator<<(std::ostream& os, const deletable_row::printer& p) {
|
||||
auto& dr = p._deletable_row;
|
||||
os << "{deletable_row: ";
|
||||
if (!dr._marker.is_missing()) {
|
||||
os << dr._marker << " ";
|
||||
@@ -945,16 +953,19 @@ operator<<(std::ostream& os, const deletable_row& dr) {
|
||||
if (dr._deleted_at) {
|
||||
os << dr._deleted_at << " ";
|
||||
}
|
||||
return os << dr._cells << "}";
|
||||
return os << row::printer(p._schema, column_kind::regular_column, dr._cells) << "}";
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const rows_entry& re) {
|
||||
return fprint(os, "{rows_entry: cont=%d dummy=%d %s %s}", re.continuous(), re.dummy(), re.position(), re._row);
|
||||
operator<<(std::ostream& os, const rows_entry::printer& p) {
|
||||
auto& re = p._rows_entry;
|
||||
return fprint(os, "{rows_entry: cont=%d dummy=%d %s %s}", re.continuous(), re.dummy(), re.position(),
|
||||
deletable_row::printer(p._schema, re._row));
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const mutation_partition& mp) {
|
||||
operator<<(std::ostream& os, const mutation_partition::printer& p) {
|
||||
auto& mp = p._mutation_partition;
|
||||
os << "{mutation_partition: ";
|
||||
if (mp._tombstone) {
|
||||
os << mp._tombstone << ",";
|
||||
@@ -962,8 +973,11 @@ operator<<(std::ostream& os, const mutation_partition& mp) {
|
||||
if (!mp._row_tombstones.empty()) {
|
||||
os << "\n range_tombstones: {" << ::join(",", prefixed("\n ", mp._row_tombstones)) << "},";
|
||||
}
|
||||
os << "\n static: cont=" << int(mp._static_row_continuous) << " " << mp._static_row << ",";
|
||||
os << "\n clustered: {" << ::join(",", prefixed("\n ", mp._rows)) << "}}";
|
||||
os << "\n static: cont=" << int(mp._static_row_continuous) << " " << row::printer(p._schema, column_kind::static_column, mp._static_row) << ",";
|
||||
auto add_printer = [&] (const auto& re) {
|
||||
return rows_entry::printer(p._schema, re);
|
||||
};
|
||||
os << "\n clustered: {" << ::join(",", prefixed("\n ", mp._rows | boost::adaptors::transformed(add_printer))) << "}}";
|
||||
return os;
|
||||
}
|
||||
|
||||
|
||||
@@ -354,10 +354,19 @@ public:
|
||||
|
||||
bool is_live(const schema&, column_kind kind, tombstone tomb = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const row& r);
|
||||
};
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
column_kind _kind;
|
||||
const row& _row;
|
||||
public:
|
||||
printer(const schema& s, column_kind k, const row& r) : _schema(s), _kind(k), _row(r) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const std::pair<column_id, const atomic_cell_or_collection&>& c);
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
|
||||
class row_marker;
|
||||
int compare_row_marker_for_merge(const row_marker& left, const row_marker& right) noexcept;
|
||||
@@ -700,11 +709,22 @@ public:
|
||||
const row_marker& marker() const { return _marker; }
|
||||
const row& cells() const { return _cells; }
|
||||
row& cells() { return _cells; }
|
||||
friend std::ostream& operator<<(std::ostream& os, const deletable_row& dr);
|
||||
bool equal(column_kind, const schema& s, const deletable_row& other, const schema& other_schema) const;
|
||||
bool is_live(const schema& s, tombstone base_tombstone = tombstone(), gc_clock::time_point query_time = gc_clock::time_point::min()) const;
|
||||
bool empty() const { return !_deleted_at && _marker.is_missing() && !_cells.size(); }
|
||||
deletable_row difference(const schema&, column_kind, const deletable_row& other) const;
|
||||
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
const deletable_row& _deletable_row;
|
||||
public:
|
||||
printer(const schema& s, const deletable_row& r) : _schema(s), _deletable_row(r) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
|
||||
class cache_tracker;
|
||||
@@ -848,12 +868,23 @@ public:
|
||||
return _c(p1, p2) < 0;
|
||||
}
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const rows_entry& re);
|
||||
bool equal(const schema& s, const rows_entry& other) const;
|
||||
bool equal(const schema& s, const rows_entry& other, const schema& other_schema) const;
|
||||
|
||||
size_t memory_usage(const schema&) const;
|
||||
void on_evicted(cache_tracker&) noexcept;
|
||||
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
const rows_entry& _rows_entry;
|
||||
public:
|
||||
printer(const schema& s, const rows_entry& r) : _schema(s), _rows_entry(r) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
|
||||
// Represents a set of writes made to a single partition.
|
||||
@@ -941,7 +972,18 @@ public:
|
||||
hashing_partition_visitor<Hasher> v(h, s);
|
||||
accept(s, v);
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp);
|
||||
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
const mutation_partition& _mutation_partition;
|
||||
public:
|
||||
printer(const schema& s, const mutation_partition& mp) : _schema(s), _mutation_partition(mp) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
public:
|
||||
// Makes sure there is a dummy entry after all clustered rows. Doesn't affect continuity.
|
||||
// Doesn't invalidate iterators.
|
||||
|
||||
@@ -661,7 +661,8 @@ partition_snapshot::range_tombstones()
|
||||
position_in_partition_view::after_all_clustered_rows());
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const partition_entry& e) {
|
||||
std::ostream& operator<<(std::ostream& out, const partition_entry::printer& p) {
|
||||
auto& e = p._partition_entry;
|
||||
out << "{";
|
||||
bool first = true;
|
||||
if (e._version) {
|
||||
@@ -673,7 +674,7 @@ std::ostream& operator<<(std::ostream& out, const partition_entry& e) {
|
||||
if (v->is_referenced()) {
|
||||
out << "(*) ";
|
||||
}
|
||||
out << v->partition();
|
||||
out << mutation_partition::printer(p._schema, v->partition());
|
||||
v = v->next();
|
||||
first = false;
|
||||
}
|
||||
|
||||
@@ -570,7 +570,17 @@ public:
|
||||
cache_tracker*,
|
||||
partition_snapshot::phase_type phase = partition_snapshot::default_phase);
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const partition_entry& e);
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
const partition_entry& _partition_entry;
|
||||
public:
|
||||
printer(const schema& s, const partition_entry& pe) : _schema(s), _partition_entry(pe) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
|
||||
// Monotonic exception guarantees
|
||||
|
||||
@@ -1304,6 +1304,6 @@ std::ostream& operator<<(std::ostream& out, cache_entry& e) {
|
||||
return out << "{cache_entry: " << e.position()
|
||||
<< ", cont=" << e.continuous()
|
||||
<< ", dummy=" << e.is_dummy_entry()
|
||||
<< ", " << e.partition()
|
||||
<< ", " << partition_entry::printer(*e.schema(), e.partition())
|
||||
<< "}";
|
||||
}
|
||||
|
||||
@@ -393,7 +393,10 @@ public:
|
||||
}
|
||||
|
||||
proceed flush() {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush(in_progress={}, ready={}, skip={})", this, _in_progress, _ready, _skip_in_progress);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush(in_progress={}, ready={}, skip={})", this,
|
||||
_in_progress ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_in_progress) : std::optional<mutation_fragment::printer>(),
|
||||
_ready ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_ready) : std::optional<mutation_fragment::printer>(),
|
||||
_skip_in_progress);
|
||||
flush_pending_collection(*_schema);
|
||||
// If _ready is already set we have a bug: get_mutation_fragment()
|
||||
// was not called, and below we will lose one clustering row!
|
||||
@@ -410,7 +413,10 @@ public:
|
||||
}
|
||||
|
||||
proceed flush_if_needed(range_tombstone&& rt) {
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush_if_needed(in_progress={}, ready={}, skip={})", this, _in_progress, _ready, _skip_in_progress);
|
||||
sstlog.trace("mp_row_consumer_k_l {}: flush_if_needed(in_progress={}, ready={}, skip={})", this,
|
||||
_in_progress ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_in_progress) : std::optional<mutation_fragment::printer>(),
|
||||
_ready ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_ready) : std::optional<mutation_fragment::printer>(),
|
||||
_skip_in_progress);
|
||||
proceed ret = proceed::yes;
|
||||
if (_in_progress) {
|
||||
ret = flush();
|
||||
@@ -1183,7 +1189,7 @@ public:
|
||||
|
||||
if (_inside_static_row) {
|
||||
fill_cells(column_kind::static_column, _in_progress_static_row.cells());
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_row_end(_in_progress_static_row={})", this, _in_progress_static_row);
|
||||
sstlog.trace("mp_row_consumer_m {}: consume_row_end(_in_progress_static_row={})", this, static_row::printer(*_schema, _in_progress_static_row));
|
||||
_inside_static_row = false;
|
||||
auto action = _mf_filter->apply(_in_progress_static_row);
|
||||
switch (action) {
|
||||
|
||||
@@ -48,13 +48,13 @@ public:
|
||||
BOOST_FAIL(sprint("Expected: partition start with key %s, got end of stream", dk));
|
||||
}
|
||||
if (!mfopt->is_partition_start()) {
|
||||
BOOST_FAIL(sprint("Expected: partition start with key %s, got: %s", dk, *mfopt));
|
||||
BOOST_FAIL(sprint("Expected: partition start with key %s, got: %s", dk, mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
if (!mfopt->as_partition_start().key().equal(*_reader.schema(), dk)) {
|
||||
BOOST_FAIL(sprint("Expected: partition start with key %s, got: %s", dk, *mfopt));
|
||||
BOOST_FAIL(sprint("Expected: partition start with key %s, got: %s", dk, mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
if (tomb && mfopt->as_partition_start().partition_tombstone() != *tomb) {
|
||||
BOOST_FAIL(sprint("Expected: partition start with tombstone %s, got: %s", *tomb, *mfopt));
|
||||
BOOST_FAIL(sprint("Expected: partition start with tombstone %s, got: %s", *tomb, mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -66,7 +66,7 @@ public:
|
||||
BOOST_FAIL("Expected static row, got end of stream");
|
||||
}
|
||||
if (!mfopt->is_static_row()) {
|
||||
BOOST_FAIL(sprint("Expected static row, got: %s", *mfopt));
|
||||
BOOST_FAIL(sprint("Expected static row, got: %s", mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -78,7 +78,7 @@ public:
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got end of stream", ck));
|
||||
}
|
||||
if (!mfopt->is_clustering_row()) {
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got %s", ck, *mfopt));
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got %s", ck, mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
auto& actual = mfopt->as_clustering_row().key();
|
||||
if (!actual.equal(*_reader.schema(), ck)) {
|
||||
@@ -105,7 +105,7 @@ public:
|
||||
BOOST_FAIL("Expected static row, got end of stream");
|
||||
}
|
||||
if (!mfopt->is_static_row()) {
|
||||
BOOST_FAIL(sprint("Expected static row, got: %s", *mfopt));
|
||||
BOOST_FAIL(sprint("Expected static row, got: %s", mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
auto& cells = mfopt->as_static_row().cells();
|
||||
if (cells.size() != columns.size()) {
|
||||
@@ -135,7 +135,7 @@ public:
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got end of stream", ck));
|
||||
}
|
||||
if (!mfopt->is_clustering_row()) {
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got %s", ck, *mfopt));
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got %s", ck, mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
auto& actual = mfopt->as_clustering_row().key();
|
||||
if (!actual.equal(*_reader.schema(), ck)) {
|
||||
@@ -174,7 +174,7 @@ public:
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got end of stream", ck));
|
||||
}
|
||||
if (!mfopt->is_clustering_row()) {
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got %s", ck, *mfopt));
|
||||
BOOST_FAIL(sprint("Expected row with key %s, but got %s", ck, mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
auto& actual = mfopt->as_clustering_row().key();
|
||||
if (!actual.equal(*_reader.schema(), ck)) {
|
||||
@@ -203,7 +203,7 @@ public:
|
||||
BOOST_FAIL(sprint("Expected range tombstone %s, but got end of stream", rt));
|
||||
}
|
||||
if (!mfo->is_range_tombstone()) {
|
||||
BOOST_FAIL(sprint("Expected range tombstone %s, but got %s", rt, *mfo));
|
||||
BOOST_FAIL(sprint("Expected range tombstone %s, but got %s", rt, mutation_fragment::printer(*_reader.schema(), *mfo)));
|
||||
}
|
||||
const schema& s = *_reader.schema();
|
||||
range_tombstone_list actual_list(s);
|
||||
@@ -234,7 +234,7 @@ public:
|
||||
BOOST_FAIL(sprint("Expected partition end but got end of stream"));
|
||||
}
|
||||
if (!mfopt->is_end_of_partition()) {
|
||||
BOOST_FAIL(sprint("Expected partition end but got %s", *mfopt));
|
||||
BOOST_FAIL(sprint("Expected partition end but got %s", mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -242,10 +242,10 @@ public:
|
||||
flat_reader_assertions& produces(const schema& s, const mutation_fragment& mf) {
|
||||
auto mfopt = read_next();
|
||||
if (!mfopt) {
|
||||
BOOST_FAIL(sprint("Expected %s, but got end of stream", mf));
|
||||
BOOST_FAIL(sprint("Expected %s, but got end of stream", mutation_fragment::printer(*_reader.schema(), mf)));
|
||||
}
|
||||
if (!mfopt->equal(s, mf)) {
|
||||
BOOST_FAIL(sprint("Expected %s, but got %s", mf, *mfopt));
|
||||
BOOST_FAIL(sprint("Expected %s, but got %s", mutation_fragment::printer(*_reader.schema(), mf), mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -254,7 +254,7 @@ public:
|
||||
BOOST_TEST_MESSAGE("Expecting end of stream");
|
||||
auto mfopt = read_next();
|
||||
if (bool(mfopt)) {
|
||||
BOOST_FAIL(sprint("Expected end of stream, got %s", *mfopt));
|
||||
BOOST_FAIL(sprint("Expected end of stream, got %s", mutation_fragment::printer(*_reader.schema(), *mfopt)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -336,7 +336,8 @@ public:
|
||||
BOOST_REQUIRE(!inside_partition);
|
||||
auto& dk = mfo->as_partition_start().key();
|
||||
if (previous_partition && !previous_partition->as_partition_start().key().less_compare(*_reader.schema(), dk)) {
|
||||
BOOST_FAIL(sprint("previous partition had greater key: prev=%s, current=%s", *previous_partition, *mfo));
|
||||
BOOST_FAIL(sprint("previous partition had greater key: prev=%s, current=%s",
|
||||
mutation_fragment::printer(*_reader.schema(), *previous_partition), mutation_fragment::printer(*_reader.schema(), *mfo)));
|
||||
}
|
||||
previous_partition = std::move(mfo);
|
||||
previous_fragment = stdx::nullopt;
|
||||
@@ -348,7 +349,8 @@ public:
|
||||
BOOST_REQUIRE(inside_partition);
|
||||
if (previous_fragment) {
|
||||
if (!less(previous_fragment->position(), mfo->position())) {
|
||||
BOOST_FAIL(sprint("previous fragment has greater position: prev=%s, current=%s", *previous_fragment, *mfo));
|
||||
BOOST_FAIL(sprint("previous fragment has greater position: prev=%s, current=%s",
|
||||
mutation_fragment::printer(*_reader.schema(), *previous_fragment), mutation_fragment::printer(*_reader.schema(), *mfo)));
|
||||
}
|
||||
}
|
||||
previous_fragment = std::move(mfo);
|
||||
|
||||
@@ -114,7 +114,7 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_fragment) {
|
||||
for (auto&& mf : mfs) {
|
||||
auto refrozen_mf = freeze(s, mf).unfreeze(s);
|
||||
if (!mf.equal(s, refrozen_mf)) {
|
||||
BOOST_FAIL("Expected " << mf << " got " << refrozen_mf);
|
||||
BOOST_FAIL("Expected " << mutation_fragment::printer(s, mf) << " got " << mutation_fragment::printer(s, refrozen_mf));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -49,10 +49,12 @@ public:
|
||||
return *this;
|
||||
}
|
||||
if (!_m.equal(*_schema, other, s)) {
|
||||
BOOST_FAIL(sprint("Mutations differ, expected %s\n ...but got: %s", other, _m));
|
||||
BOOST_FAIL(sprint("Mutations differ, expected %s\n ...but got: %s",
|
||||
mutation_partition::printer(s, other), mutation_partition::printer(*_schema, _m)));
|
||||
}
|
||||
if (!other.equal(s, _m, *_schema)) {
|
||||
BOOST_FAIL(sprint("Mutation inequality is not symmetric for %s\n ...and: %s", other, _m));
|
||||
BOOST_FAIL(sprint("Mutation inequality is not symmetric for %s\n ...and: %s",
|
||||
mutation_partition::printer(s, other), mutation_partition::printer(*_schema, _m)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -63,21 +65,22 @@ public:
|
||||
|
||||
mutation_partition_assertion& is_not_equal_to(const schema& s, const mutation_partition& other) {
|
||||
if (_m.equal(*_schema, other, s)) {
|
||||
BOOST_FAIL(sprint("Mutations equal but expected to differ: %s\n ...and: %s", other, _m));
|
||||
BOOST_FAIL(sprint("Mutations equal but expected to differ: %s\n ...and: %s",
|
||||
mutation_partition::printer(s, other), mutation_partition::printer(*_schema, _m)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
mutation_partition_assertion& has_same_continuity(const mutation_partition& other) {
|
||||
if (!_m.equal_continuity(*_schema, other)) {
|
||||
BOOST_FAIL(sprint("Continuity doesn't match: %s\n ...and: %s", other, _m));
|
||||
BOOST_FAIL(sprint("Continuity doesn't match: %s\n ...and: %s", mutation_partition::printer(*_schema, other), mutation_partition::printer(*_schema, _m)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
mutation_partition_assertion& is_continuous(const position_range& r, is_continuous cont = is_continuous::yes) {
|
||||
if (!_m.check_continuity(*_schema, r, cont)) {
|
||||
BOOST_FAIL(sprint("Expected range %s to be %s in %s", r, cont ? "continuous" : "discontinuous", _m));
|
||||
BOOST_FAIL(sprint("Expected range %s to be %s in %s", r, cont ? "continuous" : "discontinuous", mutation_partition::printer(*_schema, _m)));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -1237,7 +1237,7 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
|
||||
rd.consume_pausable([&](mutation_fragment&& mf) {
|
||||
position_in_partition::less_compare less(*s);
|
||||
if (!less(mf.position(), position_in_partition_view::before_all_clustered_rows())) {
|
||||
BOOST_FAIL(sprint("Received clustering fragment: %s", mf));
|
||||
BOOST_FAIL(sprint("Received clustering fragment: %s", mutation_fragment::printer(*s, mf)));
|
||||
}
|
||||
result.partition().apply(*s, std::move(mf));
|
||||
return stop_iteration::no;
|
||||
@@ -1248,11 +1248,11 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
|
||||
rd.fast_forward_to(prange, db::no_timeout).get();
|
||||
rd.consume_pausable([&](mutation_fragment&& mf) {
|
||||
if (!mf.relevant_for_range(*s, prange.start())) {
|
||||
BOOST_FAIL(sprint("Received fragment which is not relevant for range: %s, range: %s", mf, prange));
|
||||
BOOST_FAIL(sprint("Received fragment which is not relevant for range: %s, range: %s", mutation_fragment::printer(*s, mf), prange));
|
||||
}
|
||||
position_in_partition::less_compare less(*s);
|
||||
if (!less(mf.position(), prange.end())) {
|
||||
BOOST_FAIL(sprint("Received fragment is out of range: %s, range: %s", mf, prange));
|
||||
BOOST_FAIL(sprint("Received fragment is out of range: %s, range: %s", mutation_fragment::printer(*s, mf), prange));
|
||||
}
|
||||
result.partition().apply(*s, std::move(mf));
|
||||
return stop_iteration::no;
|
||||
@@ -1304,7 +1304,7 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
|
||||
|
||||
rd.consume_pausable([&] (mutation_fragment&& mf) {
|
||||
if (mf.position().has_clustering_key() && !mf.range().overlaps(*s, prange.start(), prange.end())) {
|
||||
BOOST_FAIL(sprint("Received fragment which is not relevant for the slice: %s, slice: %s", mf, range));
|
||||
BOOST_FAIL(sprint("Received fragment which is not relevant for the slice: %s, slice: %s", mutation_fragment::printer(*s, mf), range));
|
||||
}
|
||||
result.partition().apply(*s, std::move(mf));
|
||||
return stop_iteration::no;
|
||||
@@ -1339,7 +1339,7 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
|
||||
auto consume_clustered = [&] (mutation_fragment&& mf) {
|
||||
position_in_partition::less_compare less(*s);
|
||||
if (less(mf.position(), last_pos)) {
|
||||
BOOST_FAIL(sprint("Out of order fragment: %s, last pos: %s", mf, last_pos));
|
||||
BOOST_FAIL(sprint("Out of order fragment: %s, last pos: %s", mutation_fragment::printer(*s, mf), last_pos));
|
||||
}
|
||||
last_pos = position_in_partition(mf.position());
|
||||
result.partition().apply(*s, std::move(mf));
|
||||
|
||||
@@ -1046,7 +1046,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) {
|
||||
|
||||
rd.consume_pausable([&] (mutation_fragment&& mf) {
|
||||
if (mf.position().has_clustering_key() && !mf.range().overlaps(*s, prange.start(), prange.end())) {
|
||||
BOOST_FAIL(sprint("Received fragment which is not relevant for the slice: %s, slice: %s", mf, range));
|
||||
BOOST_FAIL(sprint("Received fragment which is not relevant for the slice: %s, slice: %s", mutation_fragment::printer(*s, mf), range));
|
||||
}
|
||||
result.partition().apply(*s, std::move(mf));
|
||||
return stop_iteration::no;
|
||||
@@ -1075,7 +1075,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) {
|
||||
auto consume_clustered = [&] (mutation_fragment&& mf) {
|
||||
position_in_partition::less_compare less(*s);
|
||||
if (less(mf.position(), last_pos)) {
|
||||
BOOST_FAIL(sprint("Out of order fragment: %s, last pos: %s", mf, last_pos));
|
||||
BOOST_FAIL(sprint("Out of order fragment: %s, last pos: %s", mutation_fragment::printer(*s, mf), last_pos));
|
||||
}
|
||||
last_pos = position_in_partition(mf.position());
|
||||
result.partition().apply(*s, std::move(mf));
|
||||
|
||||
@@ -1540,7 +1540,7 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
|
||||
return seastar::async([] {
|
||||
auto check_partitions_match = [] (const mutation_partition& mp1, const mutation_partition& mp2, const schema& s) {
|
||||
if (!mp1.equal(s, mp2)) {
|
||||
BOOST_FAIL(sprint("Partitions don't match, got: %s\n...and: %s", mp1, mp2));
|
||||
BOOST_FAIL(sprint("Partitions don't match, got: %s\n...and: %s", mutation_partition::printer(s, mp1), mutation_partition::printer(s, mp2)));
|
||||
}
|
||||
};
|
||||
for_each_mutation_pair([&] (auto&& m1, auto&& m2, are_equal eq) {
|
||||
|
||||
@@ -191,7 +191,7 @@ public:
|
||||
sstring value;
|
||||
api::timestamp_type t;
|
||||
std::tie(value, t) = _t.s.get_value(row);
|
||||
test_log.trace("reader {}: {} @{}, {}", _id, value, t, row);
|
||||
test_log.trace("reader {}: {} @{}, {}", _id, value, t, clustering_row::printer(*_t.s.schema(), row));
|
||||
if (_value && value != _value) {
|
||||
throw std::runtime_error(sprint("Saw values from two different writes in partition %d: %s and %s", _key, _value, value));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user