blob: 357c5d0cccc382c1e758bec855167cec27defa70 [file] [log] [blame]
// Package binary provides a client-side library for the binary
// repository.
//
// TODO(jsimsa): Implement parallel download and upload.
package binary
import (
"bytes"
"crypto/md5"
"encoding/hex"
"io"
"io/ioutil"
"os"
"veyron2/rt"
"veyron2/services/mgmt/binary"
"veyron2/services/mgmt/repository"
"veyron2/verror"
"veyron2/vlog"
)
var (
errOperationFailed = verror.Internalf("operation failed")
errNotExist = verror.NotFoundf("binary does not exist")
)
const (
nAttempts = 2
partSize = 1 << 22
subpartSize = 1 << 12
)
func Delete(name string) error {
client, err := repository.BindBinary(name)
if err != nil {
vlog.Errorf("BindBinary(%v) failed: %v", name, err)
return err
}
if err := client.Delete(rt.R().NewContext()); err != nil {
vlog.Errorf("Delete() failed: %v", err)
return err
}
return nil
}
func download(w io.WriteSeeker, von string) error {
client, err := repository.BindBinary(von)
if err != nil {
vlog.Errorf("BindBinary(%v) failed: %v", von, err)
return err
}
parts, err := client.Stat(rt.R().NewContext())
if err != nil {
vlog.Errorf("Stat() failed: %v", err)
return err
}
for _, part := range parts {
if part.Checksum == binary.MissingChecksum {
return errNotExist
}
}
offset, whence := int64(0), 0
for i, part := range parts {
success := false
download:
for j := 0; !success && j < nAttempts; j++ {
if _, err := w.Seek(offset, whence); err != nil {
vlog.Errorf("Seek(%v, %v) failed: %v", offset, whence, err)
continue
}
stream, err := client.Download(rt.R().NewContext(), int32(i))
if err != nil {
vlog.Errorf("Download(%v) failed: %v", i, err)
continue
}
h, nreceived := md5.New(), 0
for {
bytes, err := stream.Recv()
if err != nil {
if err != io.EOF {
vlog.Errorf("Recv() failed: %v", err)
stream.Cancel()
continue download
}
break
}
if _, err := w.Write(bytes); err != nil {
vlog.Errorf("Write() failed: %v", err)
stream.Cancel()
continue download
}
h.Write(bytes)
nreceived += len(bytes)
}
if err := stream.Finish(); err != nil {
vlog.Errorf("Finish() failed: %v", err)
continue
}
if expected, got := part.Checksum, hex.EncodeToString(h.Sum(nil)); expected != got {
vlog.Errorf("Unexpected checksum: expected %v, got %v", expected, got)
continue
}
if expected, got := part.Size, int64(nreceived); expected != got {
vlog.Errorf("Unexpected size: expected %v, got %v", expected, got)
continue
}
success = true
}
if !success {
return errOperationFailed
}
offset += part.Size
}
return nil
}
func Download(von string) ([]byte, error) {
dir, prefix := "", ""
file, err := ioutil.TempFile(dir, prefix)
if err != nil {
vlog.Errorf("TempFile(%v, %v) failed: %v", dir, prefix, err)
return nil, errOperationFailed
}
defer os.Remove(file.Name())
defer file.Close()
if err := download(file, von); err != nil {
return nil, errOperationFailed
}
bytes, err := ioutil.ReadFile(file.Name())
if err != nil {
vlog.Errorf("ReadFile(%v) failed: %v", file.Name(), err)
return nil, errOperationFailed
}
return bytes, nil
}
func DownloadToFile(von, path string) error {
dir, prefix := "", ""
file, err := ioutil.TempFile(dir, prefix)
if err != nil {
vlog.Errorf("TempFile(%v, %v) failed: %v", dir, prefix, err)
return errOperationFailed
}
defer file.Close()
if err := download(file, von); err != nil {
if err := os.Remove(file.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return errOperationFailed
}
perm := os.FileMode(0700)
if err := file.Chmod(perm); err != nil {
vlog.Errorf("Chmod(%v) failed: %v", perm, err)
if err := os.Remove(file.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return errOperationFailed
}
if err := os.Rename(file.Name(), path); err != nil {
vlog.Errorf("Rename(%v, %v) failed: %v", file.Name(), path, err)
if err := os.Remove(file.Name()); err != nil {
vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
}
return errOperationFailed
}
return nil
}
func upload(r io.ReadSeeker, von string) error {
client, err := repository.BindBinary(von)
if err != nil {
vlog.Errorf("BindBinary(%v) failed: %v", von, err)
return err
}
offset, whence := int64(0), 2
size, err := r.Seek(offset, whence)
if err != nil {
vlog.Errorf("Seek(%v, %v) failed: %v", offset, whence, err)
return errOperationFailed
}
nparts := (size-1)/partSize + 1
if err := client.Create(rt.R().NewContext(), int32(nparts)); err != nil {
vlog.Errorf("Create() failed: %v", err)
return err
}
for i := 0; int64(i) < nparts; i++ {
success := false
upload:
for j := 0; !success && j < nAttempts; j++ {
offset, whence := int64(i*partSize), 0
if _, err := r.Seek(offset, whence); err != nil {
vlog.Errorf("Seek(%v, %v) failed: %v", offset, whence, err)
continue
}
stream, err := client.Upload(rt.R().NewContext(), int32(i))
if err != nil {
vlog.Errorf("Upload(%v) failed: %v", i, err)
continue
}
buffer := make([]byte, partSize)
if int64(i+1) == nparts {
buffer = buffer[:(size % partSize)]
}
nread := 0
for nread < len(buffer) {
n, err := r.Read(buffer[nread:])
nread += n
if err != nil && (err != io.EOF || nread < len(buffer)) {
vlog.Errorf("Read() failed: %v", err)
stream.Cancel()
continue upload
}
}
for from := 0; from < len(buffer); from += subpartSize {
to := from + subpartSize
if to > len(buffer) {
to = len(buffer)
}
if err := stream.Send(buffer[from:to]); err != nil {
vlog.Errorf("Send() failed: %v", err)
stream.Cancel()
continue upload
}
}
if err := stream.CloseSend(); err != nil {
vlog.Errorf("CloseSend() failed: %v", err)
parts, statErr := client.Stat(rt.R().NewContext())
if statErr != nil {
vlog.Errorf("Stat() failed: %v", statErr)
if deleteErr := client.Delete(rt.R().NewContext()); err != nil {
vlog.Errorf("Delete() failed: %v", deleteErr)
}
return err
}
if parts[i].Checksum == binary.MissingChecksum {
stream.Cancel()
continue
}
}
if err := stream.Finish(); err != nil {
vlog.Errorf("Finish() failed: %v", err)
parts, statErr := client.Stat(rt.R().NewContext())
if statErr != nil {
vlog.Errorf("Stat() failed: %v", statErr)
if deleteErr := client.Delete(rt.R().NewContext()); err != nil {
vlog.Errorf("Delete() failed: %v", deleteErr)
}
return err
}
if parts[i].Checksum == binary.MissingChecksum {
continue
}
}
success = true
}
if !success {
return errOperationFailed
}
}
return nil
}
func Upload(von string, data []byte) error {
buffer := bytes.NewReader(data)
return upload(buffer, von)
}
func UploadFromFile(von, path string) error {
file, err := os.Open(path)
defer file.Close()
if err != nil {
vlog.Errorf("Open(%v) failed: %v", err)
return errOperationFailed
}
return upload(file, von)
}