blob: 2c7a2641d184fe6ce71f187c6f3b5cca2c00faee [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package chunker breaks a stream of bytes into context-defined chunks whose
// boundaries are chosen based on content checksums of a window that slides
// over the data. An edited sequence with insertions and removals can share
// many chunks with the original sequence.
//
// The intent is that when a sequence of bytes is to be transmitted to a
// recipient that may have much of the data, the sequence can be broken down
// into chunks. The checksums of the resulting chunks may then be transmitted
// to the recipient, which can then discover which of the chunks it has, and
// which it needs.
//
// Example:
// var s *chunker.Stream = chunker.New(&chunker.DefaultParam, anIOReader)
// for s.Advance() {
// var chunk []byte := s.Value()
// // process chunk
// }
// if s.Err() != nil {
// // anIOReader generated an error.
// }
package chunker
// The design is from:
// "A Framework for Analyzing and Improving Content-Based Chunking Algorithms";
// Kave Eshghi, Hsiu Khuern Tang; HPL-2005-30(R.1); Sep, 2005;
// http://www.hpl.hp.com/techreports/2005/HPL-2005-30R1.pdf
import "io"
import "sync"
import "v.io/x/ref/services/syncbase/localblobstore/crc64window"
import "v.io/v23/context"
import "v.io/v23/verror"
const pkgPath = "v.io/x/ref/services/syncbase/localblobstore/chunker"
var (
errStreamCancelled = verror.Register(pkgPath+".errStreamCancelled", verror.NoRetry, "{1:}{2:} Advance() called on cancelled stream{:_}")
)
// A Param contains the parameters for chunking.
//
// Chunks are broken based on a hash of a sliding window of width WindowWidth
// bytes.
// Each chunk is at most MaxChunk bytes long, and, unless end-of-file or an
// error is reached, at least MinChunk bytes long.
//
// Subject to those constaints, a chunk boundary introduced at the first point
// where the hash of the sliding window is 1 mod Primary, or if that doesn't
// occur before MaxChunk bytes, at the last position where the hash is 1 mod
// Secondary, or if that does not occur, after MaxChunk bytes.
// Normally, MinChunk < Primary < MaxChunk.
// Primary is the expected chunk size.
// The Secondary divisor exists to make it more likely that a chunk boundary is
// selected based on the local data when the Primary divisor by chance does not
// find a match for a long distance. It should be a few times smaller than
// Primary.
//
// Using primes for Primary and Secondary is not essential, but recommended
// because it guarantees mixing of the checksum bits should their distribution
// be non-uniform.
type Param struct {
WindowWidth int // the window size to use when looking for chunk boundaries
MinChunk int64 // minimum chunk size
MaxChunk int64 // maximum chunk size
Primary uint64 // primary divisor; the expected chunk size
Secondary uint64 // secondary divisor
}
// DefaultParam contains default chunking parameters.
var DefaultParam Param = Param{WindowWidth: 48, MinChunk: 512, MaxChunk: 3072, Primary: 601, Secondary: 307}
// A Stream allows a client to iterate over the chunks within an io.Reader byte
// stream.
type Stream struct {
param Param // chunking parameters
ctx *context.T // context of creator
window *crc64window.Window // sliding window for computing the hash
buf []byte // buffer of data
rd io.Reader // source of data
err error // error from rd
mu sync.Mutex // protects cancelled
cancelled bool // whether the stream has been cancelled
bufferChunks bool // whether to buffer entire chunks
// Invariant: bufStart <= chunkStart <= chunkEnd <= bufEnd
bufStart int64 // offset in rd of first byte in buf[]
bufEnd int64 // offset in rd of next byte after those in buf[]
chunkStart int64 // offset in rd of first byte of current chunk
chunkEnd int64 // offset in rd of next byte after current chunk
windowEnd int64 // offset in rd of next byte to be given to window
hash uint64 // hash of sliding window
}
// newStream() returns a pointer to a new Stream instance, with the
// parameters in *param. This internal version of NewStream() allows the caller
// to specify via bufferChunks whether entire chunks should be buffered.
func newStream(ctx *context.T, param *Param, rd io.Reader, bufferChunks bool) *Stream {
s := new(Stream)
s.param = *param
s.ctx = ctx
s.window = crc64window.New(crc64window.ECMA, s.param.WindowWidth)
bufSize := int64(8192)
if bufferChunks {
// If we must buffer entire chunks, arrange that the buffer
// size is considerably larger than the max chunk size to avoid
// copying data repeatedly.
for bufSize < 4*s.param.MaxChunk {
bufSize *= 2
}
}
s.buf = make([]byte, bufSize)
s.rd = rd
s.bufferChunks = bufferChunks
return s
}
// NewStream() returns a pointer to a new Stream instance, with the
// parameters in *param.
func NewStream(ctx *context.T, param *Param, rd io.Reader) *Stream {
return newStream(ctx, param, rd, true)
}
// isCancelled() returns whether s.Cancel() has been called.
func (s *Stream) isCancelled() (cancelled bool) {
s.mu.Lock()
cancelled = s.cancelled
s.mu.Unlock()
return cancelled
}
// Advance() stages the next chunk so that it may be retrieved via Value().
// Returns true iff there is an item to retrieve. Advance() must be called
// before Value() is called.
func (s *Stream) Advance() bool {
// Remember that s.{bufStart,bufEnd,chunkStart,chunkEnd,windowEnd}
// are all relative to the offset in it.rd, not it.buf.
// Therefore, these starts and ends can easily be compared
// with each other, but we must subtract bufStart when
// indexing into buf. (Other schemes were considered, but
// nothing seems uniformly better.)
// If buffering entire chunks, ensure there's enough data in the buffer
// for the next chunk.
if s.bufferChunks && s.bufEnd < s.chunkEnd+s.param.MaxChunk && s.err == nil {
// Next chunk might need more data.
if s.bufStart < s.chunkEnd {
// Move any remaining buffered data to start of buffer.
copy(s.buf, s.buf[s.chunkEnd-s.bufStart:s.bufEnd-s.bufStart])
s.bufStart = s.chunkEnd
}
// Fill buffer with data, unless error/EOF.
for s.err == nil && s.bufEnd < s.bufStart+int64(len(s.buf)) && !s.isCancelled() {
var n int
n, s.err = s.rd.Read(s.buf[s.bufEnd-s.bufStart:])
s.bufEnd += int64(n)
}
}
// Make the next chunk current.
s.chunkStart = s.chunkEnd
minChunk := s.chunkStart + s.param.MinChunk
maxChunk := s.chunkStart + s.param.MaxChunk
lastSecondaryBreak := maxChunk
// While not end of chunk...
for s.windowEnd != maxChunk &&
(s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
(s.windowEnd != s.bufEnd || s.err == nil) && !s.isCancelled() {
// Fill the buffer if empty, and there's more data to read.
if s.windowEnd == s.bufEnd && s.err == nil {
if s.bufferChunks {
panic("chunker.Advance had to fill buffer in bufferChunks mode")
}
s.bufStart = s.bufEnd
var n int
n, s.err = s.rd.Read(s.buf)
s.bufEnd += int64(n)
}
// bufLimit is the minimum of the maximum possible chunk size and the buffer length.
bufLimit := maxChunk
if s.bufEnd < bufLimit {
bufLimit = s.bufEnd
}
// Advance window until both MinChunk reached and primary boundary found.
for s.windowEnd != bufLimit &&
(s.windowEnd < minChunk || (s.hash%s.param.Primary) != 1) &&
!s.isCancelled() {
// Advance the window by one byte.
s.hash = s.window.Advance(s.buf[s.windowEnd-s.bufStart])
s.windowEnd++
if (s.hash % s.param.Secondary) == 1 {
lastSecondaryBreak = s.windowEnd
}
}
}
if s.windowEnd == maxChunk && (s.hash%s.param.Primary) != 1 && lastSecondaryBreak != maxChunk {
// The primary break point was not found in the maximum chunk
// size, and a secondary break point was found; use it.
s.chunkEnd = lastSecondaryBreak
} else {
s.chunkEnd = s.windowEnd
}
return !s.isCancelled() && s.chunkStart != s.chunkEnd // We have a non-empty chunk to return.
}
// Value() returns the chunk that was staged by Advance(). May panic if
// Advance() returned false or was not called. Never blocks.
func (s *Stream) Value() []byte {
return s.buf[s.chunkStart-s.bufStart : s.chunkEnd-s.bufStart]
}
// Err() returns any error encountered by Advance(). Never blocks.
func (s *Stream) Err() (err error) {
s.mu.Lock()
if s.cancelled && (s.err == nil || s.err == io.EOF) {
s.err = verror.New(errStreamCancelled, s.ctx)
}
s.mu.Unlock()
if s.err != io.EOF { // Do not consider EOF to be an error.
err = s.err
}
return err
}
// Cancel() causes the next call to Advance() to return false.
// It should be used when the client does not wish to iterate to the end of the stream.
// Never blocks. May be called concurrently with other method calls on s.
func (s *Stream) Cancel() {
s.mu.Lock()
s.cancelled = true
s.mu.Unlock()
}
// ----------------------------------
// A PosStream is just like a Stream, except that the Value() method returns only
// the byte offsets of the ends of chunks, rather than the chunks themselves.
// It can be used when chunks are too large to buffer a small number
// comfortably in memory.
type PosStream struct {
s *Stream
}
// NewPosStream() returns a pointer to a new PosStream instance, with the
// parameters in *param.
func NewPosStream(ctx *context.T, param *Param, rd io.Reader) *PosStream {
ps := new(PosStream)
ps.s = newStream(ctx, param, rd, false)
return ps
}
// Advance() stages the offset of the end of the next chunk so that it may be
// retrieved via Value(). Returns true iff there is an item to retrieve.
// Advance() must be called before Value() is called.
func (ps *PosStream) Advance() bool {
return ps.s.Advance()
}
// Value() returns the chunk that was staged by Advance(). May panic if
// Advance() returned false or was not called. Never blocks.
func (ps *PosStream) Value() int64 {
return ps.s.chunkEnd
}
// Err() returns any error encountered by Advance(). Never blocks.
func (ps *PosStream) Err() error {
return ps.s.Err()
}
// Cancel() causes the next call to Advance() to return false.
// It should be used when the client does not wish to iterate to the end of the stream.
// Never blocks. May be called concurrently with other method calls on ps.
func (ps *PosStream) Cancel() {
ps.s.Cancel()
}