streaming: stream_blob: hold table for streaming
When initializing streaming sources in tablet_stream_files_handler we use a reference to the table. We should hold the table while doing so, because otherwise the table may be dropped and destroyed when we yield. Use the table.stream_in_progress() phaser to hold the table while we access it. For sstable file streaming we can release the table after the snapshot is initialized, and the table may be dropped safely because the files are held by the snapshot and we don't access the table anymore. There was a single access to the table for logging but it is replaced by a pre-calculated variable. For logstor segment streaming, currently it doesn't support discarding the segments while they are streamed - when the table is dropped it discard the segments by overwriting and freeing them, so they shouldn't be accessed after that. Therefore, in that case continue to hold the table until streaming is completed. Fixes SCYLLADB-1533
This commit is contained in:
@@ -742,10 +742,12 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
|
||||
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
|
||||
stream_files_response resp;
|
||||
auto& table = db.find_column_family(req.table);
|
||||
auto table_stream_op = table.stream_in_progress();
|
||||
auto files = std::list<stream_blob_info>();
|
||||
auto reader = co_await db.obtain_reader_permit(table, "tablet_file_streaming", db::no_timeout, {});
|
||||
bool is_logstor_table = table.uses_logstor();
|
||||
|
||||
if (table.uses_logstor()) {
|
||||
if (is_logstor_table) {
|
||||
auto segments = co_await table.take_logstor_snapshot(req.range);
|
||||
for (auto& seg : segments) {
|
||||
auto& info = files.emplace_back();
|
||||
@@ -807,6 +809,9 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
|
||||
// that sstable's content has been fully streamed.
|
||||
sstables.clear();
|
||||
|
||||
// Release the table - we don't need to access it anymore and the files are held by the snapshot.
|
||||
table_stream_op = {};
|
||||
|
||||
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
|
||||
req.ops_id, sstable_nr, files.size(), files, req.range);
|
||||
}
|
||||
@@ -820,7 +825,7 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
|
||||
resp.stream_bytes = stream_bytes;
|
||||
auto duration = std::chrono::steady_clock::now() - ops_start_time;
|
||||
blogger.info("stream_{}[{}] Finished sending files_nr={} range={} stream_bytes={} stream_time={} stream_bw={}",
|
||||
table.uses_logstor() ? "logstor_segments" : "sstables",
|
||||
is_logstor_table ? "logstor_segments" : "sstables",
|
||||
req.ops_id, files_nr, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time));
|
||||
co_return resp;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user