Merge 'Introduce SELECT MUTATION FRAGMENTS statement' from Botond Dénes

SELECT MUTATION FRAGMENTS is a new select statement sub-type, which allows dumping the underling mutations making up the data of a given table. The output of this statement is mutation-fragments presented as CQL rows. Each row corresponds to a mutation-fragment. Subsequently, the output of this statement has a schema that is different than that of the underlying table.  The output schema is derived from the table's schema, as following:
* The table's partition key is copied over as-is
* The clustering key is formed from the following columns:
    - mutation_source (text): the kind of the mutation source, one of: memtable, row-cache or sstable; and the identifier of the individual mutation source.
    - partition_region (int): represents the enum with the same name.
    - the copy of the table's clustering columns
    - position_weight (int): -1, 0 or 1, has the same meaning as that in position_in_partition, used to disambiguate range tombstone changes with the same clustering key, from rows and from each other.
* The following regular columns:
    - metadata (text): the JSON representation of the mutation-fragment's metadata.
    - value (text): the JSON representation of the mutation-fragment's value.

Data is always read from the local replica, on which the query is executed. Migrating queries between coordinators is frobidden.

More details in the documentation commit (last commit).

Example:
```cql
cqlsh> CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));

cqlsh> DELETE FROM ks.tbl WHERE pk = 0;
cqlsh> DELETE FROM ks.tbl WHERE pk = 0 AND ck > 0 AND ck < 2;
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 0, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 1, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 2, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 0, 0);
cqlsh> SELECT * FROM ks.tbl;

 pk | ck | v
----+----+---
  1 |  0 | 0
  0 |  0 | 0
  0 |  1 | 0
  0 |  2 | 0

(4 rows)
cqlsh> SELECT * FROM MUTATION_FRAGMENTS(ks.tbl);

 pk | mutation_source | partition_region | ck | position_weight | metadata                                                                                                                 | mutation_fragment_kind | value
----+-----------------+------------------+----+-----------------+--------------------------------------------------------------------------------------------------------------------------+------------------------+-----------
  1 |      memtable:0 |                0 |    |                 |                                                                                                         {"tombstone":{}} |        partition start |      null
  1 |      memtable:0 |                2 |  0 |               0 | {"marker":{"timestamp":1688122873341627},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122873341627}}} |         clustering row | {"v":"0"}
  1 |      memtable:0 |                3 |    |                 |                                                                                                                     null |          partition end |      null
  0 |      memtable:0 |                0 |    |                 |                                      {"tombstone":{"timestamp":1688122848686316,"deletion_time":"2023-06-30 11:00:48z"}} |        partition start |      null
  0 |      memtable:0 |                2 |  0 |               0 | {"marker":{"timestamp":1688122860037077},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122860037077}}} |         clustering row | {"v":"0"}
  0 |      memtable:0 |                2 |  0 |               1 |                                      {"tombstone":{"timestamp":1688122853571709,"deletion_time":"2023-06-30 11:00:53z"}} | range tombstone change |      null
  0 |      memtable:0 |                2 |  1 |               0 | {"marker":{"timestamp":1688122864641920},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122864641920}}} |         clustering row | {"v":"0"}
  0 |      memtable:0 |                2 |  2 |              -1 |                                                                                                         {"tombstone":{}} | range tombstone change |      null
  0 |      memtable:0 |                2 |  2 |               0 | {"marker":{"timestamp":1688122868706989},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122868706989}}} |         clustering row | {"v":"0"}
  0 |      memtable:0 |                3 |    |                 |                                                                                                                     null |          partition end |      null

(10 rows)
```

Perf simple query:
```
/build/release/scylla perf-simple-query -c1 -m2G --duration=60
```

Before:
```
median 141596.39 tps ( 62.1 allocs/op,  13.1 tasks/op,   43688 insns/op,        0 errors)
median absolute deviation: 137.15
maximum: 142173.32
minimum: 140492.37
```
After:
```
median 141889.95 tps ( 62.1 allocs/op,  13.1 tasks/op,   43692 insns/op,        0 errors)
median absolute deviation: 167.04
maximum: 142380.26
minimum: 141025.51
```

Fixes: https://github.com/scylladb/scylladb/issues/11130

Closes #14347

* github.com:scylladb/scylladb:
  docs/operating-scylla/admin-tools: add documentation for the SELECT * FROM MUTATION_FRAGMENTS() statement
  test/topology_custom: add test_select_from_mutation_fragments.py
  test/boost/database_test: add test for mutation_dump/generate_output_schema_from_underlying_schema
  test/cql-pytest: add test_select_mutation_fragments.py
  test/cql-pytest: move scylla_data_dir fixture to conftest.py
  cql3/statements: wire-in mutation_fragments_select_statement
  cql3/restrictions/statement_restrictions: fix indentation
  cql3/restrictions/statement_restrictions: add check_indexes flag
  cql3/statments/select_statement: add mutation_fragments_select_statement
  cql3: add SELECT MUTATION FRAGMENTS select statement sub-type
  service/pager: allow passing a query functor override
  service/storage_proxy: un-embed coordinator_query_options
  replica: add mutation_dump
  replica: extract query_state into own header
  replica/table: add make_nonpopulating_cache_reader()
  replica/table: add select_memtables_as_mutation_sources()
  tools,mutation: extract the low-level json utilities into mutation/json.hh
  tools/json_writer: fold SstableKey() overloads into callers
  tools/json_writer: allow writing metadata and value separately
  tools/json_writer: split mutation_fragment_json_writer in two classes
  tools/json_writer: allow passing custom std::ostream to json_writer
This commit is contained in:
Avi Kivity
2023-07-19 11:54:10 +03:00
33 changed files with 2340 additions and 416 deletions

View File

@@ -724,6 +724,7 @@ scylla_core = (['message/messaging_service.cc',
'replica/memtable.cc',
'replica/exceptions.cc',
'replica/dirty_memory_manager.cc',
'replica/mutation_dump.cc',
'mutation/atomic_cell.cc',
'mutation/canonical_mutation.cc',
'mutation/frozen_mutation.cc',

View File

@@ -391,7 +391,10 @@ selectStatement returns [std::unique_ptr<raw::select_statement> expr]
( K_DISTINCT { is_distinct = true; } )?
sclause=selectClause
)
K_FROM cf=columnFamilyName
K_FROM (
cf=columnFamilyName
| K_MUTATION_FRAGMENTS '(' cf=columnFamilyName ')' { statement_subtype = raw::select_statement::parameters::statement_subtype::MUTATION_FRAGMENTS; }
)
( K_WHERE w=whereClause { wclause = std::move(w); } )?
( K_GROUP K_BY gbcolumns=listOfIdentifiers)?
( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] )* )?
@@ -2067,6 +2070,7 @@ basic_unreserved_keyword returns [sstring str]
| K_DESCRIBE
| K_DESC
| K_EXECUTE
| K_MUTATION_FRAGMENTS
) { $str = $k.text; }
;
@@ -2275,6 +2279,8 @@ K_PRUNE: P R U N E;
K_EXECUTE: E X E C U T E;
K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;
// Case-insensitive alpha characters
fragment A: ('a'|'A');
fragment B: ('b'|'B');

View File

