| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // A test for the chunker package. |
| package chunker_test |
| |
| import "bytes" |
| import "crypto/md5" |
| import "fmt" |
| import "io" |
| import "math/rand" |
| import "testing" |
| |
| import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker" |
| |
| // A RandReader contains a pointer to a rand.Read, and a size limit. Its |
| // pointers implement the Read() method from io.Reader, which yields bytes |
| // obtained from the random number generator. |
| type RandReader struct { |
| rand *rand.Rand // Source of random bytes. |
| pos int // Number of bytes read. |
| limit int // Max number of bytes that may be read. |
| insertInterval int // If non-zero, number of bytes between insertions of zero bytes. |
| eofErr error // error to be returned at the end of the stream |
| } |
| |
| // NewRandReader() returns a new RandReader with the specified seed and size limit. |
| // It yields eofErr when the end of the stream is reached. |
| // If insertInterval is non-zero, a zero byte is inserted into the stream every |
| // insertInterval bytes, before resuming getting bytes from the random number |
| // generator. |
| func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader { |
| r := new(RandReader) |
| r.rand = rand.New(rand.NewSource(seed)) |
| r.limit = limit |
| r.insertInterval = insertInterval |
| r.eofErr = eofErr |
| return r |
| } |
| |
| // Read() implements the io.Reader Read() method for *RandReader. |
| func (r *RandReader) Read(buf []byte) (n int, err error) { |
| // Generate bytes up to the end of the stream, or the end of the buffer. |
| max := r.limit - r.pos |
| if len(buf) < max { |
| max = len(buf) |
| } |
| for ; n != max; n++ { |
| if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 { |
| buf[n] = byte(r.rand.Int31n(256)) |
| } else { |
| buf[n] = 0 |
| } |
| r.pos++ |
| } |
| if r.pos == r.limit { |
| err = r.eofErr |
| } |
| return n, err |
| } |
| |
| // TestChunksPartitionStream() tests that the chunker partitions its input |
| // stream into reasonable sized chunks, which when concatenated form the |
| // original stream. |
| func TestChunksPartitionStream(t *testing.T) { |
| var err error |
| totalLength := 1024 * 1024 |
| |
| // Compute the md5 of an arbiotrary stream. We will later compare this |
| // with the md5 of the concanenation of chunks from an equivalent |
| // stream. |
| r := NewRandReader(1, totalLength, 0, io.EOF) |
| hStream := md5.New() |
| buf := make([]byte, 8192) |
| for err == nil { |
| var n int |
| n, err = r.Read(buf) |
| hStream.Write(buf[0:n]) |
| } |
| checksumStream := hStream.Sum(nil) |
| |
| // Using an equivalent stream, break it into chunks. |
| r = NewRandReader(1, totalLength, 0, io.EOF) |
| param := &chunker.DefaultParam |
| hChunked := md5.New() |
| |
| length := 0 |
| s := chunker.NewStream(param, r) |
| for s.Advance() { |
| chunk := s.Value() |
| length += len(chunk) |
| // The last chunk is permitted to be short, hence the second |
| // conjunct in the following predicate. |
| if int64(len(chunk)) < param.MinChunk && length != totalLength { |
| t.Errorf("chunker_test: chunk length %d below minimum %d", len(chunk), param.MinChunk) |
| } |
| if int64(len(chunk)) > param.MaxChunk { |
| t.Errorf("chunker_test: chunk length %d above maximum %d", len(chunk), param.MaxChunk) |
| } |
| hChunked.Write(chunk) |
| } |
| if s.Err() != nil { |
| t.Errorf("chunker_test: got error from chunker: %v\n", err) |
| } |
| |
| if length != totalLength { |
| t.Errorf("chunker_test: chunk lengths summed to %d, expected %d", length, totalLength) |
| } |
| |
| checksumChunked := hChunked.Sum(nil) |
| if bytes.Compare(checksumStream, checksumChunked) != 0 { |
| t.Errorf("chunker_test: md5 of stream is %v, but md5 of chunks is %v", checksumStream, checksumChunked) |
| } |
| } |
| |
| // TestPosStream() tests that a PosStream leads to the same chunks as an Stream. |
| func TestPosStream(t *testing.T) { |
| totalLength := 1024 * 1024 |
| |
| s := chunker.NewStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF)) |
| ps := chunker.NewPosStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF)) |
| |
| itReady := s.Advance() |
| pitReady := ps.Advance() |
| it_pos := 0 |
| chunk_count := 0 |
| for itReady && pitReady { |
| it_pos += len(s.Value()) |
| if int64(it_pos) != ps.Value() { |
| t.Fatalf("chunker_test: Stream and PosStream positions diverged at chunk %d: %d vs %d", chunk_count, it_pos, ps.Value()) |
| } |
| chunk_count++ |
| itReady = s.Advance() |
| pitReady = ps.Advance() |
| } |
| if itReady { |
| t.Error("chunker_test: Stream ended before PosStream") |
| } |
| if pitReady { |
| t.Error("chunker_test: PosStream ended before Stream") |
| } |
| if s.Err() != nil { |
| t.Errorf("chunker_test: Stream got unexpected error: %v", s.Err()) |
| } |
| if ps.Err() != nil { |
| t.Errorf("chunker_test: PosStream got unexpected error: %v", ps.Err()) |
| } |
| } |
| |
| // chunkSums() returns a vector of md5 checksums for the chunks of the |
| // specified Reader, using the default chunking parameters. |
| func chunkSums(r io.Reader) (sums [][md5.Size]byte) { |
| s := chunker.NewStream(&chunker.DefaultParam, r) |
| for s.Advance() { |
| sums = append(sums, md5.Sum(s.Value())) |
| } |
| return sums |
| } |
| |
| // TestInsertions() tests the how chunk sequences differ when bytes are |
| // periodically inserted into a stream. |
| func TestInsertions(t *testing.T) { |
| totalLength := 1024 * 1024 |
| insertionInterval := 20 * 1024 |
| bytesInserted := totalLength / insertionInterval |
| |
| // Get the md5 sums of the chunks of two similar streams, where the |
| // second has an extra bytes every 20k bytes. |
| sums0 := chunkSums(NewRandReader(1, totalLength, 0, io.EOF)) |
| sums1 := chunkSums(NewRandReader(1, totalLength, insertionInterval, io.EOF)) |
| |
| // Iterate over chunks of second stream, counting which are in common |
| // with first stream. We expect to find common chunks within 10 of the |
| // last chunk in common, since insertions are single bytes, widely |
| // separated. |
| same := 0 // Number of chunks in sums1 that are the same as chunks in sums0. |
| i0 := 0 // Where to search for a match in sums0. |
| for i1 := 0; i1 != len(sums1); i1++ { |
| // Be prepared to search up to the next 10 elements of sums0 from the most recent match. |
| limit := len(sums0) - i0 |
| if limit > 10 { |
| limit = 10 |
| } |
| var d int |
| for d = 0; d != limit && bytes.Compare(sums0[i0+d][:], sums1[i1][:]) != 0; d++ { |
| } |
| if d != limit { // found |
| same++ |
| i0 += d // Advance i0 to the most recent match. |
| } |
| } |
| // The number of chunks that aren't the same as one in the original stream should be at least as large |
| // as the number of bytes inserted, and not too many more. |
| different := len(sums1) - same |
| if different < bytesInserted { |
| t.Errorf("chunker_test: saw %d different chunks, but expected at least %d", different, bytesInserted) |
| } |
| if bytesInserted+(bytesInserted/2) < different { |
| t.Errorf("chunker_test: saw %d different chunks, but expected at most %d", different, bytesInserted+(bytesInserted/2)) |
| } |
| // Require that most chunks are the same, by a substantial margin. |
| if same < 5*different { |
| t.Errorf("chunker_test: saw %d different chunks, and %d same, but expected at least a factor of 5 more same than different", different, same) |
| } |
| } |
| |
| // TestError() tests the behaviour of a chunker when given an error by its |
| // reader. |
| func TestError(t *testing.T) { |
| notEOF := fmt.Errorf("not EOF") |
| totalLength := 50 * 1024 |
| r := NewRandReader(1, totalLength, 0, notEOF) |
| s := chunker.NewStream(&chunker.DefaultParam, r) |
| length := 0 |
| for s.Advance() { |
| chunk := s.Value() |
| length += len(chunk) |
| } |
| if s.Err() != notEOF { |
| t.Errorf("chunker_test: chunk stream ended with error %v, expected %v", s.Err(), notEOF) |
| } |
| if length != totalLength { |
| t.Errorf("chunker_test: chunk lengths summed to %d, expected %d", length, totalLength) |
| } |
| } |