blob: 84976fa988c742f0daf8c73626416a90829a7def [file] [log] [blame]
// The implementation of the binary repository interface stores
// objects identified by veyron name suffixes using the local file
// system. Given a veyron name suffix, the implementation computes an
// MD5 hash of the suffix and generates the following path in the
// local filesystem: /<root>/<dir_1>/.../<dir_n>/<hash>. The root and
// the directory depth are parameters of the implementation. The
// contents of the directory include the checksum and data for the
// object and each of its individual parts:
//
// checksum
// data
// <part_1>/checksum
// <part_1>/data
// ...
// <part_n>/checksum
// <part_n>/data
//
// TODO(jsimsa): Add an "fsck" method that cleans up existing on-disk
// repository and provide a command-line flag that identifies whether
// fsck should run when new repository server process starts up.
package impl
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"syscall"
"veyron2/ipc"
"veyron2/services/mgmt/binary"
"veyron2/services/mgmt/repository"
"veyron2/vlog"
)
// state holds the state shared across different binary repository
// invocations.
type state struct {
// depth determines the depth of the directory hierarchy that the
// binary repository uses to organize binaries in the local file
// system. There is a trade-off here: smaller values lead to faster
// access, while higher values allow the performance to scale to
// larger collections of binaries. The number should be a value
// between 0 and (md5.Size - 1).
//
// Note that the cardinality of each level (except the leaf level)
// is at most 256. If you expect to have X total binary items, and
// you want to limit directories to at most Y entries (because of
// filesystem limitations), then you should set depth to at least
// log_256(X/Y). For example, using hierarchyDepth = 3 with a local
// filesystem that can handle up to 1,000 entries per directory
// before its performance degrades allows the binary repository to
// store 16B objects.
depth int
// root identifies the local filesystem directory in which the
// binary repository stores its objects.
root string
}
// invoker holds the state of a binary repository invocation.
type invoker struct {
// path is the local filesystem path to the object identified by the
// veyron name suffix.
path string
// state holds the state shared across different binary repository
// invocations.
state *state
// suffix is the suffix of the current invocation that is assumed to
// be used as a relative veyron name to identify a binary.
suffix string
}
const (
checksum = "checksum"
data = "data"
lock = "lock"
)
var (
errExist = errors.New("binary already exists")
errNotExist = errors.New("binary does not exist")
errInProgress = errors.New("identical upload already in progress")
errInvalidParts = errors.New("invalid number of parts")
errOperationFailed = errors.New("operation failed")
)
// TODO(jsimsa): When VDL supports composite literal constants, remove
// this definition.
var MissingPart = binary.PartInfo{
Checksum: binary.MissingChecksum,
Size: binary.MissingSize,
}
// newInvoker is the invoker factory.
func newInvoker(state *state, suffix string) *invoker {
// Generate the local filesystem path for the object identified by
// the veyron name suffix.
h := md5.New()
h.Write([]byte(suffix))
hash := hex.EncodeToString(h.Sum(nil))
dir := ""
for j := 0; j < state.depth; j++ {
dir = filepath.Join(dir, hash[j*2:(j+1)*2])
}
path := filepath.Join(state.root, dir, hash)
return &invoker{
path: path,
state: state,
suffix: suffix,
}
}
// BINARY INTERFACE IMPLEMENTATION
const bufferLength = 4096
// consolidate checks if all parts of a binary have been uploaded and
// if so, creates the aggregate binary and checksum.
func (i *invoker) consolidate() error {
// Coordinate concurrent uploaders to make sure the aggregate binary
// and checksum is created only once.
err := i.checksumExists(i.path)
switch err {
case nil:
return nil
case errNotExist:
default:
return err
}
// Check if all parts have been uploaded.
parts, err := i.getParts()
if err != nil {
return err
}
for _, part := range parts {
err := i.checksumExists(part)
switch err {
case errNotExist:
return nil
case nil:
default:
return err
}
}
// Create the aggregate binary and its checksum.
suffix := ""
output, err := ioutil.TempFile(i.path, suffix)
if err != nil {
vlog.Errorf("TempFile(%v, %v) failed: %v", i.path, suffix, err)
return errOperationFailed
}
defer output.Close()
buffer, h := make([]byte, bufferLength), md5.New()
for _, part := range parts {
input, err := os.Open(filepath.Join(part, data))
if err != nil {
vlog.Errorf("Open(%v) failed: %v", filepath.Join(part, data), err)
if err := os.Remove(output.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
}
return errOperationFailed
}
defer input.Close()
for {
n, err := input.Read(buffer)
if err != nil && err != io.EOF {
vlog.Errorf("Read() failed: %v", err)
if err := os.Remove(output.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
}
return errOperationFailed
}
if n == 0 {
break
}
if _, err := output.Write(buffer[:n]); err != nil {
vlog.Errorf("Write() failed: %v", err)
if err := os.Remove(output.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
}
return errOperationFailed
}
h.Write(buffer[:n])
}
}
hash := hex.EncodeToString(h.Sum(nil))
checksumFile, perm := filepath.Join(i.path, checksum), os.FileMode(0600)
if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil {
vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", checksumFile, hash, perm, err)
if err := os.Remove(output.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
}
return errOperationFailed
}
dataFile := filepath.Join(i.path, data)
if err := os.Rename(output.Name(), dataFile); err != nil {
vlog.Errorf("Rename(%v, %v) failed: %v", output.Name(), dataFile, err)
if err := os.Remove(output.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", output.Name(), err)
}
return errOperationFailed
}
return nil
}
// checksumExists checks whether the given path contains a
// checksum. The implementation uses the existence of checksum to
// determine whether the binary (part) identified by the given path
// exists.
func (i *invoker) checksumExists(path string) error {
checksumFile := filepath.Join(path, checksum)
_, err := os.Stat(checksumFile)
switch {
case os.IsNotExist(err):
return errNotExist
case err != nil:
vlog.Errorf("Stat(%v) failed: %v", path, err)
return errOperationFailed
default:
return nil
}
}
// generatePartPath generates a path for the given binary part.
func (i *invoker) generatePartPath(part int) string {
return filepath.Join(i.path, fmt.Sprintf("%d", part))
}
// getParts returns a collection of paths to the parts of the binary.
func (i *invoker) getParts() ([]string, error) {
infos, err := ioutil.ReadDir(i.path)
if err != nil {
vlog.Errorf("ReadDir(%v) failed: %v", i.path, err)
return []string{}, errOperationFailed
}
n := 0
result := make([]string, len(infos))
for _, info := range infos {
if info.IsDir() {
idx, err := strconv.Atoi(info.Name())
if err != nil {
vlog.Errorf("Atoi(%v) failed: %v", info.Name(), err)
return []string{}, errOperationFailed
}
result[idx] = filepath.Join(i.path, info.Name())
n++
}
}
result = result[:n]
return result, nil
}
func (i *invoker) Create(_ ipc.ServerContext, nparts int32) error {
vlog.Infof("%v.Create(%v)", i.suffix, nparts)
if nparts < 1 {
return errInvalidParts
}
parent, perm := filepath.Dir(i.path), os.FileMode(0700)
if err := os.MkdirAll(parent, perm); err != nil {
vlog.Errorf("MkdirAll(%v, %v) failed: %v", parent, perm, err)
return errOperationFailed
}
prefix := "creating-"
tmpDir, err := ioutil.TempDir(parent, prefix)
if err != nil {
vlog.Errorf("TempDir(%v, %v) failed: %v", parent, prefix, err)
return errOperationFailed
}
for j := 0; j < int(nparts); j++ {
partPath, partPerm := filepath.Join(tmpDir, fmt.Sprintf("%d", j)), os.FileMode(0700)
if err := os.MkdirAll(partPath, partPerm); err != nil {
vlog.Errorf("MkdirAll(%v, %v) failed: %v", partPath, partPerm, err)
if err := os.RemoveAll(tmpDir); err != nil {
vlog.Errorf("RemoveAll(%v) failed: %v", tmpDir, err)
}
return errOperationFailed
}
}
// Use os.Rename() to atomically create the binary directory
// structure.
if err := os.Rename(tmpDir, i.path); err != nil {
defer func() {
if err := os.RemoveAll(tmpDir); err != nil {
vlog.Errorf("RemoveAll(%v) failed: %v", tmpDir, err)
}
}()
if os.IsExist(err) {
return errExist
}
vlog.Errorf("Rename(%v, %v) failed: %v", tmpDir, i.path, err)
return errOperationFailed
}
return nil
}
func (i *invoker) Delete(context ipc.ServerContext) error {
vlog.Infof("%v.Delete()", i.suffix)
if _, err := os.Stat(i.path); err != nil {
if os.IsNotExist(err) {
return errNotExist
}
vlog.Errorf("Stat(%v) failed: %v", i.path, err)
return errOperationFailed
}
// Use os.Rename() to atomically remove the binary directory
// structure.
path := filepath.Join(filepath.Dir(i.path), "removing-"+filepath.Base(i.path))
if err := os.Rename(i.path, path); err != nil {
vlog.Errorf("Rename(%v, %v) failed: %v", i.path, path, err)
return errOperationFailed
}
if err := os.RemoveAll(path); err != nil {
vlog.Errorf("Remove(%v) failed: %v", path, err)
return errOperationFailed
}
for {
// Remove the binary and all directories on the path back to the
// root that are left empty after the binary is removed.
path = filepath.Dir(path)
if i.state.root == path {
break
}
if err := os.Remove(path); err != nil {
if err.(*os.PathError).Err.Error() == syscall.ENOTEMPTY.Error() {
break
}
vlog.Errorf("Remove(%v) failed: %v", path, err)
return errOperationFailed
}
}
return nil
}
func (i *invoker) Download(context ipc.ServerContext, part int32, stream repository.BinaryServiceDownloadStream) error {
vlog.Infof("%v.Download(%v)", i.suffix, part)
path := i.generatePartPath(int(part))
if err := i.checksumExists(path); err != nil {
return err
}
dataPath := filepath.Join(path, data)
file, err := os.Open(dataPath)
if err != nil {
vlog.Errorf("Open(%v) failed: %v", path, err)
return errOperationFailed
}
defer file.Close()
buffer := make([]byte, bufferLength)
for {
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
vlog.Errorf("Read() failed: %v", err)
return errOperationFailed
}
if n == 0 {
break
}
if err := stream.Send(buffer[:n]); err != nil {
vlog.Errorf("Send() failed: %v", err)
return errOperationFailed
}
}
return nil
}
func (i *invoker) DownloadURL(ipc.ServerContext) (string, int64, error) {
vlog.Infof("%v.DownloadURL()", i.suffix)
// TODO(jsimsa): Implement.
return "", 0, nil
}
func (i *invoker) Stat(ipc.ServerContext) ([]binary.PartInfo, error) {
vlog.Infof("%v.Stat()", i.suffix)
result := make([]binary.PartInfo, 0)
parts, err := i.getParts()
if err != nil {
return []binary.PartInfo{}, err
}
for _, part := range parts {
checksumFile := filepath.Join(part, checksum)
bytes, err := ioutil.ReadFile(checksumFile)
if err != nil {
if os.IsNotExist(err) {
result = append(result, MissingPart)
continue
}
vlog.Errorf("ReadFile(%v) failed: %v", checksumFile, err)
return []binary.PartInfo{}, errOperationFailed
}
dataFile := filepath.Join(part, data)
fi, err := os.Stat(dataFile)
if err != nil {
if os.IsNotExist(err) {
result = append(result, MissingPart)
continue
}
vlog.Errorf("Stat(%v) failed: %v", dataFile, err)
return []binary.PartInfo{}, errOperationFailed
}
result = append(result, binary.PartInfo{Checksum: string(bytes), Size: fi.Size()})
}
return result, nil
}
func (i *invoker) Upload(context ipc.ServerContext, part int32, stream repository.BinaryServiceUploadStream) error {
vlog.Infof("%v.Upload(%v)", i.suffix, part)
path, suffix := i.generatePartPath(int(part)), ""
err := i.checksumExists(path)
switch err {
case nil:
return errExist
case errNotExist:
default:
return err
}
// Use os.OpenFile() to resolve races.
lockPath, flags, perm := filepath.Join(path, lock), os.O_CREATE|os.O_WRONLY|os.O_EXCL, os.FileMode(0600)
lockFile, err := os.OpenFile(lockPath, flags, perm)
if err != nil {
if os.IsExist(err) {
return errInProgress
}
vlog.Errorf("OpenFile(%v, %v, %v) failed: %v", lockPath, flags, suffix, err)
return errOperationFailed
}
defer os.Remove(lockFile.Name())
defer lockFile.Close()
file, err := ioutil.TempFile(path, suffix)
if err != nil {
vlog.Errorf("TempFile(%v, %v) failed: %v", path, suffix, err)
return errOperationFailed
}
defer file.Close()
h := md5.New()
for {
bytes, err := stream.Recv()
if err != nil && err != io.EOF {
vlog.Errorf("Recv() failed: %v", err)
if err := os.Remove(file.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return errOperationFailed
}
if err == io.EOF {
break
}
if _, err := file.Write(bytes); err != nil {
vlog.Errorf("Write() failed: %v", err)
if err := os.Remove(file.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return errOperationFailed
}
h.Write(bytes)
}
hash := hex.EncodeToString(h.Sum(nil))
checksumFile, perm := filepath.Join(path, checksum), os.FileMode(0600)
if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil {
vlog.Errorf("WriteFile(%v, %v, %v) failed: %v", checksumFile, hash, perm, err)
if err := os.Remove(file.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return errOperationFailed
}
dataFile := filepath.Join(path, data)
if err := os.Rename(file.Name(), dataFile); err != nil {
vlog.Errorf("Rename(%v, %v) failed: %v", file.Name(), dataFile, err)
if err := os.Remove(file.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return errOperationFailed
}
return nil
}