@@ -355,9 +355,11 @@ statement_restrictions::statement_restrictions(data_dictionary::database db,
prepare_context& ctx,
bool selects_only_static_columns,
bool for_view,
bool allow_filtering)
bool allow_filtering,
check_indexes do_check_indexes)
: statement_restrictions(schema, allow_filtering)
{
_check_indexes = do_check_indexes;
for (auto&& relation_expr : boolean_factors(where_clause)) {
const expr::binary_operator* relation_binop = expr::as_if<expr::binary_operator>(&relation_expr);
@@ -383,18 +385,25 @@ statement_restrictions::statement_restrictions(data_dictionary::database db,
_clustering_prefix_restrictions = extract_clustering_prefix_restrictions(*_where, _schema);
_partition_range_restrictions = extract_partition_range(*_where, _schema);
}
auto cf = db.find_column_family(schema);
auto& sim = cf.get_index_manager();
const expr::allow_local_index allow_local(
!has_partition_key_unrestricted_components()
&& partition_key_restrictions_is_all_eq());
_has_multi_column = find_binop(_clustering_columns_restrictions, expr::is_multi_column);
_has_queriable_ck_index = clustering_columns_restrictions_have_supporting_index(sim, allow_local)
&& !type.is_delete();
_has_queriable_pk_index = parition_key_restrictions_have_supporting_index(sim, allow_local)
&& !type.is_delete();
_has_queriable_regular_index = expr::index_supports_some_column(_nonprimary_key_restrictions, sim, allow_local)
&& !type.is_delete();
if (_check_indexes) {
auto cf = db.find_column_family(schema);
auto& sim = cf.get_index_manager();
const expr::allow_local_index allow_local(
!has_partition_key_unrestricted_components()
&& partition_key_restrictions_is_all_eq());
_has_multi_column = find_binop(_clustering_columns_restrictions, expr::is_multi_column);
_has_queriable_ck_index = clustering_columns_restrictions_have_supporting_index(sim, allow_local)
&& !type.is_delete();
_has_queriable_pk_index = parition_key_restrictions_have_supporting_index(sim, allow_local)
&& !type.is_delete();
_has_queriable_regular_index = expr::index_supports_some_column(_nonprimary_key_restrictions, sim, allow_local)
&& !type.is_delete();
} else {
_has_queriable_ck_index = false;
_has_queriable_pk_index = false;
_has_queriable_regular_index = false;
}
// At this point, the select statement if fully constructed, but we still have a few things to validate
process_partition_key_restrictions(for_view, allow_filtering);
@@ -571,9 +580,12 @@ bool statement_restrictions::has_eq_restriction_on_column(const column_definitio
std::vector<const column_definition*> statement_restrictions::get_column_defs_for_filtering(data_dictionary::database db) const {
std::vector<const column_definition*> column_defs_for_filtering;
if (need_filtering()) {
auto cf = db.find_column_family(_schema);
auto& sim = cf.get_index_manager();
auto opt_idx = std::get<0>(find_idx(sim));
std::optional<secondary_index::index> opt_idx;
if (_check_indexes) {
auto cf = db.find_column_family(_schema);
auto& sim = cf.get_index_manager();
opt_idx = std::get<0>(find_idx(sim));
}
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef, const expr::expression* single_col_restr) {
return opt_idx && single_col_restr && is_supported_by(*single_col_restr, *opt_idx);
};

View File

@@ -25,6 +25,10 @@ namespace cql3 {
namespace restrictions {
///In some cases checking if columns have indexes is undesired of even
///impossible, because e.g. the query runs on a pseudo-table, which does not
///have an index-manager, or even a table object.
using check_indexes = bool_class<class check_indexes_tag>;
/**
* The restrictions corresponding to the relations specified on the where-clause of CQL query.
@@ -110,6 +114,8 @@ private:
bool _partition_range_is_simple; ///< False iff _partition_range_restrictions imply a Cartesian product.
check_indexes _check_indexes = check_indexes::yes;
public:
/**
* Creates a new empty <code>StatementRestrictions</code>.
@@ -126,7 +132,8 @@ public:
prepare_context& ctx,
bool selects_only_static_columns,
bool for_view = false,
bool allow_filtering = false);
bool allow_filtering = false,
check_indexes do_check_indexes = check_indexes::yes);
const std::vector<expr::expression>& index_restrictions() const;

View File

@@ -12,6 +12,7 @@
#include "cql3/statements/raw/cf_statement.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/restrictions/statement_restrictions.hh"
#include "cql3/attributes.hh"
#include "db/config.hh"
#include <seastar/core/shared_ptr.hh>
@@ -24,10 +25,6 @@ namespace selection {
class prepared_selector;
} // namespace selection
namespace restrictions {
class statement_restrictions;
} // namespace restrictions
namespace statements {
namespace raw {
@@ -49,7 +46,7 @@ public:
class parameters final {
public:
using orderings_type = std::vector<std::pair<shared_ptr<column_identifier::raw>, ordering>>;
enum class statement_subtype { REGULAR, JSON, PRUNE_MATERIALIZED_VIEW };
enum class statement_subtype { REGULAR, JSON, PRUNE_MATERIALIZED_VIEW, MUTATION_FRAGMENTS };
private:
const orderings_type _orderings;
const bool _is_distinct;
@@ -69,6 +66,7 @@ public:
bool is_distinct() const;
bool allow_filtering() const;
bool is_json() const;
bool is_mutation_fragments() const;
bool bypass_cache() const;
bool is_prune_materialized_view() const;
orderings_type const& orderings() const;
@@ -110,13 +108,14 @@ private:
prepare_context& ctx,
::shared_ptr<selection::selection> selection,
bool for_view = false,
bool allow_filtering = false);
bool allow_filtering = false,
restrictions::check_indexes do_check_indexes = restrictions::check_indexes::yes);
/** Returns an expression for the limit or nullopt if no limit is set */
std::optional<expr::expression> prepare_limit(data_dictionary::database db, prepare_context& ctx, const std::optional<expr::expression>& limit);
// Checks whether it is legal to have ORDER BY in this statement
static void verify_ordering_is_allowed(const restrictions::statement_restrictions& restrictions);
static void verify_ordering_is_allowed(const parameters& params, const restrictions::statement_restrictions& restrictions);
void handle_unrecognized_ordering_column(const column_identifier& column) const;

View File

@@ -47,6 +47,7 @@
#include "utils/result_combinators.hh"
#include "utils/result_loop.hh"
#include "replica/database.hh"
#include "replica/mutation_dump.hh"
#include <boost/algorithm/cxx11/all_of.hpp>
@@ -135,6 +136,10 @@ bool select_statement::parameters::is_json() const {
return _statement_subtype == statement_subtype::JSON;
}
bool select_statement::parameters::is_mutation_fragments() const {
return _statement_subtype == statement_subtype::MUTATION_FRAGMENTS;
}
bool select_statement::parameters::allow_filtering() const {
return _allow_filtering;
}
@@ -1617,6 +1622,192 @@ parallelized_select_statement::do_execute(
});
}
mutation_fragments_select_statement::mutation_fragments_select_statement(
schema_ptr output_schema,
schema_ptr underlying_schema,
uint32_t bound_terms,
lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection,
::shared_ptr<const restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
bool is_reversed,
ordering_comparator_type ordering_comparator,
std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit,
cql_stats &stats,
std::unique_ptr<cql3::attributes> attrs)
: select_statement(
std::move(output_schema),
bound_terms,
std::move(parameters),
std::move(selection),
std::move(restrictions),
std::move(group_by_cell_indices),
is_reversed,
std::move(ordering_comparator),
std::move(limit),
std::move(per_partition_limit),
stats,
std::move(attrs))
, _underlying_schema(std::move(underlying_schema))
{ }
schema_ptr mutation_fragments_select_statement::generate_output_schema(schema_ptr underlying_schema) {
return replica::mutation_dump::generate_output_schema_from_underlying_schema(std::move(underlying_schema));
}
future<exceptions::coordinator_result<service::storage_proxy_coordinator_query_result>>
mutation_fragments_select_statement::do_query(
const locator::node* this_node,
service::storage_proxy& sp,
schema_ptr schema,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges,
db::consistency_level cl,
service::storage_proxy_coordinator_query_options optional_params) const {
auto res = co_await replica::mutation_dump::dump_mutations(sp.get_db(), schema, _underlying_schema, partition_ranges, *cmd, optional_params.timeout(sp));
service::replicas_per_token_range last_replicas;
if (this_node) {
last_replicas.emplace(dht::token_range::make_open_ended_both_sides(), std::vector<locator::host_id>{this_node->host_id()});
}
co_return service::storage_proxy_coordinator_query_result{std::move(res), std::move(last_replicas), {}};
}
future<::shared_ptr<cql_transport::messages::result_message>>
mutation_fragments_select_statement::do_execute(query_processor& qp, service::query_state& state, const query_options& options) const {
tracing::add_table_name(state.get_trace_state(), keyspace(), column_family());
auto cl = options.get_consistency();
uint64_t limit = get_limit(options);
auto now = gc_clock::now();
_stats.filtered_reads += _restrictions_need_filtering;
const source_selector src_sel = state.get_client_state().is_internal()
? source_selector::INTERNAL : source_selector::USER;
++_stats.query_cnt(src_sel, _ks_sel, cond_selector::NO_CONDITIONS, statement_type::SELECT);
_stats.select_bypass_caches += _parameters->bypass_cache();
_stats.select_allow_filtering += _parameters->allow_filtering();
_stats.select_partition_range_scan += _range_scan;
_stats.select_partition_range_scan_no_bypass_cache += _range_scan_no_bypass_cache;
auto slice = make_partition_slice(options);
auto max_result_size = qp.proxy().get_max_result_size(slice);
auto command = ::make_lw_shared<query::read_command>(
_schema->id(),
_schema->version(),
std::move(slice),
max_result_size,
query::tombstone_limit(qp.proxy().get_tombstone_limit()),
query::row_limit(limit),
query::partition_limit(query::max_partitions),
now,
tracing::make_trace_info(state.get_trace_state()),
query_id::create_null_id(),
query::is_first_page::no,
options.get_timestamp(state));
command->allow_limit = db::allow_per_partition_rate_limit::yes;
int32_t page_size = options.get_page_size();
_stats.unpaged_select_queries(_ks_sel) += page_size <= 0;
// An aggregation query will never be paged for the user, but we always page it internally to avoid OOM.
// If we user provided a page_size we'll use that to page internally (because why not), otherwise we use our default
// Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707).
// Also note: all GROUP BY queries are considered aggregation.
const bool aggregate = _selection->is_aggregate() || has_group_by();
const bool nonpaged_filtering = _restrictions_need_filtering && page_size <= 0;
if (aggregate || nonpaged_filtering) {
page_size = internal_paging_size;
}
auto key_ranges = _restrictions->get_partition_key_ranges(options);
auto timeout_duration = get_timeout(state.get_client_state(), options);
auto timeout = db::timeout_clock::now() + timeout_duration;
if (!aggregate && !_restrictions_need_filtering && (page_size <= 0
|| !service::pager::query_pagers::may_need_paging(*_schema, page_size,
*command, key_ranges))) {
return do_query({}, qp.proxy(), _schema, command, std::move(key_ranges), cl,
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}})
.then(wrap_result_to_error_message([&, this] (service::storage_proxy_coordinator_query_result&& qr) {
cql3::selection::result_set_builder builder(*_selection, now);
query::result_view::consume(*qr.query_result, std::move(slice),
cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection));
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(result(builder.build()));
return ::shared_ptr<cql_transport::messages::result_message>(std::move(msg));
}));
}
const locator::node* this_node = nullptr;
{
auto& tbl = qp.proxy().local_db().find_column_family(_underlying_schema);
auto& erm = tbl.get_effective_replication_map();
auto& topo = erm->get_topology();
this_node = topo.this_node();
auto state = options.get_paging_state();
if (state && !state->get_last_replicas().empty()) {
auto last_host = state->get_last_replicas().begin()->second.front();
if (last_host != this_node->host_id()) {
const auto last_node = topo.find_node(last_host);
throw exceptions::invalid_request_exception(format(
"Moving between coordinators is not allowed in SELECT FROM MUTATION_FRAGMENTS() statements, last page's coordinator was {}{}",
last_host,
last_node ? fmt::format("({})", last_node->endpoint()) : ""));
}
}
}
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto p = service::pager::query_pagers::pager(
qp.proxy(),
_schema,
_selection,
state,
options,
command,
std::move(key_ranges),
_restrictions_need_filtering ? _restrictions : nullptr,
std::bind_front(&mutation_fragments_select_statement::do_query, this, this_node));
if (_selection->is_trivial() && !_restrictions_need_filtering && !_per_partition_limit) {
return p->fetch_page_generator_result(page_size, now, timeout, _stats).then(wrap_result_to_error_message([this, p = std::move(p)] (result_generator&& generator) {
auto meta = [&] () -> shared_ptr<const cql3::metadata> {
if (!p->is_exhausted()) {
auto meta = make_shared<metadata>(*_selection->get_result_metadata());
meta->set_paging_state(p->state());
return meta;
} else {
return _selection->get_result_metadata();
}
}();
return shared_ptr<cql_transport::messages::result_message>(
make_shared<cql_transport::messages::result_message::rows>(result(std::move(generator), std::move(meta)))
);
}));
}
return p->fetch_page_result(page_size, now, timeout).then(wrap_result_to_error_message(
[this, p = std::move(p)](std::unique_ptr<cql3::result_set>&& rs) {
if (!p->is_exhausted()) {
rs->get_metadata().set_paging_state(p->state());
}
if (_restrictions_need_filtering) {
_stats.filtered_rows_read_total += p->stats().rows_read_total;
_stats.filtered_rows_matched_total += rs->size();
}
update_stats_rows_read(rs->size());
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
}));
}
namespace raw {
static void validate_attrs(const cql3::attributes::raw& attrs) {
@@ -1687,7 +1878,8 @@ select_statement::maybe_jsonize_select_clause(std::vector<selection::prepared_se
}
std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::database db, cql_stats& stats, bool for_view) {
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
schema_ptr underlying_schema = validation::validate_column_family(db, keyspace(), column_family());
schema_ptr schema = _parameters->is_mutation_fragments() ? mutation_fragments_select_statement::generate_output_schema(underlying_schema) : underlying_schema;
prepare_context& ctx = get_prepare_context();
auto prepared_selectors = selection::raw_selector::to_prepared_selectors(_select_clause, *schema, db, keyspace());
@@ -1718,7 +1910,8 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
? selection::selection::wildcard(schema)
: selection::selection::from_selectors(db, schema, keyspace(), levellized_prepared_selectors);
auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering());
auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering(),
restrictions::check_indexes(!_parameters->is_mutation_fragments()));
if (_parameters->is_distinct()) {
validate_distinct_selection(*schema, *selection, *restrictions);
@@ -1729,7 +1922,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
if (!_parameters->orderings().empty()) {
assert(!for_view);
verify_ordering_is_allowed(*restrictions);
verify_ordering_is_allowed(*_parameters, *restrictions);
prepared_orderings_type prepared_orderings = prepare_orderings(*schema);
verify_ordering_is_valid(prepared_orderings, *schema, *restrictions);
@@ -1791,6 +1984,21 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
prepare_limit(db, ctx, _per_partition_limit),
stats,
std::move(prepared_attrs));
} else if (_parameters->is_mutation_fragments()) {
stmt = ::make_shared<cql3::statements::mutation_fragments_select_statement>(
schema,
underlying_schema,
ctx.bound_variables_size(),
_parameters,
std::move(selection),
std::move(restrictions),
std::move(group_by_cell_indices),
is_reversed_,
std::move(ordering_comparator),
prepare_limit(db, ctx, _limit),
prepare_limit(db, ctx, _per_partition_limit),
stats,
std::move(prepared_attrs));
} else if (restrictions->uses_secondary_indexing()) {
stmt = indexed_table_select_statement::prepare(
db,
@@ -1862,11 +2070,12 @@ select_statement::prepare_restrictions(data_dictionary::database db,
prepare_context& ctx,
::shared_ptr<selection::selection> selection,
bool for_view,
bool allow_filtering)
bool allow_filtering,
restrictions::check_indexes do_check_indexes)
{
try {
return ::make_shared<restrictions::statement_restrictions>(db, schema, statement_type::SELECT, _where_clause, ctx,
selection->contains_only_static_columns(), for_view, allow_filtering);
selection->contains_only_static_columns(), for_view, allow_filtering, do_check_indexes);
} catch (const exceptions::unrecognized_entity_exception& e) {
if (contains_alias(e.entity)) {
throw exceptions::invalid_request_exception(format("Aliases aren't allowed in the WHERE clause (name: '{}')", e.entity));
@@ -1889,7 +2098,7 @@ select_statement::prepare_limit(data_dictionary::database db, prepare_context& c
return prep_limit;
}
void select_statement::verify_ordering_is_allowed(const restrictions::statement_restrictions& restrictions)
void select_statement::verify_ordering_is_allowed(const parameters& params, const restrictions::statement_restrictions& restrictions)
{
if (restrictions.uses_secondary_indexing()) {
throw exceptions::invalid_request_exception("ORDER BY with 2ndary indexes is not supported.");
@@ -1897,6 +2106,9 @@ void select_statement::verify_ordering_is_allowed(const restrictions::statement_
if (restrictions.is_key_range()) {
throw exceptions::invalid_request_exception("ORDER BY is only supported when the partition key is restricted by an EQ or an IN.");
}
if (params.is_mutation_fragments()) {
throw exceptions::invalid_request_exception("ORDER BY is not supported in SELECT FROM MUTATION_FRAGMENTS() statements.");
}
}
void select_statement::handle_unrecognized_ordering_column(const column_identifier& column) const

View File

@@ -20,8 +20,15 @@
#include "exceptions/exceptions.hh"
#include "exceptions/coordinator_result.hh"
namespace locator {
class node;
} // namespace locator
namespace service {
class client_state;
class storage_proxy;
class storage_proxy_coordinator_query_options;
class storage_proxy_coordinator_query_result;
} // namespace service
namespace cql3 {
@@ -310,6 +317,42 @@ private:
bytes compute_idx_token(const partition_key& key) const;
};
class mutation_fragments_select_statement : public select_statement {
schema_ptr _underlying_schema;
public:
mutation_fragments_select_statement(
schema_ptr output_schema,
schema_ptr underlying_schema,
uint32_t bound_terms,
lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection,
::shared_ptr<const restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
bool is_reversed,
ordering_comparator_type ordering_comparator,
std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit,
cql_stats &stats,
std::unique_ptr<cql3::attributes> attrs);
// This statement has a schema that is different from that of the underlying table.
static schema_ptr generate_output_schema(schema_ptr underlying_schema);
private:
future<exceptions::coordinator_result<service::storage_proxy_coordinator_query_result>>
do_query(
const locator::node* this_node,
service::storage_proxy& sp,
schema_ptr schema,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges,
db::consistency_level cl,
service::storage_proxy_coordinator_query_options optional_params) const;
virtual future<::shared_ptr<cql_transport::messages::result_message>> do_execute(query_processor& qp,
service::query_state& state, const query_options& options) const override;
};
}
}

View File

@@ -14,5 +14,7 @@
* `scyllatop <https://www.scylladb.com/2016/03/22/scyllatop/>`_ - A terminal base top-like tool for scylladb collectd/prometheus metrics.
* :doc:`scylla_dev_mode_setup</getting-started/installation-common/dev-mod>` - run Scylla in Developer Mode.
* :doc:`perftune</operating-scylla/admin-tools/perftune>` - performance configuration.
* :doc:`SELECT * FROM MUTATION_FRAGMENTS() Statement </operating-scylla/admin-tools/select-from-mutation-fragments/>` - dump the underlying mutation data from tables.
Run each tool with ``-h``, ``--help`` for full options description.

View File

@@ -18,7 +18,7 @@ Admin Tools
Scylla Logs </getting-started/logging/>
perftune
Virtual Tables </operating-scylla/admin-tools/virtual-tables/>
SELECT * FROM MUTATION_FRAGMENTS() Statement </operating-scylla/admin-tools/select-from-mutation-fragments/>
.. panel-box::
:title: Admin Tools

View File

@@ -153,6 +153,8 @@ The stream is strictly ordered:
Supported Operations
--------------------
.. _scylla-sstable-dump-data-operation:
dump-data
^^^^^^^^^

View File

@@ -0,0 +1,336 @@
============================================
SELECT * FROM MUTATION_FRAGMENTS() Statement
============================================
.. warning:: This statement is not final and is subject to change without notice and in backwards-incompatible ways.
.. note:: The target audience of this statement and therefore that of this document is people who are familiar with the internals of ScyllaDB.
The ``SELECT * FROM MUTATION_FRAGMENTS()`` statement allows for reading the raw underlying mutations (data) from a table.
This is indended to be used as a diagnostics tool to debug performance or correctness issues, where inspecting the raw underlying data, as scylla stores it, is desired.
So far this was only possible with sstables, using a tool like :doc:`Scylla SStable</operating-scylla/admin-tools/scylla-sstable>`.
This statement allows inspecting the content of the row-cache, as well as that of individual memtables, in addition to individual sstables.
The statement has to be used on an existing table, by using a regular ``SELECT`` query, which wraps the table name in ``MUTATION_FRAGMENTS()``. For example, to dump all mutations from ``my_keyspace.my_table``:
.. code-block:: cql
SELECT * FROM MUTATION_FRAGMENTS(my_keyspace.my_table);
Output Schema
-------------
The schema of the statement, and therefore the columns available to select and to restrict, are different from that of the underlying table.
The output schema is computed from the schema of the underlying table, as follows:
* The partition key columns are copied as-is
* The clustering key is computed as follows:
- ``mutation_source text``
- ``partition_region byte``
- The clustering columns of the underlying table
- ``position_weight byte``
* Regular columns:
- ``mutation_fragment_kind text``
- ``metadata text``
- ``value text``
So for a table with the following definition:
.. code-block:: cql
CREATE TABLE my_keyspace.my_table (
pk1 int,
pk2 text,
ck1 byte,
ck2 text,
col1 text,
col2 text,
PRIMARY KEY ((pk1, pk2), ck1, ck2)
);
The transformed schema would look like this:
.. code-block:: cql
CREATE TABLE "my_keyspace.my_table_$mutation_fragments"(
pk1 int,
pk2 text,
mutation_source text,
partition_region byte,
ck1 byte,
ck2 text,
position_weight byte,
mutation_fragment_kind text,
metadata text,
value text,
PRIMARY KEY ((pk1, pk2), mutation_source, partition_region, ck1, ck2, position_weight)
);
Note how the partition-key columns are identical, the clustering key columns are derived from that of the underlying table and the regular columns are completely replaced.
Each row in the output represents a mutation-fragment in the underlying representation, and each partition in the output represents a mutation in the underlying representation.
Columns
^^^^^^^
mutation_source
~~~~~~~~~~~~~~~
The mutation source the mutation originates from. It has the following format: ``${mutation_source_kind}[:${mutation_source_id}]``.
Where ``mutation_source_kind`` is one of:
* ``memtable``
* ``row-cache``
* ``sstable``
And the ``mutation_source_id`` is used to distinguish individual mutation sources of the same kind, where applicable:
* ``memtable`` - a numeric id, starting from ``0``
* ``row-cache`` - N/A, there is only a single cache per table
* ``sstable`` - the path of the sstable
partition_region
~~~~~~~~~~~~~~~~
The numeric representation of the ``enum`` with the same name:
.. code-block:: c++
enum class partition_region : uint8_t {
partition_start, // 0
static_row, // 1
clustered, // 2
partition_end, // 3
};
The reason for using the underlying numeric representation, instead of the name, is to sort mutation-fragments in their natural order.
position_weight
~~~~~~~~~~~~~~~
The position-weight of the underlying mutation-fragment, describing its relation to the clustering key in its position. This is either:
* ``-1`` - before
* ``0`` - at
* ``1`` - after
The reason for using the underlying numeric representation, instead of the human-readeable text, is to sort mutation-fragments in their natural order.
mutation_fragment_kind
~~~~~~~~~~~~~~~~~~~~~~
The kind of the mutation fragment, the row represents. One of:
* ``partition start``
* ``static row``
* ``clustering row``
* ``range tombstone change``
* ``partition end``
This is the text representation of the ``enum class mutation_fragment_v2_kind``. Since this is a regular column, the human readeable name is used.
metadata
~~~~~~~~
The content of the mutation-fragment represented as JSON, without the values, if applicable.
This is uses the same JSON schema as :ref:`scylla sstable dump-data<scylla-sstable-dump-data-operation>`.
Content of ``metadata`` column for various mutation fragment kinds:
+------------------------+-------------------------------------------------------------+
| mutation frament kind | Content |
+========================+=============================================================+
| partition start | ``{"tombstone": $TOMBSTONE}`` |
+------------------------+-------------------------------------------------------------+
| static row | ``$COLUMNS`` |
+------------------------+-------------------------------------------------------------+
| clustering row | ``$CLUSTERING_ROW`` without the ``type`` and ``key`` fields |
+------------------------+-------------------------------------------------------------+
| range tombstone change | ``{"tombstone": $TOMBSTONE}`` |
+------------------------+-------------------------------------------------------------+
| partition end | N/A |
+------------------------+-------------------------------------------------------------+
JSON symbols are represented as ``$SYMBOL_NAME``, the definition of these can be found in :ref:`scylla sstable dump-data<scylla-sstable-dump-data-operation>`.
value
~~~~~
The value of the mutation-fragment, represented as JSON, if applicable.
Only ``static row`` and ``clustering row`` fragments have values.
The JSON schema of both is that of the ``$COLUMNS`` JSON symbol.
See :ref:`scylla sstable dump-data<scylla-sstable-dump-data-operation>` for the definition of these.
Only the ``value`` field is left in cell objects (``$REGULAR_CELL``, ``$COUNTER_SHARDS_CELL``, ``$COUNTER_UPDATE_CELL`` and ``$FROZEN_COLLECTION``) and the ``cells`` field in collection objects (``$COLLECTION``).
The reason for extracting this out into a separate column, is to allow deselecting the potentially large values, de-cluttering the CQL output, and reducing the amount of data that has to transferred.
Limitations and Peculiarities
-----------------------------
Data is read locally, from the node which receives the query, so replica is always the same node as the coordinator.
The query cannot be migrated between nodes. If a query is paged, all its pages have to be served by the same coordinator. This is enforced, and any attempt to migrate the query to another coordinator will result in the query being aborted.
Note that by default, drivers use round robin load balancing policies, and consequently they will attemp to read each page from a different coordinator.
The statement can output rows with a non-full clustering prefix.
Examples
--------
Given a table, with the following definition:
.. code-block:: cql
CREATE TABLE ks.tbl (
pk int,
ck int,
v int,
PRIMARY KEY (pk, ck)
);
And the following content:
.. code-block:: console
cqlsh> DELETE FROM ks.tbl WHERE pk = 0;
cqlsh> DELETE FROM ks.tbl WHERE pk = 0 AND ck > 0 AND ck < 2;
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 0, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 1, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 2, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 0, 0);
cqlsh> SELECT * FROM ks.tbl;
pk | ck | v
----+----+---
1 | 0 | 0
0 | 0 | 0
0 | 1 | 0
0 | 2 | 0
(4 rows)
Dump the content of the entire table
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: console
cqlsh> SELECT * FROM MUTATION_FRAGMENTS(ks.tbl);
pk | mutation_source | partition_region | ck | position_weight | metadata | mutation_fragment_kind | value
----+-----------------+------------------+----+-----------------+--------------------------------------------------------------------------------------------------------------------------+------------------------+-----------
1 | memtable:0 | 0 | | | {"tombstone":{}} | partition start | null
1 | memtable:0 | 2 | 0 | 0 | {"marker":{"timestamp":1688122873341627},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122873341627}}} | clustering row | {"v":"0"}
1 | memtable:0 | 3 | | | null | partition end | null
0 | memtable:0 | 0 | | | {"tombstone":{"timestamp":1688122848686316,"deletion_time":"2023-06-30 11:00:48z"}} | partition start | null
0 | memtable:0 | 2 | 0 | 0 | {"marker":{"timestamp":1688122860037077},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122860037077}}} | clustering row | {"v":"0"}
0 | memtable:0 | 2 | 0 | 1 | {"tombstone":{"timestamp":1688122853571709,"deletion_time":"2023-06-30 11:00:53z"}} | range tombstone change | null
0 | memtable:0 | 2 | 1 | 0 | {"marker":{"timestamp":1688122864641920},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122864641920}}} | clustering row | {"v":"0"}
0 | memtable:0 | 2 | 2 | -1 | {"tombstone":{}} | range tombstone change | null
0 | memtable:0 | 2 | 2 | 0 | {"marker":{"timestamp":1688122868706989},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122868706989}}} | clustering row | {"v":"0"}
0 | memtable:0 | 3 | | | null | partition end | null
(10 rows)
Dump the content of a single partition of interest
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: console
cqlsh> SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 1;
pk | mutation_source | partition_region | ck | position_weight | metadata | mutation_fragment_kind | value
----+-----------------+------------------+----+-----------------+--------------------------------------------------------------------------------------------------------------------------+------------------------+-----------
1 | memtable:0 | 0 | | | {"tombstone":{}} | partition start | null
1 | memtable:0 | 2 | 0 | 0 | {"marker":{"timestamp":1688122873341627},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122873341627}}} | clustering row | {"v":"0"}
1 | memtable:0 | 3 | | | null | partition end | null
(3 rows)
This works just like selecting a partition from the base table.
Mutation sources
^^^^^^^^^^^^^^^^
Note how after insertion, all data is in the memtable (see above). After flushing the memtable, this will look like this:
.. code-block:: console
cqlsh> SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 1;
pk | mutation_source | partition_region | ck | position_weight | metadata | mutation_fragment_kind | value
----+------------------------------------------------------------------------------------------------------------------+------------------+----+-----------------+--------------------------------------------------------------------------------------------------------------------------+------------------------+-----------
1 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 0 | | | {"tombstone":{}} | partition start | null
1 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 2 | 0 | 0 | {"marker":{"timestamp":1688122873341627},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122873341627}}} | clustering row | {"v":"0"}
1 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 3 | | | null | partition end | null
(3 rows)
After executing a read on the queried partition of the underlying table, the data will also be included in the row-cache:
.. code-block:: console
cqlsh> SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 1;
pk | mutation_source | partition_region | ck | position_weight | metadata | mutation_fragment_kind | value
----+------------------------------------------------------------------------------------------------------------------+------------------+----+-----------------+--------------------------------------------------------------------------------------------------------------------------+------------------------+-----------
1 | row-cache | 0 | | | {"tombstone":{}} | partition start | null
1 | row-cache | 2 | 0 | 0 | {"marker":{"timestamp":1688122873341627},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122873341627}}} | clustering row | {"v":"0"}
1 | row-cache | 3 | | | null | partition end | null
1 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 0 | | | {"tombstone":{}} | partition start | null
1 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 2 | 0 | 0 | {"marker":{"timestamp":1688122873341627},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122873341627}}} | clustering row | {"v":"0"}
1 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 3 | | | null | partition end | null
(6 rows)
It is possible to restrict the output to a single mutation source, or mutation source kind:
.. code-block:: console
cqlsh> SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 1 AND mutation_source = 'row-cache';
pk | mutation_source | partition_region | ck | position_weight | metadata | mutation_fragment_kind | value
----+-----------------+------------------+----+-----------------+--------------------------------------------------------------------------------------------------------------------------+------------------------+-----------
1 | row-cache | 0 | | | {"tombstone":{}} | partition start | null
1 | row-cache | 2 | 0 | 0 | {"marker":{"timestamp":1688122873341627},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122873341627}}} | clustering row | {"v":"0"}
1 | row-cache | 3 | | | null | partition end | null
(3 rows)
Filtering and Aggregation
^^^^^^^^^^^^^^^^^^^^^^^^^
Select only clustering elements:
.. code-block:: console
cqlsh> SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 0 AND partition_region = 2 ALLOW FILTERING;
pk | mutation_source | partition_region | ck | position_weight | metadata | mutation_fragment_kind | value
----+------------------------------------------------------------------------------------------------------------------+------------------+----+-----------------+--------------------------------------------------------------------------------------------------------------------------+------------------------+-----------
0 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 2 | 0 | 0 | {"marker":{"timestamp":1688122860037077},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122860037077}}} | clustering row | {"v":"0"}
0 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 2 | 0 | 1 | {"tombstone":{"timestamp":1688122853571709,"deletion_time":"2023-06-30 11:00:53z"}} | range tombstone change | null
0 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 2 | 1 | 0 | {"marker":{"timestamp":1688122864641920},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122864641920}}} | clustering row | {"v":"0"}
0 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 2 | 2 | -1 | {"tombstone":{}} | range tombstone change | null
0 | sstable:/var/lib/scylla/data/ks/tbl-259b2520104011ee822ed2e489876007/me-3g79_0ur3_48e402ejkwsvj7viqr-big-Data.db | 2 | 2 | 0 | {"marker":{"timestamp":1688122868706989},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122868706989}}} | clustering row | {"v":"0"}
(5 rows)
Count range tombstone changes:
.. code-block:: console
cqlsh> SELECT COUNT(*) FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 0 AND mutation_fragment_kind = 'range tombstone change' ALLOW FILTERING;
count
-------
2
(1 rows)

