v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker: Introduce data chunker.
Introduce localblobstore/chunker.
Package chunker breaks a stream of bytes into context-defined chunks whose
boundaries are chosen based on content checksums of a window that slides over
the data.
The intent is that when a sequence of bytes is to be transmitted to a recipient
that may have much of the data, the sequnece can be broken down into chunks
based on local context. The checksums of the resulting chunks may then be
transmitted to the recipient, which can then discover which of the chunks it
has, and which it needs.
Change-Id: I0b1fd8e1a8e5cf31348b9e4120ea46fce9303942
diff --git a/services/syncbase/localblobstore/chunker/chunker.go b/services/syncbase/localblobstore/chunker/chunker.go
new file mode 100644
index 0000000..84abf6f
--- /dev/null
+++ b/services/syncbase/localblobstore/chunker/chunker.go
@@ -0,0 +1,239 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package chunker breaks a stream of bytes into context-defined chunks whose
+// boundaries are chosen based on content checksums of a window that slides
+// over the data. An edited sequence with insertions and removals can share
+// many chunks with the original sequence.
+//
+// The intent is that when a sequence of bytes is to be transmitted to a
+// recipient that may have much of the data, the sequence can be broken down
+// into chunks. The checksums of the resulting chunks may then be transmitted
+// to the recipient, which can then discover which of the chunks it has, and
+// which it needs.
+//
+// Example:
+// var s *chunker.Stream = chunker.New(&chunker.DefaultParam, anIOReader)
+// for s.Advance() {
+// var chunk []byte := s.Value()
+// // process chunk
+// }
+// if s.Err() != nil {
+// // anIOReader generated an error.
+// }
+package chunker
+
+// The design is from:
+// "A Framework for Analyzing and Improving Content-Based Chunking Algorithms";
+// Kave Eshghi, Hsiu Khuern Tang; HPL-2005-30(R.1); Sep, 2005;
+// http://www.hpl.hp.com/techreports/2005/HPL-2005-30R1.pdf
+
+import "io"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/crc64window"
+
+// A Param contains the parameters for chunking.
+//
+// Chunks are broken based on a hash of a sliding window of width WindowWidth
+// bytes.
+// Each chunk is at most MaxChunk bytes long, and, unless end-of-file or an
+// error is reached, at least MinChunk bytes long.
+//
+// Subject to those constaints, a chunk boundary introduced at the first point
+// where the hash of the sliding window is 1 mod Primary, or if that doesn't
+// occur before MaxChunk bytes, at the last position where the hash is 1 mod
+// Secondary, or if that does not occur, after MaxChunk bytes.
+// Normally, MinChunk < Primary < MaxChunk.
+// Primary is the expected chunk size.
+// The Secondary divisor exists to make it more likely that a chunk boundary is
+// selected based on the local data when the Primary divisor by chance does not
+// find a match for a long distance. It should be a few times smaller than
+// Primary.
+//
+// Using primes for Primary and Secondary is not essential, but recommended
+// because it guarantees mixing of the checksum bits should their distribution
+// be non-uniform.
+type Param struct {
+ WindowWidth int // the window size to use when looking for chunk boundaries
+ MinChunk int64 // minimum chunk size
+ MaxChunk int64 // maximum chunk size
+ Primary uint64 // primary divisor; the expected chunk size
+ Secondary uint64 // secondary divisor
+}
+
+// DefaultParam contains default chunking parameters.
+var DefaultParam Param = Param{WindowWidth: 48, MinChunk: 512, MaxChunk: 3072, Primary: 601, Secondary: 307}
+
+// A Stream allows a client to iterate over the chunks within an io.Reader byte
+// stream.
+type Stream struct {
+ param Param // chunking parameters
+ window *crc64window.Window // sliding window for computing the hash
+ buf []byte // buffer of data
+ rd io.Reader // source of data
+ err error // error from rd
+ bufferChunks bool // whether to buffer entire chunks
+ // Invariant: bufStart <= chunkStart <= chunkEnd <= bufEnd
+ bufStart int64 // offset in rd of first byte in buf[]
+ bufEnd int64 // offset in rd of next byte after those in buf[]
+ chunkStart int64 // offset in rd of first byte of current chunk
+ chunkEnd int64 // offset in rd of next byte after current chunk
+ windowEnd int64 // offset in rd of next byte to be given to window
+ hash uint64 // hash of sliding window
+}
+
+// newStream() returns a pointer to a new Stream instance, with the
+// parameters in *param. This internal version of NewStream() allows the caller
+// to specify via bufferChunks whether entire chunks should be buffered.
+func newStream(param *Param, rd io.Reader, bufferChunks bool) *Stream {
+ s := new(Stream)
+ s.param = *param
+ s.window = crc64window.New(crc64window.ECMA, s.param.WindowWidth)
+ bufSize := int64(8192)
+ if bufferChunks {
+ // If we must buffer entire chunks, arrange that the buffer
+ // size is considerably larger than the max chunk size to avoid
+ // copying data repeatedly.
+ for bufSize < 4*s.param.MaxChunk {
+ bufSize *= 2
+ }
+ }
+ s.buf = make([]byte, bufSize)
+ s.rd = rd
+ s.bufferChunks = bufferChunks
+ return s
+}
+
+// NewStream() returns a pointer to a new Stream instance, with the
+// parameters in *param.
+func NewStream(param *Param, rd io.Reader) *Stream {
+ return newStream(param, rd, true)
+}
+
+// Advance() stages the next chunk so that it may be retrieved via Value().
+// Returns true iff there is an item to retrieve. Advance() must be called
+// before Value() is called.
+func (s *Stream) Advance() bool {
+ // Remember that s.{bufStart,bufEnd,chunkStart,chunkEnd,windowEnd}
+ // are all relative to the offset in it.rd, not it.buf.
+ // Therefore, these starts and ends can easily be compared
+ // with each other, but we must subtract bufStart when
+ // indexing into buf. (Other schemes were considered, but
+ // nothing seems uniformly better.)
+
+ // If buffering entire chunks, ensure there's enough data in the buffer
+ // for the next chunk.
+ if s.bufferChunks && s.bufEnd < s.chunkEnd+s.param.MaxChunk && s.err == nil {
+ // Next chunk might need more data.
+ if s.bufStart < s.chunkEnd {
+ // Move any remaining buffered data to start of buffer.
+ copy(s.buf, s.buf[s.chunkEnd-s.bufStart:s.bufEnd-s.bufStart])
+ s.bufStart = s.chunkEnd
+ }
+ // Fill buffer with data, unless error/EOF.
+ for s.err == nil && s.bufEnd < s.bufStart+int64(len(s.buf)) {
+ var n int
+ n, s.err = s.rd.Read(s.buf[s.bufEnd-s.bufStart:])
+ s.bufEnd += int64(n)
+ }
+ }
+
+ // Make the next chunk current.
+ s.chunkStart = s.chunkEnd
+ minChunk := s.chunkStart + s.param.MinChunk
+ maxChunk := s.chunkStart + s.param.MaxChunk
+ lastSecondaryBreak := maxChunk
+
+ // While not end of chunk...
+ for s.windowEnd != maxChunk &&
+ (s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
+ (s.windowEnd != s.bufEnd || s.err == nil) {
+
+ // Fill the buffer if empty, and there's more data to read.
+ if s.windowEnd == s.bufEnd && s.err == nil {
+ if s.bufferChunks {
+ panic("chunker.Advance had to fill buffer in bufferChunks mode")
+ }
+ s.bufStart = s.bufEnd
+ var n int
+ n, s.err = s.rd.Read(s.buf)
+ s.bufEnd += int64(n)
+ }
+
+ // bufLimit is the minimum of the maximum possible chunk size and the buffer length.
+ bufLimit := maxChunk
+ if s.bufEnd < bufLimit {
+ bufLimit = s.bufEnd
+ }
+ // Advance window until both MinChunk reached and primary boundary found.
+ for s.windowEnd != bufLimit && (s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) {
+ // Advance the window by one byte.
+ s.hash = s.window.Advance(s.buf[s.windowEnd-s.bufStart])
+ s.windowEnd++
+ if (s.hash % s.param.Secondary) == 1 {
+ lastSecondaryBreak = s.windowEnd
+ }
+ }
+ }
+
+ if s.windowEnd == maxChunk && (s.hash%s.param.Primary) != 1 && lastSecondaryBreak != maxChunk {
+ // The primary break point was not found in the maximum chunk
+ // size, and a secondary break point was found; use it.
+ s.chunkEnd = lastSecondaryBreak
+ } else {
+ s.chunkEnd = s.windowEnd
+ }
+
+ return s.chunkStart != s.chunkEnd // We have a non-empty chunk to return.
+}
+
+// Value() returns the chunk that was staged by Advance(). May panic if
+// Advance() returned false or was not called. Never blocks.
+func (s *Stream) Value() []byte {
+ return s.buf[s.chunkStart-s.bufStart : s.chunkEnd-s.bufStart]
+}
+
+// Err() returns any error encountered by Advance(). Never blocks.
+func (s *Stream) Err() (err error) {
+ if s.err != io.EOF { // Do not consider EOF to be an error.
+ err = s.err
+ }
+ return err
+}
+
+// ----------------------------------
+
+// A PosStream is just like a Stream, except that the Value() method returns only
+// the byte offsets of the ends of chunks, rather than the chunks themselves.
+// It can be used when chunks are too large to buffer a small number
+// comfortably in memory.
+type PosStream struct {
+ s *Stream
+}
+
+// NewPosStream() returns a pointer to a new PosStream instance, with the
+// parameters in *param.
+func NewPosStream(param *Param, rd io.Reader) *PosStream {
+ ps := new(PosStream)
+ ps.s = newStream(param, rd, false)
+ return ps
+}
+
+// Advance() stages the offset of the end of the next chunk so that it may be
+// retrieved via Value(). Returns true iff there is an item to retrieve.
+// Advance() must be called before Value() is called.
+func (ps *PosStream) Advance() bool {
+ return ps.s.Advance()
+}
+
+// Value() returns the chunk that was staged by Advance(). May panic if
+// Advance() returned false or was not called. Never blocks.
+func (ps *PosStream) Value() int64 {
+ return ps.s.chunkEnd
+}
+
+// Err() returns any error encountered by Advance(). Never blocks.
+func (ps *PosStream) Err() error {
+ return ps.s.Err()
+}
diff --git a/services/syncbase/localblobstore/chunker/chunker_test.go b/services/syncbase/localblobstore/chunker/chunker_test.go
new file mode 100644
index 0000000..6de3304
--- /dev/null
+++ b/services/syncbase/localblobstore/chunker/chunker_test.go
@@ -0,0 +1,226 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test for the chunker package.
+package chunker_test
+
+import "bytes"
+import "crypto/md5"
+import "fmt"
+import "io"
+import "math/rand"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
+
+// A RandReader contains a pointer to a rand.Read, and a size limit. Its
+// pointers implement the Read() method from io.Reader, which yields bytes
+// obtained from the random number generator.
+type RandReader struct {
+ rand *rand.Rand // Source of random bytes.
+ pos int // Number of bytes read.
+ limit int // Max number of bytes that may be read.
+ insertInterval int // If non-zero, number of bytes between insertions of zero bytes.
+ eofErr error // error to be returned at the end of the stream
+}
+
+// NewRandReader() returns a new RandReader with the specified seed and size limit.
+// It yields eofErr when the end of the stream is reached.
+// If insertInterval is non-zero, a zero byte is inserted into the stream every
+// insertInterval bytes, before resuming getting bytes from the random number
+// generator.
+func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader {
+ r := new(RandReader)
+ r.rand = rand.New(rand.NewSource(seed))
+ r.limit = limit
+ r.insertInterval = insertInterval
+ r.eofErr = eofErr
+ return r
+}
+
+// Read() implements the io.Reader Read() method for *RandReader.
+func (r *RandReader) Read(buf []byte) (n int, err error) {
+ // Generate bytes up to the end of the stream, or the end of the buffer.
+ max := r.limit - r.pos
+ if len(buf) < max {
+ max = len(buf)
+ }
+ for ; n != max; n++ {
+ if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 {
+ buf[n] = byte(r.rand.Int31n(256))
+ } else {
+ buf[n] = 0
+ }
+ r.pos++
+ }
+ if r.pos == r.limit {
+ err = r.eofErr
+ }
+ return n, err
+}
+
+// TestChunksPartitionStream() tests that the chunker partitions its input
+// stream into reasonable sized chunks, which when concatenated form the
+// original stream.
+func TestChunksPartitionStream(t *testing.T) {
+ var err error
+ totalLength := 1024 * 1024
+
+ // Compute the md5 of an arbiotrary stream. We will later compare this
+ // with the md5 of the concanenation of chunks from an equivalent
+ // stream.
+ r := NewRandReader(1, totalLength, 0, io.EOF)
+ hStream := md5.New()
+ buf := make([]byte, 8192)
+ for err == nil {
+ var n int
+ n, err = r.Read(buf)
+ hStream.Write(buf[0:n])
+ }
+ checksumStream := hStream.Sum(nil)
+
+ // Using an equivalent stream, break it into chunks.
+ r = NewRandReader(1, totalLength, 0, io.EOF)
+ param := &chunker.DefaultParam
+ hChunked := md5.New()
+
+ length := 0
+ s := chunker.NewStream(param, r)
+ for s.Advance() {
+ chunk := s.Value()
+ length += len(chunk)
+ // The last chunk is permitted to be short, hence the second
+ // conjunct in the following predicate.
+ if int64(len(chunk)) < param.MinChunk && length != totalLength {
+ t.Errorf("chunker_test: chunk length %d below minimum %d", len(chunk), param.MinChunk)
+ }
+ if int64(len(chunk)) > param.MaxChunk {
+ t.Errorf("chunker_test: chunk length %d above maximum %d", len(chunk), param.MaxChunk)
+ }
+ hChunked.Write(chunk)
+ }
+ if s.Err() != nil {
+ t.Errorf("chunker_test: got error from chunker: %v\n", err)
+ }
+
+ if length != totalLength {
+ t.Errorf("chunker_test: chunk lengths summed to %d, expected %d", length, totalLength)
+ }
+
+ checksumChunked := hChunked.Sum(nil)
+ if bytes.Compare(checksumStream, checksumChunked) != 0 {
+ t.Errorf("chunker_test: md5 of stream is %v, but md5 of chunks is %v", checksumStream, checksumChunked)
+ }
+}
+
+// TestPosStream() tests that a PosStream leads to the same chunks as an Stream.
+func TestPosStream(t *testing.T) {
+ totalLength := 1024 * 1024
+
+ s := chunker.NewStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
+ ps := chunker.NewPosStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
+
+ itReady := s.Advance()
+ pitReady := ps.Advance()
+ it_pos := 0
+ chunk_count := 0
+ for itReady && pitReady {
+ it_pos += len(s.Value())
+ if int64(it_pos) != ps.Value() {
+ t.Fatalf("chunker_test: Stream and PosStream positions diverged at chunk %d: %d vs %d", chunk_count, it_pos, ps.Value())
+ }
+ chunk_count++
+ itReady = s.Advance()
+ pitReady = ps.Advance()
+ }
+ if itReady {
+ t.Error("chunker_test: Stream ended before PosStream")
+ }
+ if pitReady {
+ t.Error("chunker_test: PosStream ended before Stream")
+ }
+ if s.Err() != nil {
+ t.Errorf("chunker_test: Stream got unexpected error: %v", s.Err())
+ }
+ if ps.Err() != nil {
+ t.Errorf("chunker_test: PosStream got unexpected error: %v", ps.Err())
+ }
+}
+
+// chunkSums() returns a vector of md5 checksums for the chunks of the
+// specified Reader, using the default chunking parameters.
+func chunkSums(r io.Reader) (sums [][md5.Size]byte) {
+ s := chunker.NewStream(&chunker.DefaultParam, r)
+ for s.Advance() {
+ sums = append(sums, md5.Sum(s.Value()))
+ }
+ return sums
+}
+
+// TestInsertions() tests the how chunk sequences differ when bytes are
+// periodically inserted into a stream.
+func TestInsertions(t *testing.T) {
+ totalLength := 1024 * 1024
+ insertionInterval := 20 * 1024
+ bytesInserted := totalLength / insertionInterval
+
+ // Get the md5 sums of the chunks of two similar streams, where the
+ // second has an extra bytes every 20k bytes.
+ sums0 := chunkSums(NewRandReader(1, totalLength, 0, io.EOF))
+ sums1 := chunkSums(NewRandReader(1, totalLength, insertionInterval, io.EOF))
+
+ // Iterate over chunks of second stream, counting which are in common
+ // with first stream. We expect to find common chunks within 10 of the
+ // last chunk in common, since insertions are single bytes, widely
+ // separated.
+ same := 0 // Number of chunks in sums1 that are the same as chunks in sums0.
+ i0 := 0 // Where to search for a match in sums0.
+ for i1 := 0; i1 != len(sums1); i1++ {
+ // Be prepared to search up to the next 10 elements of sums0 from the most recent match.
+ limit := len(sums0) - i0
+ if limit > 10 {
+ limit = 10
+ }
+ var d int
+ for d = 0; d != limit && bytes.Compare(sums0[i0+d][:], sums1[i1][:]) != 0; d++ {
+ }
+ if d != limit { // found
+ same++
+ i0 += d // Advance i0 to the most recent match.
+ }
+ }
+ // The number of chunks that aren't the same as one in the original stream should be at least as large
+ // as the number of bytes inserted, and not too many more.
+ different := len(sums1) - same
+ if different < bytesInserted {
+ t.Errorf("chunker_test: saw %d different chunks, but expected at least %d", different, bytesInserted)
+ }
+ if bytesInserted+(bytesInserted/2) < different {
+ t.Errorf("chunker_test: saw %d different chunks, but expected at most %d", different, bytesInserted+(bytesInserted/2))
+ }
+ // Require that most chunks are the same, by a substantial margin.
+ if same < 5*different {
+ t.Errorf("chunker_test: saw %d different chunks, and %d same, but expected at least a factor of 5 more same than different", different, same)
+ }
+}
+
+// TestError() tests the behaviour of a chunker when given an error by its
+// reader.
+func TestError(t *testing.T) {
+ notEOF := fmt.Errorf("not EOF")
+ totalLength := 50 * 1024
+ r := NewRandReader(1, totalLength, 0, notEOF)
+ s := chunker.NewStream(&chunker.DefaultParam, r)
+ length := 0
+ for s.Advance() {
+ chunk := s.Value()
+ length += len(chunk)
+ }
+ if s.Err() != notEOF {
+ t.Errorf("chunker_test: chunk stream ended with error %v, expected %v", s.Err(), notEOF)
+ }
+ if length != totalLength {
+ t.Errorf("chunker_test: chunk lengths summed to %d, expected %d", length, totalLength)
+ }
+}