blob: f39f64485a335303dfd5b9a9ba022ebc7a23c1c5 [file] [log] [blame]
package impl
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"sync"
"syscall"
"veyron2/ipc"
"veyron2/services/mgmt/content"
"veyron2/vlog"
)
// invoker holds the state of a content manager invocation.
type invoker struct {
// root is the directory at which the content manager namespace is
// mounted.
root string
// depth determines the depth of the directory hierarchy that the
// content manager uses to organize the content in the local file
// system. There is a trade-off here: smaller values lead to faster
// access, while higher values allow the performance to scale to
// large content collections. The number should be a value in
// between 0 and (md5.Size - 1).
//
// Note that the cardinality of each level (except the leaf level)
// is at most 256. If you expect to have X total content items, and
// you want to limit directories to at most Y entries (because of
// filesystem limitations), then you should set depth to at least
// log_256(X/Y). For example, using hierarchyDepth = 3 with a local
// filesystem that can handle up to 1,000 entries per directory
// before its performance degrades allows the content manager to
// store 16B objects.
depth int
// fs is a lock used to atomatically modify the contents of the
// local filesystem used for storing the content.
fs *sync.Mutex
// suffix is the suffix of the current invocation that is assumed to
// be used as a relative veyron name to identify content.
suffix string
}
var (
errContentExists = errors.New("content already exists")
errMissingChecksum = errors.New("missing checksum")
errInvalidChecksum = errors.New("invalid checksum")
errInvalidSuffix = errors.New("invalid suffix")
errOperationFailed = errors.New("operation failed")
)
// newInvoker is the invoker factory.
func newInvoker(root string, depth int, fs *sync.Mutex, suffix string) *invoker {
if min, max := 0, md5.Size-1; min > depth || depth > max {
panic(fmt.Sprintf("Depth should be a value between %v and %v, got %v.", min, max, depth))
}
return &invoker{
root: root,
depth: depth,
fs: fs,
suffix: suffix,
}
}
var (
reg = regexp.MustCompile("[a-f0-9]+")
)
func isValid(suffix string) bool {
return reg.Match([]byte(suffix))
}
// CONTENT INTERFACE IMPLEMENTATION
const bufferLength = 1024
func (i *invoker) generateDir(checksum string) string {
dir := ""
for j := 0; j < i.depth; j++ {
dir = filepath.Join(dir, checksum[j*2:(j+1)*2])
}
return dir
}
func (i *invoker) Delete(context ipc.ServerContext) error {
vlog.VI(0).Infof("%v.Delete()", i.suffix)
if !isValid(i.suffix) {
return errInvalidSuffix
}
filename := filepath.Join(i.root, i.generateDir(i.suffix), i.suffix)
i.fs.Lock()
defer i.fs.Unlock()
// Remove the content and all directories on the path back to the
// root that are left empty after the content is removed.
if err := os.Remove(filename); err != nil {
return errOperationFailed
}
for {
filename = filepath.Dir(filename)
if i.root == filename {
break
}
if err := os.Remove(filename); err != nil {
if err.(*os.PathError).Err.Error() == syscall.ENOTEMPTY.Error() {
break
}
return errOperationFailed
}
}
return nil
}
func (i *invoker) Download(context ipc.ServerContext, stream content.ContentServiceDownloadStream) error {
vlog.VI(0).Infof("%v.Download()", i.suffix)
if !isValid(i.suffix) {
return errInvalidSuffix
}
h := md5.New()
file, err := os.Open(filepath.Join(i.root, i.generateDir(i.suffix), i.suffix))
if err != nil {
return errOperationFailed
}
defer file.Close()
buffer := make([]byte, bufferLength)
for {
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
return errOperationFailed
}
if n == 0 {
break
}
if err := stream.Send(buffer[:n]); err != nil {
return errOperationFailed
}
h.Write(buffer[:n])
}
if hex.EncodeToString(h.Sum(nil)) != i.suffix {
return errInvalidChecksum
}
return nil
}
func (i *invoker) Upload(context ipc.ServerContext, stream content.ContentServiceUploadStream) (string, error) {
vlog.VI(0).Infof("%v.Upload()", i.suffix)
if i.suffix != "" {
return "", errInvalidSuffix
}
h := md5.New()
file, err := ioutil.TempFile(i.root, "")
if err != nil {
return "", errOperationFailed
}
defer file.Close()
for {
bytes, err := stream.Recv()
if err != nil && err != io.EOF {
return "", errOperationFailed
}
if err == io.EOF {
break
}
if _, err := file.Write(bytes); err != nil {
return "", errOperationFailed
}
h.Write(bytes)
}
checksum := hex.EncodeToString(h.Sum(nil))
dir := filepath.Join(i.root, i.generateDir(checksum))
i.fs.Lock()
defer i.fs.Unlock()
if err := os.MkdirAll(dir, 0700); err != nil {
return "", errOperationFailed
}
filename := filepath.Join(dir, checksum)
if _, err := os.Stat(filename); err != nil && !os.IsNotExist(err) {
os.Remove(file.Name())
return "", errContentExists
}
// Note that this rename operation requires only a metadata update
// (as opposed to copy and delete) because file.Name() and filename
// share the same mount point.
if err := os.Rename(file.Name(), filename); err != nil {
os.Remove(file.Name())
return "", errOperationFailed
}
return checksum, nil
}