116
mutation/json.hh Normal file
View File

@@ -0,0 +1,116 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "dht/i_partitioner.hh"
#include "schema/schema_fwd.hh"
#include "utils/rjson.hh"
// has to be below the utils/rjson.hh include
#include <rapidjson/ostreamwrapper.h>
/*
* Utilities for converting mutations, mutation-fragments and their parts into json.
*/
class atomic_cell_or_collection;
class atomic_cell_view;
class counter_cell_view;
class row;
class row_marker;
class tombstone;
struct collection_mutation_view_description;
namespace mutation_json {
class json_writer {
using stream = rapidjson::BasicOStreamWrapper<std::ostream>;
using writer = rapidjson::Writer<stream, rjson::encoding, rjson::encoding, rjson::allocator>;
stream _stream;
writer _writer;
public:
json_writer(std::ostream& os = std::cout) : _stream(os), _writer(_stream)
{ }
writer& rjson_writer() { return _writer; }
// following the rapidjson method names here
bool Null() { return _writer.Null(); }
bool Bool(bool b) { return _writer.Bool(b); }
bool Int(int i) { return _writer.Int(i); }
bool Uint(unsigned i) { return _writer.Uint(i); }
bool Int64(int64_t i) { return _writer.Int64(i); }
bool Uint64(uint64_t i) { return _writer.Uint64(i); }
bool Double(double d) { return _writer.Double(d); }
bool RawNumber(std::string_view str) { return _writer.RawNumber(str.data(), str.size(), false); }
bool String(std::string_view str) { return _writer.String(str.data(), str.size(), false); }
bool StartObject() { return _writer.StartObject(); }
bool Key(std::string_view str) { return _writer.Key(str.data(), str.size(), false); }
bool EndObject(rapidjson::SizeType memberCount = 0) { return _writer.EndObject(memberCount); }
bool StartArray() { return _writer.StartArray(); }
bool EndArray(rapidjson::SizeType elementCount = 0) { return _writer.EndArray(elementCount); }
// scylla-specific extensions (still following rapidjson naming scheme for consistency)
template <typename T>
void AsString(const T& obj) {
String(fmt::format("{}", obj));
}
// partition or clustering key
template <typename KeyType>
void DataKey(const schema& schema, const KeyType& key, std::optional<dht::token> token = {}) {
StartObject();
if (token) {
Key("token");
AsString(*token);
}
Key("raw");
String(to_hex(key.representation()));
Key("value");
AsString(key.with_schema(schema));
EndObject();
}
void StartStream() {
StartObject();
Key("sstables");
StartObject();
}
void EndStream() {
EndObject();
EndObject();
}
};
class mutation_partition_json_writer {
const schema& _schema;
json_writer _writer;
private:
void write_each_collection_cell(const collection_mutation_view_description& mv, data_type type, std::function<void(atomic_cell_view, data_type)> func);
public:
explicit mutation_partition_json_writer(const schema& s, std::ostream& os = std::cout)
: _schema(s), _writer(os) {}
const schema& schema() const { return _schema; }
json_writer& writer() { return _writer; }
sstring to_string(gc_clock::time_point tp);
void write_atomic_cell_value(const atomic_cell_view& cell, data_type type);
void write_collection_value(const collection_mutation_view_description& mv, data_type type);
void write(gc_clock::duration ttl, gc_clock::time_point expiry);
void write(const tombstone& t);
void write(const row_marker& m);
void write(counter_cell_view cv);
void write(const atomic_cell_view& cell, data_type type, bool include_value = true);
void write(const collection_mutation_view_description& mv, data_type type, bool include_value = true);
void write(const atomic_cell_or_collection& cell, const column_definition& cdef, bool include_value = true);
void write(const row& r, column_kind kind, bool include_value = true);
};
} // namespace mutation_json

View File

@@ -9,6 +9,8 @@
#include "mutation.hh"
#include "query-result-writer.hh"
#include "mutation_rebuilder.hh"
#include "mutation/json.hh"
#include "types/tuple.hh"
mutation::data::data(dht::decorated_key&& key, schema_ptr&& schema)
: _schema(std::move(schema))
@@ -199,3 +201,168 @@ std::ostream& operator<<(std::ostream& os, const mutation& m) {
os << mutation_partition::printer(s, m.partition()) << "\n}";
return os;
}
namespace mutation_json {
void mutation_partition_json_writer::write_each_collection_cell(const collection_mutation_view_description& mv, data_type type,
std::function<void(atomic_cell_view, data_type)> func) {
std::function<void(size_t, bytes_view)> write_key;
std::function<void(size_t, atomic_cell_view)> write_value;
if (auto t = dynamic_cast<const collection_type_impl*>(type.get())) {
write_key = [this, t = t->name_comparator()] (size_t, bytes_view k) { _writer.String(t->to_string(k)); };
write_value = [t = t->value_comparator(), &func] (size_t, atomic_cell_view v) { func(v, t); };
} else if (auto t = dynamic_cast<const tuple_type_impl*>(type.get())) {
write_key = [this] (size_t i, bytes_view) { _writer.String(""); };
write_value = [t, &func] (size_t i, atomic_cell_view v) { func(v, t->type(i)); };
}
if (write_key && write_value) {
_writer.StartArray();
for (size_t i = 0; i < mv.cells.size(); ++i) {
_writer.StartObject();
_writer.Key("key");
write_key(i, mv.cells[i].first);
_writer.Key("value");
write_value(i, mv.cells[i].second);
_writer.EndObject();
}
_writer.EndArray();
} else {
_writer.Null();
}
}
sstring mutation_partition_json_writer::to_string(gc_clock::time_point tp) {
return fmt::format("{:%F %T}z", fmt::gmtime(gc_clock::to_time_t(tp)));
}
void mutation_partition_json_writer::write_atomic_cell_value(const atomic_cell_view& cell, data_type type) {
if (type->is_counter()) {
if (cell.is_counter_update()) {
_writer.Int64(cell.counter_update_value());
} else {
write(counter_cell_view(cell));
}
} else {
_writer.String(type->to_string(cell.value().linearize()));
}
}
void mutation_partition_json_writer::write_collection_value(const collection_mutation_view_description& mv, data_type type) {
write_each_collection_cell(mv, type, [&] (atomic_cell_view v, data_type t) { write_atomic_cell_value(v, t); });
}
void mutation_partition_json_writer::write(gc_clock::duration ttl, gc_clock::time_point expiry) {
_writer.Key("ttl");
_writer.AsString(ttl);
_writer.Key("expiry");
_writer.String(to_string(expiry));
}
void mutation_partition_json_writer::write(const tombstone& t) {
_writer.StartObject();
if (t) {
_writer.Key("timestamp");
_writer.Int64(t.timestamp);
_writer.Key("deletion_time");
_writer.String(to_string(t.deletion_time));
}
_writer.EndObject();
}
void mutation_partition_json_writer::write(const row_marker& m) {
_writer.StartObject();
_writer.Key("timestamp");
_writer.Int64(m.timestamp());
if (m.is_live() && m.is_expiring()) {
write(m.ttl(), m.expiry());
}
_writer.EndObject();
}
void mutation_partition_json_writer::write(counter_cell_view cv) {
_writer.StartArray();
for (const auto& shard : cv.shards()) {
_writer.StartObject();
_writer.Key("id");
_writer.AsString(shard.id());
_writer.Key("value");
_writer.Int64(shard.value());
_writer.Key("clock");
_writer.Int64(shard.logical_clock());
_writer.EndObject();
}
_writer.EndArray();
}
void mutation_partition_json_writer::write(const atomic_cell_view& cell, data_type type, bool include_value) {
_writer.StartObject();
_writer.Key("is_live");
_writer.Bool(cell.is_live());
_writer.Key("type");
if (type->is_counter()) {
if (cell.is_counter_update()) {
_writer.String("counter-update");
} else {
_writer.String("counter-shards");
}
} else if (type->is_collection()) {
_writer.String("frozen-collection");
} else {
_writer.String("regular");
}
_writer.Key("timestamp");
_writer.Int64(cell.timestamp());
if (!type->is_counter()) {
if (cell.is_live_and_has_ttl()) {
write(cell.ttl(), cell.expiry());
}
if (!cell.is_live()) {
_writer.Key("deletion_time");
_writer.String(to_string(cell.deletion_time()));
}
}
if (include_value && (type->is_counter() || cell.is_live())) {
_writer.Key("value");
write_atomic_cell_value(cell, type);
}
_writer.EndObject();
}
void mutation_partition_json_writer::write(const collection_mutation_view_description& mv, data_type type, bool include_value) {
_writer.StartObject();
if (mv.tomb) {
_writer.Key("tombstone");
write(mv.tomb);
}
_writer.Key("cells");
write_each_collection_cell(mv, type, [&] (atomic_cell_view v, data_type t) { write(v, t, include_value); });
_writer.EndObject();
}
void mutation_partition_json_writer::write(const atomic_cell_or_collection& cell, const column_definition& cdef, bool include_value) {
if (cdef.is_atomic()) {
write(cell.as_atomic_cell(cdef), cdef.type, include_value);
} else if (cdef.type->is_collection() || cdef.type->is_user_type()) {
cell.as_collection_mutation().with_deserialized(*cdef.type, [&, this] (collection_mutation_view_description mv) {
write(mv, cdef.type, include_value);
});
} else {
_writer.Null();
}
}
void mutation_partition_json_writer::write(const row& r, column_kind kind, bool include_value) {
_writer.StartObject();
r.for_each_cell([this, kind, include_value] (column_id id, const atomic_cell_or_collection& cell) {
auto cdef = _schema.column_at(kind, id);
_writer.Key(cdef.name_as_text());
write(cell, cdef, include_value);
});
_writer.EndObject();
}
} // namespace mutation_json

