streaming: Implement transfer_files

This commit is contained in:
Asias He
2015-06-25 15:53:36 +08:00
parent 89c3cab526
commit a4235ebc13
4 changed files with 81 additions and 72 deletions

View File

@@ -85,15 +85,16 @@ public:
{
return new HashSet<>(peerSessions.keySet());
}
public synchronized StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting)
{
return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting);
#endif
public:
stream_session& get_or_create_next_session(inet_address peer, inet_address connecting) {
return get_or_create_host_data(peer).get_or_create_next_session(peer, connecting);
}
#if 0
public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting)
{
return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting);
return get_or_create_host_data(peer).getOrCreateSessionById(peer, id, connecting);
}
public synchronized void updateProgress(ProgressInfo info)
@@ -103,7 +104,7 @@ public:
public synchronized void addSessionInfo(SessionInfo session)
{
HostStreamingData data = getOrCreateHostData(session.peer);
HostStreamingData data = get_or_create_host_data(session.peer);
data.addSessionInfo(session);
}
@@ -116,28 +117,29 @@ public:
}
return result;
}
public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
HostStreamingData sessionList = getOrCreateHostData(to);
if (connectionsPerHost > 1)
{
#endif
public:
void transfer_files(inet_address to, std::vector<stream_session::ss_table_streaming_sections> sstable_details) {
host_streaming_data& session_list = get_or_create_host_data(to);
if (_connections_per_host > 1) {
abort();
#if 0
List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails);
for (List<StreamSession.SSTableStreamingSections> subList : buckets)
{
StreamSession session = sessionList.getOrCreateNextSession(to, to);
StreamSession session = sessionList.get_or_create_next_session(to, to);
session.addTransferFiles(subList);
}
}
else
{
StreamSession session = sessionList.getOrCreateNextSession(to, to);
session.addTransferFiles(sstableDetails);
#endif
} else {
auto& session = session_list.get_or_create_next_session(to, to);
session.add_transfer_files(sstable_details);
}
}
#if 0
private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
// There's no point in divvying things up into more buckets than we have sstableDetails
@@ -172,18 +174,15 @@ public:
throw new IllegalArgumentException("Unknown peer requested: " + peer);
return data;
}
#endif
private HostStreamingData getOrCreateHostData(InetAddress peer)
{
HostStreamingData data = peerSessions.get(peer);
if (data == null)
{
data = new HostStreamingData();
peerSessions.put(peer, data);
}
return data;
private:
host_streaming_data& get_or_create_host_data(inet_address peer) {
_peer_sessions[peer] = host_streaming_data(_connections_per_host, _keep_ss_table_level);
return _peer_sessions[peer];
}
#if 0
private static class StreamSessionConnector implements Runnable
{
private final StreamSession session;
@@ -203,31 +202,40 @@ public:
private:
class host_streaming_data {
using inet_address = gms::inet_address;
private:
std::map<int, stream_session> _stream_sessions;
std::map<int, session_info> _session_infos;
int last_returned = -1;
public:
bool has_active_sessions();
#if 0
public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting)
{
// create
if (streamSessions.size() < connectionsPerHost)
{
StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel);
streamSessions.put(++lastReturned, session);
return session;
}
// get
else
{
if (lastReturned >= streamSessions.size() - 1)
lastReturned = 0;
int _last_returned = -1;
int _connections_per_host;
bool _keep_ss_table_level;
return streamSessions.get(lastReturned++);
public:
host_streaming_data() = default;
host_streaming_data(int connections_per_host, bool keep_ss_table_level)
: _connections_per_host(connections_per_host)
, _keep_ss_table_level(keep_ss_table_level) {
}
bool has_active_sessions();
stream_session& get_or_create_next_session(inet_address peer, inet_address connecting) {
// create
int size = _stream_sessions.size();
if (size < _connections_per_host) {
auto session = stream_session(peer, connecting, size, _keep_ss_table_level);
_stream_sessions.emplace(++_last_returned, std::move(session));
return _stream_sessions[_last_returned];
// get
} else {
if (_last_returned >= (size - 1)) {
_last_returned = 0;
}
return _stream_sessions[_last_returned++];
}
}
#if 0
public void connectAllStreamSessions()
{

View File

@@ -47,4 +47,9 @@ stream_plan& stream_plan::transfer_ranges(inet_address to, inet_address connecti
return *this;
}
stream_plan& stream_plan::transfer_files(inet_address to, std::vector<stream_session::ss_table_streaming_sections> sstable_details) {
_coordinator.transfer_files(to, std::move(sstable_details));
return *this;
}
}

View File

@@ -127,7 +127,6 @@ public:
*/
stream_plan& transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges, std::initializer_list<sstring> column_families);
#if 0
/**
* Add transfer task to send given SSTable files.
*
@@ -136,12 +135,8 @@ public:
* this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
coordinator.transferFiles(to, sstableDetails);
return this;
}
stream_plan& transfer_files(inet_address to, std::vector<stream_session::ss_table_streaming_sections> sstable_details);
#if 0
public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
{

View File

@@ -126,6 +126,20 @@ private:
distributed<handler> _handlers;
void init_messaging_service_handler();
future<> start();
public:
struct ss_table_streaming_sections {
sstables::sstable& sstable;
std::map<int64_t, int64_t> sections;
int64_t estimated_keys;
int64_t repaired_at;
ss_table_streaming_sections(sstables::sstable& sstable_, std::map<int64_t, int64_t> sections_,
long estimated_keys_, long repaired_at_)
: sstable(sstable_)
, sections(std::move(sections_))
, estimated_keys(estimated_keys_)
, repaired_at(repaired_at_) {
}
};
public:
/**
* Streaming endpoint.
@@ -158,6 +172,7 @@ private:
stream_session_state _state = stream_session_state::INITIALIZED;
bool _complete_sent = false;
public:
stream_session() : conn_handler(*this) { }
/**
* Create new streaming session with the peer.
*
@@ -330,9 +345,10 @@ public:
throw t;
}
}
#endif
public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
void add_transfer_files(std::vector<ss_table_streaming_sections> sstable_details) {
#if 0
Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
while (iter.hasNext())
{
@@ -355,23 +371,8 @@ public:
task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
iter.remove();
}
}
#endif
public:
struct ss_table_streaming_sections {
sstables::sstable& sstable;
std::map<int64_t, int64_t> sections;
int64_t estimated_keys;
int64_t repaired_at;
ss_table_streaming_sections(sstables::sstable& sstable_, std::map<int64_t, int64_t> sections_,
long estimated_keys_, long repaired_at_)
: sstable(sstable_)
, sections(std::move(sections_))
, estimated_keys(estimated_keys_)
, repaired_at(repaired_at_) {
}
};
}
private:
void close_session(stream_session_state final_state);