blob: 215489ff07e0c23de5ce90907f9d738b2563481c [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.v23.syncbase;
import com.google.common.util.concurrent.ListenableFuture;
import io.v.v23.InputChannel;
import io.v.v23.context.VContext;
import io.v.v23.services.syncbase.BlobFetchStatus;
import io.v.v23.services.syncbase.BlobRef;
import io.v.v23.services.syncbase.DatabaseClient;
import io.v.v23.vdl.ClientRecvStream;
import io.v.v23.vdl.VdlUint64;
import io.v.v23.verror.EndOfFileException;
import io.v.v23.verror.VException;
import java.io.IOException;
import java.io.InputStream;
import static io.v.v23.VFutures.sync;
class BlobReaderImpl implements BlobReader {
private final DatabaseClient client;
private final BlobRef ref;
BlobReaderImpl(DatabaseClient client, BlobRef ref) {
this.client = client;
this.ref = ref;
}
@Override
public BlobRef getRef() {
return ref;
}
@Override
public InputStream stream(VContext ctx, long offset) {
return new BlobInputStream(client.getBlob(ctx, ref, offset));
}
@Override
public InputChannel<BlobFetchStatus> prefetch(VContext ctx, long priority) {
return client.fetchBlob(ctx, ref, new VdlUint64(priority));
}
@Override
public ListenableFuture<Long> size(VContext ctx) {
return client.getBlobSize(ctx, ref);
}
@Override
public ListenableFuture<Void> delete(VContext ctx) {
return client.deleteBlob(ctx, ref);
}
@Override
public ListenableFuture<Void> pin(VContext ctx) {
return client.pinBlob(ctx, ref);
}
@Override
public ListenableFuture<Void> unpin(VContext ctx) {
return client.unpinBlob(ctx, ref);
}
@Override
public ListenableFuture<Void> keep(VContext ctx, long rank) {
return client.keepBlob(ctx, ref, new VdlUint64(rank));
}
private static class BlobInputStream extends InputStream {
private final ClientRecvStream<byte[], Void> stream;
private boolean closed = false;
private byte[] lastRecv = null;
private int lastRecvRemaining = 0;
private boolean eof = false;
BlobInputStream(ClientRecvStream<byte[], Void> stream) {
this.stream = stream;
}
@Override
public int available() {
return 0;
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
try {
sync(stream.finish());
closed = true;
} catch (VException e) {
throw new IOException(e);
}
}
@Override
public boolean markSupported() {
return false;
}
@Override
public synchronized int read() throws IOException {
byte[] b = new byte[1];
if (read(b) == -1) {
return -1;
}
return b[0];
}
@Override
public synchronized int read(byte[] b, int offset, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (eof) {
return -1;
}
if (b == null) {
throw new NullPointerException();
}
if (offset < 0 || len < 0 || len > (b.length - offset)) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return 0;
}
int need = len;
while (need > 0) {
if (lastRecvRemaining <= 0) {
try {
lastRecv = sync(stream.recv());
lastRecvRemaining = lastRecv.length;
} catch (EndOfFileException e) {
eof = true;
break;
} catch (VException e) {
throw new IOException(e);
}
}
int copyLen = len > lastRecvRemaining ? lastRecvRemaining : len;
System.arraycopy(lastRecv,
lastRecv.length - lastRecvRemaining, b, offset + len - need, copyLen);
lastRecvRemaining -= copyLen;
need -= copyLen;
}
if (need == len) {
return -1;
}
return len - need;
}
@Override
public synchronized int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
}
}