View File

@@ -8,7 +8,8 @@ target_sources(replica
distributed_loader.cc
memtable.cc
exceptions.cc
dirty_memory_manager.cc)
dirty_memory_manager.cc
mutation_dump.cc)
target_include_directories(replica
PUBLIC
${CMAKE_SOURCE_DIR})

View File

@@ -703,12 +703,24 @@ public:
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
lw_shared_ptr<sstables::sstable_set> sstables) const;
// Make a reader which reads only from the row-cache.
// The reader doens't populate the cache, it reads only what is in the cache
// Supports reading only a single partition.
// Does not support reading in reverse.
flat_mutation_reader_v2 make_nonpopulating_cache_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice, tracing::trace_state_ptr ts);
sstables::shared_sstable make_streaming_sstable_for_write(std::optional<sstring> subdir = {});
sstables::shared_sstable make_streaming_staging_sstable();
mutation_source as_mutation_source() const;
mutation_source as_mutation_source_excluding_staging() const;
// Select all memtables which contain this token and return them as mutation sources.
// We could return memtables here, but table has no public memtable accessors so far.
// Memtables are mutable objects, so it is best to keep it this way.
std::vector<mutation_source> select_memtables_as_mutation_sources(dht::token) const;
void set_virtual_reader(mutation_source virtual_reader) {
_virtual_reader = std::move(virtual_reader);
}

613
replica/mutation_dump.cc Normal file
View File

@@ -0,0 +1,613 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later)
*/
#include "multishard_mutation_query.hh"
#include "mutation/json.hh"
#include "mutation_query.hh"
#include "partition_slice_builder.hh"
#include "readers/foreign.hh"
#include "replica/database.hh"
#include "replica/mutation_dump.hh"
#include "replica/query_state.hh"
#include "schema/schema_builder.hh"
#include "schema/schema_registry.hh"
#include "sstables/sstables.hh"
namespace replica::mutation_dump {
namespace {
class mutation_dump_reader : public flat_mutation_reader_v2::impl {
struct mutation_source_with_params {
mutation_source ms;
std::vector<interval<partition_region>> region_intervals;
query::partition_slice slice;
};
using exploded_clustering_range = interval<std::vector<bytes>>;
private:
replica::database& _db;
dht::decorated_key _dk;
query::partition_slice _ps;
tracing::trace_state_ptr _ts;
std::vector<position_range> _pos_ranges;
schema_ptr _underlying_schema;
dht::partition_range _underlying_pr;
// has to be sorted, because it is part of the clustering key
std::map<sstring, mutation_source_with_params> _underlying_mutation_sources;
flat_mutation_reader_v2_opt _underlying_reader;
bool _partition_start_emitted = false;
bool _partition_end_emitted = false;
private:
static void set_cell(const ::schema& schema, row& cr, const column_definition& cdef, data_value value) {
auto ts = api::new_timestamp();
if (!value.is_null()) {
cr.apply(cdef, atomic_cell::make_live(*cdef.type, ts, value.serialize_nonnull()));
}
}
static void set_cell(const ::schema& schema, row& cr, const bytes& column_name, data_value value) {
auto cdef = schema.get_column_definition(column_name);
set_cell(schema, cr, *cdef, std::move(value));
}
std::map<sstring, mutation_source> create_all_mutation_sources() {
std::map<sstring, mutation_source> all_mutation_sources;
auto& tbl = _db.find_column_family(_underlying_schema);
{
auto mss = tbl.select_memtables_as_mutation_sources(_dk.token());
for (size_t i = 0; i < mss.size(); ++i) {
auto current_source = format("memtable:{}", i);
all_mutation_sources.emplace(std::move(current_source), mss[i]);
}
}
all_mutation_sources.emplace("row-cache", mutation_source([&tbl] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
tracing::trace_state_ptr ts,
streamed_mutation::forwarding,
mutation_reader::forwarding) {
return tbl.make_nonpopulating_cache_reader(std::move(schema), std::move(permit), pr, ps, ts);
}));
{
auto ssts = tbl.select_sstables(_underlying_pr);
for (size_t i = 0; i < ssts.size(); ++i) {
auto current_source = format("sstable:{}", ssts[i]->get_filename());
all_mutation_sources.emplace(std::move(current_source), ssts[i]->as_mutation_source());
}
}
return all_mutation_sources;
}
template <typename T>
interval<T> transform_range(const exploded_clustering_range& cr, unsigned i) {
const auto& ck_types = _schema->clustering_key_type()->types();
auto transform_bound = [&] (std::optional<exploded_clustering_range::bound> b) -> std::optional<typename interval<T>::bound> {
if (!b) {
return {};
}
const auto& exploded_ck = b->value();
if (exploded_ck.size() <= i) {
return {};
}
const auto force_inclusive = exploded_ck.size() > i + 1;
return typename interval<T>::bound(value_cast<T>(ck_types.at(i)->deserialize_value(exploded_ck[i])), force_inclusive || b->is_inclusive());
};
return interval<T>(transform_bound(cr.start()), transform_bound(cr.end()));
}
query::clustering_range transform_to_underlying_cr(const exploded_clustering_range& cr) {
const auto& ck_types = _underlying_schema->clustering_key_type()->types();
auto transform_range_bound = [&] (std::optional<exploded_clustering_range::bound> b, bool is_end) -> std::optional<query::clustering_range::bound> {
if (!b) {
return {};
}
const auto& exploded_ck = b->value();
if (exploded_ck.size() < 3) {
return {};
}
const auto underlying_ck_begin = exploded_ck.begin() + 2;
const auto underlying_ck_end = underlying_ck_begin + std::min(exploded_ck.size() - 2, ck_types.size());
auto underlying_ck = clustering_key::from_exploded(std::vector<bytes>(underlying_ck_begin, underlying_ck_end));
bool is_inclusive = b->is_inclusive();
// Check if inclusiveness override is needed because of position weight
if (exploded_ck.size() == ck_types.size() + 3) {
const auto pos_weight = value_cast<int8_t>(byte_type->deserialize_value(exploded_ck.back()));
if (pos_weight < 0) {
if (!is_end) {
is_inclusive = true;
}
if (is_end) {
is_inclusive = false;
}
} else if (pos_weight > 0) {
if (!is_end) {
is_inclusive = false;
}
if (is_end) {
is_inclusive = true;
}
}
}
return query::clustering_range::bound(std::move(underlying_ck), is_inclusive);
};
return query::clustering_range(transform_range_bound(cr.start(), false), transform_range_bound(cr.end(), true));
}
void create_underlying_mutation_sources() {
const auto all_mutation_sources = create_all_mutation_sources();
const auto ms_end = all_mutation_sources.end();
const auto& ranges = _ps.row_ranges(*_schema, _dk.key());
struct mutation_source_with_slice_parts {
mutation_source_opt ms;
std::vector<interval<partition_region>> region_intervals;
std::vector<query::clustering_range> underlying_crs;
};
std::map<sstring, mutation_source_with_slice_parts> prepared_mutation_sources;
auto maybe_push = [] (auto& intervals, auto&& new_interval, const auto& cmp) {
if (intervals.empty() || !intervals.back().equal(new_interval, cmp)) {
intervals.push_back(std::move(new_interval));
}
};
for (const auto& cr : ranges) {
const auto exploded_cr = cr.transform([this] (const clustering_key& ck) {
auto elements = ck.explode(*_schema);
while (!elements.empty() && elements.back().empty()) {
elements.pop_back();
}
return elements;
});
auto ms_it = all_mutation_sources.begin();
const auto ms_int = transform_range<sstring>(exploded_cr, 0);
while (ms_it != ms_end && ms_int.before(ms_it->first, std::compare_three_way{})) {
++ms_it;
}
if (ms_it == ms_end) {
continue;
}
const auto region_int = transform_range<int8_t>(exploded_cr, 1).transform([] (int8_t v) { return partition_region(v); });
const auto transformed_cr = transform_to_underlying_cr(exploded_cr);
while (ms_it != ms_end && ms_int.contains(ms_it->first, std::compare_three_way{})) {
auto& e = prepared_mutation_sources[ms_it->first];
e.ms = ms_it->second;
maybe_push(e.region_intervals, region_int, std::compare_three_way{});
maybe_push(e.underlying_crs, std::move(transformed_cr), clustering_key_view::tri_compare(*_underlying_schema));
++ms_it;
}
}
for (auto& [name, ms] : prepared_mutation_sources) {
auto slice = partition_slice_builder(*_underlying_schema).with_ranges(std::move(ms.underlying_crs)).build();
_underlying_mutation_sources.emplace(name,
mutation_source_with_params{std::move(*ms.ms), std::move(ms.region_intervals), std::move(slice)});
}
}
clustering_key transform_clustering_key(position_in_partition_view pos, const sstring& data_source_name) {
const auto& underlying_ck_types = _underlying_schema->clustering_key_type()->types();
const auto underlying_ck_raw_values = pos.has_key() ? pos.key().explode(*_underlying_schema) : std::vector<bytes>{};
std::vector<bytes> output_ck_raw_values;
output_ck_raw_values.push_back(data_value(data_source_name).serialize_nonnull());
output_ck_raw_values.push_back(data_value(static_cast<int8_t>(pos.region())).serialize_nonnull());
for (unsigned i = 0; i < underlying_ck_types.size(); ++i) {
if (i < underlying_ck_raw_values.size()) {
output_ck_raw_values.emplace_back(underlying_ck_raw_values[i]);
} else {
output_ck_raw_values.emplace_back(bytes{});
}
}
if (underlying_ck_raw_values.empty()) {
output_ck_raw_values.push_back(bytes{});
} else {
output_ck_raw_values.push_back(data_value(static_cast<int8_t>(pos.get_bound_weight())).serialize_nonnull());
}
return clustering_key::from_exploded(*_schema, output_ck_raw_values);
}
void add_metadata_column(mutation_fragment_v2& mf, row& r, const column_definition& cdef) {
std::stringstream ss;
mutation_json::mutation_partition_json_writer writer(*_underlying_schema, ss);
switch (mf.mutation_fragment_kind()) {
case mutation_fragment_v2::kind::partition_start:
writer.writer().StartObject();
writer.writer().Key("tombstone");
writer.write(mf.as_partition_start().partition_tombstone());
writer.writer().EndObject();
break;
case mutation_fragment_v2::kind::static_row:
writer.write(mf.as_static_row().cells(), column_kind::static_column, false);
break;
case mutation_fragment_v2::kind::clustering_row:
{
writer.writer().StartObject();
auto& cr = mf.as_clustering_row();
if (cr.tomb()) {
writer.writer().Key("tombstone");
writer.write(cr.tomb().regular());
writer.writer().Key("shadowable_tombstone");
writer.write(cr.tomb().shadowable().tomb());
}
if (!cr.marker().is_missing()) {
writer.writer().Key("marker");
writer.write(cr.marker());
}
writer.writer().Key("columns");
writer.write(cr.cells(), column_kind::regular_column, false);
writer.writer().EndObject();
}
break;
case mutation_fragment_v2::kind::range_tombstone_change:
writer.writer().StartObject();
writer.writer().Key("tombstone");
writer.write(mf.as_range_tombstone_change().tombstone());
writer.writer().EndObject();
break;
case mutation_fragment_v2::kind::partition_end:
// No value set.
break;
}
if (!ss.str().empty()) {
set_cell(*_schema, r, cdef, ss.str());
}
}
void add_value_column(mutation_fragment_v2& mf, row& r, const column_definition& cdef) {
column_kind kind;
const row* value;
if (mf.is_static_row()) {
kind = column_kind::static_column;
value = &mf.as_static_row().cells();
} else if (mf.is_clustering_row()) {
kind = column_kind::regular_column;
value = &mf.as_clustering_row().cells();
} else {
return;
}
std::stringstream ss;
mutation_json::mutation_partition_json_writer writer(*_underlying_schema, ss);
writer.writer().StartObject();
value->for_each_cell([this, kind, &writer] (column_id id, const atomic_cell_or_collection& cell) {
auto& cdef = _underlying_schema->column_at(kind, id);
writer.writer().Key(cdef.name_as_text());
if (cdef.is_atomic()) {
writer.write_atomic_cell_value(cell.as_atomic_cell(cdef), cdef.type);
} else if (cdef.type->is_collection() || cdef.type->is_user_type()) {
cell.as_collection_mutation().with_deserialized(*cdef.type, [&] (collection_mutation_view_description mv) {
writer.write_collection_value(mv, cdef.type);
});
} else {
writer.writer().Null();
}
});
writer.writer().EndObject();
set_cell(*_schema, r, cdef, ss.str());
}
mutation_fragment_v2 transform_mutation_fragment(mutation_fragment_v2& mf, const sstring& data_source_name) {
auto ck = transform_clustering_key(mf.position(), data_source_name);
auto cr_out = clustering_row(ck);
set_cell(*_schema, cr_out.cells(), "mutation_fragment_kind", fmt::to_string(mf.mutation_fragment_kind()));
if (!mf.is_end_of_partition()) {
auto metadata_cdef = *_schema->get_column_definition("metadata");
if (std::ranges::find(_ps.regular_columns, metadata_cdef.id) != _ps.regular_columns.end()) {
add_metadata_column(mf, cr_out.cells(), metadata_cdef);
}
auto value_cdef = *_schema->get_column_definition("value");
if (std::ranges::find(_ps.regular_columns, value_cdef.id) != _ps.regular_columns.end()) {
add_value_column(mf, cr_out.cells(), value_cdef);
}
}
return mutation_fragment_v2(*_schema, _permit, std::move(cr_out));
}
void partition_not_empty() {
if (!_partition_start_emitted) {
push_mutation_fragment(*_schema, _permit, partition_start(_dk, {}));
_partition_start_emitted = true;
}
}
future<> do_fill_buffer() {
auto cmp = clustering_key::prefix_equal_tri_compare(*_schema);
while (!is_buffer_full() && !_underlying_mutation_sources.empty()) {
auto ms_begin = _underlying_mutation_sources.begin();
if (!_underlying_reader) {
_underlying_reader = ms_begin->second.ms.make_reader_v2(_underlying_schema, _permit, _underlying_pr, ms_begin->second.slice,
_ts, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
}
auto mf_opt = co_await (*_underlying_reader)();
auto contains_mf_region = [&] (const interval<partition_region>& prr) {
return prr.contains(mf_opt->position().region(), std::compare_three_way{});
};
if (mf_opt && std::ranges::any_of(ms_begin->second.region_intervals, contains_mf_region)) {
partition_not_empty();
push_mutation_fragment(transform_mutation_fragment(*mf_opt, ms_begin->first));
} else if (!mf_opt || ms_begin->second.region_intervals.back().after(mf_opt->position().region(), std::compare_three_way{})) {
// The reader is at EOS or provided all interesting fragments already.
co_await _underlying_reader->close();
_underlying_reader = {};
_underlying_mutation_sources.erase(ms_begin);
continue;
}
}
}
public:
mutation_dump_reader(schema_ptr output_schema, schema_ptr underlying_schema, reader_permit permit, replica::database& db, const dht::decorated_key& dk, const query::partition_slice& ps, tracing::trace_state_ptr ts)
: impl(std::move(output_schema), std::move(permit))
, _db(db)
, _dk(dk)
, _ps(ps)
, _ts(std::move(ts))
, _underlying_schema(std::move(underlying_schema))
, _underlying_pr(dht::partition_range::make_singular(dk))
{
create_underlying_mutation_sources();
}
virtual future<> fill_buffer() override {
co_await do_fill_buffer();
_end_of_stream = _underlying_mutation_sources.empty();
if (_end_of_stream && _partition_start_emitted && !_partition_end_emitted) {
push_mutation_fragment(*_schema, _permit, partition_end{});
_partition_end_emitted = true;
}
}
virtual future<> next_partition() override { throw std::bad_function_call(); }
virtual future<> fast_forward_to(const dht::partition_range&) override { throw std::bad_function_call(); }
virtual future<> fast_forward_to(position_range) override { throw std::bad_function_call(); }
virtual future<> close() noexcept override {
if (_underlying_reader) {
return _underlying_reader->close();
}
return make_ready_future<>();
}
};
future<flat_mutation_reader_v2> make_partition_mutation_dump_reader(
schema_ptr output_schema,
schema_ptr underlying_schema,
reader_permit permit,
distributed<replica::database>& db,
const dht::decorated_key& dk,
const query::partition_slice& ps,
tracing::trace_state_ptr ts,
db::timeout_clock::time_point timeout) {
const auto& tbl = db.local().find_column_family(underlying_schema);
const auto shard = tbl.shard_of(dk.token());
if (shard == this_shard_id()) {
co_return make_flat_mutation_reader_v2<mutation_dump_reader>(std::move(output_schema), std::move(underlying_schema), std::move(permit),
db.local(), dk, ps, std::move(ts));
}
auto gos = global_schema_ptr(output_schema);
auto gus = global_schema_ptr(underlying_schema);
auto gts = tracing::global_trace_state_ptr(ts);
auto remote_reader = co_await db.invoke_on(shard,
[gos = std::move(gos), gus = std::move(gus), &dk, &ps, gts = std::move(gts), timeout] (replica::database& local_db) -> future<foreign_ptr<std::unique_ptr<flat_mutation_reader_v2>>> {
auto output_schema = gos.get();
auto underlying_schema = gus.get();
auto ts = gts.get();
auto permit = co_await local_db.obtain_reader_permit(underlying_schema, "mutation-dump-remote-read", timeout, ts);
auto reader = make_flat_mutation_reader_v2<mutation_dump_reader>(std::move(output_schema), std::move(underlying_schema), std::move(permit),
local_db, dk, ps, std::move(ts));
co_return make_foreign(std::make_unique<flat_mutation_reader_v2>(std::move(reader)));
});
co_return make_foreign_reader(std::move(output_schema), std::move(permit), std::move(remote_reader));
}
class multi_range_partition_generator {
distributed<replica::database>& _db;
schema_ptr _schema;
circular_buffer<dht::partition_range> _prs;
tracing::trace_state_ptr _ts;
db::timeout_clock::time_point _timeout;
query::read_command _cmd;
circular_buffer<dht::decorated_key> _dks;
private:
query::partition_slice make_slice() {
return partition_slice_builder(*_schema)
.without_clustering_key_columns()
.with_no_static_columns()
.with_no_regular_columns()
.with_option<query::partition_slice::option::send_partition_key>()
.with_option<query::partition_slice::option::allow_short_read>()
.with_option<query::partition_slice::option::bypass_cache>()
.build();
}
future<> read_next_page() {
const dht::partition_range_vector prs{_prs.front()};
auto res = co_await query_mutations_on_all_shards(_db, _schema, _cmd, prs, _ts, _timeout);
const auto& rr = std::get<0>(res);
for (const auto& p : rr->partitions()) {
auto mut = p.mut().unfreeze(_schema);
_dks.emplace_back(mut.decorated_key());
}
_cmd.is_first_page = query::is_first_page::no;
if (rr->is_short_read() || _dks.size() >= _cmd.partition_limit) {
auto cmp = dht::ring_position_comparator(*_schema);
if (auto r_opt = _prs.front().trim_front(dht::partition_range::bound(_dks.back(), false), cmp); r_opt) {
_prs.front() = std::move(*r_opt);
co_return;
}
}
// fallback: range is exhausted
_prs.pop_front();
_cmd.query_uuid = query_id::create_random_id();
_cmd.is_first_page = query::is_first_page::yes;
}
future<> fill_dk_buffer_from_current_range() {
if (_prs.empty()) {
co_return;
}
if (_prs.front().is_singular()) {
_dks.emplace_back(_prs.front().start()->value().as_decorated_key());
_prs.pop_front();
co_return;
}
co_await read_next_page();
}
public:
multi_range_partition_generator(distributed<replica::database>& db, schema_ptr schema, const dht::partition_range_vector& prs,
tracing::trace_state_ptr ts, db::timeout_clock::time_point timeout)
: _db(db)
, _schema(std::move(schema))
, _ts(std::move(ts))
, _timeout(timeout)
, _cmd(
_schema->id(),
_schema->version(),
make_slice(),
query::max_result_size(query::result_memory_limiter::maximum_result_size),
query::tombstone_limit(1000),
query::row_limit::max,
query::partition_limit(1000),
gc_clock::now(),
tracing::make_trace_info(_ts),
query_id::create_random_id(),
query::is_first_page::yes)
{
_prs.reserve(prs.size());
std::copy(prs.begin(), prs.end(), std::back_inserter(_prs));
}
future<std::optional<dht::decorated_key>> operator()() {
while (_dks.empty() && !_prs.empty()) {
co_await fill_dk_buffer_from_current_range();
}
if (_dks.empty()) {
co_return std::nullopt;
}
auto ret = std::optional(std::move(_dks.front()));
_dks.pop_front();
co_return std::move(ret);
}
};
noncopyable_function<future<std::optional<dht::decorated_key>>()>
make_partition_key_generator(distributed<replica::database>& db, schema_ptr schema, const dht::partition_range_vector& prs,
tracing::trace_state_ptr ts, db::timeout_clock::time_point timeout) {
if (prs.size() == 1 && prs.front().is_singular()) {
auto dk_opt = std::optional(prs.front().start()->value().as_decorated_key());
return [dk_opt = std::move(dk_opt)] () mutable {
return make_ready_future<std::optional<dht::decorated_key>>(std::exchange(dk_opt, std::nullopt));
};
}
return multi_range_partition_generator(db, std::move(schema), prs, std::move(ts), timeout);
}
} // anonymous namespace
schema_ptr generate_output_schema_from_underlying_schema(schema_ptr underlying_schema) {
const auto& ks = underlying_schema->ks_name();
const auto tbl = format("{}_$mutation_fragments", underlying_schema->cf_name());
auto sb = schema_builder(ks, tbl, generate_legacy_id(ks, tbl));
// partition key
for (const auto& pk_col : underlying_schema->partition_key_columns()) {
sb.with_column(pk_col.name(), pk_col.type, column_kind::partition_key);
}
// clustering key
sb.with_column("mutation_source", utf8_type, column_kind::clustering_key);
sb.with_column("partition_region", byte_type, column_kind::clustering_key);
for (const auto& ck_col : underlying_schema->clustering_key_columns()) {
sb.with_column(ck_col.name(), ck_col.type, column_kind::clustering_key);
}
sb.with_column("position_weight", byte_type, column_kind::clustering_key);
// regular columns
sb.with_column("mutation_fragment_kind", utf8_type);
sb.with_column("metadata", utf8_type);
sb.with_column("value", utf8_type);
md5_hasher h;
feed_hash(h, underlying_schema->id());
feed_hash(h, sb.uuid());
feed_hash(h, 0); // bump this on modifications to the schema
sb.with_version(table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize())));
return sb.build();
}
future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
sharded<database>& db,
schema_ptr output_schema,
schema_ptr underlying_schema,
const dht::partition_range_vector& prs,
const query::read_command& cmd,
db::timeout_clock::time_point timeout) {
// Should be enforced on the CQL level, but double-check here to be sure.
if (cmd.slice.is_reversed()) {
throw std::runtime_error("reverse reads are not supported");
}
tracing::trace_state_ptr ts;
if (cmd.trace_info) {
ts = tracing::tracing::get_local_tracing_instance().create_session(*cmd.trace_info);
tracing::begin(ts);
}
auto permit = co_await db.local().obtain_reader_permit(underlying_schema, "mutation-dump", timeout, ts);
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : db.local().get_unlimited_query_max_result_size();
permit.set_max_result_size(max_result_size);
const auto opts = query::result_options::only_result();
const auto short_read_allowed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
auto accounter = co_await db.local().get_result_memory_limiter().new_data_read(permit.max_result_size(), short_read_allowed);
query_state qs(output_schema, cmd, opts, prs, std::move(accounter));
auto compaction_state = make_lw_shared<compact_for_query_state_v2>(*output_schema, qs.cmd.timestamp, qs.cmd.slice, qs.remaining_rows(), qs.remaining_partitions());
auto partition_key_generator = make_partition_key_generator(db, underlying_schema, prs, ts, timeout);
auto dk_opt = co_await partition_key_generator();
while (dk_opt) {
auto reader_consumer = compact_for_query_v2<query_result_builder>(compaction_state, query_result_builder(*output_schema, qs.builder));
auto reader = co_await make_partition_mutation_dump_reader(output_schema, underlying_schema, permit, db, *dk_opt, cmd.slice, ts, timeout);
std::exception_ptr ex;
try {
co_await reader.consume(std::move(reader_consumer));
} catch (...) {
ex = std::current_exception();
}
co_await reader.close();
if (ex) {
std::rethrow_exception(std::move(ex));
}
dk_opt = co_await partition_key_generator();
}
co_return make_lw_shared<query::result>(qs.builder.build(compaction_state->current_full_position()));
}
} // namespace replica::mutation_dump

