Skip to content

Commit

Permalink
Prepare resume transfers from directory in Conn, implements PeerInfo …
Browse files Browse the repository at this point in the history
…result, correct closable resources usage in Conn
  • Loading branch information
a-pavlov committed Aug 26, 2016
1 parent f05400e commit 2ef256b
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 38 deletions.
84 changes: 54 additions & 30 deletions src/main/java/org/jed2k/Conn.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.jed2k.alert.SearchResultAlert;
import org.jed2k.alert.ServerMessageAlert;
import org.jed2k.alert.ServerStatusAlert;
import org.jed2k.exception.ErrorCode;
import org.jed2k.exception.JED2KException;
import org.jed2k.protocol.Hash;
import org.jed2k.protocol.NetworkIdentifier;
Expand Down Expand Up @@ -37,6 +38,8 @@ public class Conn {
private static final boolean trial = "true".equals(System.getProperty("session.trial"));
private static final boolean compression = "true".equals(System.getProperty("session.compression"));
private static Set<TransferHandle> handles = new HashSet<>();
private static Path incomingDirectory;
private static Path resumeDataDirectory;

private static void printGlobalSearchResult() {
if (globalSearchRes == null) return;
Expand All @@ -63,38 +66,48 @@ static TransferHandle addTransfer(final Session s, final Hash hash, final long s
}

static void saveTransferParameters(final AddTransferParams params) {
File f = new File(params.filepath.toString() + ".resumedata");
FileChannel channel = null;
File transferFile = new File(params.filepath.toString());
File resumeDataFile = new File(resumeDataDirectory.resolve(transferFile.getName()).toString());

try(FileOutputStream stream = new FileOutputStream(f, false)) {
channel = stream.getChannel();
try(FileOutputStream stream = new FileOutputStream(resumeDataFile, false); FileChannel channel = stream.getChannel();) {
ByteBuffer bb = ByteBuffer.allocate(params.bytesCount());
bb.order(ByteOrder.LITTLE_ENDIAN);
params.put(bb);
channel = stream.getChannel();
while(bb.hasRemaining()) channel.write(bb);
} catch(IOException e) {
System.out.println("I/O exception on load " + e);
} catch(JED2KException e) {
System.out.println("Unable to load search results " + e);
} finally {
try {
if (channel != null) channel.close();
} catch(IOException e) {
log.error("unable to close channel {}", e.toString());
}
}
}

public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, JED2KException {

if (args.length < 1) {
System.out.println("Specify incoming directory");
return;
}

Path incomingDir = FileSystems.getDefault().getPath(args[0]);
System.out.println("Incoming directory set to: " + incomingDir);
incomingDirectory = FileSystems.getDefault().getPath(args[0]);
System.out.println("Incoming directory set to: " + incomingDirectory);
File incomingFile = incomingDirectory.toFile();
boolean dirCreated = incomingFile.mkdirs();

if (!dirCreated) {
throw new JED2KException(ErrorCode.INCOMING_DIR_INACCESSIBLE);
}

resumeDataDirectory = incomingDirectory.resolve(".resumedata");
File resumeFile = resumeDataDirectory.toFile();

dirCreated = resumeFile.mkdirs();

if (!dirCreated) {
throw new JED2KException(ErrorCode.INCOMING_DIR_INACCESSIBLE);
}

assert incomingDirectory != null;
assert resumeDataDirectory != null;

System.out.println("Conn started");
final Settings startSettings = new Settings();
Expand Down Expand Up @@ -269,11 +282,9 @@ else if (parts[0].compareTo("save") == 0) {
if (globalSearchRes != null && !globalSearchRes.files.isEmpty()) {
ByteBuffer bb = ByteBuffer.allocate(globalSearchRes.bytesCount());
bb.order(ByteOrder.LITTLE_ENDIAN);
File f = new File(Paths.get(args[0], "search_results.txt").toString());
FileOutputStream stream = new FileOutputStream(f, false);
FileChannel channel = stream.getChannel();
File f = new File(incomingDirectory.resolve("search_results.txt").toString());

try {
try(FileOutputStream stream = new FileOutputStream(f, false);FileChannel channel = stream.getChannel()) {
globalSearchRes.put(bb);
bb.flip();
channel.write(bb);
Expand All @@ -283,22 +294,15 @@ else if (parts[0].compareTo("save") == 0) {
} catch(JED2KException e) {
System.out.println("Unable to save search result: " + e);
}
finally {
channel.close();
stream.close();
}
} else {
System.out.println("Won't save empty search result");
}
}
else if (parts[0].compareTo("restore") == 0) {
File f = new File(Paths.get(args[0], "search_results.txt").toString());
FileInputStream stream = new FileInputStream(f);
FileChannel channel = stream.getChannel();
try {
File f = new File(incomingDirectory.resolve("search_results.txt").toString());
try(FileInputStream stream = new FileInputStream(f); FileChannel channel = stream.getChannel()) {
ByteBuffer bb = ByteBuffer.allocate((int)f.length());
bb.order(ByteOrder.LITTLE_ENDIAN);
channel = stream.getChannel();
channel.read(bb);
bb.flip();
globalSearchRes = new SearchResult();
Expand All @@ -307,9 +311,6 @@ else if (parts[0].compareTo("restore") == 0) {
System.out.println("I/O exception on load " + e);
} catch(JED2KException e) {
System.out.println("Unable to load search results " + e);
} finally {
channel.close();
stream.close();
}
}
else if (parts[0].compareTo("print") == 0) {
Expand All @@ -319,6 +320,29 @@ else if ((parts[0].compareTo("delete") == 0) && parts.length == 2) {
log.debug("delete transfer {}", parts[1]);
s.removeTransfer(Hash.fromString(parts[1]), true);
}
else if (parts[0].compareTo("resume") == 0) {
File resumeDataFile = resumeDataDirectory.toFile();
File[] files = resumeDataFile.listFiles();
ByteBuffer buff = ByteBuffer.allocate(1024);
buff.order(ByteOrder.LITTLE_ENDIAN);
for (final File f: files) {
try(FileInputStream stream = new FileInputStream(f); FileChannel channel = stream.getChannel()) {
channel.read(buff);
buff.flip();
AddTransferParams atp = new AddTransferParams();
atp.get(buff);
handles.add(s.addTransfer(atp));
}
catch(IOException e) {
log.error("i/o exception on restore transfer {}", e);
}
catch(JED2KException e) {
log.error("transfer creation error {}", e);
} finally {
buff.clear();
}
}
}
}

for(TransferHandle handle: handles) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/jed2k/Peer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public enum SourceFlag {

long lastConnected = 0;
long nextConnection = 0;
long failCount = 0;
int failCount = 0;
boolean connectable = false;
int source = 0;
NetworkIdentifier endpoint;
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/org/jed2k/PeerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1060,14 +1060,21 @@ Future<AsyncOperationResult> asyncHash(int pieceIndex, final Transfer t) {
return session.submitDiskTask(new AsyncHash(t, pieceIndex));
}

/**
*
* @return information about remote peer
*/
public final PeerInfo getInfo() {
PeerInfo i = new PeerInfo();
i.downloadPayload = statistics().totalPayloadDownload();
i.downloadProtocol = statistics().totalProtocolDownload();
i.downloadSpeed = (int)statistics().downloadRate();
i.payloadDownloadSpeed = (int)statistics().downloadPayloadRate();


i.remotePieces = remotePieces;
i.failCount = ((getPeer()!=null)?getPeer().failCount:0);
i.modName = remotePeerInfo.modName;
i.version = remotePeerInfo.version;
i.modVersion = remotePeerInfo.modNumber;
return i;
}
}
6 changes: 4 additions & 2 deletions src/main/java/org/jed2k/PeerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ public class PeerInfo {
int payloadDownloadSpeed;
long downloadPayload;
long downloadProtocol;
NetworkIdentifier endpoint;
BitField remotePieces;
String client;
int failCount;
NetworkIdentifier endpoint;
String modName;
int version;
int modVersion;
}
18 changes: 18 additions & 0 deletions src/main/java/org/jed2k/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,24 @@ public final synchronized TransferHandle addTransfer(Hash h, long size, String f
return new TransferHandle(this, t);
}

/**
* create new transfer in session or return previously created transfer
* using add transfer parameters structure with or without resume data block
* @param atp transfer parameters with or without resume data
* @return transfer handle
* @throws JED2KException
*/
public final synchronized TransferHandle addTransfer(final AddTransferParams atp) throws JED2KException {
Transfer t = transfers.get(atp.hash);

if (t == null) {
t = new Transfer(this, atp);
transfers.put(atp.hash, t);
}

return new TransferHandle(this, t);
}

public final synchronized TransferHandle findTransfer(final Hash h) {
return new TransferHandle(this, transfers.get(h));
}
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/jed2k/Transfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -539,4 +539,13 @@ public TransferStatus getStatus() {
public void asyncRestoreBlock(final PieceBlock b, final ByteBuffer buffer) {
aioFutures.addLast(session.submitDiskTask(new AsyncRestore(this, b, size, buffer)));
}

public final List<PeerInfo> getPeersInfo() {
List<PeerInfo> res = new LinkedList<>();
for(final PeerConnection c: connections) {
res.add(c.getInfo());
}

return res;
}
}
13 changes: 13 additions & 0 deletions src/main/java/org/jed2k/TransferHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.jed2k.protocol.TransferResumeData;

import java.lang.ref.WeakReference;
import java.util.LinkedList;
import java.util.List;

/**
* transfer handle for manipulation of transfer outside of session
Expand Down Expand Up @@ -124,6 +126,17 @@ public TransferStatus getStatus() {
return null;
}

public List<PeerInfo> getPeersInfo() {
Transfer t = transfer.get();
if (t != null) {
synchronized (ses) {
return t.getPeersInfo();
}
}

return new LinkedList<PeerInfo>();
}

@Override
public boolean equals(Object o) {
return (o instanceof TransferHandle && ((TransferHandle)o).getHash().equals(getHash()));
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/jed2k/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public enum ErrorCode implements BaseErrorCode {
LINK_MAILFORMED(32, "Incorrect link format"),
NO_MEMORY(33, "No memory available"),
SESSION_STOPPING(34, "Session stopping"),
FAIL(35, "Fail");
INCOMING_DIR_INACCESSIBLE(35, "Incoming directory is inaccessible"),
FAIL(36, "Fail");

private final int code;
private final String description;
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/jed2k/protocol/BitField.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ private int bitsToBytes(int count) {
return Utils.divCeil(count, 8);
}

public BitField() {
}
public BitField() {}

public BitField(int bits) {
resize(bits);
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/jed2k/test/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void testTransferHandle() throws JED2KException, InterruptedException {
assertEquals(handle, handle2);
assertEquals(1000L, handle2.getSize());
assertEquals("xxx", handle2.getFilepath());
assertTrue(handle2.getPeersInfo().isEmpty());
session.removeTransfer(handle.getHash(), true);
session.interrupt();
session.join();
Expand Down

0 comments on commit 2ef256b

Please sign in to comment.