repair, streaming: Generalize consumer lambdas

Both streaming and repair call the distributed sstables writing with
equal lambdas each being ~30 lines of code. The only difference between
them is repair might request offstrategy compaction for new sstable.

Generalization of these two pieces save lines of codes and speeds the
release/repair/row_level.o compilation by half a minute (out of twelve).

tests: unit(dev)

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20210531133113.23003-1-xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-05-31 16:31:13 +03:00
committed by Avi Kivity
parent 777771df34
commit 0944d69475
5 changed files with 123 additions and 67 deletions

View File

@@ -938,6 +938,7 @@ scylla_core = (['database.cc',
'streaming/stream_result_future.cc',
'streaming/stream_session_state.cc',
'streaming/stream_reason.cc',
'streaming/consumer.cc',
'clocks-impl.cc',
'partition_slice_builder.cc',
'init.cc',

View File

@@ -23,7 +23,6 @@
#include "message/messaging_service.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_set.hh"
#include "mutation_fragment.hh"
#include "mutation_writer/multishard_writer.hh"
#include "dht/i_partitioner.hh"
@@ -33,7 +32,6 @@
#include "utils/UUID.hh"
#include "utils/hash.hh"
#include "service/priority_manager.hh"
#include "db/view/view_update_checks.hh"
#include "database.hh"
#include <seastar/util/bool_class.hh>
#include <seastar/core/metrics_registration.hh>
@@ -44,13 +42,13 @@
#include <optional>
#include <boost/range/adaptors.hpp>
#include <boost/intrusive/list.hpp>
#include "../db/view/view_update_generator.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "gms/gossiper.hh"
#include "repair/row_level.hh"
#include "mutation_source_metadata.hh"
#include "utils/stall_free.hh"
#include "service/migration_manager.hh"
#include "streaming/consumer.hh"
extern logging::logger rlogger;
@@ -586,41 +584,7 @@ public:
_mq = std::move(queue_handle);
auto writer = shared_from_this();
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader),
[&db, &sys_dist_ks, &view_update_gen, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
auto& t = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(sys_dist_ks.local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader), reason, &view_update_gen] (bool use_view_update_path) mutable {
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = t->get_compaction_strategy();
const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions);
sstables::offstrategy offstrategy = is_offstrategy_supported(reason);
bool auto_offstrategy_trigger = offstrategy && (reason == streaming::stream_reason::repair);
auto consumer = cs.make_interposer_consumer(metadata,
[t = std::move(t), &view_update_gen, use_view_update_path, adjusted_estimated_partitions, offstrategy, auto_offstrategy_trigger] (flat_mutation_reader reader) {
sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write();
schema_ptr s = reader.schema();
auto& pc = service::get_local_streaming_priority();
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
t->get_sstables_manager().configure_writer("repair"),
encoding_stats{}, pc).then([sst] {
return sst->open_data();
}).then([t, sst, offstrategy, auto_offstrategy_trigger] {
if (auto_offstrategy_trigger) {
rlogger.debug("Enabled automatic off-strategy trigger for table {}.{}",
t->schema()->ks_name(), t->schema()->cf_name());
t->enable_off_strategy_trigger();
}
return t->add_sstable_and_update_cache(sst, offstrategy);
}).then([t, s, sst, use_view_update_path, &view_update_gen]() mutable -> future<> {
if (!use_view_update_path) {
return make_ready_future<>();
}
return view_update_gen.local().register_staging_sstable(sst, std::move(t));
});
});
return consumer(std::move(reader));
});
},
streaming::make_streaming_consumer("repair", db, sys_dist_ks, view_update_gen, _estimated_partitions, _reason, is_offstrategy_supported(_reason)),
t.stream_in_progress()).then([writer] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
writer->_schema->ks_name(), writer->_schema->cf_name(), partitions);

75
streaming/consumer.cc Normal file
View File

@@ -0,0 +1,75 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "consumer.hh"
#include "mutation_source_metadata.hh"
#include "service/priority_manager.hh"
#include "db/view/view_update_generator.hh"
#include "db/view/view_update_checks.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
namespace streaming {
std::function<future<> (flat_mutation_reader)> make_streaming_consumer(sstring origin,
sharded<database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy) {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin)] (flat_mutation_reader reader) {
auto& cf = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(sys_dist_ks.local(), cf, reason).then([cf = cf.shared_from_this(), &vug, estimated_partitions, offstrategy, reason, reader = std::move(reader), origin = std::move(origin)] (bool use_view_update_path) mutable {
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = cf->get_compaction_strategy();
const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions);
auto consumer = cs.make_interposer_consumer(metadata,
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy, reason] (flat_mutation_reader reader) {
sstables::shared_sstable sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
schema_ptr s = reader.schema();
auto& pc = service::get_local_streaming_priority();
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
cf->get_sstables_manager().configure_writer(origin),
encoding_stats{}, pc).then([sst] {
return sst->open_data();
}).then([cf, sst, offstrategy, reason] {
if (offstrategy && (reason == stream_reason::repair)) {
sstables::sstlog.debug("Enabled automatic off-strategy trigger for table {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name());
cf->enable_off_strategy_trigger();
}
return cf->add_sstable_and_update_cache(sst, offstrategy);
}).then([cf, s, sst, use_view_update_path, &vug]() mutable -> future<> {
if (!use_view_update_path) {
return make_ready_future<>();
}
return vug.local().register_staging_sstable(sst, std::move(cf));
});
});
return consumer(std::move(reader));
});
};
}
}

43
streaming/consumer.hh Normal file
View File

@@ -0,0 +1,43 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sstables/sstable_set.hh"
#include "streaming/stream_reason.hh"
class database;
namespace db {
class system_distributed_keyspace;
namespace view {
class view_update_generator;
}
}
namespace streaming {
std::function<future<>(flat_mutation_reader)> make_streaming_consumer(sstring origin,
sharded<database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy);
}

View File

@@ -68,6 +68,7 @@
#include "../db/view/view_update_generator.hh"
#include "mutation_source_metadata.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "consumer.hh"
namespace streaming {
@@ -181,35 +182,7 @@ void stream_session::init_messaging_service_handler(netw::messaging_service& ms,
//FIXME: discarded future.
(void)mutation_writer::distribute_reader_and_consume_on_shards(s,
make_generating_reader(s, permit, std::move(get_next_mutation_fragment)),
[plan_id, estimated_partitions, reason] (flat_mutation_reader reader) {
auto& cf = get_local_db().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), cf, reason).then([cf = cf.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = cf->get_compaction_strategy();
const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions);
auto consumer = cf->get_compaction_strategy().make_interposer_consumer(metadata,
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path] (flat_mutation_reader reader) {
sstables::shared_sstable sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
schema_ptr s = reader.schema();
auto& pc = service::get_local_streaming_priority();
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
cf->get_sstables_manager().configure_writer("streaming"),
encoding_stats{}, pc).then([sst] {
return sst->open_data();
}).then([cf, sst] {
return cf->add_sstable_and_update_cache(sst);
}).then([cf, s, sst, use_view_update_path]() mutable -> future<> {
if (!use_view_update_path) {
return make_ready_future<>();
}
return _view_update_generator->local().register_staging_sstable(sst, std::move(cf));
});
});
return consumer(std::move(reader));
});
},
make_streaming_consumer("streaming", *_db, *_sys_dist_ks, *_view_update_generator, estimated_partitions, reason, sstables::offstrategy::no),
cf.stream_in_progress()
).then_wrapped([s, plan_id, from, sink, estimated_partitions] (future<uint64_t> f) mutable {
int32_t status = 0;