26
replica/mutation_dump.hh Normal file
View File

@@ -0,0 +1,26 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later)
*/
#pragma once
#include "db/timeout_clock.hh"
#include "query-result.hh"
namespace replica::mutation_dump {
schema_ptr generate_output_schema_from_underlying_schema(schema_ptr underlying_schema);
future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
sharded<database>& db,
schema_ptr output_schema, // must have been generated from `underlying_schema`, with `generate_output_schema_from_underlying_schema()`
schema_ptr underlying_schema,
const dht::partition_range_vector& pr,
const query::read_command& cmd,
db::timeout_clock::time_point timeout);
} // namespace replica::mutation_dump

50
replica/query_state.hh Normal file
View File

@@ -0,0 +1,50 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "query-request.hh"
#include "query-result.hh"
#include "query-result-writer.hh"
namespace replica {
struct query_state {
explicit query_state(schema_ptr s,
const query::read_command& cmd,
query::result_options opts,
const dht::partition_range_vector& ranges,
query::result_memory_accounter memory_accounter)
: schema(std::move(s))
, cmd(cmd)
, builder(cmd.slice, opts, std::move(memory_accounter), cmd.tombstone_limit)
, limit(cmd.get_row_limit())
, partition_limit(cmd.partition_limit)
, current_partition_range(ranges.begin())
, range_end(ranges.end()){
}
schema_ptr schema;
const query::read_command& cmd;
query::result::builder builder;
uint64_t limit;
uint32_t partition_limit;
bool range_empty = false; // Avoid ubsan false-positive when moving after construction
dht::partition_range_vector::const_iterator current_partition_range;
dht::partition_range_vector::const_iterator range_end;
uint64_t remaining_rows() const {
return limit - builder.row_count();
}
uint32_t remaining_partitions() const {
return partition_limit - builder.partition_count();
}
bool done() const {
return !remaining_rows() || !remaining_partitions() || current_partition_range == range_end || builder.is_short_read();
}
};
} // namespace replica

View File

