blob: 6de3304e984f8d3b324958019ecabee92101fbee [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// A test for the chunker package.
package chunker_test
import "bytes"
import "crypto/md5"
import "fmt"
import "io"
import "math/rand"
import "testing"
import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunker"
// A RandReader contains a pointer to a rand.Read, and a size limit. Its
// pointers implement the Read() method from io.Reader, which yields bytes
// obtained from the random number generator.
type RandReader struct {
rand *rand.Rand // Source of random bytes.
pos int // Number of bytes read.
limit int // Max number of bytes that may be read.
insertInterval int // If non-zero, number of bytes between insertions of zero bytes.
eofErr error // error to be returned at the end of the stream
}
// NewRandReader() returns a new RandReader with the specified seed and size limit.
// It yields eofErr when the end of the stream is reached.
// If insertInterval is non-zero, a zero byte is inserted into the stream every
// insertInterval bytes, before resuming getting bytes from the random number
// generator.
func NewRandReader(seed int64, limit int, insertInterval int, eofErr error) *RandReader {
r := new(RandReader)
r.rand = rand.New(rand.NewSource(seed))
r.limit = limit
r.insertInterval = insertInterval
r.eofErr = eofErr
return r
}
// Read() implements the io.Reader Read() method for *RandReader.
func (r *RandReader) Read(buf []byte) (n int, err error) {
// Generate bytes up to the end of the stream, or the end of the buffer.
max := r.limit - r.pos
if len(buf) < max {
max = len(buf)
}
for ; n != max; n++ {
if r.insertInterval == 0 || (r.pos%r.insertInterval) != 0 {
buf[n] = byte(r.rand.Int31n(256))
} else {
buf[n] = 0
}
r.pos++
}
if r.pos == r.limit {
err = r.eofErr
}
return n, err
}
// TestChunksPartitionStream() tests that the chunker partitions its input
// stream into reasonable sized chunks, which when concatenated form the
// original stream.
func TestChunksPartitionStream(t *testing.T) {
var err error
totalLength := 1024 * 1024
// Compute the md5 of an arbiotrary stream. We will later compare this
// with the md5 of the concanenation of chunks from an equivalent
// stream.
r := NewRandReader(1, totalLength, 0, io.EOF)
hStream := md5.New()
buf := make([]byte, 8192)
for err == nil {
var n int
n, err = r.Read(buf)
hStream.Write(buf[0:n])
}
checksumStream := hStream.Sum(nil)
// Using an equivalent stream, break it into chunks.
r = NewRandReader(1, totalLength, 0, io.EOF)
param := &chunker.DefaultParam
hChunked := md5.New()
length := 0
s := chunker.NewStream(param, r)
for s.Advance() {
chunk := s.Value()
length += len(chunk)
// The last chunk is permitted to be short, hence the second
// conjunct in the following predicate.
if int64(len(chunk)) < param.MinChunk && length != totalLength {
t.Errorf("chunker_test: chunk length %d below minimum %d", len(chunk), param.MinChunk)
}
if int64(len(chunk)) > param.MaxChunk {
t.Errorf("chunker_test: chunk length %d above maximum %d", len(chunk), param.MaxChunk)
}
hChunked.Write(chunk)
}
if s.Err() != nil {
t.Errorf("chunker_test: got error from chunker: %v\n", err)
}
if length != totalLength {
t.Errorf("chunker_test: chunk lengths summed to %d, expected %d", length, totalLength)
}
checksumChunked := hChunked.Sum(nil)
if bytes.Compare(checksumStream, checksumChunked) != 0 {
t.Errorf("chunker_test: md5 of stream is %v, but md5 of chunks is %v", checksumStream, checksumChunked)
}
}
// TestPosStream() tests that a PosStream leads to the same chunks as an Stream.
func TestPosStream(t *testing.T) {
totalLength := 1024 * 1024
s := chunker.NewStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
ps := chunker.NewPosStream(&chunker.DefaultParam, NewRandReader(1, totalLength, 0, io.EOF))
itReady := s.Advance()
pitReady := ps.Advance()
it_pos := 0
chunk_count := 0
for itReady && pitReady {
it_pos += len(s.Value())
if int64(it_pos) != ps.Value() {
t.Fatalf("chunker_test: Stream and PosStream positions diverged at chunk %d: %d vs %d", chunk_count, it_pos, ps.Value())
}
chunk_count++
itReady = s.Advance()
pitReady = ps.Advance()
}
if itReady {
t.Error("chunker_test: Stream ended before PosStream")
}
if pitReady {
t.Error("chunker_test: PosStream ended before Stream")
}
if s.Err() != nil {
t.Errorf("chunker_test: Stream got unexpected error: %v", s.Err())
}
if ps.Err() != nil {
t.Errorf("chunker_test: PosStream got unexpected error: %v", ps.Err())
}
}
// chunkSums() returns a vector of md5 checksums for the chunks of the
// specified Reader, using the default chunking parameters.
func chunkSums(r io.Reader) (sums [][md5.Size]byte) {
s := chunker.NewStream(&chunker.DefaultParam, r)
for s.Advance() {
sums = append(sums, md5.Sum(s.Value()))
}
return sums
}
// TestInsertions() tests the how chunk sequences differ when bytes are
// periodically inserted into a stream.
func TestInsertions(t *testing.T) {
totalLength := 1024 * 1024
insertionInterval := 20 * 1024
bytesInserted := totalLength / insertionInterval
// Get the md5 sums of the chunks of two similar streams, where the
// second has an extra bytes every 20k bytes.
sums0 := chunkSums(NewRandReader(1, totalLength, 0, io.EOF))
sums1 := chunkSums(NewRandReader(1, totalLength, insertionInterval, io.EOF))
// Iterate over chunks of second stream, counting which are in common
// with first stream. We expect to find common chunks within 10 of the
// last chunk in common, since insertions are single bytes, widely
// separated.
same := 0 // Number of chunks in sums1 that are the same as chunks in sums0.
i0 := 0 // Where to search for a match in sums0.
for i1 := 0; i1 != len(sums1); i1++ {
// Be prepared to search up to the next 10 elements of sums0 from the most recent match.
limit := len(sums0) - i0
if limit > 10 {
limit = 10
}
var d int
for d = 0; d != limit && bytes.Compare(sums0[i0+d][:], sums1[i1][:]) != 0; d++ {
}
if d != limit { // found
same++
i0 += d // Advance i0 to the most recent match.
}
}
// The number of chunks that aren't the same as one in the original stream should be at least as large
// as the number of bytes inserted, and not too many more.
different := len(sums1) - same
if different < bytesInserted {
t.Errorf("chunker_test: saw %d different chunks, but expected at least %d", different, bytesInserted)
}
if bytesInserted+(bytesInserted/2) < different {
t.Errorf("chunker_test: saw %d different chunks, but expected at most %d", different, bytesInserted+(bytesInserted/2))
}
// Require that most chunks are the same, by a substantial margin.
if same < 5*different {
t.Errorf("chunker_test: saw %d different chunks, and %d same, but expected at least a factor of 5 more same than different", different, same)
}
}
// TestError() tests the behaviour of a chunker when given an error by its
// reader.
func TestError(t *testing.T) {
notEOF := fmt.Errorf("not EOF")
totalLength := 50 * 1024
r := NewRandReader(1, totalLength, 0, notEOF)
s := chunker.NewStream(&chunker.DefaultParam, r)
length := 0
for s.Advance() {
chunk := s.Value()
length += len(chunk)
}
if s.Err() != notEOF {
t.Errorf("chunker_test: chunk stream ended with error %v, expected %v", s.Err(), notEOF)
}
if length != totalLength {
t.Errorf("chunker_test: chunk lengths summed to %d, expected %d", length, totalLength)
}
}