blob: e7d88cebf4d616d9a4f238955e608e57de0d22f1 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// The implementation of the binary repository interface stores objects
// identified by object name suffixes using the local file system. Given an
// object name suffix, the implementation computes an MD5 hash of the suffix and
// generates the following path in the local filesystem:
// /<root-dir>/<dir_1>/.../<dir_n>/<hash>. The root directory and the directory
// depth are parameters of the implementation. <root-dir> also contains
// __acls/data and __acls/sig files storing the Permissions for the root level.
// The contents of the directory include the checksum and data for each of the
// individual parts of the binary, the name of the object and a directory
// containing the perms for this particular object:
//
// name
// acls/data
// acls/sig
// mediainfo
// name
// <part_1>/checksum
// <part_1>/data
// ...
// <part_n>/checksum
// <part_n>/data
//
// TODO(jsimsa): Add an "fsck" method that cleans up existing on-disk
// repository and provide a command-line flag that identifies whether
// fsck should run when new repository server process starts up.
package binarylib
import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"syscall"
"v.io/v23/context"
"v.io/v23/glob"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
"v.io/v23/services/binary"
"v.io/v23/services/repository"
"v.io/v23/verror"
"v.io/x/ref/services/internal/pathperms"
)
// binaryService implements the Binary server interface.
type binaryService struct {
// path is the local filesystem path to the object identified by the
// object name suffix.
path string
// state holds the state shared across different binary repository
// invocations.
state *state
// suffix is the name of the binary object.
suffix string
permsStore *pathperms.PathStore
}
const pkgPath = "v.io/x/ref/services/internal/binarylib"
var (
ErrInProgress = verror.Register(pkgPath+".errInProgress", verror.NoRetry, "{1:}{2:} identical upload already in progress{:_}")
ErrInvalidParts = verror.Register(pkgPath+".errInvalidParts", verror.NoRetry, "{1:}{2:} invalid number of binary parts{:_}")
ErrInvalidPart = verror.Register(pkgPath+".errInvalidPart", verror.NoRetry, "{1:}{2:} invalid binary part number{:_}")
ErrOperationFailed = verror.Register(pkgPath+".errOperationFailed", verror.NoRetry, "{1:}{2:} operation failed{:_}")
ErrNotAuthorized = verror.Register(pkgPath+".errNotAuthorized", verror.NoRetry, "{1:}{2:} none of the client's blessings are valid {:_}")
ErrInvalidSuffix = verror.Register(pkgPath+".errInvalidSuffix", verror.NoRetry, "{1:}{2:} invalid suffix{:_}")
)
// TODO(jsimsa): When VDL supports composite literal constants, remove
// this definition.
var MissingPart = binary.PartInfo{
Checksum: binary.MissingChecksum,
Size: binary.MissingSize,
}
// newBinaryService returns a new Binary service implementation.
func newBinaryService(state *state, suffix string, permsStore *pathperms.PathStore) *binaryService {
return &binaryService{
path: state.dir(suffix),
state: state,
suffix: suffix,
permsStore: permsStore,
}
}
const BufferLength = 4096
func (i *binaryService) createFileTree(ctx *context.T, nparts int32, mediaInfo repository.MediaInfo) (string, error) {
parent, dirPerm := filepath.Dir(i.path), os.FileMode(0700)
if err := os.MkdirAll(parent, dirPerm); err != nil {
ctx.Errorf("MkdirAll(%v, %v) failed: %v", parent, dirPerm, err)
return "", verror.New(ErrOperationFailed, ctx)
}
prefix := "creating-"
tmpDir, err := ioutil.TempDir(parent, prefix)
if err != nil {
ctx.Errorf("TempDir(%v, %v) failed: %v", parent, prefix, err)
return "", verror.New(ErrOperationFailed, ctx)
}
nameFile, filePerm := filepath.Join(tmpDir, nameFileName), os.FileMode(0600)
if err := ioutil.WriteFile(nameFile, []byte(i.suffix), filePerm); err != nil {
ctx.Errorf("WriteFile(%q) failed: %v", nameFile, err)
return "", verror.New(ErrOperationFailed, ctx)
}
infoFile := filepath.Join(tmpDir, mediaInfoFileName)
jInfo, err := json.Marshal(mediaInfo)
if err != nil {
ctx.Errorf("json.Marshal(%v) failed: %v", mediaInfo, err)
return "", verror.New(ErrOperationFailed, ctx)
}
if err := ioutil.WriteFile(infoFile, jInfo, filePerm); err != nil {
ctx.Errorf("WriteFile(%q) failed: %v", infoFile, err)
return "", verror.New(ErrOperationFailed, ctx)
}
for j := 0; j < int(nparts); j++ {
partPath := generatePartPath(tmpDir, j)
if err := os.MkdirAll(partPath, dirPerm); err != nil {
ctx.Errorf("MkdirAll(%v, %v) failed: %v", partPath, dirPerm, err)
if err := os.RemoveAll(tmpDir); err != nil {
ctx.Errorf("RemoveAll(%v) failed: %v", tmpDir, err)
}
return "", verror.New(ErrOperationFailed, ctx)
}
}
return tmpDir, nil
}
func (i *binaryService) deleteACLs(ctx *context.T) error {
permsDir := permsPath(i.state.rootDir, i.suffix)
if err := i.permsStore.Delete(permsDir); err != nil {
return err
}
// HACK: we need to also clean up the parent directory (corresponding to
// the suffix) that holds the "acls" directory for regular binary
// objects. See the implementation of permsPath.
if base := filepath.Base(permsDir); base == "acls" {
if err := os.Remove(filepath.Dir(permsDir)); err != nil {
return err
}
}
return nil
}
// setInitialPermissions sets the acls for the binary if they don't exist (if
// they do, it's a sign that the binary already exists, and then an error is
// returned). Upon success, it returns a function that removes the permissions
// just set here (to be called if something fails downstream and we need to undo
// setting the initial permissions).
func (i *binaryService) setInitialPermissions(ctx *context.T, call rpc.ServerCall) (func(), error) {
rb, _ := security.RemoteBlessingNames(ctx, call.Security())
if len(rb) == 0 {
// None of the client's blessings are valid.
return nil, verror.New(ErrNotAuthorized, ctx)
}
permsDir := permsPath(i.state.rootDir, i.suffix)
created, err := i.permsStore.SetIfAbsent(permsDir, pathperms.PermissionsForBlessings(rb))
if err != nil {
ctx.Errorf("permsStore.SetIfAbsent(%v, %v) failed: %v", permsDir, rb, err)
return nil, verror.New(ErrOperationFailed, ctx)
}
if !created {
return nil, verror.New(verror.ErrExist, ctx, i.suffix)
}
return func() {
if err := i.deleteACLs(ctx); err != nil {
ctx.Errorf("deleteACLs() failed: %v", err)
}
}, nil
}
func (i *binaryService) Create(ctx *context.T, call rpc.ServerCall, nparts int32, mediaInfo repository.MediaInfo) error {
ctx.Infof("%v.Create(%v, %v)", i.suffix, nparts, mediaInfo)
// Disallow creating binaries on the root of the server. The
// permissions on the root have special meaning (see
// hierarchical_authorizer).
if i.suffix == "" {
return verror.New(ErrInvalidSuffix, ctx, "")
}
if nparts < 1 {
return verror.New(ErrInvalidParts, ctx)
}
removePerms, err := i.setInitialPermissions(ctx, call)
if err != nil {
return err
}
tmpDir, err := i.createFileTree(ctx, nparts, mediaInfo)
if err != nil {
removePerms()
return err
}
// Use os.Rename() to atomically create the binary directory
// structure.
if err := os.Rename(tmpDir, i.path); err != nil {
ctx.Errorf("Rename(%v, %v) failed: %v", tmpDir, i.path, err)
if err := os.RemoveAll(tmpDir); err != nil {
ctx.Errorf("RemoveAll(%v) failed: %v", tmpDir, err)
}
removePerms()
return verror.New(ErrOperationFailed, ctx, i.path)
}
return nil
}
func (i *binaryService) Delete(ctx *context.T, _ rpc.ServerCall) error {
ctx.Infof("%v.Delete()", i.suffix)
if _, err := os.Stat(i.path); err != nil {
if os.IsNotExist(err) {
return verror.New(verror.ErrNoExist, ctx, i.path)
}
ctx.Errorf("Stat(%v) failed: %v", i.path, err)
return verror.New(ErrOperationFailed, ctx)
}
// Use os.Rename() to atomically remove the binary directory
// structure.
path := filepath.Join(filepath.Dir(i.path), "removing-"+filepath.Base(i.path))
if err := os.Rename(i.path, path); err != nil {
ctx.Errorf("Rename(%v, %v) failed: %v", i.path, path, err)
return verror.New(ErrOperationFailed, ctx, i.path)
}
if err := os.RemoveAll(path); err != nil {
ctx.Errorf("Remove(%v) failed: %v", path, err)
return verror.New(ErrOperationFailed, ctx)
}
for {
// Remove the binary and all directories on the path back to the
// root directory that are left empty after the binary is removed.
path = filepath.Dir(path)
if i.state.rootDir == path {
break
}
if err := os.Remove(path); err != nil {
if err.(*os.PathError).Err.Error() == syscall.ENOTEMPTY.Error() {
break
}
ctx.Errorf("Remove(%v) failed: %v", path, err)
return verror.New(ErrOperationFailed, ctx)
}
}
if err := i.deleteACLs(ctx); err != nil {
ctx.Errorf("deleteACLs() failed: %v", err)
return verror.New(ErrOperationFailed, ctx)
}
return nil
}
func (i *binaryService) Download(ctx *context.T, call repository.BinaryDownloadServerCall, part int32) error {
ctx.Infof("%v.Download(%v)", i.suffix, part)
path := i.generatePartPath(int(part))
if err := checksumExists(ctx, path); err != nil {
return err
}
dataPath := filepath.Join(path, dataFileName)
file, err := os.Open(dataPath)
if err != nil {
ctx.Errorf("Open(%v) failed: %v", dataPath, err)
return verror.New(ErrOperationFailed, ctx)
}
defer file.Close()
buffer := make([]byte, BufferLength)
sender := call.SendStream()
for {
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
ctx.Errorf("Read() failed: %v", err)
return verror.New(ErrOperationFailed, ctx)
}
if n == 0 {
break
}
if err := sender.Send(buffer[:n]); err != nil {
ctx.Errorf("Send() failed: %v", err)
return verror.New(ErrOperationFailed, ctx)
}
}
return nil
}
// TODO(jsimsa): Design and implement an access control mechanism for
// the URL-based downloads.
func (i *binaryService) DownloadUrl(ctx *context.T, _ rpc.ServerCall) (string, int64, error) {
ctx.Infof("%v.DownloadUrl()", i.suffix)
return i.state.rootURL + "/" + i.suffix, 0, nil
}
func (i *binaryService) Stat(ctx *context.T, _ rpc.ServerCall) ([]binary.PartInfo, repository.MediaInfo, error) {
ctx.Infof("%v.Stat()", i.suffix)
result := make([]binary.PartInfo, 0)
parts, err := getParts(ctx, i.path)
if err != nil {
return []binary.PartInfo{}, repository.MediaInfo{}, err
}
for _, part := range parts {
checksumFile := filepath.Join(part, checksumFileName)
bytes, err := ioutil.ReadFile(checksumFile)
if err != nil {
if os.IsNotExist(err) {
result = append(result, MissingPart)
continue
}
ctx.Errorf("ReadFile(%v) failed: %v", checksumFile, err)
return []binary.PartInfo{}, repository.MediaInfo{}, verror.New(ErrOperationFailed, ctx)
}
dataFile := filepath.Join(part, dataFileName)
fi, err := os.Stat(dataFile)
if err != nil {
if os.IsNotExist(err) {
result = append(result, MissingPart)
continue
}
ctx.Errorf("Stat(%v) failed: %v", dataFile, err)
return []binary.PartInfo{}, repository.MediaInfo{}, verror.New(ErrOperationFailed, ctx)
}
result = append(result, binary.PartInfo{Checksum: string(bytes), Size: fi.Size()})
}
infoFile := filepath.Join(i.path, mediaInfoFileName)
jInfo, err := ioutil.ReadFile(infoFile)
if err != nil {
ctx.Errorf("ReadFile(%q) failed: %v", infoFile, err)
return []binary.PartInfo{}, repository.MediaInfo{}, verror.New(ErrOperationFailed, ctx)
}
var mediaInfo repository.MediaInfo
if err := json.Unmarshal(jInfo, &mediaInfo); err != nil {
ctx.Errorf("json.Unmarshal(%v) failed: %v", jInfo, err)
return []binary.PartInfo{}, repository.MediaInfo{}, verror.New(ErrOperationFailed, ctx)
}
return result, mediaInfo, nil
}
func (i *binaryService) Upload(ctx *context.T, call repository.BinaryUploadServerCall, part int32) error {
ctx.Infof("%v.Upload(%v)", i.suffix, part)
path, suffix := i.generatePartPath(int(part)), ""
err := checksumExists(ctx, path)
if err == nil {
return verror.New(verror.ErrExist, ctx, path)
} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
return err
}
// Use os.OpenFile() to resolve races.
lockPath, flags, perm := filepath.Join(path, lockFileName), os.O_CREATE|os.O_WRONLY|os.O_EXCL, os.FileMode(0600)
lockFile, err := os.OpenFile(lockPath, flags, perm)
if err != nil {
if os.IsExist(err) {
return verror.New(ErrInProgress, ctx, path)
}
ctx.Errorf("OpenFile(%v, %v, %v) failed: %v", lockPath, flags, suffix, err)
return verror.New(ErrOperationFailed, ctx)
}
defer os.Remove(lockFile.Name())
defer lockFile.Close()
file, err := ioutil.TempFile(path, suffix)
if err != nil {
ctx.Errorf("TempFile(%v, %v) failed: %v", path, suffix, err)
return verror.New(ErrOperationFailed, ctx)
}
defer file.Close()
h := md5.New()
rStream := call.RecvStream()
for rStream.Advance() {
bytes := rStream.Value()
if _, err := file.Write(bytes); err != nil {
ctx.Errorf("Write() failed: %v", err)
if err := os.Remove(file.Name()); err != nil {
ctx.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return verror.New(ErrOperationFailed, ctx)
}
h.Write(bytes)
}
if err := rStream.Err(); err != nil {
ctx.Errorf("Advance() failed: %v", err)
if err := os.Remove(file.Name()); err != nil {
ctx.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return verror.New(ErrOperationFailed, ctx)
}
hash := hex.EncodeToString(h.Sum(nil))
checksumFile, perm := filepath.Join(path, checksumFileName), os.FileMode(0600)
if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil {
ctx.Errorf("WriteFile(%v, %v, %v) failed: %v", checksumFile, hash, perm, err)
if err := os.Remove(file.Name()); err != nil {
ctx.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return verror.New(ErrOperationFailed, ctx)
}
dataFile := filepath.Join(path, dataFileName)
if err := os.Rename(file.Name(), dataFile); err != nil {
ctx.Errorf("Rename(%v, %v) failed: %v", file.Name(), dataFile, err)
if err := os.Remove(file.Name()); err != nil {
ctx.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return verror.New(ErrOperationFailed, ctx)
}
return nil
}
func (i *binaryService) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, m *glob.Element) error {
elems := strings.Split(i.suffix, "/")
if len(elems) == 1 && elems[0] == "" {
elems = nil
}
n := i.createObjectNameTree().find(elems, false)
if n == nil {
return verror.New(ErrOperationFailed, ctx)
}
for k, _ := range n.children {
if m.Match(k) {
call.SendStream().Send(naming.GlobChildrenReplyName{Value: k})
}
}
return nil
}
func (i *binaryService) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
perms, version, err = i.permsStore.Get(permsPath(i.state.rootDir, i.suffix))
if os.IsNotExist(err) {
// No Permissions file found which implies a nil authorizer. This results in
// default authorization.
return pathperms.NilAuthPermissions(ctx, call.Security()), "", nil
}
return perms, version, err
}
func (i *binaryService) SetPermissions(_ *context.T, _ rpc.ServerCall, perms access.Permissions, version string) error {
return i.permsStore.Set(permsPath(i.state.rootDir, i.suffix), perms, version)
}