@@ -18,6 +18,7 @@
#include "replica/database.hh"
#include "replica/data_dictionary_impl.hh"
#include "replica/compaction_group.hh"
#include "replica/query_state.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "db/schema_tables.hh"
@@ -332,6 +333,14 @@ flat_mutation_reader_v2 table::make_streaming_reader(schema_ptr schema, reader_p
std::move(trace_state), fwd, fwd_mr);
}
flat_mutation_reader_v2 table::make_nonpopulating_cache_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice, tracing::trace_state_ptr ts) {
if (!range.is_singular()) {
throw std::runtime_error("table::make_cache_reader(): only singular ranges are supported");
}
return _cache.make_nonpopulating_reader(std::move(schema), std::move(permit), range, slice, std::move(ts));
}
future<std::vector<locked_cell>> table::lock_counter_cells(const mutation& m, db::timeout_clock::time_point timeout) {
assert(m.schema() == _counter_cell_locks->schema());
return _counter_cell_locks->lock_cells(m.decorated_key(), partition_cells_range(m.partition()), timeout);
@@ -2312,39 +2321,6 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables::
});
}
struct query_state {
explicit query_state(schema_ptr s,
const query::read_command& cmd,
query::result_options opts,
const dht::partition_range_vector& ranges,
query::result_memory_accounter memory_accounter)
: schema(std::move(s))
, cmd(cmd)
, builder(cmd.slice, opts, std::move(memory_accounter), cmd.tombstone_limit)
, limit(cmd.get_row_limit())
, partition_limit(cmd.partition_limit)
, current_partition_range(ranges.begin())
, range_end(ranges.end()){
}
schema_ptr schema;
const query::read_command& cmd;
query::result::builder builder;
uint64_t limit;
uint32_t partition_limit;
bool range_empty = false; // Avoid ubsan false-positive when moving after construction
dht::partition_range_vector::const_iterator current_partition_range;
dht::partition_range_vector::const_iterator range_end;
uint64_t remaining_rows() const {
return limit - builder.row_count();
}
uint32_t remaining_partitions() const {
return partition_limit - builder.partition_count();
}
bool done() const {
return !remaining_rows() || !remaining_partitions() || current_partition_range == range_end || builder.is_short_read();
}
};
future<lw_shared_ptr<query::result>>
table::query(schema_ptr s,
reader_permit permit,
@@ -2723,6 +2699,16 @@ table::as_mutation_source_excluding_staging() const {
});
}
std::vector<mutation_source> table::select_memtables_as_mutation_sources(dht::token token) const {
auto& cg = compaction_group_for_token(token);
std::vector<mutation_source> mss;
mss.reserve(cg.memtables()->size());
for (auto& mt : *cg.memtables()) {
mss.emplace_back(mt->as_data_source());
}
return mss;
}
class compaction_group::table_state : public compaction::table_state {
table& _t;
compaction_group& _cg;

View File

@@ -23,6 +23,7 @@
#include "readers/forwardable_v2.hh"
#include "readers/nonforwardable.hh"
#include "cache_flat_mutation_reader.hh"
#include "partition_snapshot_reader.hh"
#include "clustering_key_filter.hh"
namespace cache {
@@ -777,6 +778,45 @@ row_cache::make_reader_opt(schema_ptr s,
}
}
flat_mutation_reader_v2 row_cache::make_nonpopulating_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice, tracing::trace_state_ptr ts) {
if (!range.is_singular()) {
throw std::runtime_error("row_cache::make_nonpopulating_reader(): only singular ranges are supported");
}
struct dummy_accounter {
void operator()(const clustering_row&) {}
void operator()(const static_row&) {}
void operator()(const range_tombstone_change&) {}
void operator()(const partition_start&) {}
void operator()(const partition_end&) {}
};
return _read_section(_tracker.region(), [&] () -> flat_mutation_reader_v2 {
dht::ring_position_comparator cmp(*_schema);
auto&& pos = range.start()->value();
partitions_type::bound_hint hint;
auto i = _partitions.lower_bound(pos, cmp, hint);
if (hint.match) {
cache_entry& e = *i;
upgrade_entry(e);
tracing::trace(ts, "Reading partition {} from cache", pos);
return make_partition_snapshot_flat_reader<false, dummy_accounter>(
schema,
std::move(permit),
e.key(),
query::clustering_key_filter_ranges(slice.row_ranges(*schema, e.key().key())),
e.partition().read(_tracker.region(), _tracker.memtable_cleaner(), nullptr, phase_of(pos)),
false,
_tracker.region(),
_read_section,
{},
streamed_mutation::forwarding::no);
} else {
tracing::trace(ts, "Partition {} is not found in cache", pos);
return make_empty_flat_reader_v2(std::move(schema), std::move(permit));
}
});
}
row_cache::~row_cache() {
with_allocator(_tracker.allocator(), [this] {
_partitions.clear_and_dispose([this] (cache_entry* p) mutable noexcept {

View File

@@ -392,6 +392,12 @@ public:
streamed_mutation::forwarding::no, mutation_reader::forwarding::no, gc_state);
}
// Only reads what is in the cache, doesn't populate.
// Supports reading singular ranges only, for now.
// Does not support reading in reverse.
flat_mutation_reader_v2 make_nonpopulating_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice, tracing::trace_state_ptr ts);
const stats& stats() const { return _stats; }
public:
// Populate cache from given mutation, which must be fully continuous.

View File

@@ -21,10 +21,19 @@
namespace service {
class storage_proxy;
class storage_proxy_coordinator_query_options;
class storage_proxy_coordinator_query_result;
namespace pager {
using query_function = std::function<future<exceptions::coordinator_result<service::storage_proxy_coordinator_query_result>>(
service::storage_proxy& sp,
schema_ptr schema,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
service::storage_proxy_coordinator_query_options optional_params)>;
/**
* Perform a query, paging it by page of a given size.
*
@@ -76,12 +85,16 @@ protected:
std::optional<db::read_repair_decision> _query_read_repair_decision;
uint64_t _rows_fetched_for_last_partition = 0;
stats _stats;
query_function _query_function;
public:
query_pager(service::storage_proxy& p, schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector ranges);
dht::partition_range_vector ranges,
query_function query_function_override = {});
virtual ~query_pager() {}
/**

View File

@@ -45,7 +45,8 @@ query_pager::query_pager(service::storage_proxy& p, schema_ptr s,
service::query_state& state,
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector ranges)
dht::partition_range_vector ranges,
query_function query_function_override)
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
, _max(cmd->get_row_limit())
, _per_partition_limit(cmd->slice.partition_row_limit())
@@ -57,7 +58,21 @@ query_pager::query_pager(service::storage_proxy& p, schema_ptr s,
, _options(options)
, _cmd(std::move(cmd))
, _ranges(std::move(ranges))
{}
{
if (query_function_override) {
_query_function = std::move(query_function_override);
} else {
_query_function = [] (
service::storage_proxy& sp,
schema_ptr schema,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
service::storage_proxy_coordinator_query_options optional_params) {
return sp.query_result(std::move(schema), std::move(cmd), std::move(partition_ranges), cl, std::move(optional_params));
};
}
}
future<result<service::storage_proxy::coordinator_query_result>> query_pager::do_fetch_page(uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout) {
auto state = _options.get_paging_state();
@@ -179,7 +194,9 @@ future<result<service::storage_proxy::coordinator_query_result>> query_pager::do
auto ranges = _ranges;
auto command = ::make_lw_shared<query::read_command>(*_cmd);
return _proxy->query_result(_schema,
return _query_function(
*_proxy,
_schema,
std::move(command),
std::move(ranges),
_options.get_consistency(),
@@ -244,8 +261,9 @@ public:
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector ranges,
::shared_ptr<const cql3::restrictions::statement_restrictions> filtering_restrictions)
: query_pager(p, s, selection, state, options, std::move(cmd), std::move(ranges))
::shared_ptr<const cql3::restrictions::statement_restrictions> filtering_restrictions,
query_function query_function_override)
: query_pager(p, s, selection, state, options, std::move(cmd), std::move(ranges), std::move(query_function_override))
, _filtering_restrictions(std::move(filtering_restrictions))
{}
virtual ~filtering_query_pager() {}
@@ -469,7 +487,8 @@ std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager
service::query_state& state, const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector ranges,
::shared_ptr<const cql3::restrictions::statement_restrictions> filtering_restrictions) {
::shared_ptr<const cql3::restrictions::statement_restrictions> filtering_restrictions,
query_function query_function_override) {
// If partition row limit is applied to paging, we still need to fall back
// to filtering the results to avoid extraneous rows on page breaks.
if (!filtering_restrictions && cmd->slice.partition_row_limit() < query::max_rows_if_set) {
@@ -477,10 +496,10 @@ std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager
}
if (filtering_restrictions) {
return std::make_unique<filtering_query_pager>(proxy, std::move(s), std::move(selection), state,
options, std::move(cmd), std::move(ranges), std::move(filtering_restrictions));
options, std::move(cmd), std::move(ranges), std::move(filtering_restrictions), std::move(query_function_override));
}
return std::make_unique<query_pager>(proxy, std::move(s), std::move(selection), state,
options, std::move(cmd), std::move(ranges));
options, std::move(cmd), std::move(ranges), std::move(query_function_override));
}
::shared_ptr<service::pager::query_pager> service::pager::query_pagers::ghost_row_deleting_pager(

View File

@@ -37,7 +37,8 @@ public:
const cql3::query_options&,
lw_shared_ptr<query::read_command>,
dht::partition_range_vector,
::shared_ptr<const cql3::restrictions::statement_restrictions> filtering_restrictions = nullptr);
::shared_ptr<const cql3::restrictions::statement_restrictions> filtering_restrictions = nullptr,
query_function query_function_override = {});
static ::shared_ptr<query_pager> ghost_row_deleting_pager(schema_ptr,
shared_ptr<const cql3::selection::selection>,
service::query_state&,

View File

@@ -101,6 +101,37 @@ using allow_hints = bool_class<allow_hints_tag>;
using is_cancellable = bool_class<struct cancellable_tag>;
using storage_proxy_clock_type = lowres_clock;
class storage_proxy_coordinator_query_options {
storage_proxy_clock_type::time_point _timeout;
public:
service_permit permit;
client_state& cstate;
tracing::trace_state_ptr trace_state = nullptr;
replicas_per_token_range preferred_replicas;
std::optional<db::read_repair_decision> read_repair_decision;
storage_proxy_coordinator_query_options(storage_proxy_clock_type::time_point timeout,
service_permit permit_,
client_state& client_state_,
tracing::trace_state_ptr trace_state = nullptr,
replicas_per_token_range preferred_replicas = { },
std::optional<db::read_repair_decision> read_repair_decision = { })
: _timeout(timeout)
, permit(std::move(permit_))
, cstate(client_state_)
, trace_state(std::move(trace_state))
, preferred_replicas(std::move(preferred_replicas))
, read_repair_decision(read_repair_decision) {
}
storage_proxy_clock_type::time_point timeout(storage_proxy& sp) const {
return _timeout;
}
};
struct storage_proxy_coordinator_query_result {
foreign_ptr<lw_shared_ptr<query::result>> query_result;
replicas_per_token_range last_replicas;
@@ -127,7 +158,7 @@ public:
};
template<typename T = void>
using result = exceptions::coordinator_result<T>;
using clock_type = lowres_clock;
using clock_type = storage_proxy_clock_type;
struct config {
db::hints::host_filter hinted_handoff_enabled = {};
db::hints::directory_initializer hints_directory_initializer;
@@ -166,35 +197,7 @@ public:
using global_stats = storage_proxy_stats::global_stats;
using cdc_stats = cdc::stats;
class coordinator_query_options {
clock_type::time_point _timeout;
public:
service_permit permit;
client_state& cstate;
tracing::trace_state_ptr trace_state = nullptr;
replicas_per_token_range preferred_replicas;
std::optional<db::read_repair_decision> read_repair_decision;
coordinator_query_options(clock_type::time_point timeout,
service_permit permit_,
client_state& client_state_,
tracing::trace_state_ptr trace_state = nullptr,
replicas_per_token_range preferred_replicas = { },
std::optional<db::read_repair_decision> read_repair_decision = { })
: _timeout(timeout)
, permit(std::move(permit_))
, cstate(client_state_)
, trace_state(std::move(trace_state))
, preferred_replicas(std::move(preferred_replicas))
, read_repair_decision(read_repair_decision) {
}
clock_type::time_point timeout(storage_proxy& sp) const {
return _timeout;
}
};
using coordinator_query_options = storage_proxy_coordinator_query_options;
using coordinator_query_result = storage_proxy_coordinator_query_result;
// Holds a list of endpoints participating in CAS request, for a given

View File

@@ -42,6 +42,7 @@
#include "transport/messages/result_message.hh"
#include "compaction/compaction_manager.hh"
#include "db/snapshot-ctl.hh"
#include "replica/mutation_dump.hh"
using namespace std::chrono_literals;
using namespace sstables;
@@ -1428,3 +1429,14 @@ SEASTAR_TEST_CASE(drop_table_with_explicit_snapshot) {
co_return;
});
}
SEASTAR_TEST_CASE(mutation_dump_generated_schema_deterministic_id_version) {
simple_schema s;
auto os1 = replica::mutation_dump::generate_output_schema_from_underlying_schema(s.schema());
auto os2 = replica::mutation_dump::generate_output_schema_from_underlying_schema(s.schema());
BOOST_REQUIRE_EQUAL(os1->id(), os2->id());
BOOST_REQUIRE_EQUAL(os1->version(), os2->version());
return make_ready_future<>();
}

View File

@@ -13,6 +13,7 @@ import pytest
from cassandra.cluster import Cluster, NoHostAvailable
from cassandra.connection import DRIVER_NAME, DRIVER_VERSION
import json
import os
import ssl
import subprocess
@@ -201,6 +202,20 @@ def scylla_path(cql):
pytest.skip("Local server isn't Scylla")
return path
# A fixture for finding Scylla's data directory. We get it using the CQL
# interface to Scylla's configuration. Note that if the server is remote,
# the directory retrieved this way may be irrelevant, whether or not it
# exists on the local machine... However, if the same test that uses this
# fixture also uses the scylla_path fixture, the test will anyway be skipped
# if the running Scylla is not on the local machine local.
@pytest.fixture(scope="module")
def scylla_data_dir(cql):
try:
dir = json.loads(cql.execute("SELECT value FROM system.config WHERE name = 'data_file_directories'").one().value)[0]
return dir
except:
pytest.skip("Can't find Scylla sstable directory")
@pytest.fixture(scope="function")
def temp_workdir():
""" Creates a temporary work directory, for the scope of a single test. """

View File

@@ -0,0 +1,424 @@
# -*- coding: utf-8 -*-
# Copyright 2023-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
# Tests concerning the SELECT * FROM MUTATION_FRAGMENTS($table) statement, which allows dumping
# the underlying mutation fragment data stream, for a table.
import cassandra.protocol
import cassandra.query
import glob
import json
import os
import nodetool
import pytest
import requests
import subprocess
import util
@pytest.fixture(scope="module")
def test_table(cql, test_keyspace):
""" Prepares a table for the mutation dump tests to work with."""
with util.new_test_table(cql, test_keyspace, 'pk1 int, pk2 int, ck1 int, ck2 int, v text, s text static, PRIMARY KEY ((pk1, pk2), ck1, ck2)') as table:
yield table
def test_smoke(cql, test_table, scylla_only):
""" Simple smoke tests, this should fail first if something is very wrong. """
partitions = {}
for i in range(0, 1):
pk1 = util.unique_key_int()
pk2 = 0
cql.execute(f"DELETE FROM {test_table} WHERE pk1 = {pk1} AND pk2 = {pk2}")
cql.execute(f"UPDATE {test_table} SET s = 'static val' WHERE pk1 = {pk1} AND pk2 = {pk2}")
cql.execute(f"DELETE FROM {test_table} WHERE pk1 = {pk1} AND pk2 = {pk2} AND ck1 = 0 AND ck2 > 0 AND ck2 < 3")
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 1, 'regular val')")
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 2, 'regular val')")
partitions[(pk1, pk2)] = [
(pk1, pk2, 'memtable:0', 0, None, None, None, 'partition start'),
(pk1, pk2, 'memtable:0', 1, None, None, None, 'static row'),
(pk1, pk2, 'memtable:0', 2, 0 , 0 , 1 , 'range tombstone change'),
(pk1, pk2, 'memtable:0', 2, 0 , 1 , 0 , 'clustering row'),
(pk1, pk2, 'memtable:0', 2, 0 , 2 , 0 , 'clustering row'),
(pk1, pk2, 'memtable:0', 2, 0 , 3 , -1 , 'range tombstone change'),
(pk1, pk2, 'memtable:0', 3, None, None, None, 'partition end'),
]
col_names = ('pk1', 'pk2', 'mutation_source', 'partition_region', 'ck1', 'ck2', 'position_weight', 'mutation_fragment_kind')
def check_partition_rows(rows, expected_rows):
assert len(rows) == len(expected_rows)
for expected_col_values, row in zip(expected_rows, rows):
for col_name, col_value in zip(col_names, expected_col_values):
assert hasattr(row, col_name)
assert getattr(row, col_name) == col_value
# Point queries
for (pk1, pk2), expected_rows in partitions.items():
rows = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2}"))
check_partition_rows(rows, expected_rows)
# Range scan
all_rows = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table})"))
for (pk1, pk2), expected_rows in partitions.items():
rows = [r for r in all_rows if r.pk1 == pk1 and r.pk2 == pk2]
check_partition_rows(rows, expected_rows)
def test_order_by(cql, test_table, scylla_only):
""" ORDER BY is not allowed """
pk1 = util.unique_key_int()
pk2 = 0
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 0, 'vv')")
with pytest.raises(cassandra.protocol.InvalidRequest, match="ORDER BY is not supported in SELECT FROM MUTATION_FRAGMENTS\\(\\) statements"):
cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2} ORDER BY mutation_source DESC")
def test_mutation_source(cql, test_table, scylla_only):
""" Manipulate where the data is located in the node, and check that the corred mutation source is reported. """
pk1 = util.unique_key_int()
pk2 = util.unique_key_int()
def expect_sources(*expected_sources):
for src in ('memtable', 'row-cache', 'sstable'):
rows = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2} AND mutation_source >= '{src}' AND mutation_source < '{src};'"))
if src in expected_sources:
assert len(rows) == 3 # partition-start, clustering-row, partition-end
else:
assert len(rows) == 0
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 0, 'vv')")
expect_sources('memtable')
nodetool.flush(cql, f"{test_table}")
expect_sources('row-cache', 'sstable')
requests.post(f'{nodetool.rest_api_url(cql)}/system/drop_sstable_caches')
expect_sources('sstable')
assert list(cql.execute(f"SELECT v FROM {test_table} WHERE pk1={pk1} AND pk2={pk2} BYPASS CACHE"))[0].v == 'vv'
expect_sources('sstable')
assert list(cql.execute(f"SELECT v FROM {test_table} WHERE pk1={pk1} AND pk2={pk2}"))[0].v == 'vv'
expect_sources('row-cache', 'sstable')
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 0, 'vv')")
expect_sources('memtable', 'row-cache', 'sstable')
def test_mutation_dump_range_tombstone_changes(cql, test_table, scylla_only):
"""
Range tombstones can share the same position.
This doesn't seem to happen in practice, but this test still tries to produce
such range tombstone and checks that they are handled correctly.
"""
ks, _ = test_table.split(".")
pk1 = util.unique_key_int()
pk2 = util.unique_key_int()
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 0, 'vv')")
rts = 4
with nodetool.no_autocompaction_context(cql, ks):
for ck in range(44, 44 - rts, -1):
cql.execute(f"DELETE FROM {test_table} WHERE pk1={pk1} AND pk2={pk2} AND ck1=0 AND ck2>30 AND ck2<{ck}")
nodetool.flush(cql, f"{test_table}")
res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2} AND mutation_source > 'sstable' AND mutation_source < 'sstable;' AND partition_region = 2 ALLOW FILTERING"))
# row + 2 * range-tombstone-change
assert len(res) == 2 * rts + 1
def test_count(cql, test_table, scylla_only):
""" Test aggregation (COUNT). """
pk1 = util.unique_key_int()
pk2 = util.unique_key_int()
cql.execute(f"UPDATE {test_table} SET s = 'static val' WHERE pk1 = {pk1} AND pk2 = {pk2}")
for ck in range(0, 10):
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, {ck}, 'vv')")
for ck in range(10, 20):
cql.execute(f"DELETE FROM {test_table} WHERE pk1 = {pk1} AND pk2 = {pk2} AND ck1 = 1 AND ck2 > {ck} AND ck2 < 100")
def check_count(kind, expected_count):
res = list(cql.execute(f"SELECT COUNT(*) FROM MUTATION_FRAGMENTS({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2} AND mutation_fragment_kind = '{kind}' ALLOW FILTERING"))
assert res[0].count == expected_count
check_count('partition start', 1)
check_count('static row', 1)
check_count('clustering row', 10)
check_count('range tombstone change', 11)
check_count('partition end', 1)
def test_many_partition_scan(cql, test_keyspace, scylla_only):
"""
Full scans work like secondary-index based scans. First, a query is
issued to obtain partition-keys, then each partition is read individually.
The former uses paging, reading 1000 partition keys in a page. Stress this
logic a bit.
"""
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v text') as test_table:
insert_stmt = cql.prepare(f"INSERT INTO {test_table} (pk, v) VALUES (?, ?)")
delete_stmt = cql.prepare(f"DELETE FROM {test_table} WHERE pk = ?")
partitions = []
# the scan algorithm reads 1000 partition / page, so have enough partitions for at least 2 pages
partition_count = 1312
for pk in range(0, partition_count):
cql.execute(insert_stmt, (pk, 'v'))
if pk % 3 == 0:
cql.execute(delete_stmt, (pk,))
partitions.append((pk, False))
else:
partitions.append((pk, True))
nodetool.flush(cql, f"{test_table}")
requests.post(f'{nodetool.rest_api_url(cql)}/system/drop_sstable_caches')
# the real test here is that this scan completes without problems
res_all = list(cql.execute(f"SELECT pk, mutation_source, mutation_fragment_kind, metadata FROM MUTATION_FRAGMENTS({test_table});"))
actual_partitions = []
for r in res_all:
if r.mutation_fragment_kind == "partition start":
actual_partitions.append((r.pk, len(json.loads(r.metadata)["tombstone"]) == 0))
actual_partitions = sorted(actual_partitions)
assert len(actual_partitions) == len(partitions)
assert actual_partitions == partitions
def test_metadata_and_value(cql, test_keyspace, scylla_path, scylla_data_dir, scylla_only):
"""
Test that metadata + value columns allow reconstructing a full sstable dump.
Meaning that their json representation of metadata and value is the same.
"""
with util.new_test_table(cql, test_keyspace, 'pk int, ck int, v1 text, v2 map<int, text>, v3 tuple<int, text, boolean>, s text static, PRIMARY KEY (pk, ck)') as test_table:
insert_stmt = cql.prepare(f"INSERT INTO {test_table} (pk, ck, v1, v2, v3, s) VALUES (?, ?, ?, ?, ?, ?)")
delete_row_stmt = cql.prepare(f"DELETE FROM {test_table} WHERE pk = ? AND ck = ?")
delete_row_range_stmt = cql.prepare(f"DELETE FROM {test_table} WHERE pk = ? AND ck > ? AND CK < ?")
delete_partition_stmt = cql.prepare(f"DELETE FROM {test_table} WHERE pk = ?")
for pk in range(0, 2):
for ck in range(0, 10):
if ck % 4:
cql.execute(delete_row_stmt, (pk, ck))
else:
cql.execute(insert_stmt, (pk, ck, 'v1_val', {0: '0_val', 1: '1_val'}, (4, 'tuple_val', ck % 2), 'static_val'))
cql.execute(delete_row_range_stmt, (pk, 100, 200))
cql.execute(delete_partition_stmt, (100,))
nodetool.flush(cql, f"{test_table}")
nodetool.flush_keyspace(cql, "system_schema")
requests.post(f'{nodetool.rest_api_url(cql)}/system/drop_sstable_caches')
table_name = test_table.split('.')[1]
sstables = glob.glob(os.path.join(scylla_data_dir, test_keyspace, f"{table_name}-*", "*-Data.db"))
with nodetool.no_autocompaction_context(cql, "system", "system_schema"):
res = subprocess.check_output([scylla_path, "sstable", "dump-data", "--merge"] + sstables)
reference_dump = json.loads(res)["sstables"]["anonymous"]
for partition in reference_dump:
del partition["key"]["token"]
del partition["key"]["raw"]
for ce in partition.get("clustering_elements", {}):
del ce["key"]["raw"]
def merged_value_into_metadata(metadata, value):
value = json.loads(value)
for col_name, col_value in metadata.items():
if "cells" in col_value:
for i, cell_value in enumerate(col_value["cells"]):
cell_value["value"]["value"] = value[col_name][int(i)]["value"]
else:
col_value["value"] = value[col_name]
return metadata
res = cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table})")
reconstructed_dump = []
partition = {}
for row in res:
kind = row.mutation_fragment_kind
if kind == "partition start":
partition = {"key": {"value": str(row.pk)}}
metadata = json.loads(row.metadata)
tombstone = metadata["tombstone"]
if tombstone:
partition["tombstone"] = tombstone
elif kind == "static row":
partition["static_row"] = json.loads(row.metadata)
merged_value_into_metadata(partition["static_row"], row.value)
elif kind == "clustering row":
cr = {"type": "clustering-row", "key": {"value": str(row.ck)}}
cr.update(json.loads(row.metadata))
merged_value_into_metadata(cr["columns"], row.value)
if "clustering_elements" in partition:
partition["clustering_elements"].append(cr)
else:
partition["clustering_elements"] = [cr]
elif kind == "range tombstone change":
rtc = {"type": "range-tombstone-change", "key": {"value": str(row.ck)}, "weight": row.position_weight}
rtc.update(json.loads(row.metadata))
if "clustering_elements" in partition:
partition["clustering_elements"].append(rtc)
else:
partition["clustering_elements"] = [rtc]
else:
assert kind == "partition end"
reconstructed_dump.append(partition)
reference_dump_json = json.dumps(reference_dump, indent=4, sort_keys=True)
reconstructed_dump_json = json.dumps(reconstructed_dump, indent=4, sort_keys=True)
assert reference_dump_json == reconstructed_dump_json
def test_paging(cql, test_table):
""" Test that paging works properly. """
pk1 = util.unique_key_int()
pk2 = util.unique_key_int()
insert_stmt = cql.prepare(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES (?, ?, ?, ?, ?)")
num_rows = 43
expected_mutation_fragments = num_rows + 2
page_size = 10
for ck in range(0, num_rows):
cql.execute(insert_stmt, (pk1, pk2, 0, ck, 'asdasd'))
read_stmt = cassandra.query.SimpleStatement(f"SELECT * FROM mutation_fragments({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2}", fetch_size=page_size)
result = cql.execute(read_stmt)
remaining = expected_mutation_fragments
while remaining:
rows = list(result.current_rows)
print(f"rows({len(rows)}): {rows}")
current_page_size = min(page_size, remaining)
assert len(rows) == current_page_size
remaining -= current_page_size
if remaining:
assert result.has_more_pages
result.fetch_next_page()
def test_slicing_rows(cql, test_table):
""" Test that slicing rows from underlying works. """
pk1 = util.unique_key_int()
pk2 = util.unique_key_int()
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 0, 'vv')")
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 20, 'vv')")
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 1, 20, 'vv')")
all_rows = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2}"))
def check_slice(ck1, ck2_start_inclusive, ck2_end_exclusive):
res = list(cql.execute(f"""SELECT * FROM MUTATION_FRAGMENTS({test_table})
WHERE
pk1 = {pk1} AND
pk2 = {pk2} AND
mutation_source = 'memtable:0' AND
partition_region = 2 AND
ck1 = {ck1} AND
ck2 >= {ck2_start_inclusive} AND
ck2 < {ck2_end_exclusive}
"""))
expected_rows = [r for r in all_rows if r.ck1 == ck1 and r.ck2 >= ck2_start_inclusive and r.ck2 < ck2_end_exclusive]
assert res == expected_rows
check_slice(0, 0, 1)
check_slice(0, 1, 10)
check_slice(0, 1, 20)
check_slice(0, 1, 21)
check_slice(0, 20, 21)
check_slice(0, 21, 210)
check_slice(1, 0, 2)
check_slice(1, 0, 20)
check_slice(1, 0, 22)
check_slice(2, 0, 100)
def test_slicing_range_tombstone_changes(cql, test_table):
""" Test that slicing range-tombstone-changes from underlying works. """
pk1 = util.unique_key_int()
pk2 = util.unique_key_int()
ck1 = 0
cql.execute(f"DELETE FROM {test_table} WHERE pk1 = {pk1} AND pk2 = {pk2} AND ck1 = {ck1}")
cql.execute(f"DELETE FROM {test_table} WHERE pk1 = {pk1} AND pk2 = {pk2} AND ck1 = {ck1} AND ck2 > 10 AND ck2 < 20")
cql.execute(f"DELETE FROM {test_table} WHERE pk1 = {pk1} AND pk2 = {pk2} AND ck1 = {ck1} AND ck2 > 20 AND ck2 < 30")
def check_slice_ck1_fixed(ck2_start_inclusive, ck2_end_exclusive, expected_rtcs):
res = list(cql.execute(f"""SELECT * FROM MUTATION_FRAGMENTS({test_table})
WHERE
pk1 = {pk1} AND
pk2 = {pk2} AND
mutation_source = 'memtable:0' AND
partition_region = 2 AND
ck1 = {ck1} AND
ck2 >= {ck2_start_inclusive} AND
ck2 < {ck2_end_exclusive}
"""))
assert len(res) == len(expected_rtcs)
for row, rtc in zip(res, expected_rtcs):
assert row.ck1 == ck1
assert row.ck2 == rtc
check_slice_ck1_fixed(0, 8, [0, 8])
check_slice_ck1_fixed(1, 10, [1, 10])
check_slice_ck1_fixed(10, 28, [10, 10, 20, 20, 28])
check_slice_ck1_fixed(30, 38, [30, 30, 38])
def test_ck_in_query(cql, test_table):
pk1 = util.unique_key_int()
pk2 = util.unique_key_int()
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 0, 'vv')")
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 1, 1, 'vv')")
nodetool.flush(cql, f"{test_table}")
cql.execute(f"INSERT INTO {test_table} (pk1, pk2, ck1, ck2, v) VALUES ({pk1}, {pk2}, 0, 0, 'vv')")
ks, _ = test_table.split(".")
with nodetool.no_autocompaction_context(cql, ks):
sources_res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({test_table}) WHERE pk1 = {pk1} AND pk2 = {pk2}"))
sources = {r.mutation_source.split(":")[0]: r.mutation_source for r in sources_res}
assert len(sources) == 3
assert "memtable" in sources
assert "row-cache" in sources
assert "sstable" in sources
res = list(cql.execute(f"""SELECT * FROM MUTATION_FRAGMENTS({test_table})
WHERE
pk1 = {pk1} AND
pk2 = {pk2} AND
(mutation_source, partition_region, ck1, ck2, position_weight) IN (
('{sources["memtable"]}', 2, 0, 0, 0),
('{sources["row-cache"]}', 2, 0, 0, 0),
('{sources["sstable"]}', 2, 0, 0, 0),
('{sources["sstable"]}', 2, 1, 1, 0))
"""))
columns = ("mutation_source", "partition_region", "ck1", "ck2", "position_weight")
expected_results = [
(sources["memtable"], 2, 0, 0, 0),
(sources["row-cache"], 2, 0, 0, 0),
(sources["sstable"], 2, 0, 0, 0),
(sources["sstable"], 2, 1, 1, 0),
]
assert len(res) == len(expected_results)
for row, expected_row in zip(res, expected_results):
for col_name, expected_value in zip(columns, expected_row):
assert hasattr(row, col_name)
assert getattr(row, col_name) == expected_value

View File

@@ -20,21 +20,6 @@ import shutil
import util
# A fixture for finding Scylla's data directory. We get it using the CQL
# interface to Scylla's configuration. Note that if the server is remote,
# the directory retrieved this way may be irrelevant, whether or not it
# exists on the local machine... However, if the same test that uses this
# fixture also uses the scylla_path fixture, the test will anyway be skipped
# if the running Scylla is not on the local machine local.
@pytest.fixture(scope="module")
def scylla_data_dir(cql):
try:
dir = json.loads(cql.execute("SELECT value FROM system.config WHERE name = 'data_file_directories'").one().value)[0]
return dir
except:
pytest.skip("Can't find Scylla sstable directory")
def simple_no_clustering_table(cql, keyspace):
table = util.unique_name()
schema = f"CREATE TABLE {keyspace}.{table} (pk int PRIMARY KEY, v int) WITH compaction = {{'class': 'NullCompactionStrategy'}}"

View File

@@ -0,0 +1,40 @@
import asyncio
import pytest
import time
from cassandra.protocol import InvalidRequest # type: ignore
from cassandra.query import SimpleStatement # type: ignore
from test.pylib.manager_client import ManagerClient
from test.pylib.util import unique_name
from test.topology.util import wait_for_token_ring_and_group0_consistency
@pytest.mark.asyncio
async def test_sticky_coordinator_enforced(manager: ManagerClient) -> None:
s1 = await manager.server_add(cmdline=['--logger-log-level', 'paging=trace'])
s2 = await manager.server_add(cmdline=['--logger-log-level', 'paging=trace'])
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
cql = manager.get_cql()
await cql.run_async("create keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
await cql.run_async("create table ks.tbl (pk int, ck int, v int, primary key (pk, ck))")
num_rows = 43
expected_num_rows = num_rows + 2 # rows + partition-start + partitione-end
for ck in range(0, num_rows):
await cql.run_async(f"INSERT INTO ks.tbl (pk, ck, v) VALUES (0, {ck}, 100)")
unpaged_res = await cql.run_async("SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 0")
assert len(unpaged_res) == expected_num_rows
read_stmt = SimpleStatement("SELECT * FROM MUTATION_FRAGMENTS(ks.tbl) WHERE pk = 0", fetch_size=10)
# The default round-robin load-balancing policy will jump between the nodes.
# This should trigger an exception.
with pytest.raises(InvalidRequest, match="Moving between coordinators is not allowed in SELECT FROM MUTATION_FRAGMENTS\\(\\) statements.*"):
# Blocking call until #14451 is solved
res = list(cql.execute(read_stmt))

View File

@@ -6,104 +6,21 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "dht/i_partitioner.hh"
#include "schema/schema_fwd.hh"
#include "mutation/json.hh"
#include "sstables/sstables.hh"
#include "utils/rjson.hh"
// has to be below the utils/rjson.hh include
#include <rapidjson/ostreamwrapper.h>
namespace tools {
class json_writer {
using stream = rapidjson::BasicOStreamWrapper<std::ostream>;
using writer = rapidjson::Writer<stream, rjson::encoding, rjson::encoding, rjson::allocator>;
stream _stream;
writer _writer;
public:
json_writer() : _stream(std::cout), _writer(_stream)
{ }
writer& rjson_writer() { return _writer; }
// following the rapidjson method names here
bool Null() { return _writer.Null(); }
bool Bool(bool b) { return _writer.Bool(b); }
bool Int(int i) { return _writer.Int(i); }
bool Uint(unsigned i) { return _writer.Uint(i); }
bool Int64(int64_t i) { return _writer.Int64(i); }
bool Uint64(uint64_t i) { return _writer.Uint64(i); }
bool Double(double d) { return _writer.Double(d); }
bool RawNumber(std::string_view str) { return _writer.RawNumber(str.data(), str.size(), false); }
bool String(std::string_view str) { return _writer.String(str.data(), str.size(), false); }
bool StartObject() { return _writer.StartObject(); }
bool Key(std::string_view str) { return _writer.Key(str.data(), str.size(), false); }
bool EndObject(rapidjson::SizeType memberCount = 0) { return _writer.EndObject(memberCount); }
bool StartArray() { return _writer.StartArray(); }
bool EndArray(rapidjson::SizeType elementCount = 0) { return _writer.EndArray(elementCount); }
// scylla-specific extensions (still following rapidjson naming scheme for consistency)
template <typename T>
void AsString(const T& obj) {
String(fmt::format("{}", obj));
}
// partition or clustering key
template <typename KeyType>
void DataKey(const schema& schema, const KeyType& key, std::optional<dht::token> token = {}) {
StartObject();
if (token) {
Key("token");
AsString(*token);
}
Key("raw");
String(to_hex(key.representation()));
Key("value");
AsString(key.with_schema(schema));
EndObject();
}
void StartStream() {
StartObject();
Key("sstables");
StartObject();
}
void EndStream() {
EndObject();
EndObject();
}
void SstableKey(const sstables::sstable& sst) {
Key(sst.get_filename());
}
void SstableKey(const sstables::sstable* const sst) {
if (sst) {
SstableKey(*sst);
} else {
Key("anonymous");
}
}
};
class mutation_fragment_json_writer {
const schema& _schema;
json_writer _writer;
class mutation_fragment_stream_json_writer {
mutation_json::mutation_partition_json_writer _writer;
bool _clustering_array_created;
private:
sstring to_string(gc_clock::time_point tp);
void write(gc_clock::duration ttl, gc_clock::time_point expiry);
void write(const tombstone& t);
void write(const row_marker& m);
void write(counter_cell_view cv);
void write(const atomic_cell_view& cell, data_type type);
void write(const collection_mutation_view_description& mv, data_type type);
void write(const atomic_cell_or_collection& cell, const column_definition& cdef);
void write(const row& r, column_kind kind);
void write(const clustering_row& cr);
void write(const range_tombstone_change& rtc);
public:
explicit mutation_fragment_json_writer(const schema& s) : _schema(s) {}
json_writer& writer() { return _writer; }
explicit mutation_fragment_stream_json_writer(const schema& s, std::ostream& os = std::cout)
: _writer(s, os) {}
mutation_json::json_writer& writer() { return _writer.writer(); }
void start_stream();
void start_sstable(const sstables::sstable* const sst);
void start_partition(const partition_start& ps);

View File

@@ -902,7 +902,7 @@ int range_tombstone_change_index_l(lua_State* l) {
}
class json_writer {
tools::mutation_fragment_json_writer _writer;
tools::mutation_fragment_stream_json_writer _writer;
private:
static json_writer& get_this(lua_State* l) {

View File

@@ -41,7 +41,7 @@
using namespace seastar;
using json_writer = tools::json_writer;
using json_writer = mutation_json::json_writer;
namespace bpo = boost::program_options;
@@ -311,249 +311,107 @@ output_format get_output_format_from_options(const bpo::variables_map& opts, out
namespace tools {
sstring mutation_fragment_json_writer::to_string(gc_clock::time_point tp) {
return fmt::format("{:%F %T}z", fmt::gmtime(gc_clock::to_time_t(tp)));
}
void mutation_fragment_json_writer::write(gc_clock::duration ttl, gc_clock::time_point expiry) {
_writer.Key("ttl");
_writer.AsString(ttl);
_writer.Key("expiry");
_writer.String(to_string(expiry));
}
void mutation_fragment_json_writer::write(const tombstone& t) {
_writer.StartObject();
if (t) {
_writer.Key("timestamp");
_writer.Int64(t.timestamp);
_writer.Key("deletion_time");
_writer.String(to_string(t.deletion_time));
}
_writer.EndObject();
}
void mutation_fragment_json_writer::write(const row_marker& m) {
_writer.StartObject();
_writer.Key("timestamp");
_writer.Int64(m.timestamp());
if (m.is_live() && m.is_expiring()) {
write(m.ttl(), m.expiry());
}
_writer.EndObject();
}
void mutation_fragment_json_writer::write(counter_cell_view cv) {
_writer.StartArray();
for (const auto& shard : cv.shards()) {
_writer.StartObject();
_writer.Key("id");
_writer.AsString(shard.id());
_writer.Key("value");
_writer.Int64(shard.value());
_writer.Key("clock");
_writer.Int64(shard.logical_clock());
_writer.EndObject();
}
_writer.EndArray();
}
void mutation_fragment_json_writer::write(const atomic_cell_view& cell, data_type type) {
_writer.StartObject();
_writer.Key("is_live");
_writer.Bool(cell.is_live());
_writer.Key("type");
if (type->is_counter()) {
if (cell.is_counter_update()) {
_writer.String("counter-update");
} else {
_writer.String("counter-shards");
}
} else if (type->is_collection()) {
_writer.String("frozen-collection");
} else {
_writer.String("regular");
}
_writer.Key("timestamp");
_writer.Int64(cell.timestamp());
if (type->is_counter()) {
_writer.Key("value");
if (cell.is_counter_update()) {
_writer.Int64(cell.counter_update_value());
} else {
write(counter_cell_view(cell));
}
} else {
if (cell.is_live_and_has_ttl()) {
write(cell.ttl(), cell.expiry());
}
if (cell.is_live()) {
_writer.Key("value");
_writer.String(type->to_string(cell.value().linearize()));
} else {
_writer.Key("deletion_time");
_writer.String(to_string(cell.deletion_time()));
}
}
_writer.EndObject();
}
void mutation_fragment_json_writer::write(const collection_mutation_view_description& mv, data_type type) {
_writer.StartObject();
if (mv.tomb) {
_writer.Key("tombstone");
write(mv.tomb);
}
_writer.Key("cells");
std::function<void(size_t, bytes_view)> write_key;
std::function<void(size_t, atomic_cell_view)> write_value;
if (auto t = dynamic_cast<const collection_type_impl*>(type.get())) {
write_key = [this, t = t->name_comparator()] (size_t, bytes_view k) { _writer.String(t->to_string(k)); };
write_value = [this, t = t->value_comparator()] (size_t, atomic_cell_view v) { write(v, t); };
} else if (auto t = dynamic_cast<const tuple_type_impl*>(type.get())) {
write_key = [this] (size_t i, bytes_view) { _writer.String(""); };
write_value = [this, t] (size_t i, atomic_cell_view v) { write(v, t->type(i)); };
}
if (write_key && write_value) {
_writer.StartArray();
for (size_t i = 0; i < mv.cells.size(); ++i) {
_writer.StartObject();
_writer.Key("key");
write_key(i, mv.cells[i].first);
_writer.Key("value");
write_value(i, mv.cells[i].second);
_writer.EndObject();
}
_writer.EndArray();
} else {
_writer.Null();
}
_writer.EndObject();
}
void mutation_fragment_json_writer::write(const atomic_cell_or_collection& cell, const column_definition& cdef) {
if (cdef.is_atomic()) {
write(cell.as_atomic_cell(cdef), cdef.type);
} else if (cdef.type->is_collection() || cdef.type->is_user_type()) {
cell.as_collection_mutation().with_deserialized(*cdef.type, [&, this] (collection_mutation_view_description mv) {
write(mv, cdef.type);
});
} else {
_writer.Null();
}
}
void mutation_fragment_json_writer::write(const row& r, column_kind kind) {
_writer.StartObject();
r.for_each_cell([this, kind] (column_id id, const atomic_cell_or_collection& cell) {
auto cdef = _schema.column_at(kind, id);
_writer.Key(cdef.name_as_text());
write(cell, cdef);
});
_writer.EndObject();
}
void mutation_fragment_json_writer::write(const clustering_row& cr) {
_writer.StartObject();
_writer.Key("type");
_writer.String("clustering-row");
_writer.Key("key");
_writer.DataKey(_schema, cr.key());
void mutation_fragment_stream_json_writer::write(const clustering_row& cr) {
writer().StartObject();
writer().Key("type");
writer().String("clustering-row");
writer().Key("key");
writer().DataKey(_writer.schema(), cr.key());
if (cr.tomb()) {
_writer.Key("tombstone");
write(cr.tomb().regular());
_writer.Key("shadowable_tombstone");
write(cr.tomb().shadowable().tomb());
writer().Key("tombstone");
_writer.write(cr.tomb().regular());
writer().Key("shadowable_tombstone");
_writer.write(cr.tomb().shadowable().tomb());
}
if (!cr.marker().is_missing()) {
_writer.Key("marker");
write(cr.marker());
writer().Key("marker");
_writer.write(cr.marker());
}
_writer.Key("columns");
write(cr.cells(), column_kind::regular_column);
_writer.EndObject();
writer().Key("columns");
_writer.write(cr.cells(), column_kind::regular_column);
writer().EndObject();
}
void mutation_fragment_json_writer::write(const range_tombstone_change& rtc) {
_writer.StartObject();
_writer.Key("type");
_writer.String("range-tombstone-change");
void mutation_fragment_stream_json_writer::write(const range_tombstone_change& rtc) {
writer().StartObject();
writer().Key("type");
writer().String("range-tombstone-change");
const auto pos = rtc.position();
if (pos.has_key()) {
_writer.Key("key");
_writer.DataKey(_schema, pos.key());
writer().Key("key");
writer().DataKey(_writer.schema(), pos.key());
}
_writer.Key("weight");
_writer.Int(static_cast<int>(pos.get_bound_weight()));
_writer.Key("tombstone");
write(rtc.tombstone());
_writer.EndObject();
writer().Key("weight");
writer().Int(static_cast<int>(pos.get_bound_weight()));
writer().Key("tombstone");
_writer.write(rtc.tombstone());
writer().EndObject();
}
void mutation_fragment_json_writer::start_stream() {
_writer.StartStream();
void mutation_fragment_stream_json_writer::start_stream() {
writer().StartStream();
}
void mutation_fragment_json_writer::start_sstable(const sstables::sstable* const sst) {
_writer.SstableKey(sst);
_writer.StartArray();
void mutation_fragment_stream_json_writer::start_sstable(const sstables::sstable* const sst) {
if (sst) {
writer().Key(sst->get_filename());
} else {
writer().Key("anonymous");
}
writer().StartArray();
}
void mutation_fragment_json_writer::start_partition(const partition_start& ps) {
void mutation_fragment_stream_json_writer::start_partition(const partition_start& ps) {
const auto& dk = ps.key();
_clustering_array_created = false;
_writer.StartObject();
writer().StartObject();
_writer.Key("key");
_writer.DataKey(_schema, dk.key(), dk.token());
writer().Key("key");
writer().DataKey(_writer.schema(), dk.key(), dk.token());
if (ps.partition_tombstone()) {
_writer.Key("tombstone");
write(ps.partition_tombstone());
writer().Key("tombstone");
_writer.write(ps.partition_tombstone());
}
}
void mutation_fragment_json_writer::partition_element(const static_row& sr) {
_writer.Key("static_row");
write(sr.cells(), column_kind::static_column);
void mutation_fragment_stream_json_writer::partition_element(const static_row& sr) {
writer().Key("static_row");
_writer.write(sr.cells(), column_kind::static_column);
}
void mutation_fragment_json_writer::partition_element(const clustering_row& cr) {
void mutation_fragment_stream_json_writer::partition_element(const clustering_row& cr) {
if (!_clustering_array_created) {
_writer.Key("clustering_elements");
_writer.StartArray();
writer().Key("clustering_elements");
writer().StartArray();
_clustering_array_created = true;
}
write(cr);
}
void mutation_fragment_json_writer::partition_element(const range_tombstone_change& rtc) {
void mutation_fragment_stream_json_writer::partition_element(const range_tombstone_change& rtc) {
if (!_clustering_array_created) {
_writer.Key("clustering_elements");
_writer.StartArray();
writer().Key("clustering_elements");
writer().StartArray();
_clustering_array_created = true;
}
write(rtc);
}
void mutation_fragment_json_writer::end_partition() {
void mutation_fragment_stream_json_writer::end_partition() {
if (_clustering_array_created) {
_writer.EndArray();
writer().EndArray();
}
_writer.EndObject();
writer().EndObject();
}
void mutation_fragment_json_writer::end_sstable() {
_writer.EndArray();
void mutation_fragment_stream_json_writer::end_sstable() {
writer().EndArray();
}
void mutation_fragment_json_writer::end_stream() {
_writer.EndStream();
void mutation_fragment_stream_json_writer::end_stream() {
writer().EndStream();
}
} // namespace tools
@@ -603,7 +461,7 @@ class dumping_consumer : public sstable_consumer {
}
};
class json_dumper : public sstable_consumer {
tools::mutation_fragment_json_writer _writer;
tools::mutation_fragment_stream_json_writer _writer;
public:
explicit json_dumper(const schema& s) : _writer(s) {}
virtual future<> consume_stream_start() override {
@@ -1104,7 +962,7 @@ void dump_index_operation(schema_ptr schema, reader_permit permit, const std::ve
sstables::index_reader idx_reader(sst, permit);
auto close_idx_reader = deferred_close(idx_reader);
writer.SstableKey(*sst);
writer.Key(sst->get_filename());
writer.StartArray();
while (!idx_reader.eof()) {
@@ -1143,7 +1001,7 @@ void dump_compression_info_operation(schema_ptr schema, reader_permit permit, co
for (auto& sst : sstables) {
const auto& compression = sst->get_compression();
writer.SstableKey(*sst);
writer.Key(sst->get_filename());
writer.StartObject();
writer.Key("name");
writer.String(disk_string_to_string(compression.name));
@@ -1181,7 +1039,7 @@ void dump_summary_operation(schema_ptr schema, reader_permit permit, const std::
for (auto& sst : sstables) {
auto& summary = sst->get_summary();
writer.SstableKey(*sst);
writer.Key(sst->get_filename());
writer.StartObject();
writer.Key("header");
@@ -1461,7 +1319,7 @@ void dump_statistics_operation(schema_ptr schema, reader_permit permit, const st
for (auto& sst : sstables) {
auto& statistics = sst->get_statistics();
writer.SstableKey(*sst);
writer.Key(sst->get_filename());
writer.StartObject();
writer.Key("offsets");
@@ -1621,7 +1479,7 @@ void dump_scylla_metadata_operation(schema_ptr schema, reader_permit permit, con
json_writer writer;
writer.StartStream();
for (auto& sst : sstables) {
writer.SstableKey(*sst);
writer.Key(sst->get_filename());
writer.StartObject();
auto m = sst->get_scylla_metadata();
if (!m) {