v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap: Introduce chunkmap

The CL introduces chunkmap, which keeps a non-volatile mapping from chunk
content hashes to locations within blobs, and from blobs to lists of chunks
within them.

It will ultimately be integrated with fs_cablobstore, and use chunk data from
chunker.

chunkmap is not intended to be called directly by the client.  Rather, the
client will call on the localblobstore interface, and fs_cablobstore will call
chunkmap on the client's behalf.

Expected uses:

- When a blob is added:
        blobstore will use chunker to find the content hashes of the chunks,
        and record them with chunkmap.AssociateChunkWithLocation().

- When a blob is deleted:
        blobstore will call chunkmap.DeleteBlob()

- When the blopbstore is garbage collected:
        blobstore will enumerate all blobs in the chunkmap,
        and delete any that no longer appear in the store.

- When a device X wishes to transfer blob B to device Y.
        X will get the list of chunk hashes for B using
        chunkmap.NewBlobStream(), using its Advance()/Value() calls, and send
        them to device B.

        Y will callchunkmap.LookupChunk() on each chunk.  For those that can be
        found locally, it remembers the blob, offset and size data.  For those
        that cannot, it tells X it needs those chunks (perhaps by giving the
        index).

        X will get each chunk that Y needs, either by remembering the location
        information from its earlier scan, or by retrieving it with
        chunkmap.LookupChunk().  Y transfers those chunks to X.

        Y writes the blob using the chunks it already has, plus those just
        received from X.
        chunkmap.AssociateChunkWithLocation() is used to add
        the associatation between the chunks of the new blob and their
        locations within the blob.

