v.io/syncbase/x/ref/services/syncbase/localblobstore: Integrate chunk handling into main blob store code.
This change integrates the existing code to break byte streams into chunks
and to maintain map to/from chunks and blob locations into the main blob store interface.
It also fixes a few annoyances with the blob store interface:
- Iter is now called Stream
- Streams now have Cancel calls
- Various calls now take a *context.T
chunker/chunker.go
- add Cancel() method to the streams so the Streams have standard
Vanadium stream signatures.
- add a context.T argument to the stream creations
(I should have had one before. It's now needed for verror.New()
when generating the "stream cancelled" error.)
chunker/chunker_test.go
- move the NewRandReader code to localblobstore_testlib/randreader.go
so that it may be used by localblobstore_testlib/localblobstore_testlib.go.
chunkmap/chunkmap.go
- rename the Blob field of chunkmap.Location to BlobID to reduce
confusion with blob names.
(Blob names are strings seen by clients, while blob IDs are more
compact byte vectors for internal use.)
- rename the chunk parameter to ChunkMap.LookupChunk to chunkHash,
to make it clear that it's the hash of a chunk, rather than the chunk
contents. (Both are byte vectors, so it's unclear from the type.)
- rename BlobStream to ChunkStream, to distinguish it from the new
BlobStream (see below). Now a BlobStream generates a sequence of
blobs in a ChunkMap, while a ChunkStream generates a sequence of
Chunks in a Blob..
- add new BlobStream, which allows a client to generate the list of
blobs in the ChunkMap. This is used primarily by the blobstore
garbage collector, to elimate blobs that have been deleted from the
blob store, but left in the ChunkMap.
chunkmap/chunkmap_test.go
- add test code for the new BlobStream
- rename the old BlobStream uses to ChunkStream.
- fix a bug in verifyChunksInBlob() which was testing that loc.Blob equalled itself.
It should not have been checking this at all, since (after renaming) loc.BlobID
and loc2.BlobID can differe legitimately. The comment now explains this.
fs_cablobstore/fs_cablobstore.go
- add chunk support fo the blob store (see model.go for the public API changes)
A ChunkMap is added to the store in a new "chunk" subdirectory.
Chunks are written to the ChunkMap when a blob is written
by forking a thread that reads the blob as it is being written;
this reuses the BlobReader code for reading the data.
Some synchronization in the blob descriptor keeps the BlobReader from trying
to read parts of the blob that have not yet been written.
See the comments on the fields of blobDesc for the way the mutex is used.
- The call NewBlobWriter() has an extra argument to allow the caller
to specify the name of the blob. This is used when a blob is transferred from
one store to another---the name of the blob is unchanged on the other side.
For new blobs, created on the current device, an empty name can be specified
and the system will picka random one.
--------------
- add comments to distinguish fragments (the on-disc unit of storage)
from chunks (the on-network unit of transfer)
- add a "chunk" subdirectory to the blob store, which keeps the chunkmap.
- add code to maintain the ChunkMap as blobs are written and deleted.
- fix format of comments on FsCaBlobStore fields.
- fileNameToHash() as the inverse of hashToFileName(),
and make both general by allowing them to use an arbitrary directory prefix.
These are used to convert between ID and names.
(The reason for using names where possible is that they are human
readble. The reason for using IDs for chunks in the ChunkStream is
that when transferring a large blob, the amount of data transmitted
over the wire is smaller if raw bytes are used.)
filenameToHash() now checks that the input string has the specified directory prefix,
which subsumed some of the checking that was done in (say) DeleteBlob().
- NewBlobWriter() and ResumeBlobWriter() now fork the thread that
writes the chunks to the ChunkMap.
- a fix to ResumeBlobWriter() which would crash if given the name of a non-existent blob
- the new internal calls forkWriteChunkMap(), joinWriteChunkMap()
create and then wait for a thread that runs writeChunkMap() to write
teh chunks into the ChunkMap. Each insertion is performed by insertChunk().
- BlobWriter.Close() and BlobWriter().CloseWithoutFinalize() now tell the
chunk-writer thread that writing has finished, and call joinWriteChunkMap().
- AppendBlob() notifies the chunk-writing thread when there is more
data to look at.
- BlobReader has a new field waitForWriter that tells the reader
whether it is synchronizaed with a concurrent BlobWriter. This is
set by the blob-writing thread.
- waitUntilAvailable() encapculates the condition that allows
the reader of the chunk-writer thread to continue.
- ReadAt() has some reorganization to avoid holding the lock over I/O operations.
The main things is that a copy of fragment[i] is put in fragmenti;
acesses to fragmenti don't need the lock.
- Seek() needs to use the lock, and waitUntilAvailable() if the offset
is relative to the end of the file.
- The FsCasIter gets a context.T fields.
- Add Cancel() to FsCasIter.
- Add BlobChunkStream() to allow the client to iterate over the chunks
in a blob. Unless the client passes a bad parameter, this is
implemented by returning a localblobstore.ChunkStream
- GC() now garbage collects the CHunkMap also.
It removes from the ChunkMap any blob present there, but not present in the blob store.
fs_cablobstore/fs_cablobstore_test.go
- TestWritingViaChunks() mainly calls localblobstore_testlib.WriteViaChunks()
which tests that blobs can be written by incremental file transfer.
localblobstore_test.go
- TestWritingViaChunks() mainly calls localblobstore_testlib.WriteViaChunks()
which tests that blobs can be written by incremental file transfer.
localblobstore_testlib/localblobstore_testlib.go
- accommodate extra argument to NewBlobWriter()
- add a test that creates two blob stores, stores a blob in one,
stores a somewhat similar blob (sharing many chunks) in the other,
and then simulates transfering the second blob to the first store.
The moments when data is expected to be sent over the wire are called out in the code.
In the test, I use Fatalf() instead of Errorf(), because a failure anywere
leads to many later failures.
The test currently assumes that no blobs involved in the transfer are
deleted in the stores.
localblobstore_testlib/randreader.go
- This is copied from chunker/chunker_test.go so that it may be used by
localblobstore_testlib.go
localblobstore_transfer_test.go
- A full example of transferring a blob incrementally from a sender to
a receiver.
model.go
- Comments at the top to explain expected use.
- NewBlobWriter() has an extra argument to allow a blob to be written
with a specified name. This is expected to happen when a blob is
transferred between devices, to preserve the original name.
- Expose
BlockChunkStream(), which allows the a sending device to list
the chunks in a blob.
RecipeStreamFromBlobStream() which allows a receiveing device
to determine which chunks it needs, and which is has locally.
LookupChunk() which allows a sending device to find where
within its store a chunk appears, so that it may be read and sent
to a receiving device.
The chunks are recipes are treated as streams rather than arrays in
case a large blob must be transferred---one so large that it would be
inconvenient to hold all the chunk hashes or recipe steps in memory
at one time.
- The Iter type is now called Stream, and now has a Cancel() method.
- Expose the ChunkStream interface, which is a implemented by chunkmap,
and passed through by the blob store.
- Expose the RecipeStream interface which allows a client to
find the RecipeStep values needed to reconstruct a blob from
local and remote data.
Change-Id: I79f9ceefd0c03989949ffc580d0f1277bb30190c
11 files changed
tree: d2840973bd198ca5d06b7adef50b718f3b7ee7ee
- services/