| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package fs_cablobstore implements a content addressable blob store |
| // on top of a file system. It assumes that either os.Link() or |
| // os.Rename() is available. |
| package fs_cablobstore |
| |
| // Internals: |
| // Blobs are partitioned into two types of unit: "fragments" and "chunks". |
| // A fragment is stored in a single file on disc. A chunk is a unit of network |
| // transmission. |
| // |
| // The blobstore consists of a directory with "blob", "cas", "chunk", and |
| // "tmp" subdirectories. |
| // - "tmp" is used for temporary files that are moved into place via |
| // link()/unlink() or rename(), depending on what's available. |
| // - "cas" contains files whose names are content hashes of the files being |
| // named. A few slashes are thrown into the name near the front so that no |
| // single directory gets too large. These files are called "fragments". |
| // - "blob" contains files whose names are random numbers. These names are |
| // visible externally as "blob names". Again, a few slashes are thrown |
| // into the name near the front so that no single directory gets too large. |
| // Each of these files contains a series of lines of the form: |
| // d <size> <offset> <cas-fragment> |
| // followed optionally by a line of the form: |
| // f <md5-hash> |
| // Each "d" line indicates that the next <size> bytes of the blob appear at |
| // <offset> bytes into <cas-fragment>, which is in the "cas" subtree. The |
| // "f" line indicates that the blob is "finalized" and gives its complete |
| // md5 hash. No fragments may be appended to a finalized blob. |
| // - "chunk" contains a store (currently implemented with leveldb) that |
| // maps chunks of blobs to content hashes and vice versa. |
| |
| import "bufio" |
| import "bytes" |
| import "crypto/md5" |
| import "fmt" |
| import "hash" |
| import "io" |
| import "io/ioutil" |
| import "math" |
| import "math/rand" |
| import "os" |
| import "path/filepath" |
| import "strconv" |
| import "strings" |
| import "sync" |
| import "time" |
| |
| import "v.io/x/ref/services/syncbase/localblobstore" |
| import "v.io/x/ref/services/syncbase/localblobstore/chunker" |
| import "v.io/x/ref/services/syncbase/localblobstore/blobmap" |
| import "v.io/v23/context" |
| import "v.io/v23/verror" |
| |
| const pkgPath = "v.io/x/ref/services/syncbase/localblobstore/fs_cablobstore" |
| |
| var ( |
| errNotADir = verror.Register(pkgPath+".errNotADir", verror.NoRetry, "{1:}{2:} Not a directory{:_}") |
| errAppendFailed = verror.Register(pkgPath+".errAppendFailed", verror.NoRetry, "{1:}{2:} fs_cablobstore.Append failed{:_}") |
| errMalformedField = verror.Register(pkgPath+".errMalformedField", verror.NoRetry, "{1:}{2:} Malformed field in blob specification{:_}") |
| errAlreadyClosed = verror.Register(pkgPath+".errAlreadyClosed", verror.NoRetry, "{1:}{2:} BlobWriter is already closed{:_}") |
| errBlobAlreadyFinalized = verror.Register(pkgPath+".errBlobAlreadyFinalized", verror.NoRetry, "{1:}{2:} Blob is already finalized{:_}") |
| errIllegalPositionForRead = verror.Register(pkgPath+".errIllegalPositionForRead", verror.NoRetry, "{1:}{2:} BlobReader: illegal position {3} on Blob of size {4}{:_}") |
| errBadSeekWhence = verror.Register(pkgPath+".errBadSeekWhence", verror.NoRetry, "{1:}{2:} BlobReader: Bad value for 'whence' in Seek{:_}") |
| errNegativeSeekPosition = verror.Register(pkgPath+".errNegativeSeekPosition", verror.NoRetry, "{1:}{2:} BlobReader: negative position for Seek: offset {3}, whence {4}{:_}") |
| errBadSizeOrOffset = verror.Register(pkgPath+".errBadSizeOrOffset", verror.NoRetry, "{1:}{2:} Bad size ({3}) or offset ({4}) in blob {5} (size {6}){:_}") |
| errMalformedBlobHash = verror.Register(pkgPath+".errMalformedBlobHash", verror.NoRetry, "{1:}{2:} Blob {3} hash malformed hash{:_}") |
| errInvalidBlobName = verror.Register(pkgPath+".errInvalidBlobName", verror.NoRetry, "{1:}{2:} Invalid blob name {3}{:_}") |
| errCantDeleteBlob = verror.Register(pkgPath+".errCantDeleteBlob", verror.NoRetry, "{1:}{2:} Can't delete blob {3}{:_}") |
| errBlobDeleted = verror.Register(pkgPath+".errBlobDeleted", verror.NoRetry, "{1:}{2:} Blob is deleted{:_}") |
| errSizeTooBigForFragment = verror.Register(pkgPath+".errSizeTooBigForFragment", verror.NoRetry, "{1:}{2:} writing blob {1}, size too big for fragment{:1}") |
| errStreamCancelled = verror.Register(pkgPath+".errStreamCancelled", verror.NoRetry, "{1:}{2:} Advance() called on cancelled stream{:_}") |
| ) |
| |
| // For the moment, we disallow others from accessing the tree where blobs are |
| // stored. We could in the future relax this to 0711/0755, and 0644. |
| const dirPermissions = 0700 |
| const filePermissions = 0600 |
| |
| // Subdirectories of the blobstore's tree |
| const ( |
| blobDir = "blob" // Subdirectory where blobs are indexed by blob id. |
| casDir = "cas" // Subdirectory where fragments are indexed by content hash. |
| chunkDir = "chunk" // Subdirectory where chunks are indexed by content hash. |
| tmpDir = "tmp" // Subdirectory where temporary files are created. |
| ) |
| |
| // An FsCaBlobStore represents a simple, content-addressable store. |
| type FsCaBlobStore struct { |
| rootName string // The name of the root of the store. |
| bm *blobmap.BlobMap // Mapping from chunks to blob locations and vice versa. |
| |
| // mu protects fields below, plus most fields in each blobDesc when used from a BlobWriter. |
| mu sync.Mutex |
| activeDesc []*blobDesc // The blob descriptors in use by active BlobReaders and BlobWriters. |
| toDelete []*map[string]bool // Sets of items that active GC threads are about to delete. (Pointers to maps, to allow pointer comparison.) |
| } |
| |
| // hashToFileName() returns the name of the binary ID with the specified |
| // prefix. Requires len(id)==16. An md5 hash is suitable. |
| func hashToFileName(prefix string, hash []byte) string { |
| return filepath.Join(prefix, |
| fmt.Sprintf("%02x", hash[0]), |
| fmt.Sprintf("%02x", hash[1]), |
| fmt.Sprintf("%02x", hash[2]), |
| fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", |
| hash[3], |
| hash[4], hash[5], hash[6], hash[7], |
| hash[8], hash[9], hash[10], hash[11], |
| hash[12], hash[13], hash[14], hash[15])) |
| } |
| |
| // fileNameToHash() converts a file name in the format generated by |
| // hashToFileName(prefix, ...) to a vector of 16 bytes. If the string is |
| // malformed, the nil slice is returned. |
| func fileNameToHash(prefix string, s string) []byte { |
| idStr := strings.TrimPrefix(filepath.ToSlash(s), prefix+"/") |
| hash := make([]byte, 16, 16) |
| n, err := fmt.Sscanf(idStr, "%02x/%02x/%02x/%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", |
| &hash[0], &hash[1], &hash[2], &hash[3], |
| &hash[4], &hash[5], &hash[6], &hash[7], |
| &hash[8], &hash[9], &hash[10], &hash[11], |
| &hash[12], &hash[13], &hash[14], &hash[15]) |
| if n != 16 || err != nil { |
| hash = nil |
| } |
| return hash |
| } |
| |
| // newBlobName() returns a new random name for a blob. |
| func newBlobName() string { |
| return filepath.Join(blobDir, |
| fmt.Sprintf("%02x", rand.Int31n(256)), |
| fmt.Sprintf("%02x", rand.Int31n(256)), |
| fmt.Sprintf("%02x", rand.Int31n(256)), |
| fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", |
| rand.Int31n(256), |
| rand.Int31n(256), rand.Int31n(256), rand.Int31n(256), rand.Int31n(256), |
| rand.Int31n(256), rand.Int31n(256), rand.Int31n(256), rand.Int31n(256), |
| rand.Int31n(256), rand.Int31n(256), rand.Int31n(256), rand.Int31n(256))) |
| } |
| |
| // hashToString() returns a string representation of the hash. |
| // Requires len(hash)==16. An md5 hash is suitable. |
| func hashToString(hash []byte) string { |
| return fmt.Sprintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", |
| hash[0], hash[1], hash[2], hash[3], |
| hash[4], hash[5], hash[6], hash[7], |
| hash[8], hash[9], hash[10], hash[11], |
| hash[12], hash[13], hash[14], hash[15]) |
| } |
| |
| // stringToHash() converts a string in the format generated by hashToString() |
| // to a vector of 16 bytes. If the string is malformed, the nil slice is |
| // returned. |
| func stringToHash(s string) []byte { |
| hash := make([]byte, 16, 16) |
| n, err := fmt.Sscanf(s, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", |
| &hash[0], &hash[1], &hash[2], &hash[3], |
| &hash[4], &hash[5], &hash[6], &hash[7], |
| &hash[8], &hash[9], &hash[10], &hash[11], |
| &hash[12], &hash[13], &hash[14], &hash[15]) |
| if n != 16 || err != nil { |
| hash = nil |
| } |
| return hash |
| } |
| |
| // Create() returns a pointer to an FsCaBlobStore stored in the file system at |
| // "rootName". If the directory rootName does not exist, it is created. |
| func Create(ctx *context.T, stEngine, rootName string) (fscabs *FsCaBlobStore, err error) { |
| dir := []string{tmpDir, casDir, chunkDir, blobDir} |
| for i := 0; i != len(dir) && err == nil; i++ { |
| fullName := filepath.Join(rootName, dir[i]) |
| os.MkdirAll(fullName, dirPermissions) |
| var fi os.FileInfo |
| fi, err = os.Stat(fullName) |
| if err == nil && !fi.IsDir() { |
| err = verror.New(errNotADir, ctx, fullName) |
| } |
| } |
| var bm *blobmap.BlobMap |
| if err == nil { |
| bm, err = blobmap.New(ctx, stEngine, filepath.Join(rootName, chunkDir)) |
| } |
| if err == nil { |
| fscabs = new(FsCaBlobStore) |
| fscabs.rootName = rootName |
| fscabs.bm = bm |
| } |
| return fscabs, err |
| } |
| |
| // Close() closes the FsCaBlobStore. { |
| func (fscabs *FsCaBlobStore) Close() error { |
| return fscabs.bm.Close() |
| } |
| |
| // Root() returns the name of the root directory where *fscabs is stored. |
| func (fscabs *FsCaBlobStore) Root() string { |
| return fscabs.rootName |
| } |
| |
| // DeleteBlob() deletes the named blob from *fscabs. |
| func (fscabs *FsCaBlobStore) DeleteBlob(ctx *context.T, blobName string) (err error) { |
| // Disallow deletions of things outside the blob tree, or that may contain "..". |
| // For simplicity, the code currently disallows '.'. |
| blobID := fileNameToHash(blobDir, blobName) |
| if blobID == nil || strings.IndexByte(blobName, '.') != -1 { |
| err = verror.New(errInvalidBlobName, ctx, blobName) |
| } else { |
| err = os.Remove(filepath.Join(fscabs.rootName, blobName)) |
| if err != nil { |
| err = verror.New(errCantDeleteBlob, ctx, blobName, err) |
| } else { |
| err = fscabs.bm.DeleteBlob(ctx, blobID) |
| } |
| } |
| return err |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // A file encapsulates both an os.File and a bufio.Writer on that file. |
| type file struct { |
| fh *os.File |
| writer *bufio.Writer |
| } |
| |
| // newFile() returns a *file containing fh and a bufio.Writer on that file, if |
| // err is nil. |
| func newFile(fh *os.File, err error) (*file, error) { |
| var f *file |
| if err == nil { |
| f = new(file) |
| f.fh = fh |
| f.writer = bufio.NewWriter(f.fh) |
| } |
| return f, err |
| } |
| |
| // newTempFile() returns a *file on a new temporary file created in the |
| // directory dir. |
| func newTempFile(ctx *context.T, dir string) (*file, error) { |
| return newFile(ioutil.TempFile(dir, "newfile")) |
| } |
| |
| // close() flushes buffers (if err==nil initially) and closes the file, |
| // returning its name. |
| func (f *file) close(ctx *context.T, err error) (string, error) { |
| name := f.fh.Name() |
| // Flush the data out to disc and close the file. |
| if err == nil { |
| err = f.writer.Flush() |
| } |
| if err == nil { |
| err = f.fh.Sync() |
| } |
| err2 := f.fh.Close() |
| if err == nil { |
| err = err2 |
| } |
| return name, err |
| } |
| |
| // closeAndRename() calls f.close(), and if err==nil initially and no new |
| // errors are seen, renames the file to newName. |
| func (f *file) closeAndRename(ctx *context.T, newName string, err error) error { |
| var oldName string |
| oldName, err = f.close(ctx, err) |
| if err == nil { // if temp file written successfully... |
| // Link or rename the file into place, hoping at least one is |
| // supported on this file system. |
| os.MkdirAll(filepath.Dir(newName), dirPermissions) |
| err = os.Link(oldName, newName) |
| if err == nil { |
| os.Remove(oldName) |
| } else { |
| err = os.Rename(oldName, newName) |
| } |
| } |
| if err != nil { |
| os.Remove(oldName) |
| } |
| return err |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // addFragment() ensures that the store *fscabs contains a fragment comprising |
| // the catenation of the byte vectors named by item[..].block and the contents |
| // of the files named by item[..].fileName. The block field is ignored if |
| // fileName!="". The fragment is not physically added if already present. |
| // The fragment is added to the fragment list of the descriptor *desc. |
| func (fscabs *FsCaBlobStore) addFragment(ctx *context.T, extHasher hash.Hash, |
| desc *blobDesc, item ...localblobstore.BlockOrFile) (fileName string, size int64, err error) { |
| |
| hasher := md5.New() |
| var buf []byte |
| var fileHandleList []*os.File |
| |
| // Hash the inputs. |
| for i := 0; i != len(item) && err == nil; i++ { |
| if len(item[i].FileName) != 0 { |
| if buf == nil { |
| buf = make([]byte, 8192, 8192) |
| fileHandleList = make([]*os.File, 0, len(item)) |
| } |
| var fileHandle *os.File |
| fileHandle, err = os.Open(filepath.Join(fscabs.rootName, item[i].FileName)) |
| if err == nil { |
| fileHandleList = append(fileHandleList, fileHandle) |
| at := item[i].Offset |
| toRead := item[i].Size |
| var haveRead int64 |
| for err == nil && (toRead == -1 || haveRead < toRead) { |
| var n int |
| n, err = fileHandle.ReadAt(buf, at) |
| if err == nil { |
| if toRead != -1 && int64(n)+haveRead > toRead { |
| n = int(toRead - haveRead) |
| } |
| haveRead += int64(n) |
| at += int64(n) |
| size += int64(n) |
| hasher.Write(buf[0:n]) // Cannot fail; see Hash interface. |
| extHasher.Write(buf[0:n]) |
| } |
| } |
| if err == io.EOF { |
| if toRead == -1 || haveRead == toRead { |
| err = nil // The loop read all that was asked; EOF is a possible outcome. |
| } else { // The loop read less than was asked; request must have been too big. |
| err = verror.New(errSizeTooBigForFragment, ctx, desc.name, item[i].FileName) |
| } |
| } |
| } |
| } else { |
| hasher.Write(item[i].Block) // Cannot fail; see Hash interface. |
| extHasher.Write(item[i].Block) |
| size += int64(len(item[i].Block)) |
| } |
| } |
| |
| // Compute the hash, and form the file name in the respository. |
| hash := hasher.Sum(nil) |
| relFileName := hashToFileName(casDir, hash) |
| absFileName := filepath.Join(fscabs.rootName, relFileName) |
| |
| // Add the fragment's name to *desc's fragments so the garbage |
| // collector will not delete it. |
| fscabs.mu.Lock() |
| desc.fragment = append(desc.fragment, blobFragment{ |
| pos: desc.size, |
| size: size, |
| offset: 0, |
| fileName: relFileName}) |
| fscabs.mu.Unlock() |
| |
| // If the file does not already exist, ... |
| if _, statErr := os.Stat(absFileName); err == nil && os.IsNotExist(statErr) { |
| // ... try to create it by writing to a temp file and renaming. |
| var t *file |
| t, err = newTempFile(ctx, filepath.Join(fscabs.rootName, tmpDir)) |
| if err == nil { |
| // Copy the byte-sequences and input files to the temp file. |
| j := 0 |
| for i := 0; i != len(item) && err == nil; i++ { |
| if len(item[i].FileName) != 0 { |
| at := item[i].Offset |
| toRead := item[i].Size |
| var haveRead int64 |
| for err == nil && (toRead == -1 || haveRead < toRead) { |
| var n int |
| n, err = fileHandleList[j].ReadAt(buf, at) |
| if err == nil { |
| if toRead != -1 && int64(n)+haveRead > toRead { |
| n = int(toRead - haveRead) |
| } |
| haveRead += int64(n) |
| at += int64(n) |
| _, err = t.writer.Write(buf[0:n]) |
| } |
| } |
| if err == io.EOF { // EOF is the expected outcome. |
| err = nil |
| } |
| j++ |
| } else { |
| _, err = t.writer.Write(item[i].Block) |
| } |
| } |
| err = t.closeAndRename(ctx, absFileName, err) |
| } |
| } // else file already exists, nothing more to do. |
| |
| for i := 0; i != len(fileHandleList); i++ { |
| fileHandleList[i].Close() |
| } |
| |
| if err != nil { |
| err = verror.New(errAppendFailed, ctx, fscabs.rootName, err) |
| // Remove the entry added to fragment list above. |
| fscabs.mu.Lock() |
| desc.fragment = desc.fragment[0 : len(desc.fragment)-1] |
| fscabs.mu.Unlock() |
| } else { // commit the change by updating the size |
| fscabs.mu.Lock() |
| desc.size += size |
| desc.cv.Broadcast() // Tell blobmap BlobReader there's more to read. |
| fscabs.mu.Unlock() |
| } |
| |
| return relFileName, size, err |
| } |
| |
| // A blobFragment represents a vector of bytes and its position within a blob. |
| type blobFragment struct { |
| pos int64 // position of this fragment within its containing blob. |
| size int64 // size of this fragment. |
| offset int64 // offset within fileName. |
| fileName string // name of file describing this fragment. |
| } |
| |
| // A blobDesc is the in-memory representation of a blob. |
| type blobDesc struct { |
| activeDescIndex int // Index into fscabs.activeDesc if refCount>0; under fscabs.mu. |
| refCount int // Reference count; under fscabs.mu. |
| |
| name string // Name of the blob. |
| |
| // The following fields are modified under fscabs.mu and in BlobWriter |
| // owner's thread; they may be read by GC (when obtained from |
| // fscabs.activeDesc) and the chunk writer under fscabs.mu. In the |
| // BlobWriter owner's thread, reading does not require a lock, but |
| // writing does. In other contexts (BlobReader, or a desc that has |
| // just been allocated by getBlob()), no locking is needed. |
| |
| fragment []blobFragment // All the fragments in this blob |
| size int64 // Total size of the blob. |
| finalized bool // Whether the blob has been finalized. |
| // A finalized blob has a valid hash field, and no new bytes may be added |
| // to it. A well-formed hash has 16 bytes. |
| hash []byte |
| |
| openWriter bool // Whether this descriptor is being written by an open BlobWriter. |
| cv *sync.Cond // signalled when a BlobWriter writes or closes. |
| } |
| |
| // isBeingDeleted() returns whether fragment fragName is about to be deleted |
| // by the garbage collector. Requires fscabs.mu held. |
| func (fscabs *FsCaBlobStore) isBeingDeleted(fragName string) (beingDeleted bool) { |
| for i := 0; i != len(fscabs.toDelete) && !beingDeleted; i++ { |
| _, beingDeleted = (*(fscabs.toDelete[i]))[fragName] |
| } |
| return beingDeleted |
| } |
| |
| // descRef() increments the reference count of *desc and returns whether |
| // successful. It may fail if the fragments referenced by the descriptor are |
| // being deleted by the garbage collector. |
| func (fscabs *FsCaBlobStore) descRef(desc *blobDesc) bool { |
| beingDeleted := false |
| fscabs.mu.Lock() |
| if desc.refCount == 0 { |
| // On the first reference, check whether the fragments are |
| // being deleted, and if not, add *desc to the |
| // fscabs.activeDesc vector. |
| for i := 0; i != len(desc.fragment) && !beingDeleted; i++ { |
| beingDeleted = fscabs.isBeingDeleted(desc.fragment[i].fileName) |
| } |
| if !beingDeleted { |
| desc.activeDescIndex = len(fscabs.activeDesc) |
| fscabs.activeDesc = append(fscabs.activeDesc, desc) |
| } |
| } |
| if !beingDeleted { |
| desc.refCount++ |
| } |
| fscabs.mu.Unlock() |
| return !beingDeleted |
| } |
| |
| // descUnref() decrements the reference count of *desc if desc!=nil; if that |
| // removes the last reference, *desc is removed from the fscabs.activeDesc |
| // vector. |
| func (fscabs *FsCaBlobStore) descUnref(desc *blobDesc) { |
| if desc != nil { |
| fscabs.mu.Lock() |
| desc.refCount-- |
| if desc.refCount < 0 { |
| panic("negative reference count") |
| } else if desc.refCount == 0 { |
| // Remove desc from fscabs.activeDesc by moving the |
| // last entry in fscabs.activeDesc to desc's slot. |
| n := len(fscabs.activeDesc) |
| lastDesc := fscabs.activeDesc[n-1] |
| lastDesc.activeDescIndex = desc.activeDescIndex |
| fscabs.activeDesc[desc.activeDescIndex] = lastDesc |
| fscabs.activeDesc = fscabs.activeDesc[0 : n-1] |
| desc.activeDescIndex = -1 |
| } |
| fscabs.mu.Unlock() |
| } |
| } |
| |
| // getBlob() returns the in-memory blob descriptor for the named blob. |
| func (fscabs *FsCaBlobStore) getBlob(ctx *context.T, blobName string) (desc *blobDesc, err error) { |
| slashBlobName := filepath.ToSlash(blobName) |
| if !strings.HasPrefix(slashBlobName, blobDir+"/") || strings.IndexByte(blobName, '.') != -1 { |
| err = verror.New(errInvalidBlobName, ctx, blobName) |
| } else { |
| absBlobName := filepath.Join(fscabs.rootName, blobName) |
| var fh *os.File |
| fh, err = os.Open(absBlobName) |
| if err == nil { |
| var line string |
| desc = new(blobDesc) |
| desc.activeDescIndex = -1 |
| desc.name = blobName |
| desc.cv = sync.NewCond(&fscabs.mu) |
| scanner := bufio.NewScanner(fh) |
| for scanner.Scan() { |
| field := strings.Split(scanner.Text(), " ") |
| if len(field) == 4 && field[0] == "d" { |
| var fragSize int64 |
| var fragOffset int64 |
| fragSize, err = strconv.ParseInt(field[1], 0, 64) |
| if err == nil { |
| fragOffset, err = strconv.ParseInt(field[2], 0, 64) |
| } |
| if err == nil { |
| // No locking needed here because desc |
| // is newly allocated and not yet passed to descRef(). |
| desc.fragment = append(desc.fragment, |
| blobFragment{ |
| fileName: field[3], |
| pos: desc.size, |
| size: fragSize, |
| offset: fragOffset}) |
| } |
| desc.size += fragSize |
| } else if len(field) == 2 && field[0] == "f" { |
| desc.hash = stringToHash(field[1]) |
| desc.finalized = true |
| if desc.hash == nil { |
| err = verror.New(errMalformedBlobHash, ctx, blobName, field[1]) |
| } |
| } else if len(field) > 0 && len(field[0]) == 1 && "a" <= field[0] && field[0] <= "z" { |
| // unrecognized line, reserved for extensions: ignore. |
| } else { |
| err = verror.New(errMalformedField, ctx, line) |
| } |
| } |
| err = scanner.Err() |
| fh.Close() |
| } |
| } |
| // Ensure that we return either a properly referenced desc, or nil. |
| if err != nil { |
| desc = nil |
| } else if !fscabs.descRef(desc) { |
| err = verror.New(errBlobDeleted, ctx, blobName) |
| desc = nil |
| } |
| return desc, err |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // A BlobWriter allows a blob to be written. If a blob has not yet been |
| // finalized, it also allows that blob to be extended. A BlobWriter may be |
| // created with NewBlobWriter(), and should be closed with Close() or |
| // CloseWithoutFinalize(). |
| type BlobWriter struct { |
| // The BlobWriter exists within a particular FsCaBlobStore and context.T |
| fscabs *FsCaBlobStore |
| ctx *context.T |
| |
| desc *blobDesc // Description of the blob being written. |
| f *file // The file being written. |
| hasher hash.Hash // Running hash of blob. |
| |
| // Fields to allow the BlobMap to be written. |
| csBr *BlobReader // Reader over the blob that's currently being written. |
| cs *chunker.Stream // Stream of chunks derived from csBr |
| csErr chan error // writeBlobMap() sends its result here; Close/CloseWithoutFinalize receives it. |
| } |
| |
| // NewBlobWriter() returns a pointer to a newly allocated BlobWriter on |
| // a newly created blob. If "name" is non-empty, it is used to name |
| // the blob, and it must be in the format of a name returned by this |
| // interface (probably by another instance on another device). |
| // Otherwise, a new name is created, which can be found using |
| // the Name() method. It is an error to attempt to overwrite a blob |
| // that already exists in this blob store. BlobWriters should not be |
| // used concurrently by multiple threads. The returned handle should |
| // be closed with either the Close() or CloseWithoutFinalize() method |
| // to avoid leaking file handles. |
| func (fscabs *FsCaBlobStore) NewBlobWriter(ctx *context.T, name string) (localblobstore.BlobWriter, error) { |
| var bw *BlobWriter |
| if name == "" { |
| name = newBlobName() |
| } |
| fileName := filepath.Join(fscabs.rootName, name) |
| os.MkdirAll(filepath.Dir(fileName), dirPermissions) |
| f, err := newFile(os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, filePermissions)) |
| if err == nil { |
| bw = new(BlobWriter) |
| bw.fscabs = fscabs |
| bw.ctx = ctx |
| bw.desc = new(blobDesc) |
| bw.desc.activeDescIndex = -1 |
| bw.desc.name = name |
| bw.desc.cv = sync.NewCond(&fscabs.mu) |
| bw.desc.openWriter = true |
| bw.f = f |
| bw.hasher = md5.New() |
| if !fscabs.descRef(bw.desc) { |
| // Can't happen; descriptor refers to no fragments. |
| panic(verror.New(errBlobDeleted, ctx, bw.desc.name)) |
| } |
| // Write the chunks of this blob into the BlobMap, as they are |
| // written by this writer. |
| bw.forkWriteBlobMap() |
| } |
| return bw, err |
| } |
| |
| // ResumeBlobWriter() returns a pointer to a newly allocated BlobWriter on an |
| // old, but unfinalized blob name. |
| func (fscabs *FsCaBlobStore) ResumeBlobWriter(ctx *context.T, blobName string) (localblobstore.BlobWriter, error) { |
| var err error |
| var bw *BlobWriter |
| var desc *blobDesc |
| desc, err = fscabs.getBlob(ctx, blobName) |
| if err == nil && desc.finalized { |
| err = verror.New(errBlobAlreadyFinalized, ctx, blobName) |
| } else if err == nil { |
| bw = new(BlobWriter) |
| bw.fscabs = fscabs |
| bw.ctx = ctx |
| bw.desc = desc |
| bw.desc.openWriter = true |
| fileName := filepath.Join(fscabs.rootName, bw.desc.name) |
| bw.f, err = newFile(os.OpenFile(fileName, os.O_WRONLY|os.O_APPEND, 0666)) |
| bw.hasher = md5.New() |
| // Add the existing fragments to the running hash. |
| // The descRef's ref count is incremented here to compensate |
| // for the decrement it will receive in br.Close(), below. |
| if !fscabs.descRef(bw.desc) { |
| // Can't happen; descriptor's ref count was already |
| // non-zero. |
| panic(verror.New(errBlobDeleted, ctx, fileName)) |
| } |
| br := fscabs.blobReaderFromDesc(ctx, bw.desc, dontWaitForWriter) |
| buf := make([]byte, 8192, 8192) |
| for err == nil { |
| var n int |
| n, err = br.Read(buf) |
| bw.hasher.Write(buf[0:n]) |
| } |
| br.Close() |
| if err == io.EOF { // EOF is expected. |
| err = nil |
| } |
| if err == nil { |
| // Write the chunks of this blob into the BlobMap, as |
| // they are written by this writer. |
| bw.forkWriteBlobMap() |
| } |
| } |
| return bw, err |
| } |
| |
| // forkWriteBlobMap() creates a new thread to run writeBlobMap(). It adds |
| // the chunks written to *bw to the blob store's BlobMap. The caller is |
| // expected to call joinWriteBlobMap() at some later point. |
| func (bw *BlobWriter) forkWriteBlobMap() { |
| // The descRef's ref count is incremented here to compensate |
| // for the decrement it will receive in br.Close() in joinWriteBlobMap. |
| if !bw.fscabs.descRef(bw.desc) { |
| // Can't happen; descriptor's ref count was already non-zero. |
| panic(verror.New(errBlobDeleted, bw.ctx, bw.desc.name)) |
| } |
| bw.csBr = bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc, waitForWriter) |
| bw.cs = chunker.NewStream(bw.ctx, &chunker.DefaultParam, bw.csBr) |
| bw.csErr = make(chan error) |
| go bw.writeBlobMap() |
| } |
| |
| // insertChunk() inserts chunk into the blob store's BlobMap, associating it |
| // with the specified byte offset in the blob blobID being written by *bw. The byte |
| // offset of the next chunk is returned. |
| func (bw *BlobWriter) insertChunk(blobID []byte, chunkHash []byte, offset int64, size int64) (int64, error) { |
| err := bw.fscabs.bm.AssociateChunkWithLocation(bw.ctx, chunkHash[:], |
| blobmap.Location{BlobID: blobID, Offset: offset, Size: size}) |
| if err != nil { |
| bw.cs.Cancel() |
| } |
| return offset + size, err |
| } |
| |
| // writeBlobMap() iterates over the chunk in stream bw.cs, and associates each |
| // one with the blob being written. |
| func (bw *BlobWriter) writeBlobMap() { |
| var err error |
| var offset int64 |
| blobID := fileNameToHash(blobDir, bw.desc.name) |
| // Associate each chunk only after the next chunk has been seen (or |
| // the blob finalized), to avoid recording an artificially short chunk |
| // at the end of a partial transfer. |
| var chunkHash [md5.Size]byte |
| var chunkLen int64 |
| if bw.cs.Advance() { |
| chunk := bw.cs.Value() |
| // Record the hash and size, since chunk's underlying buffer |
| // may be reused by the next call to Advance(). |
| chunkHash = md5.Sum(chunk) |
| chunkLen = int64(len(chunk)) |
| for bw.cs.Advance() { |
| offset, err = bw.insertChunk(blobID, chunkHash[:], offset, chunkLen) |
| chunk = bw.cs.Value() |
| chunkHash = md5.Sum(chunk) |
| chunkLen = int64(len(chunk)) |
| } |
| } |
| if err == nil { |
| err = bw.cs.Err() |
| } |
| bw.fscabs.mu.Lock() |
| if err == nil && chunkLen != 0 && bw.desc.finalized { |
| offset, err = bw.insertChunk(blobID, chunkHash[:], offset, chunkLen) |
| } |
| bw.fscabs.mu.Unlock() |
| bw.csErr <- err // wake joinWriteBlobMap() |
| } |
| |
| // joinWriteBlobMap waits for the completion of the thread forked by forkWriteBlobMap(). |
| // It returns when the chunks in the blob have been written to the blob store's BlobMap. |
| func (bw *BlobWriter) joinWriteBlobMap(err error) error { |
| err2 := <-bw.csErr // read error from end of writeBlobMap() |
| if err == nil { |
| err = err2 |
| } |
| bw.csBr.Close() |
| return err |
| } |
| |
| // Close() finalizes *bw, and indicates that the client will perform no further |
| // append operations on *bw. Any internal open file handles are closed. |
| func (bw *BlobWriter) Close() (err error) { |
| if bw.f == nil { |
| err = verror.New(errAlreadyClosed, bw.ctx, bw.desc.name) |
| } else if bw.desc.finalized { |
| err = verror.New(errBlobAlreadyFinalized, bw.ctx, bw.desc.name) |
| } else { |
| h := bw.hasher.Sum(nil) |
| _, err = fmt.Fprintf(bw.f.writer, "f %s\n", hashToString(h)) // finalize |
| _, err = bw.f.close(bw.ctx, err) |
| bw.f = nil |
| bw.fscabs.mu.Lock() |
| bw.desc.finalized = true |
| bw.desc.openWriter = false |
| bw.desc.cv.Broadcast() // Tell blobmap BlobReader that writing has ceased. |
| bw.fscabs.mu.Unlock() |
| err = bw.joinWriteBlobMap(err) |
| bw.fscabs.descUnref(bw.desc) |
| } |
| return err |
| } |
| |
| // CloseWithoutFinalize() indicates that the client will perform no further |
| // append operations on *bw, but does not finalize the blob. Any internal open |
| // file handles are closed. Clients are expected to need this operation |
| // infrequently. |
| func (bw *BlobWriter) CloseWithoutFinalize() (err error) { |
| if bw.f == nil { |
| err = verror.New(errAlreadyClosed, bw.ctx, bw.desc.name) |
| } else { |
| bw.fscabs.mu.Lock() |
| bw.desc.openWriter = false |
| bw.desc.cv.Broadcast() // Tell blobmap BlobReader that writing has ceased. |
| bw.fscabs.mu.Unlock() |
| _, err = bw.f.close(bw.ctx, err) |
| bw.f = nil |
| err = bw.joinWriteBlobMap(err) |
| bw.fscabs.descUnref(bw.desc) |
| } |
| return err |
| } |
| |
| // AppendFragment() appends a fragment to the blob being written by *bw, where |
| // the fragment is composed of the byte vectors described by the elements of |
| // item[]. The fragment is copied into the blob store. |
| func (bw *BlobWriter) AppendFragment(item ...localblobstore.BlockOrFile) (err error) { |
| if bw.f == nil { |
| panic("fs_cablobstore.BlobWriter programming error: AppendFragment() after Close()") |
| } |
| var fragmentName string |
| var size int64 |
| fragmentName, size, err = bw.fscabs.addFragment(bw.ctx, bw.hasher, bw.desc, item...) |
| if err == nil { |
| _, err = fmt.Fprintf(bw.f.writer, "d %d %d %s\n", size, 0 /*offset*/, fragmentName) |
| } |
| if err == nil { |
| err = bw.f.writer.Flush() |
| } |
| return err |
| } |
| |
| // AppendBlob() adds a (substring of a) pre-existing blob to the blob being |
| // written by *bw. The fragments of the pre-existing blob are not physically |
| // copied; they are referenced by both blobs. |
| func (bw *BlobWriter) AppendBlob(blobName string, size int64, offset int64) (err error) { |
| if bw.f == nil { |
| panic("fs_cablobstore.BlobWriter programming error: AppendBlob() after Close()") |
| } |
| var desc *blobDesc |
| desc, err = bw.fscabs.getBlob(bw.ctx, blobName) |
| origSize := bw.desc.size |
| if err == nil { |
| if size == -1 { |
| size = desc.size - offset |
| } |
| if offset < 0 || desc.size < offset+size { |
| err = verror.New(errBadSizeOrOffset, bw.ctx, size, offset, blobName, desc.size) |
| } |
| for i := 0; i != len(desc.fragment) && err == nil && size > 0; i++ { |
| if desc.fragment[i].size <= offset { |
| offset -= desc.fragment[i].size |
| } else { |
| consume := desc.fragment[i].size - offset |
| if size < consume { |
| consume = size |
| } |
| _, err = fmt.Fprintf(bw.f.writer, "d %d %d %s\n", |
| consume, offset+desc.fragment[i].offset, desc.fragment[i].fileName) |
| if err == nil { |
| // Add fragment so garbage collector can see it. |
| // The garbage collector cannot be |
| // about to delete the fragment, because |
| // getBlob() already checked for that |
| // above, and kept a reference. |
| bw.fscabs.mu.Lock() |
| bw.desc.fragment = append(bw.desc.fragment, blobFragment{ |
| pos: bw.desc.size, |
| size: consume, |
| offset: offset + desc.fragment[i].offset, |
| fileName: desc.fragment[i].fileName}) |
| bw.desc.size += consume |
| bw.desc.cv.Broadcast() // Tell blobmap BlobReader there's more to read. |
| bw.fscabs.mu.Unlock() |
| } |
| offset = 0 |
| size -= consume |
| } |
| } |
| bw.fscabs.descUnref(desc) |
| // Add the new fragments to the running hash. |
| if !bw.fscabs.descRef(bw.desc) { |
| // Can't happen; descriptor's ref count was already |
| // non-zero. |
| panic(verror.New(errBlobDeleted, bw.ctx, blobName)) |
| } |
| br := bw.fscabs.blobReaderFromDesc(bw.ctx, bw.desc, dontWaitForWriter) |
| if err == nil { |
| _, err = br.Seek(origSize, 0) |
| } |
| buf := make([]byte, 8192, 8192) |
| for err == nil { |
| var n int |
| n, err = br.Read(buf) |
| bw.hasher.Write(buf[0:n]) // Cannot fail; see Hash interface. |
| } |
| br.Close() |
| if err == io.EOF { // EOF is expected. |
| err = nil |
| } |
| if err == nil { |
| err = bw.f.writer.Flush() |
| } |
| } |
| return err |
| } |
| |
| // IsFinalized() returns whether *bw has been finalized. |
| func (bw *BlobWriter) IsFinalized() bool { |
| return bw.desc.finalized |
| } |
| |
| // Size() returns *bw's size. |
| func (bw *BlobWriter) Size() int64 { |
| return bw.desc.size |
| } |
| |
| // Name() returns *bw's name. |
| func (bw *BlobWriter) Name() string { |
| return bw.desc.name |
| } |
| |
| // Hash() returns *bw's hash, reflecting the bytes written so far. |
| func (bw *BlobWriter) Hash() []byte { |
| return bw.hasher.Sum(nil) |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // A BlobReader allows a blob to be read using the standard ReadAt(), Read(), |
| // and Seek() calls. A BlobReader can be created with NewBlobReader(), and |
| // should be closed with the Close() method to avoid leaking file handles. |
| type BlobReader struct { |
| // The BlobReader exists within a particular FsCaBlobStore and context.T. |
| fscabs *FsCaBlobStore |
| ctx *context.T |
| |
| desc *blobDesc // A description of the blob being read. |
| waitForWriter bool // whether this reader should wait for a concurrent BlobWriter |
| |
| pos int64 // The next position we will read from (used by Read/Seek, not ReadAt). |
| |
| // The fields below represent a cached open fragment desc.fragment[fragmentIndex]. |
| fragmentIndex int // -1 or 0 <= fragmentIndex < len(desc.fragment). |
| fh *os.File // non-nil iff fragmentIndex != -1. |
| } |
| |
| // constants to make the calls to blobReaderFromDesc invocations more readable |
| const ( |
| dontWaitForWriter = false |
| waitForWriter = true |
| ) |
| |
| // blobReaderFromDesc() returns a pointer to a newly allocated BlobReader given |
| // a pre-existing blobDesc. If waitForWriter is true, the reader will wait for |
| // any BlobWriter to finish writing the part of the blob the reader is trying |
| // to read. |
| func (fscabs *FsCaBlobStore) blobReaderFromDesc(ctx *context.T, desc *blobDesc, waitForWriter bool) *BlobReader { |
| br := new(BlobReader) |
| br.fscabs = fscabs |
| br.ctx = ctx |
| br.fragmentIndex = -1 |
| br.desc = desc |
| br.waitForWriter = waitForWriter |
| return br |
| } |
| |
| // NewBlobReader() returns a pointer to a newly allocated BlobReader on the |
| // specified blobName. BlobReaders should not be used concurrently by multiple |
| // threads. Returned handles should be closed with Close(). |
| func (fscabs *FsCaBlobStore) NewBlobReader(ctx *context.T, blobName string) (br localblobstore.BlobReader, err error) { |
| var desc *blobDesc |
| desc, err = fscabs.getBlob(ctx, blobName) |
| if err == nil { |
| br = fscabs.blobReaderFromDesc(ctx, desc, dontWaitForWriter) |
| } |
| return br, err |
| } |
| |
| // closeInternal() closes any open file handles within *br. |
| func (br *BlobReader) closeInternal() { |
| if br.fh != nil { |
| br.fh.Close() |
| br.fh = nil |
| } |
| br.fragmentIndex = -1 |
| } |
| |
| // Close() indicates that the client will perform no further operations on *br. |
| // It closes any open file handles within a BlobReader. |
| func (br *BlobReader) Close() error { |
| br.closeInternal() |
| br.fscabs.descUnref(br.desc) |
| return nil |
| } |
| |
| // findFragment() returns the index of the first element of fragment[] that may |
| // contain "offset", based on the "pos" fields of each element. |
| // Requires that fragment[] be sorted on the "pos" fields of the elements. |
| func findFragment(fragment []blobFragment, offset int64) int { |
| lo := 0 |
| hi := len(fragment) |
| for lo < hi { |
| mid := (lo + hi) >> 1 |
| if offset < fragment[mid].pos { |
| hi = mid |
| } else { |
| lo = mid + 1 |
| } |
| } |
| if lo > 0 { |
| lo-- |
| } |
| return lo |
| } |
| |
| // waitUntilAvailable() waits until position pos within *br is available for |
| // reading, if this reader is waiting for writers. This may be because: |
| // - *br is on an already written blob. |
| // - *br is on a blob being written that has been closed, or whose writes have |
| // passed position pos. |
| // The value pos==math.MaxInt64 can be used to mean "until the writer is closed". |
| // Requires br.fscabs.mu held. |
| func (br *BlobReader) waitUntilAvailable(pos int64) { |
| for br.waitForWriter && br.desc.openWriter && br.desc.size < pos { |
| br.desc.cv.Wait() |
| } |
| } |
| |
| // ReadAt() fills b[] with up to len(b) bytes of data starting at position "at" |
| // within the blob that *br indicates, and returns the number of bytes read. |
| func (br *BlobReader) ReadAt(b []byte, at int64) (n int, err error) { |
| br.fscabs.mu.Lock() |
| br.waitUntilAvailable(at + int64(len(b))) |
| i := findFragment(br.desc.fragment, at) |
| if i < len(br.desc.fragment) && at <= br.desc.size { |
| fragmenti := br.desc.fragment[i] // copy fragment data to allow releasing lock |
| br.fscabs.mu.Unlock() |
| if i != br.fragmentIndex { |
| br.closeInternal() |
| } |
| if br.fragmentIndex == -1 { |
| br.fh, err = os.Open(filepath.Join(br.fscabs.rootName, fragmenti.fileName)) |
| if err == nil { |
| br.fragmentIndex = i |
| } else { |
| br.closeInternal() |
| } |
| } |
| var offset int64 = at - fragmenti.pos + fragmenti.offset |
| consume := fragmenti.size - (at - fragmenti.pos) |
| if int64(len(b)) < consume { |
| consume = int64(len(b)) |
| } |
| if br.fh != nil { |
| n, err = br.fh.ReadAt(b[0:consume], offset) |
| } else if err == nil { |
| panic("failed to open blob fragment") |
| } |
| br.fscabs.mu.Lock() |
| // Return io.EOF if the Read reached the end of the last |
| // fragment, but not if it's merely the end of some interior |
| // fragment or the blob is still being extended. |
| if int64(n)+at >= br.desc.size && !(br.waitForWriter && br.desc.openWriter) { |
| if err == nil { |
| err = io.EOF |
| } |
| } else if err == io.EOF { |
| err = nil |
| } |
| } else if at == br.desc.size { // Reading at the end of the file, past the last fragment. |
| err = io.EOF |
| } else { |
| err = verror.New(errIllegalPositionForRead, br.ctx, br.pos, br.desc.size) |
| } |
| br.fscabs.mu.Unlock() |
| return n, err |
| } |
| |
| // Read() fills b[] with up to len(b) bytes of data starting at the current |
| // seek position of *br within the blob that *br indicates, and then both |
| // returns the number of bytes read and advances *br's seek position by that |
| // amount. |
| func (br *BlobReader) Read(b []byte) (n int, err error) { |
| n, err = br.ReadAt(b, br.pos) |
| if err == nil { |
| br.pos += int64(n) |
| } |
| return n, err |
| } |
| |
| // Seek() sets the seek position of *br to offset if whence==0, |
| // offset+current_seek_position if whence==1, and offset+end_of_blob if |
| // whence==2, and then returns the current seek position. |
| func (br *BlobReader) Seek(offset int64, whence int) (result int64, err error) { |
| br.fscabs.mu.Lock() |
| if whence == 0 { |
| result = offset |
| } else if whence == 1 { |
| result = offset + br.pos |
| } else if whence == 2 { |
| br.waitUntilAvailable(math.MaxInt64) |
| result = offset + br.desc.size |
| } else { |
| err = verror.New(errBadSeekWhence, br.ctx, whence) |
| result = br.pos |
| } |
| if result < 0 { |
| err = verror.New(errNegativeSeekPosition, br.ctx, offset, whence) |
| result = br.pos |
| } else if result > br.desc.size { |
| err = verror.New(errIllegalPositionForRead, br.ctx, result, br.desc.size) |
| result = br.pos |
| } else if err == nil { |
| br.pos = result |
| } |
| br.fscabs.mu.Unlock() |
| return result, err |
| } |
| |
| // IsFinalized() returns whether *br has been finalized. |
| func (br *BlobReader) IsFinalized() bool { |
| br.fscabs.mu.Lock() |
| br.waitUntilAvailable(math.MaxInt64) |
| finalized := br.desc.finalized |
| br.fscabs.mu.Unlock() |
| return finalized |
| } |
| |
| // Size() returns *br's size. |
| func (br *BlobReader) Size() int64 { |
| br.fscabs.mu.Lock() |
| br.waitUntilAvailable(math.MaxInt64) |
| size := br.desc.size |
| br.fscabs.mu.Unlock() |
| return size |
| } |
| |
| // Name() returns *br's name. |
| func (br *BlobReader) Name() string { |
| return br.desc.name |
| } |
| |
| // Hash() returns *br's hash. It may be nil if the blob is not finalized. |
| func (br *BlobReader) Hash() []byte { |
| br.fscabs.mu.Lock() |
| br.waitUntilAvailable(math.MaxInt64) |
| hash := br.desc.hash |
| br.fscabs.mu.Unlock() |
| return hash |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // A dirListing is a list of names in a directory, plus a position, which |
| // indexes the last item in nameList that has been processed. |
| type dirListing struct { |
| pos int // Current position in nameList; may be -1 at the start of iteration. |
| nameList []string // List of directory entries. |
| } |
| |
| // An FsCasIter represents an iterator that allows the client to enumerate all |
| // the blobs or fragments in a FsCaBlobStore. |
| type FsCasIter struct { |
| fscabs *FsCaBlobStore // The parent FsCaBlobStore. |
| err error // If non-nil, the error that terminated iteration. |
| stack []dirListing // The stack of dirListings leading to the current entry. |
| ctx *context.T // context passed to ListBlobIds() or ListCAIds() |
| |
| mu sync.Mutex // Protects cancelled. |
| cancelled bool // Whether Cancel() has been called. |
| } |
| |
| // ListBlobIds() returns an iterator that can be used to enumerate the blobs in |
| // an FsCaBlobStore. Expected use is: |
| // fscabsi := fscabs.ListBlobIds(ctx) |
| // for fscabsi.Advance() { |
| // // Process fscabsi.Value() here. |
| // } |
| // if fscabsi.Err() != nil { |
| // // The loop terminated early due to an error. |
| // } |
| func (fscabs *FsCaBlobStore) ListBlobIds(ctx *context.T) localblobstore.Stream { |
| stack := make([]dirListing, 1) |
| stack[0] = dirListing{pos: -1, nameList: []string{blobDir}} |
| return &FsCasIter{fscabs: fscabs, stack: stack, ctx: ctx} |
| } |
| |
| // ListCAIds() returns an iterator that can be used to enumerate the |
| // content-addressable fragments in an FsCaBlobStore. |
| // Expected use is: |
| // fscabsi := fscabs.ListCAIds(ctx) |
| // for fscabsi.Advance() { |
| // // Process fscabsi.Value() here. |
| // } |
| // if fscabsi.Err() != nil { |
| // // The loop terminated early due to an error. |
| // } |
| func (fscabs *FsCaBlobStore) ListCAIds(ctx *context.T) localblobstore.Stream { |
| stack := make([]dirListing, 1) |
| stack[0] = dirListing{pos: -1, nameList: []string{casDir}} |
| return &FsCasIter{fscabs: fscabs, stack: stack, ctx: ctx} |
| } |
| |
| // isCancelled() returns whether Cancel() has been called. |
| func (fscabsi *FsCasIter) isCancelled() bool { |
| fscabsi.mu.Lock() |
| cancelled := fscabsi.cancelled |
| fscabsi.mu.Unlock() |
| return cancelled |
| } |
| |
| // Advance() stages an item so that it may be retrieved via Value. Returns |
| // true iff there is an item to retrieve. Advance must be called before Value |
| // is called. |
| func (fscabsi *FsCasIter) Advance() (advanced bool) { |
| stack := fscabsi.stack |
| err := fscabsi.err |
| |
| for err == nil && !advanced && len(stack) != 0 && !fscabsi.isCancelled() { |
| last := len(stack) - 1 |
| stack[last].pos++ |
| if stack[last].pos == len(stack[last].nameList) { |
| stack = stack[0:last] |
| fscabsi.stack = stack |
| } else { |
| fullName := filepath.Join(fscabsi.fscabs.rootName, fscabsi.Value()) |
| var fi os.FileInfo |
| fi, err = os.Lstat(fullName) |
| if err != nil { |
| // error: nothing to do |
| } else if fi.IsDir() { |
| var dirHandle *os.File |
| dirHandle, err = os.Open(fullName) |
| if err == nil { |
| var nameList []string |
| nameList, err = dirHandle.Readdirnames(0) |
| dirHandle.Close() |
| stack = append(stack, dirListing{pos: -1, nameList: nameList}) |
| fscabsi.stack = stack |
| last = len(stack) - 1 |
| } |
| } else { |
| advanced = true |
| } |
| } |
| } |
| |
| if fscabsi.isCancelled() { |
| if err == nil { |
| fscabsi.err = verror.New(errStreamCancelled, fscabsi.ctx) |
| } |
| advanced = false |
| } |
| |
| fscabsi.err = err |
| return advanced |
| } |
| |
| // Value() returns the item that was staged by Advance. May panic if Advance |
| // returned false or was not called. Never blocks. |
| func (fscabsi *FsCasIter) Value() (name string) { |
| stack := fscabsi.stack |
| if fscabsi.err == nil && len(stack) != 0 && stack[0].pos >= 0 { |
| name = stack[0].nameList[stack[0].pos] |
| for i := 1; i != len(stack); i++ { |
| name = filepath.Join(name, stack[i].nameList[stack[i].pos]) |
| } |
| } |
| return name |
| } |
| |
| // Err() returns any error encountered by Advance. Never blocks. |
| func (fscabsi *FsCasIter) Err() error { |
| return fscabsi.err |
| } |
| |
| // Cancel() indicates that the iteration stream should terminate early. |
| // Never blocks. May be called concurrently with other methods on fscabsi. |
| func (fscabsi *FsCasIter) Cancel() { |
| fscabsi.mu.Lock() |
| fscabsi.cancelled = true |
| fscabsi.mu.Unlock() |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // An errorChunkStream is a localblobstore.ChunkStream that yields an error. |
| type errorChunkStream struct { |
| err error |
| } |
| |
| func (*errorChunkStream) Advance() bool { return false } |
| func (*errorChunkStream) Value([]byte) []byte { return nil } |
| func (ecs *errorChunkStream) Err() error { return ecs.err } |
| func (*errorChunkStream) Cancel() {} |
| |
| // BlobChunkStream() returns a ChunkStream that can be used to read the ordered |
| // list of content hashes of chunks in blob blobName. It is expected that this |
| // list will be presented to RecipeFromChunks() on another device, to create a |
| // recipe for transmitting the blob efficiently to that other device. |
| func (fscabs *FsCaBlobStore) BlobChunkStream(ctx *context.T, blobName string) (cs localblobstore.ChunkStream) { |
| blobID := fileNameToHash(blobDir, blobName) |
| if blobID == nil { |
| cs = &errorChunkStream{err: verror.New(errInvalidBlobName, ctx, blobName)} |
| } else { |
| cs = fscabs.bm.NewChunkStream(ctx, blobID) |
| } |
| return cs |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // LookupChunk returns the location of a chunk with the specified chunk hash |
| // within the store. |
| func (fscabs *FsCaBlobStore) LookupChunk(ctx *context.T, chunkHash []byte) (loc localblobstore.Location, err error) { |
| var chunkMapLoc blobmap.Location |
| chunkMapLoc, err = fscabs.bm.LookupChunk(ctx, chunkHash) |
| if err == nil { |
| loc.BlobName = hashToFileName(blobDir, chunkMapLoc.BlobID) |
| loc.Size = chunkMapLoc.Size |
| loc.Offset = chunkMapLoc.Offset |
| } |
| return loc, err |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // A RecipeStream implements localblobstore.RecipeStream. It allows the client |
| // to iterate over the recipe steps to recreate a blob identified by a stream |
| // of chunk hashes (from chunkStream), but using parts of blobs in the current |
| // blob store where possible. |
| type RecipeStream struct { |
| fscabs *FsCaBlobStore |
| ctx *context.T |
| |
| chunkStream localblobstore.ChunkStream // the stream of chunks in the blob |
| pendingChunkBuf [16]byte // a buffer for pendingChunk |
| pendingChunk []byte // the last unprocessed chunk hash read chunkStream, or nil if none |
| step localblobstore.RecipeStep // the recipe step to be returned by Value() |
| mu sync.Mutex // protects cancelled |
| cancelled bool // whether Cancel() has been called |
| } |
| |
| // RecipeStreamFromChunkStream() returns a pointer to a RecipeStream that allows |
| // the client to iterate over each RecipeStep needed to create the blob formed |
| // by the chunks in chunkStream. |
| func (fscabs *FsCaBlobStore) RecipeStreamFromChunkStream(ctx *context.T, chunkStream localblobstore.ChunkStream) localblobstore.RecipeStream { |
| rs := new(RecipeStream) |
| rs.fscabs = fscabs |
| rs.ctx = ctx |
| rs.chunkStream = chunkStream |
| return rs |
| } |
| |
| // isCancelled() returns whether rs.Cancel() has been called. |
| func (rs *RecipeStream) isCancelled() bool { |
| rs.mu.Lock() |
| cancelled := rs.cancelled |
| rs.mu.Unlock() |
| return cancelled |
| } |
| |
| // Advance() stages an item so that it may be retrieved via Value(). |
| // Returns true iff there is an item to retrieve. Advance() must be |
| // called before Value() is called. The caller is expected to read |
| // until Advance() returns false, or to call Cancel(). |
| func (rs *RecipeStream) Advance() (ok bool) { |
| if rs.pendingChunk == nil && rs.chunkStream.Advance() { |
| rs.pendingChunk = rs.chunkStream.Value(rs.pendingChunkBuf[:]) |
| } |
| for !ok && rs.pendingChunk != nil && !rs.isCancelled() { |
| var err error |
| var loc0 blobmap.Location |
| loc0, err = rs.fscabs.bm.LookupChunk(rs.ctx, rs.pendingChunk) |
| if err == nil { |
| blobName := hashToFileName(blobDir, loc0.BlobID) |
| var blobDesc *blobDesc |
| if blobDesc, err = rs.fscabs.getBlob(rs.ctx, blobName); err != nil { |
| // The BlobMap contained a reference to a |
| // deleted blob. Delete the reference in the |
| // BlobMap; the next loop iteration will |
| // consider the chunk again. |
| rs.fscabs.bm.DeleteBlob(rs.ctx, loc0.BlobID) |
| } else { |
| rs.fscabs.descUnref(blobDesc) |
| // The chunk is in a known blob. Combine |
| // contiguous chunks into a single recipe |
| // entry. |
| rs.pendingChunk = nil // consumed |
| for rs.pendingChunk == nil && rs.chunkStream.Advance() { |
| rs.pendingChunk = rs.chunkStream.Value(rs.pendingChunkBuf[:]) |
| var loc blobmap.Location |
| loc, err = rs.fscabs.bm.LookupChunk(rs.ctx, rs.pendingChunk) |
| if err == nil && bytes.Compare(loc0.BlobID, loc.BlobID) == 0 && loc.Offset == loc0.Offset+loc0.Size { |
| loc0.Size += loc.Size |
| rs.pendingChunk = nil // consumed |
| } |
| } |
| rs.step = localblobstore.RecipeStep{Blob: blobName, Offset: loc0.Offset, Size: loc0.Size} |
| ok = true |
| } |
| } else { // The chunk is not in the BlobMap; yield a single chunk hash. |
| rs.step = localblobstore.RecipeStep{Chunk: rs.pendingChunk} |
| rs.pendingChunk = nil // consumed |
| ok = true |
| } |
| } |
| return ok && !rs.isCancelled() |
| } |
| |
| // Value() returns the item that was staged by Advance(). May panic if |
| // Advance() returned false or was not called. Never blocks. |
| func (rs *RecipeStream) Value() localblobstore.RecipeStep { |
| return rs.step |
| } |
| |
| // Err() returns any error encountered by Advance. Never blocks. |
| func (rs *RecipeStream) Err() error { |
| // There are no errors to return here. The errors encountered in |
| // Advance() are expected and recoverable. |
| return nil |
| } |
| |
| // Cancel() indicates that the client wishes to cease reading from the stream. |
| // It causes the next call to Advance() to return false. Never blocks. |
| // It may be called concurrently with other calls on the stream. |
| func (rs *RecipeStream) Cancel() { |
| rs.mu.Lock() |
| rs.cancelled = true |
| rs.mu.Unlock() |
| rs.chunkStream.Cancel() |
| } |
| |
| // ----------------------------------------------------------- |
| |
| // gcTemp() attempts to delete files in dirName older than threshold. |
| // Errors are ignored. |
| func gcTemp(dirName string, threshold time.Time) { |
| fh, err := os.Open(dirName) |
| if err == nil { |
| fi, _ := fh.Readdir(0) |
| fh.Close() |
| for i := 0; i < len(fi); i++ { |
| if fi[i].ModTime().Before(threshold) { |
| os.Remove(filepath.Join(dirName, fi[i].Name())) |
| } |
| } |
| } |
| } |
| |
| // GC() removes old temp files and content-addressed blocks that are no longer |
| // referenced by any blob. It may be called concurrently with other calls to |
| // GC(), and with uses of BlobReaders and BlobWriters. |
| func (fscabs *FsCaBlobStore) GC(ctx *context.T) (err error) { |
| // Remove old temporary files. |
| gcTemp(filepath.Join(fscabs.rootName, tmpDir), time.Now().Add(-10*time.Hour)) |
| |
| // Add a key to caSet for each content-addressed fragment in *fscabs, |
| caSet := make(map[string]bool) |
| caIter := fscabs.ListCAIds(ctx) |
| for caIter.Advance() { |
| caSet[caIter.Value()] = true |
| } |
| err = caIter.Err() |
| |
| // cmBlobs maps the names of blobs found in the BlobMap to their IDs. |
| // (The IDs can be derived from the names; the map is really being used |
| // to record which blobs exist, and the value merely avoids repeated |
| // conversions.) |
| cmBlobs := make(map[string][]byte) |
| if err == nil { |
| // Record all the blobs known to the BlobMap; |
| bs := fscabs.bm.NewBlobStream(ctx) |
| for bs.Advance() { |
| blobID := bs.Value(nil) |
| cmBlobs[hashToFileName(blobDir, blobID)] = blobID |
| } |
| } |
| |
| if err == nil { |
| // Remove from cmBlobs all extant blobs, and remove from |
| // caSet all their fragments. |
| blobIter := fscabs.ListBlobIds(ctx) |
| for blobIter.Advance() { |
| var blobDesc *blobDesc |
| if blobDesc, err = fscabs.getBlob(ctx, blobIter.Value()); err == nil { |
| delete(cmBlobs, blobDesc.name) |
| for i := range blobDesc.fragment { |
| delete(caSet, blobDesc.fragment[i].fileName) |
| } |
| fscabs.descUnref(blobDesc) |
| } |
| } |
| } |
| |
| if err == nil { |
| // Remove all blobs still mentioned in cmBlobs from the BlobMap; |
| // these are the ones that no longer exist in the blobs directory. |
| for _, blobID := range cmBlobs { |
| err = fscabs.bm.DeleteBlob(ctx, blobID) |
| if err != nil { |
| break |
| } |
| } |
| } |
| |
| if err == nil { |
| // Remove from caSet all fragments referenced by open BlobReaders and |
| // BlobWriters. Advertise to new readers and writers which blobs are |
| // about to be deleted. |
| fscabs.mu.Lock() |
| for _, desc := range fscabs.activeDesc { |
| for i := range desc.fragment { |
| delete(caSet, desc.fragment[i].fileName) |
| } |
| } |
| fscabs.toDelete = append(fscabs.toDelete, &caSet) |
| fscabs.mu.Unlock() |
| |
| // Delete the things that still remain in caSet; they are no longer |
| // referenced. |
| for caName := range caSet { |
| os.Remove(filepath.Join(fscabs.rootName, caName)) |
| } |
| |
| // Stop advertising what's been deleted. |
| fscabs.mu.Lock() |
| n := len(fscabs.toDelete) |
| var i int |
| // We require that &caSet still be in the list. |
| for i = 0; fscabs.toDelete[i] != &caSet; i++ { |
| } |
| fscabs.toDelete[i] = fscabs.toDelete[n-1] |
| fscabs.toDelete = fscabs.toDelete[0 : n-1] |
| fscabs.mu.Unlock() |
| } |
| return err |
| } |