Change-Id: I59e57c2db14a86c75500b984a0477ce5f2cd80e2
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
new file mode 100644
index 0000000..830a1c9
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap.go
@@ -0,0 +1,355 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package chunkmap implements a map from chunk checksums to chunk locations
+// and vice versa, using a store.Store (currently, one implemented with
+// leveldb).
+package chunkmap
+
+import "encoding/binary"
+
+import "v.io/syncbase/x/ref/services/syncbase/store"
+import "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
+import "v.io/v23/context"
+import "v.io/v23/verror"
+
+const pkgPath = "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"
+
+var (
+	errBadBlobIDLen        = verror.Register(pkgPath+".errBadBlobIDLen", verror.NoRetry, "{1:}{2:} chunkmap {3}: bad blob length {4} should be {5}{:_}")
+	errBadChunkHashLen     = verror.Register(pkgPath+".errBadChunkHashLen", verror.NoRetry, "{1:}{2:} chunkmap {3}: bad chunk hash length {4} should be {5}{:_}")
+	errNoSuchBlob          = verror.Register(pkgPath+".errNoSuchBlob", verror.NoRetry, "{1:}{2:} chunkmap {3}: no such blob{:_}")
+	errMalformedChunkEntry = verror.Register(pkgPath+".errMalformedChunkEntry", verror.NoRetry, "{1:}{2:} chunkmap {3}: malfored chunk entry{:_}")
+	errNoSuchChunk         = verror.Register(pkgPath+".errNoSuchChunk", verror.NoRetry, "{1:}{2:} chunkmap {3}: no such chunk{:_}")
+	errMalformedBlobEntry  = verror.Register(pkgPath+".errMalformedBlobEntry", verror.NoRetry, "{1:}{2:} chunkmap {3}: malfored blob entry{:_}")
+)
+
+// There are two tables: chunk-to-location, and blob-to-chunk.
+// Each chunk is represented by one entry in each table.
+// On deletion, the latter is used to find the former, so the latter is added
+// first, and deleted last.
+//
+// chunk-to-location:
+//    Key:    1-byte containing chunkPrefix, 16-byte chunk hash, 16-byte blob ID
+//    Value:  Varint offset, Varint length.
+// The chunk with the specified 16-byte hash had the specified length, and is
+// (or was) found at the specified offset in the blob.
+//
+// blob-to-chunk:
+//    Key:    1-byte containing blobPrefix, 16-byte blob ID, 8-byte bigendian offset
+//    Value:  16-byte chunk hash, Varint length.
+//
+// The varint encoded fields are written/read with
+// encoding/binary.{Put,Read}Varint.  The blob-to-chunk keys encode the offset
+// as raw big-endian (encoding/binary.{Put,}Uint64) so that it will sort in
+// increasing offset order.
+
+const chunkHashLen = 16 // length of chunk hash
+const blobIDLen = 16    // length of blob ID
+const offsetLen = 8     // length of offset in blob-to-chunk key
+
+const maxKeyLen = 64 // conservative maximum key length
+const maxValLen = 64 // conservative maximum value length
+
+var chunkPrefix []byte = []byte{0} // key prefix for chunk-to-location
+var blobPrefix []byte = []byte{1}  // key prefix for blob-to-chunk
+
+// offsetLimit is an offset that's greater than, and one byte longer than, any
+// real offset.
+var offsetLimit []byte = []byte{
+	0xff, 0xff, 0xff, 0xff,
+	0xff, 0xff, 0xff, 0xff,
+	0xff,
+}
+
+// blobLimit is a blobID that's greater than, and one byte longer than, any
+// real blob ID
+var blobLimit []byte = []byte{
+	0xff, 0xff, 0xff, 0xff,
+	0xff, 0xff, 0xff, 0xff,
+	0xff, 0xff, 0xff, 0xff,
+	0xff, 0xff, 0xff, 0xff,
+	0xff,
+}
+
+// A Location describes chunk's location within a blob.
+type Location struct {
+	Blob   []byte // ID of blob
+	Offset int64  // byte offset of chunk within blob
+	Size   int64  // size of chunk
+}
+
+// A ChunkMap maps chunk checksums to Locations, and vice versa.
+type ChunkMap struct {
+	dir string      // the directory where the store is held
+	st  store.Store // private store that holds the mapping.
+}
+
+// New() returns a pointer to a ChunkMap, backed by storage in directory dir.
+func New(ctx *context.T, dir string) (cm *ChunkMap, err error) {
+	cm = new(ChunkMap)
+	cm.dir = dir
+	cm.st, err = leveldb.Open(dir)
+	return cm, err
+}
+
+// Close() closes any files or other resources associated with *cm.
+// No other methods on cm may be called after Close().
+func (cm *ChunkMap) Close() error {
+	return cm.st.Close()
+}
+
+// AssociateChunkWithLocation() remembers that the specified chunk hash is
+// associated with the specified Location.
+func (cm *ChunkMap) AssociateChunkWithLocation(ctx *context.T, chunk []byte, loc Location) (err error) {
+	// Check of expected lengths explicitly in routines that modify the database.
+	if len(loc.Blob) != blobIDLen {
+		err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(loc.Blob), blobIDLen)
+	} else if len(chunk) != chunkHashLen {
+		err = verror.New(errBadChunkHashLen, ctx, cm.dir, len(chunk), chunkHashLen)
+	} else {
+		var key [maxKeyLen]byte
+		var val [maxValLen]byte
+
+		// Put the blob-to-chunk entry first, since it's used
+		// to garbage collect the other.
+		keyLen := copy(key[:], blobPrefix)
+		keyLen += copy(key[keyLen:], loc.Blob)
+		binary.BigEndian.PutUint64(key[keyLen:], uint64(loc.Offset))
+		keyLen += offsetLen
+
+		valLen := copy(val[:], chunk)
+		valLen += binary.PutVarint(val[valLen:], loc.Size)
+		err = cm.st.Put(key[:keyLen], val[:valLen])
+
+		if err == nil {
+			keyLen = copy(key[:], chunkPrefix)
+			keyLen += copy(key[keyLen:], chunk)
+			keyLen += copy(key[keyLen:], loc.Blob)
+
+			valLen = binary.PutVarint(val[:], loc.Offset)
+			valLen += binary.PutVarint(val[valLen:], loc.Size)
+
+			err = cm.st.Put(key[:keyLen], val[:valLen])
+		}
+	}
+
+	return err
+}
+
+// DeleteBlob() deletes any of the chunk associations previously added with
+// AssociateChunkWithLocation(..., chunk, ...).
+func (cm *ChunkMap) DeleteBlob(ctx *context.T, blob []byte) (err error) {
+	// Check of expected lengths explicitly in routines that modify the database.
+	if len(blob) != blobIDLen {
+		err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(blob), blobIDLen)
+	} else {
+		var start [maxKeyLen]byte
+		var limit [maxKeyLen]byte
+
+		startLen := copy(start[:], blobPrefix)
+		startLen += copy(start[startLen:], blob)
+
+		limitLen := copy(limit[:], start[:startLen])
+		limitLen += copy(limit[limitLen:], offsetLimit)
+
+		var keyBuf [maxKeyLen]byte    // buffer for keys returned by stream
+		var valBuf [maxValLen]byte    // buffer for values returned by stream
+		var deleteKey [maxKeyLen]byte // buffer to construct chunk-to-location keys to delete
+
+		deletePrefixLen := copy(deleteKey[:], chunkPrefix)
+
+		seenAValue := false
+
+		s := cm.st.Scan(start[:startLen], limit[:limitLen])
+		for s.Advance() && err == nil {
+			seenAValue = true
+
+			key := s.Key(keyBuf[:])
+			value := s.Value(valBuf[:])
+
+			if len(value) >= chunkHashLen {
+				deleteKeyLen := deletePrefixLen
+				deleteKeyLen += copy(deleteKey[deleteKeyLen:], value[:chunkHashLen])
+				deleteKeyLen += copy(deleteKey[deleteKeyLen:], blob)
+				err = cm.st.Delete(deleteKey[:deleteKeyLen])
+			}
+
+			if err == nil {
+				// Delete the blob-to-chunk entry last, as it's
+				// used to find the chunk-to-location entry.
+				err = cm.st.Delete(key)
+			}
+		}
+
+		if err != nil {
+			s.Cancel()
+		} else {
+			err = s.Err()
+			if err == nil && !seenAValue {
+				err = verror.New(errNoSuchBlob, ctx, cm.dir, blob)
+			}
+		}
+	}
+
+	return err
+}
+
+// LookupChunk() returns a Location for the specified chunk.  Only one Location
+// is returned, even if several are available in the database.  If the client
+// finds that the Location is not available, perhaps because its blob has
+// been deleted, the client should remove the blob from the ChunkMap using
+// DeleteBlob(loc.Blob), and try again.  (The client may also wish to
+// arrange at some point to call GC() on the blob store.)
+func (cm *ChunkMap) LookupChunk(ctx *context.T, chunk []byte) (loc Location, err error) {
+	var start [maxKeyLen]byte
+	var limit [maxKeyLen]byte
+
+	startLen := copy(start[:], chunkPrefix)
+	startLen += copy(start[startLen:], chunk)
+
+	limitLen := copy(limit[:], start[:startLen])
+	limitLen += copy(limit[limitLen:], blobLimit)
+
+	var keyBuf [maxKeyLen]byte // buffer for keys returned by stream
+	var valBuf [maxValLen]byte // buffer for values returned by stream
+
+	s := cm.st.Scan(start[:startLen], limit[:limitLen])
+	if s.Advance() {
+		var n int
+		key := s.Key(keyBuf[:])
+		value := s.Value(valBuf[:])
+		loc.Blob = key[len(chunkPrefix)+chunkHashLen:]
+		loc.Offset, n = binary.Varint(value)
+		if n > 0 {
+			loc.Size, n = binary.Varint(value[n:])
+		}
+		if n <= 0 {
+			err = verror.New(errMalformedChunkEntry, ctx, cm.dir, chunk, key, value)
+		}
+		s.Cancel()
+	} else {
+		if err == nil {
+			err = s.Err()
+		}
+		if err == nil {
+			err = verror.New(errNoSuchChunk, ctx, cm.dir, chunk)
+		}
+	}
+
+	return loc, err
+}
+
+// A BlobStream allows the client to iterate over the chunks in a blob:
+//	bs := cm.NewBlobStream(ctx, blob)
+//	for bs.Advance() {
+//		chunkHash := bs.Value()
+//		...process chunkHash...
+//	}
+//	if bs.Err() != nil {
+//		...there was an error...
+//	}
+type BlobStream struct {
+	cm     *ChunkMap
+	ctx    *context.T
+	stream store.Stream
+
+	keyBuf [maxKeyLen]byte // buffer for keys
+	valBuf [maxValLen]byte // buffer for values
+	key    []byte          // key for current element
+	value  []byte          // value of current element
+	loc    Location        // location of current element
+	err    error           // error encountered.
+	more   bool            // whether stream may be consulted again
+}
+
+// NewBlobStream() returns a pointer to a new BlobStream that allows the client
+// to enumerate the chunk hashes in a blob, in order.
+func (cm *ChunkMap) NewBlobStream(ctx *context.T, blob []byte) *BlobStream {
+	var start [maxKeyLen]byte
+	var limit [maxKeyLen]byte
+
+	startLen := copy(start[:], blobPrefix)
+	startLen += copy(start[startLen:], blob)
+
+	limitLen := copy(limit[:], start[:startLen])
+	limitLen += copy(limit[limitLen:], offsetLimit)
+
+	bs := new(BlobStream)
+	bs.cm = cm
+	bs.ctx = ctx
+	bs.stream = cm.st.Scan(start[:startLen], limit[:limitLen])
+	bs.more = true
+
+	return bs
+}
+
+// Advance() stages an element so the client can retrieve the chunk hash with
+// Value(), or its Location with Location().  Advance() returns true iff there
+// is an element to retrieve.  The client must call Advance() before calling
+// Value() or Location() The client must call Cancel if it does not iterate
+// through all elements (i.e. until Advance() returns false).  Advance() may
+// block if an element is not immediately available.
+func (bs *BlobStream) Advance() (ok bool) {
+	if bs.more && bs.err == nil {
+		if !bs.stream.Advance() {
+			bs.err = bs.stream.Err()
+			bs.more = false // no more stream, even if no error
+		} else {
+			bs.key = bs.stream.Key(bs.keyBuf[:])
+			bs.value = bs.stream.Value(bs.valBuf[:])
+			ok = (len(bs.value) >= chunkHashLen) &&
+				(len(bs.key) == len(blobPrefix)+blobIDLen+offsetLen)
+			if ok {
+				var n int
+				bs.loc.Blob = make([]byte, blobIDLen)
+				copy(bs.loc.Blob, bs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
+				bs.loc.Offset = int64(binary.BigEndian.Uint64(bs.key[len(blobPrefix)+blobIDLen:]))
+				bs.loc.Size, n = binary.Varint(bs.value[chunkHashLen:])
+				ok = (n > 0)
+			}
+			if !ok {
+				bs.err = verror.New(errMalformedBlobEntry, bs.ctx, bs.cm.dir, bs.key, bs.value)
+				bs.stream.Cancel()
+			}
+		}
+	}
+	return ok
+}
+
+// Value() returns the content hash of the chunk staged by
+// Advance().  The returned slice may be a sub-slice of buf if buf is large
+// enough to hold the entire value.  Otherwise, a newly allocated slice will be
+// returned.  It is valid to pass a nil buf.  Value() may panic if Advance()
+// returned false or was not called at all.  Value() does not block.
+func (bs *BlobStream) Value(buf []byte) (result []byte) {
+	if len(buf) < chunkHashLen {
+		buf = make([]byte, chunkHashLen)
+	}
+	copy(buf, bs.value[:chunkHashLen])
+	return buf[:chunkHashLen]
+}
+
+// Location() returns the Location associated with the chunk staged by
+// Advance().  Location() may panic if Advance() returned false or was not
+// called at all.  Location() does not block.
+func (bs *BlobStream) Location() Location {
+	return bs.loc
+}
+
+// Err() returns a non-nil error iff the stream encountered any errors.  Err()
+// does not block.
+func (bs *BlobStream) Err() error {
+	return bs.err
+}
+
+// Cancel() notifies the stream provider that it can stop producing elements.
+// The client must call Cancel() if it does not iterate through all elements
+// (i.e. until Advance() returns false).  Cancel() is idempotent and can be
+// called concurrently with a goroutine that is iterating via Advance() and
+// Value().  Cancel() causes Advance() to subsequently return false.
+// Cancel() does not block.
+func (bs *BlobStream) Cancel() {
+	bs.stream.Cancel()
+}
diff --git a/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
new file mode 100644
index 0000000..ab961a7
--- /dev/null
+++ b/x/ref/services/syncbase/localblobstore/chunkmap/chunkmap_test.go
@@ -0,0 +1,227 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// A test for chunkmap.
+package chunkmap_test
+
+import "bytes"
+import "io/ioutil"
+import "math/rand"
+import "os"
+import "testing"
+
+import "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"
+import "v.io/v23/context"
+
+// import "v.io/v23/verror"
+import "v.io/x/ref/test"
+import _ "v.io/x/ref/runtime/factories/generic"
+
+// id() returns a new random 16-byte byte vector.
+func id() []byte {
+	v := make([]byte, 16)
+	for i := 0; i != len(v); i++ {
+		v[i] = byte(rand.Int31n(256))
+	}
+	return v
+}
+
+// verifyNoBlob() tests that blob b[blobi] is not present in *cm.
+// callSite is a callsite identifier, output in all error messages.
+func verifyNoBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, callSite int) {
+	bs := cm.NewBlobStream(ctx, b[blobi])
+	for i := 0; bs.Advance(); i++ {
+		t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: %v",
+			callSite, blobi, i, bs.Value(nil))
+	}
+	if bs.Err() != nil {
+		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Advance: unexpected error %v",
+			callSite, blobi, bs.Err())
+	}
+}
+
+// verifyBlob() tests that blob b[blobi] in *cm contains the expected chunks from c[].
+// Each blob is expected to have 8 chunks, 0...7, except that b[1] has c[8] instead of c[4] for chunk 4.
+// callSite is a callsite identifier, output in all error messages.
+func verifyBlob(t *testing.T, ctx *context.T, cm *chunkmap.ChunkMap, blobi int, b [][]byte, c [][]byte, callSite int) {
+	var err error
+	var i int
+	bs := cm.NewBlobStream(ctx, b[blobi])
+	for i = 0; bs.Advance(); i++ {
+		chunk := bs.Value(nil)
+		chunki := i
+		if blobi == 1 && i == 4 { // In blob 1, c[4] is replaced by c[8]
+			chunki = 8
+		}
+		if bytes.Compare(c[chunki], chunk) != 0 {
+			t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: got %v, expected %v",
+				callSite, blobi, i, chunk, c[chunki])
+		}
+
+		var loc chunkmap.Location
+		loc, err = cm.LookupChunk(ctx, chunk)
+		if err != nil {
+			t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: LookupChunk got unexpected error: %v",
+				callSite, blobi, i, err)
+		} else {
+			if i == 4 {
+				if bytes.Compare(loc.Blob, b[blobi]) != 0 {
+					t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Blob got %v, expected %v",
+						callSite, blobi, i, loc.Blob, b[blobi])
+				}
+			} else {
+				if bytes.Compare(loc.Blob, b[0]) != 0 && bytes.Compare(loc.Blob, b[1]) != 0 {
+					t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Blob got %v, expected %v",
+						callSite, blobi, i, loc.Blob, b[blobi])
+				}
+			}
+			if loc.Offset != int64(i) {
+				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Offset got %d, expected %d",
+					callSite, blobi, i, loc.Offset, i)
+			}
+			if loc.Size != 1 {
+				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: Location.Size got %d, expected 1",
+					callSite, blobi, i, loc.Size)
+			}
+
+			loc2 := bs.Location()
+			if bytes.Compare(loc.Blob, loc.Blob) != 0 || loc.Offset != loc2.Offset || loc.Size != loc2.Size {
+				t.Errorf("chunkmap_test: callsite %d: blob %d: chunk %d: disagreement about location: LookupChunk %v vs BlobStream %v",
+					callSite, blobi, i, loc, loc2)
+			}
+		}
+	}
+	if bs.Err() != nil {
+		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Err() unepxected error %v",
+			callSite, blobi, bs.Err())
+	}
+	if i != 8 {
+		t.Errorf("chunkmap_test: callsite %d: blob %d: BlobStream.Advance unexpectedly saw %d chunks, expected 8",
+			callSite, blobi, i)
+	}
+}
+
+// TestAddRetrieveAndDelete() tests insertion, retrieval, and deletion of blobs
+// from a ChunkMap.  It's all done in one test case, because one cannot retrieve
+// or delete blobs that have not been inserted.
+func TestAddRetrieveAndDelete(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	// Make a temporary directory.
+	var err error
+	var testDirName string
+	testDirName, err = ioutil.TempDir("", "chunkmap_test")
+	if err != nil {
+		t.Fatalf("chunkmap_test: can't make tmp directory: %v", err)
+	}
+	defer os.RemoveAll(testDirName)
+
+	// Create a chunkmap.
+	var cm *chunkmap.ChunkMap
+	cm, err = chunkmap.New(ctx, testDirName)
+	if err != nil {
+		t.Fatalf("chunkmap_test: chunkmap.New failed: %v", err)
+	}
+
+	// Two blobs: b[0] and b[1].
+	b := [][]byte{id(), id()}
+
+	// Nine chunks: c[0 .. 8]
+	c := [][]byte{id(), id(), id(), id(), id(), id(), id(), id(), id()}
+
+	// Verify that there are no chunks in blobs initially.
+	verifyNoBlob(t, ctx, cm, 0, b, 0)
+	verifyNoBlob(t, ctx, cm, 1, b, 1)
+
+	// Verify that all chunks have no locations initially.
+	for chunki := range c {
+		_, err = cm.LookupChunk(ctx, c[chunki])
+		if err == nil {
+			t.Errorf("chunkmap_test: chunk %d: LookupChunk: unexpected lack of error", chunki)
+		}
+	}
+
+	// Put chunks 0..7 into blob 0, and chunks 0..3, 8, 5..7 into blob 1.
+	// Each blob is treated as size 1.
+	for blobi := 0; blobi != 2; blobi++ {
+		for i := 0; i != 8; i++ {
+			chunki := i
+			if blobi == 1 && i == 4 { // In blob 1, c[4] 4 is replaced by c[8]
+				chunki = 8
+			}
+			err = cm.AssociateChunkWithLocation(ctx, c[chunki],
+				chunkmap.Location{Blob: b[blobi], Offset: int64(i), Size: 1})
+			if err != nil {
+				t.Errorf("chunkmap_test: blob %d: AssociateChunkWithLocation: unexpected error: %v",
+					blobi, err)
+			}
+		}
+	}
+
+	// Verify that the blobs contain the chunks specified.
+	verifyBlob(t, ctx, cm, 0, b, c, 2)
+	verifyBlob(t, ctx, cm, 1, b, c, 3)
+
+	// Verify that all chunks now have locations.
+	for chunki := range c {
+		_, err = cm.LookupChunk(ctx, c[chunki])
+		if err != nil {
+			t.Errorf("chunkmap_test: chunk %d: LookupChunk: unexpected error: %v",
+				chunki, err)
+		}
+	}
+
+	// Delete b[0].
+	err = cm.DeleteBlob(ctx, b[0])
+	if err != nil {
+		t.Errorf("chunkmap_test: blob 0: DeleteBlob: unexpected error: %v", err)
+	}
+
+	// Verify that all chunks except chunk 4 (which was in only blob 0)
+	// still have locations.
+	for chunki := range c {
+		_, err = cm.LookupChunk(ctx, c[chunki])
+		if chunki == 4 {
+			if err == nil {
+				t.Errorf("chunkmap_test: chunk %d: LookupChunk: expected lack of error",
+					chunki)
+			}
+		} else {
+			if err != nil {
+				t.Errorf("chunkmap_test: chunk %d: LookupChunk: unexpected error: %v",
+					chunki, err)
+			}
+		}
+	}
+
+	// Verify that blob 0 is gone, but blob 1 remains.
+	verifyNoBlob(t, ctx, cm, 0, b, 4)
+	verifyBlob(t, ctx, cm, 1, b, c, 5)
+
+	// Delete b[1].
+	err = cm.DeleteBlob(ctx, b[1])
+	if err != nil {
+		t.Errorf("chunkmap_test: blob 1: DeleteBlob: unexpected error: %v",
+			err)
+	}
+
+	// Verify that there are no chunks in blobs initially.
+	verifyNoBlob(t, ctx, cm, 0, b, 6)
+	verifyNoBlob(t, ctx, cm, 1, b, 7)
+
+	// Verify that all chunks have no locations once more.
+	for chunki := range c {
+		_, err = cm.LookupChunk(ctx, c[chunki])
+		if err == nil {
+			t.Errorf("chunkmap_test: chunk %d: LookupChunk: unexpected lack of error",
+				chunki)
+		}
+	}
+
+	err = cm.Close()
+	if err != nil {
+		t.Errorf("chunkmap_test: unexpected error closing ChunkMap: %v", err)
+	}
+}