Compare commits

..

1 Commits

Author SHA1 Message Date
Zlatin Balevsky
7718dc0821 force UTF8 for json serialization 2019-06-16 12:13:00 +01:00
18 changed files with 101 additions and 212 deletions

View File

@@ -34,7 +34,7 @@ class Cli {
Core core
try {
core = new Core(props, home, "0.2.4")
core = new Core(props, home, "0.2.1")
} catch (Exception bad) {
bad.printStackTrace(System.out)
println "Failed to initialize core, exiting"

View File

@@ -53,7 +53,7 @@ class CliDownloader {
Core core
try {
core = new Core(props, home, "0.2.4")
core = new Core(props, home, "0.2.1")
} catch (Exception bad) {
bad.printStackTrace(System.out)
println "Failed to initialize core, exiting"

View File

@@ -268,7 +268,7 @@ public class Core {
}
}
Core core = new Core(props, home, "0.2.4")
Core core = new Core(props, home, "0.2.1")
core.startServices()
// ... at the end, sleep or execute script

View File

@@ -17,8 +17,6 @@ import com.muwire.core.upload.UploadManager
import com.muwire.core.search.InvalidSearchResultException
import com.muwire.core.search.ResultsParser
import com.muwire.core.search.SearchManager
import com.muwire.core.search.UIResultBatchEvent
import com.muwire.core.search.UIResultEvent
import com.muwire.core.search.UnexpectedResultsException
import groovy.json.JsonOutput
@@ -227,15 +225,13 @@ class ConnectionAcceptor {
if (sender.destination != e.getDestination())
throw new IOException("Sender destination mismatch expected $e.getDestination(), got $sender.destination")
int nResults = dis.readUnsignedShort()
UIResultEvent[] results = new UIResultEvent[nResults]
for (int i = 0; i < nResults; i++) {
int jsonSize = dis.readUnsignedShort()
byte [] payload = new byte[jsonSize]
dis.readFully(payload)
def json = slurper.parse(payload)
results[i] = ResultsParser.parse(sender, resultsUUID, json)
eventBus.publish(ResultsParser.parse(sender, resultsUUID, json))
}
eventBus.publish(new UIResultBatchEvent(uuid: resultsUUID, results: results))
} catch (IOException | UnexpectedResultsException | InvalidSearchResultException bad) {
log.log(Level.WARNING, "failed to process POST", bad)
} finally {

View File

@@ -2,6 +2,7 @@ package com.muwire.core.connection
import java.io.InputStream
import java.io.OutputStream
import java.nio.charset.StandardCharsets
import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings
@@ -66,7 +67,7 @@ class PeerConnection extends Connection {
protected void write(Object message) {
byte[] payload
if (message instanceof Map) {
payload = JsonOutput.toJson(message).bytes
payload = JsonOutput.toJson(message).getBytes(StandardCharsets.UTF_8)
DataUtil.packHeader(payload.length, writeHeader)
log.fine "$name writing message type ${message.type} length $payload.length"
writeHeader[0] &= (byte)0x7F

View File

@@ -16,7 +16,6 @@ import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.security.MessageDigest
import java.security.NoSuchAlgorithmException
import java.util.logging.Level
@Log
class DownloadSession {
@@ -24,7 +23,7 @@ class DownloadSession {
private static int SAMPLES = 10
private final String meB64
private final Pieces pieces
private final Pieces downloaded, claimed
private final InfoHash infoHash
private final Endpoint endpoint
private final File file
@@ -37,10 +36,11 @@ class DownloadSession {
private ByteBuffer mapped
DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file,
DownloadSession(String meB64, Pieces downloaded, Pieces claimed, InfoHash infoHash, Endpoint endpoint, File file,
int pieceSize, long fileLength) {
this.meB64 = meB64
this.pieces = pieces
this.downloaded = downloaded
this.claimed = claimed
this.endpoint = endpoint
this.infoHash = infoHash
this.file = file
@@ -63,11 +63,20 @@ class DownloadSession {
OutputStream os = endpoint.getOutputStream()
InputStream is = endpoint.getInputStream()
int piece = pieces.claim()
if (piece == -1)
return false
boolean unclaim = true
int piece
while(true) {
piece = downloaded.getRandomPiece()
if (claimed.isMarked(piece)) {
if (downloaded.donePieces() + claimed.donePieces() == downloaded.nPieces) {
log.info("all pieces claimed")
return false
}
continue
}
break
}
claimed.markDownloaded(piece)
log.info("will download piece $piece")
long start = piece * pieceSize
@@ -76,6 +85,7 @@ class DownloadSession {
String root = Base64.encode(infoHash.getRoot())
FileChannel channel
try {
os.write("GET $root\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII))
@@ -125,46 +135,41 @@ class DownloadSession {
}
// start the download
FileChannel channel
try {
channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.SPARSE, StandardOpenOption.CREATE)) // TODO: double-check, maybe CREATE_NEW
mapped = channel.map(FileChannel.MapMode.READ_WRITE, start, end - start + 1)
byte[] tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {
if (mapped.remaining() < tmp.length)
tmp = new byte[mapped.remaining()]
int read = is.read(tmp)
if (read == -1)
throw new IOException()
synchronized(this) {
mapped.put(tmp, 0, read)
if (timestamps.size() == SAMPLES) {
timestamps.removeFirst()
reads.removeFirst()
}
timestamps.addLast(System.currentTimeMillis())
reads.addLast(read)
channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.SPARSE, StandardOpenOption.CREATE)) // TODO: double-check, maybe CREATE_NEW
mapped = channel.map(FileChannel.MapMode.READ_WRITE, start, end - start + 1)
byte[] tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {
if (mapped.remaining() < tmp.length)
tmp = new byte[mapped.remaining()]
int read = is.read(tmp)
if (read == -1)
throw new IOException()
synchronized(this) {
mapped.put(tmp, 0, read)
if (timestamps.size() == SAMPLES) {
timestamps.removeFirst()
reads.removeFirst()
}
timestamps.addLast(System.currentTimeMillis())
reads.addLast(read)
}
mapped.clear()
digest.update(mapped)
byte [] hash = digest.digest()
byte [] expected = new byte[32]
System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32)
if (hash != expected)
throw new BadHashException()
} finally {
try { channel?.close() } catch (IOException ignore) {}
}
pieces.markDownloaded(piece)
unclaim = false
mapped.clear()
digest.update(mapped)
byte [] hash = digest.digest()
byte [] expected = new byte[32]
System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32)
if (hash != expected)
throw new BadHashException()
downloaded.markDownloaded(piece)
} finally {
if (unclaim)
pieces.unclaim(piece)
claimed.clear(piece)
try { channel?.close() } catch (IOException ignore) {}
}
return true
}

View File

@@ -4,13 +4,9 @@ import com.muwire.core.InfoHash
import com.muwire.core.Persona
import com.muwire.core.connection.Endpoint
import java.nio.file.AtomicMoveNotSupportedException
import java.nio.file.Files
import java.nio.file.StandardCopyOption
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Level
import com.muwire.core.Constants
@@ -38,7 +34,7 @@ public class Downloader {
private final DownloadManager downloadManager
private final Persona me
private final File file
private final Pieces pieces
private final Pieces downloaded, claimed
private final long length
private InfoHash infoHash
private final int pieceSize
@@ -46,14 +42,12 @@ public class Downloader {
private final Set<Destination> destinations
private final int nPieces
private final File piecesFile
private final File incompleteFile
final int pieceSizePow2
private final Map<Destination, DownloadWorker> activeWorkers = new ConcurrentHashMap<>()
private volatile boolean cancelled
private final AtomicBoolean eventFired = new AtomicBoolean()
private boolean piecesFileClosed
private volatile boolean eventFired
public Downloader(EventBus eventBus, DownloadManager downloadManager,
Persona me, File file, long length, InfoHash infoHash,
@@ -68,7 +62,6 @@ public class Downloader {
this.connector = connector
this.destinations = destinations
this.piecesFile = new File(incompletes, file.getName()+".pieces")
this.incompleteFile = new File(incompletes, file.getName()+".part")
this.pieceSizePow2 = pieceSizePow2
this.pieceSize = 1 << pieceSizePow2
@@ -79,7 +72,8 @@ public class Downloader {
nPieces = length / pieceSize + 1
this.nPieces = nPieces
pieces = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO)
downloaded = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO)
claimed = new Pieces(nPieces)
}
public synchronized InfoHash getInfoHash() {
@@ -106,24 +100,20 @@ public class Downloader {
return
piecesFile.eachLine {
int piece = Integer.parseInt(it)
pieces.markDownloaded(piece)
downloaded.markDownloaded(piece)
}
}
void writePieces() {
synchronized(piecesFile) {
if (piecesFileClosed)
return
piecesFile.withPrintWriter { writer ->
pieces.getDownloaded().each { piece ->
writer.println(piece)
}
piecesFile.withPrintWriter { writer ->
downloaded.getDownloaded().each { piece ->
writer.println(piece)
}
}
}
public long donePieces() {
pieces.donePieces()
downloaded.donePieces()
}
@@ -146,7 +136,7 @@ public class Downloader {
allFinished &= it.currentState == WorkerState.FINISHED
}
if (allFinished) {
if (pieces.isComplete())
if (downloaded.isComplete())
return DownloadState.FINISHED
return DownloadState.FAILED
}
@@ -180,11 +170,8 @@ public class Downloader {
public void cancel() {
cancelled = true
stop()
synchronized(piecesFile) {
piecesFileClosed = true
piecesFile.delete()
}
incompleteFile.delete()
file.delete()
piecesFile.delete()
}
void stop() {
@@ -244,8 +231,8 @@ public class Downloader {
}
currentState = WorkerState.DOWNLOADING
boolean requestPerformed
while(!pieces.isComplete()) {
currentSession = new DownloadSession(me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length)
while(!downloaded.isComplete()) {
currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, getInfoHash(), endpoint, file, pieceSize, length)
requestPerformed = currentSession.request()
if (!requestPerformed)
break
@@ -255,17 +242,9 @@ public class Downloader {
log.log(Level.WARNING,"Exception while downloading",bad)
} finally {
currentState = WorkerState.FINISHED
if (pieces.isComplete() && eventFired.compareAndSet(false, true)) {
synchronized(piecesFile) {
piecesFileClosed = true
piecesFile.delete()
}
try {
Files.move(incompleteFile.toPath(), file.toPath(), StandardCopyOption.ATOMIC_MOVE)
} catch (AtomicMoveNotSupportedException e) {
Files.copy(incompleteFile.toPath(), file.toPath(), StandardCopyOption.REPLACE_EXISTING)
incompleteFile.delete()
}
if (downloaded.isComplete() && !eventFired) {
piecesFile.delete()
eventFired = true
eventBus.publish(
new FileDownloadedEvent(
downloadedFile : new DownloadedFile(file, getInfoHash(), pieceSizePow2, Collections.emptySet()),

View File

@@ -1,7 +1,7 @@
package com.muwire.core.download
class Pieces {
private final BitSet done, claimed
private final BitSet bitSet
private final int nPieces
private final float ratio
private final Random random = new Random()
@@ -13,53 +13,52 @@ class Pieces {
Pieces(int nPieces, float ratio) {
this.nPieces = nPieces
this.ratio = ratio
done = new BitSet(nPieces)
claimed = new BitSet(nPieces)
bitSet = new BitSet(nPieces)
}
synchronized int claim() {
int claimedCardinality = claimed.cardinality()
if (claimedCardinality == nPieces)
synchronized int getRandomPiece() {
int cardinality = bitSet.cardinality()
if (cardinality == nPieces)
return -1
// if fuller than ratio just do sequential
if ( (1.0f * claimedCardinality) / nPieces > ratio) {
int rv = claimed.nextClearBit(0)
claimed.set(rv)
return rv
if ( (1.0f * cardinality) / nPieces > ratio) {
return bitSet.nextClearBit(0)
}
while(true) {
int start = random.nextInt(nPieces)
if (claimed.get(start))
if (bitSet.get(start))
continue
claimed.set(start)
return start
}
}
synchronized def getDownloaded() {
def getDownloaded() {
def rv = []
for (int i = done.nextSetBit(0); i >= 0; i = done.nextSetBit(i+1)) {
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i+1)) {
rv << i
}
rv
}
synchronized void markDownloaded(int piece) {
done.set(piece)
claimed.set(piece)
bitSet.set(piece)
}
synchronized void unclaim(int piece) {
claimed.clear(piece)
synchronized void clear(int piece) {
bitSet.clear(piece)
}
synchronized boolean isComplete() {
done.cardinality() == nPieces
bitSet.cardinality() == nPieces
}
synchronized boolean isMarked(int piece) {
bitSet.get(piece)
}
synchronized int donePieces() {
done.cardinality()
bitSet.cardinality()
}
}

View File

@@ -1,8 +0,0 @@
package com.muwire.core.search
import com.muwire.core.Event
class UIResultBatchEvent extends Event {
UUID uuid
UIResultEvent[] results
}

View File

@@ -51,23 +51,4 @@ class ContentUploader extends Uploader {
}
}
@Override
public String getName() {
return file.getName();
}
@Override
public synchronized int getProgress() {
if (mapped == null)
return 0
int position = mapped.position()
int total = request.getRange().end - request.getRange().start
(int)(position * 100.0 / total)
}
@Override
public String getDownloader() {
request.downloader.getHumanReadableName()
}
}

View File

@@ -6,8 +6,6 @@ import java.nio.charset.StandardCharsets
import com.muwire.core.InfoHash
import com.muwire.core.connection.Endpoint
import net.i2p.data.Base64
class HashListUploader extends Uploader {
private final InfoHash infoHash
private final HashListRequest request
@@ -16,7 +14,6 @@ class HashListUploader extends Uploader {
super(endpoint)
this.infoHash = infoHash
mapped = ByteBuffer.wrap(infoHash.getHashList())
this.request = request
}
void respond() {
@@ -35,21 +32,4 @@ class HashListUploader extends Uploader {
}
endpoint.getOutputStream().flush()
}
@Override
public String getName() {
return "Hash list for " + Base64.encode(infoHash.getRoot());
}
@Override
public synchronized int getProgress() {
(int)(mapped.position() * 100.0 / mapped.capacity())
}
@Override
public String getDownloader() {
request.downloader.getHumanReadableName()
}
}

View File

@@ -23,13 +23,4 @@ abstract class Uploader {
return -1
mapped.position()
}
abstract String getName();
/**
* @return an integer between 0 and 100
*/
abstract int getProgress();
abstract String getDownloader();
}

View File

@@ -25,17 +25,4 @@ public class SharedFile {
public int getPieceSize() {
return pieceSize;
}
@Override
public int hashCode() {
return file.hashCode() ^ infoHash.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof SharedFile))
return false;
SharedFile other = (SharedFile)o;
return file.equals(other.file) && infoHash.equals(other.infoHash);
}
}

View File

@@ -1,5 +1,5 @@
group = com.muwire
version = 0.2.4
version = 0.2.1
groovyVersion = 2.4.15
slf4jVersion = 1.7.25
spockVersion = 1.1-groovy-2.4

View File

@@ -62,7 +62,7 @@ class MainFrameController {
def searchEvent
if (hashSearch) {
searchEvent = new SearchEvent(searchHash : root, uuid : uuid, oobInfohash: true)
searchEvent = new SearchEvent(searchHash : root, uuid : uuid)
} else {
// this can be improved a lot
def replaced = search.toLowerCase().trim().replaceAll(Constants.SPLIT_PATTERN, " ")

View File

@@ -20,7 +20,6 @@ import com.muwire.core.files.FileHashedEvent
import com.muwire.core.files.FileLoadedEvent
import com.muwire.core.files.FileSharedEvent
import com.muwire.core.search.QueryEvent
import com.muwire.core.search.UIResultBatchEvent
import com.muwire.core.search.UIResultEvent
import com.muwire.core.trust.TrustEvent
import com.muwire.core.trust.TrustService
@@ -116,7 +115,6 @@ class MainFrameModel {
core = e.getNewValue()
me = core.me.getHumanReadableName()
core.eventBus.register(UIResultEvent.class, this)
core.eventBus.register(UIResultBatchEvent.class, this)
core.eventBus.register(DownloadStartedEvent.class, this)
core.eventBus.register(ConnectionEvent.class, this)
core.eventBus.register(DisconnectionEvent.class, this)
@@ -163,11 +161,6 @@ class MainFrameModel {
resultsGroup?.model.handleResult(e)
}
void onUIResultBatchEvent(UIResultBatchEvent e) {
MVCGroup resultsGroup = results.get(e.uuid)
resultsGroup?.model.handleResultBatch(e.results)
}
void onDownloadStartedEvent(DownloadStartedEvent e) {
runInsideUIAsync {
downloads << e

View File

@@ -52,22 +52,4 @@ class SearchTabModel {
table.model.fireTableDataChanged()
}
}
void handleResultBatch(UIResultEvent[] batch) {
runInsideUIAsync {
batch.each {
if (uiSettings.excludeLocalResult && it.sender == core.me)
return
def bucket = hashBucket.get(it.infohash)
if (bucket == null) {
bucket = []
hashBucket[it.infohash] = bucket
}
bucket << it
results << it
}
JTable table = builder.getVariable("results-table")
table.model.fireTableDataChanged()
}
}
}

View File

@@ -120,7 +120,7 @@ class MainFrameView {
closureColumn(header: "Progress", preferredWidth: 20, type: String, read: { row ->
int pieces = row.downloader.nPieces
int done = row.downloader.donePieces()
"$done/$pieces pieces".toString()
"$done/$pieces pieces"
})
closureColumn(header: "Sources", preferredWidth : 10, type: Integer, read : {row -> row.downloader.activeWorkers()})
closureColumn(header: "Speed", preferredWidth: 50, type:String, read :{row ->
@@ -160,13 +160,16 @@ class MainFrameView {
scrollPane (constraints : BorderLayout.CENTER) {
table(id : "uploads-table") {
tableModel(list : model.uploads) {
closureColumn(header : "Name", type : String, read : {row -> row.getName() })
closureColumn(header : "Name", type : String, read : {row -> row.file.getName() })
closureColumn(header : "Progress", type : String, read : { row ->
int percent = row.getProgress()
int position = row.getPosition()
def range = row.request.getRange()
int total = range.end - range.start
int percent = (int)((position * 100.0) / total)
"$percent%"
})
closureColumn(header : "Downloader", type : String, read : { row ->
row.getDownloader()
row.request.downloader?.getHumanReadableName()
})
}
}