storage_proxy: drop make_local_reader()
This code was used only by its unit test. Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -3306,90 +3306,4 @@ storage_proxy::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
class shard_reader final : public mutation_reader::impl {
|
||||
distributed<database>& _db;
|
||||
unsigned _shard;
|
||||
const query::partition_range _range;
|
||||
global_schema_ptr _schema;
|
||||
schema_ptr _local_schema;
|
||||
const io_priority_class *_pc;
|
||||
struct remote_state {
|
||||
mutation_reader reader;
|
||||
std::experimental::optional<frozen_mutation> _m;
|
||||
};
|
||||
foreign_ptr<std::unique_ptr<remote_state>> _remote;
|
||||
private:
|
||||
future<> init() {
|
||||
return _db.invoke_on(_shard, [this] (database& db) {
|
||||
schema_ptr s = _schema;
|
||||
column_family& cf = db.find_column_family(s->id());
|
||||
return make_foreign(std::make_unique<remote_state>(
|
||||
remote_state{cf.make_reader(std::move(s), _range, query::no_clustering_key_filtering, *_pc)}));
|
||||
}).then([this] (auto&& ptr) {
|
||||
_remote = std::move(ptr);
|
||||
});
|
||||
}
|
||||
public:
|
||||
shard_reader(schema_ptr s,
|
||||
distributed<database>& db,
|
||||
unsigned shard,
|
||||
const query::partition_range& range,
|
||||
const io_priority_class& pc)
|
||||
: _db(db)
|
||||
, _shard(shard)
|
||||
, _range(range)
|
||||
, _schema(s)
|
||||
, _local_schema(std::move(s))
|
||||
, _pc(&pc)
|
||||
{ }
|
||||
|
||||
virtual future<mutation_opt> operator()() override {
|
||||
if (!_remote) {
|
||||
return init().then([this] {
|
||||
return (*this)();
|
||||
});
|
||||
}
|
||||
|
||||
// FIXME: batching
|
||||
return _db.invoke_on(_shard, [this] (database&) {
|
||||
return _remote->reader().then([this] (mutation_opt&& m) {
|
||||
if (!m) {
|
||||
_remote->_m = {};
|
||||
} else {
|
||||
_remote->_m = freeze(*m);
|
||||
}
|
||||
});
|
||||
}).then([this] () -> mutation_opt {
|
||||
if (!_remote->_m) {
|
||||
return {};
|
||||
}
|
||||
return { _remote->_m->unfreeze(_local_schema) };
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
mutation_reader
|
||||
storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range& range,
|
||||
const io_priority_class& pc) {
|
||||
// Split ranges which wrap around, because the individual readers created
|
||||
// by shard_reader do not support them:
|
||||
auto schema = _db.local().find_column_family(cf_id).schema();
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*schema))) {
|
||||
auto unwrapped = range.unwrap();
|
||||
std::vector<mutation_reader> both;
|
||||
both.reserve(2);
|
||||
both.push_back(make_local_reader(cf_id, unwrapped.first, pc));
|
||||
both.push_back(make_local_reader(cf_id, unwrapped.second, pc));
|
||||
return make_joining_reader(std::move(both));
|
||||
}
|
||||
|
||||
unsigned first_shard = range.start() ? dht::shard_of(range.start()->value().token()) : 0;
|
||||
unsigned last_shard = range.end() ? dht::shard_of(range.end()->value().token()) : smp::count - 1;
|
||||
std::vector<mutation_reader> readers;
|
||||
for (auto cpu = first_shard; cpu <= last_shard; ++cpu) {
|
||||
readers.emplace_back(make_mutation_reader<shard_reader>(schema, _db, cpu, range, pc));
|
||||
}
|
||||
return make_joining_reader(std::move(readers));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -303,14 +303,6 @@ public:
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations_locally(
|
||||
schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range&);
|
||||
|
||||
/*
|
||||
* Returns mutation_reader for given column family
|
||||
* which combines data from all shards.
|
||||
* Uses schema current at the time of invocation.
|
||||
*/
|
||||
mutation_reader make_local_reader(utils::UUID cf_id, const query::partition_range&,
|
||||
const io_priority_class& pc = default_priority_class());
|
||||
|
||||
future<> stop();
|
||||
|
||||
const stats& get_stats() const {
|
||||
|
||||
@@ -37,84 +37,6 @@
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
|
||||
static query::result to_data_query_result(mutation_reader& reader, const query::partition_slice& slice) {
|
||||
query::result::builder builder(slice, query::result_request::only_result);
|
||||
auto now = gc_clock::now();
|
||||
while (true) {
|
||||
mutation_opt mo = reader().get0();
|
||||
if (!mo) {
|
||||
break;
|
||||
}
|
||||
std::move(*mo).query(builder, slice, now);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
static query::result_set to_result_set(schema_ptr s, mutation_reader& reader) {
|
||||
auto slice = partition_slice_builder(*s).build();
|
||||
return query::result_set::from_raw_result(s, slice, to_data_query_result(reader, slice));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_make_local_reader) {
|
||||
return do_with_cql_env([](cql_test_env& e) {
|
||||
return seastar::async([&] {
|
||||
e.execute_cql("create keyspace ks2 with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").get();
|
||||
e.execute_cql("create table ks2.cf (k blob, v int, primary key (k));").get();
|
||||
e.execute_cql(
|
||||
"begin unlogged batch \n"
|
||||
" insert into ks2.cf (k, v) values (0x01, 0); \n"
|
||||
" insert into ks2.cf (k, v) values (0x02, 0); \n"
|
||||
" insert into ks2.cf (k, v) values (0x03, 0); \n"
|
||||
" insert into ks2.cf (k, v) values (0x04, 0); \n"
|
||||
" insert into ks2.cf (k, v) values (0x05, 0); \n"
|
||||
"apply batch;").get();
|
||||
|
||||
auto s = e.local_db().find_schema("ks2", "cf");
|
||||
|
||||
{
|
||||
auto reader = service::get_storage_proxy().local().make_local_reader(s->id(), query::full_partition_range);
|
||||
assert_that(to_result_set(s, reader))
|
||||
.has_size(5)
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\01"))))
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\02"))))
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\03"))))
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\04"))))
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\05"))));
|
||||
}
|
||||
|
||||
{
|
||||
auto reader = service::get_storage_proxy().local().make_local_reader(s->id(),
|
||||
query::partition_range(
|
||||
{dht::ring_position(dht::minimum_token(), dht::ring_position::token_bound::start)},
|
||||
{dht::ring_position(dht::maximum_token(), dht::ring_position::token_bound::end)}));
|
||||
assert_that(to_result_set(s, reader))
|
||||
.has_size(5)
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\01"))))
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\02"))))
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\03"))))
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\04"))))
|
||||
.has(a_row().with_column(bytes("k"), data_value(bytes("\05"))));
|
||||
}
|
||||
|
||||
{
|
||||
auto reader = service::get_storage_proxy().local().make_local_reader(s->id(),
|
||||
query::partition_range(
|
||||
{dht::ring_position(dht::minimum_token(), dht::ring_position::token_bound::start)},
|
||||
{dht::ring_position(dht::minimum_token(), dht::ring_position::token_bound::start)}));
|
||||
assert_that(to_result_set(s, reader)).is_empty();
|
||||
}
|
||||
|
||||
{
|
||||
auto reader = service::get_storage_proxy().local().make_local_reader(s->id(),
|
||||
query::partition_range(
|
||||
{dht::ring_position(dht::maximum_token(), dht::ring_position::token_bound::start)},
|
||||
{dht::ring_position(dht::maximum_token(), dht::ring_position::token_bound::start)}));
|
||||
assert_that(to_result_set(s, reader)).is_empty();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Returns random keys sorted in ring order.
|
||||
// The schema must have a single bytes_type partition key column.
|
||||
static std::vector<dht::ring_position> make_ring(schema_ptr s, int n_keys) {
|
||||
|
||||
Reference in New Issue
Block a user