{go/src/veyron2/storage,go/src/veyron2/services/store,go/src/veyron/services/store}: new transaction API

Primary description of new API:
https://docs.google.com/a/google.com/document/d/1Zob450G5rQDHmWXmg6UvVy3IH-hSiS8eE5L9IY62sW0/edit

Change-Id: I7272d389414ec27db8d599f0d562938a55e43a97
diff --git a/services/store/server/server.go b/services/store/server/server.go
index 8eb4478..8b72fc4 100644
--- a/services/store/server/server.go
+++ b/services/store/server/server.go
@@ -1,8 +1,15 @@
 // Package server implements a storage service.
+
 package server
 
+// This file defines Server, which implements the server-side Store API from
+// veyron2/services/store/service.vdl.
+
 import (
+	"fmt"
+	"math/rand"
 	"reflect"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -15,12 +22,11 @@
 	"veyron2/ipc"
 	"veyron2/security"
 	"veyron2/services/store"
-	"veyron2/vdl/vdlutil"
 	"veyron2/verror"
 )
 
 const (
-	// transactionMAxLifetime is the maximum duration before a transaction will
+	// transactionMaxLifetime is the maximum duration before a transaction will
 	// be garbage collected.
 	//
 	// TODO(jyh): This should probably be a configuration parameter.
@@ -28,28 +34,28 @@
 )
 
 var (
-	// Server implements the StoreService interface.
 	_ store.StoreService = (*Server)(nil)
 
-	nullTransactionID store.TransactionID
-
-	errTransactionAlreadyExists = verror.Existsf("transaction already exists")
-	errTransactionDoesNotExist  = verror.NotFoundf("transaction does not exist")
+	errNestedTransaction = verror.BadArgf("cannot create a nested Transaction")
+	// Note, this can happen e.g. due to expiration.
+	errTransactionDoesNotExist = verror.NotFoundf("transaction does not exist")
 	// Transaction exists, but may not be used by the caller.
 	errPermissionDenied = verror.NotAuthorizedf("permission denied")
+
+	rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+	nullTransactionID transactionID
 )
 
-// Server stores the dictionary of all media items.  It has a scanner.Scanner
-// for collecting files from the filesystem.  For each file, a FileService is
-// registered to serve the file.
+// Server implements Store and uses memstore internally.
 type Server struct {
 	mutex sync.RWMutex
 
 	// store is the actual store implementation.
 	store service.Store
 
-	// transactions are the set of active transactions.
-	transactions map[store.TransactionID]*transaction
+	// transactions is the set of active transactions.
+	transactions map[transactionID]*transaction
 
 	// Transaction garbage collection.
 	pending sync.WaitGroup
@@ -60,6 +66,13 @@
 	watcher service.Watcher
 }
 
+// transactionID is an internal transaction identifier chosen by the server.
+//
+// TODO(jyh): Consider using a larger identifier space to reduce chance of
+// collisions. (Note, createTransaction handles collisions when generating
+// transactionIDs.)
+type transactionID uint64
+
 // transactionContext defines the context in which a transaction is used. A
 // transaction may be used only in the context that created it.
 // transactionContext weakly identifies a session by the local and remote
@@ -67,9 +80,11 @@
 // TODO(tilaks): Use the local and remote addresses to identify the session.
 // Does a session with a mobile device break if the remote address changes?
 type transactionContext interface {
-	// LocalID returns the PublicID of the principal at the local end of the request.
+	// LocalID returns the PublicID of the principal at the local end of the
+	// request.
 	LocalID() security.PublicID
-	// RemoteID returns the PublicID of the principal at the remote end of the request.
+	// RemoteID returns the PublicID of the principal at the remote end of the
+	// request.
 	RemoteID() security.PublicID
 }
 
@@ -97,7 +112,7 @@
 	}
 	s := &Server{
 		store:        mstore,
-		transactions: make(map[store.TransactionID]*transaction),
+		transactions: make(map[transactionID]*transaction),
 		ticker:       time.NewTicker(time.Second),
 		closed:       make(chan struct{}),
 		watcher:      mwatcher,
@@ -127,14 +142,82 @@
 	}
 }
 
-// findTransaction returns the transaction for the TransactionID.
-func (s *Server) findTransaction(ctx transactionContext, id store.TransactionID) (service.Transaction, error) {
+// findTransactionComponent returns the (begin, end) offsets of the "$tid.*"
+// component in the given object name, or (-1, -1) if oname does not contain a
+// transaction component.
+func findTransactionComponent(oname string) (int, int) {
+	begin := 0
+	if !strings.HasPrefix(oname, "$tid") {
+		begin = strings.Index(oname, "/$tid")
+	}
+	if begin == -1 {
+		return -1, -1
+	}
+	end := strings.Index(oname[begin+1:], "/")
+	if end == -1 {
+		end = len(oname)
+	} else {
+		end += begin + 1
+	}
+	return begin, end
+}
+
+// TODO(sadovsky): One of the following:
+// - Reserve prefix string "$tid." for internal use.
+// - Reserve prefix char "$" for internal use.
+// - Require users to escape prefix char "$" when they are referring to their
+//   own data, e.g. "\$foo".
+func makeTransactionComponent(id transactionID) string {
+	return fmt.Sprintf("$tid.%d", id)
+}
+
+// stripTransactionComponent returns the given object name with its "$tid.*"
+// component removed, and also returns the stripped transactionID.
+// Examples:
+//   "/foo/$tid.123/bar" => {"/foo/bar", transactionID(123)}
+//   "/foo/bar"          => {"/foo/bar", nullTransactionID}
+func stripTransactionComponent(oname string) (string, transactionID, error) {
+	begin, end := findTransactionComponent(oname)
+	if begin == -1 {
+		return oname, nullTransactionID, nil
+	}
+	tc := oname[begin:end]
+	id, err := strconv.ParseInt(tc[strings.LastIndex(tc, ".")+1:], 10, 64)
+	if err != nil {
+		return "", nullTransactionID, fmt.Errorf("Failed to extract id from %q", tc)
+	}
+	return oname[:begin] + oname[end:], transactionID(id), nil
+}
+
+func (s *Server) createTransaction(ctx transactionContext, oname string) (string, error) {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	var id transactionID
+	for {
+		id = transactionID(rng.Int63())
+		_, ok := s.transactions[id]
+		if !ok {
+			break
+		}
+	}
+	info := &transaction{
+		trans:      memstore.NewTransaction(),
+		expires:    time.Now().Add(transactionMaxLifetime),
+		creatorCtx: ctx,
+	}
+	s.transactions[id] = info
+	return makeTransactionComponent(id), nil
+}
+
+// findTransaction returns the transaction for the given transaction ID.
+func (s *Server) findTransaction(ctx transactionContext, id transactionID) (service.Transaction, error) {
 	s.mutex.RLock()
 	defer s.mutex.RUnlock()
 	return s.findTransactionLocked(ctx, id)
 }
 
-func (s *Server) findTransactionLocked(ctx transactionContext, id store.TransactionID) (service.Transaction, error) {
+func (s *Server) findTransactionLocked(ctx transactionContext, id transactionID) (service.Transaction, error) {
 	if id == nullTransactionID {
 		return nil, nil
 	}
@@ -150,6 +233,43 @@
 	return info.trans, nil
 }
 
+// Commit commits the changes in the transaction to the store.  The
+// operation is atomic, so all mutations are performed, or none.  Returns an
+// error if the transaction aborted.
+func (s *Server) commitTransaction(ctx transactionContext, id transactionID) error {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	t, err := s.findTransactionLocked(ctx, id)
+	if err != nil {
+		return err
+	}
+	if t == nil {
+		return errTransactionDoesNotExist
+	}
+	err = t.Commit()
+	delete(s.transactions, id)
+	return err
+}
+
+// Abort discards a transaction.  This is an optimization; transactions
+// eventually time out and get discarded.  However, live transactions
+// consume resources, so if you know that you won't be using a transaction
+// anymore, you should discard it explicitly.
+func (s *Server) abortTransaction(ctx transactionContext, id transactionID) error {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	t, err := s.findTransactionLocked(ctx, id)
+	if err != nil {
+		return err
+	}
+	if t == nil {
+		return errTransactionDoesNotExist
+	}
+	err = t.Abort()
+	delete(s.transactions, id)
+	return err
+}
+
 func (t *transaction) matchesContext(ctx transactionContext) bool {
 	creatorCtx := t.creatorCtx
 	return membersEqual(creatorCtx.LocalID().Names(), ctx.LocalID().Names()) &&
@@ -193,60 +313,6 @@
 	}
 }
 
-// CreateTransaction creates a transaction.
-func (s *Server) CreateTransaction(ctx ipc.ServerContext, id store.TransactionID, opts []vdlutil.Any) error {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-
-	info, ok := s.transactions[id]
-	if ok {
-		return errTransactionAlreadyExists
-	}
-	info = &transaction{
-		trans:      memstore.NewTransaction(),
-		expires:    time.Now().Add(transactionMaxLifetime),
-		creatorCtx: ctx,
-	}
-	s.transactions[id] = info
-	return nil
-}
-
-// Commit commits the changes in the transaction to the store.  The
-// operation is atomic, so all mutations are performed, or none.  Returns an
-// error if the transaction aborted.
-func (s *Server) Commit(ctx ipc.ServerContext, id store.TransactionID) error {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-
-	t, err := s.findTransactionLocked(ctx, id)
-	if err != nil {
-		return err
-	}
-	if t == nil {
-		return errTransactionDoesNotExist
-	}
-	err = t.Commit()
-	delete(s.transactions, id)
-	return err
-}
-
-// Abort discards a transaction.
-func (s *Server) Abort(ctx ipc.ServerContext, id store.TransactionID) error {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-
-	t, err := s.findTransactionLocked(ctx, id)
-	if err != nil {
-		return err
-	}
-	if t == nil {
-		return errTransactionDoesNotExist
-	}
-	err = t.Abort()
-	delete(s.transactions, id)
-	return err
-}
-
 // Watch returns a stream of all changes.
 func (s *Server) Watch(ctx ipc.ServerContext, req raw.Request, stream raw.StoreServiceWatchStream) error {
 	return s.watcher.WatchRaw(ctx, req, stream)
@@ -276,20 +342,37 @@
 }
 
 func (d *storeDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
-	return ipc.ReflectInvoker(d.lookupServer(suffix)), d.auth, nil
+	serv, err := d.lookupServer(suffix)
+	if err != nil {
+		return nil, nil, err
+	}
+	return ipc.ReflectInvoker(serv), d.auth, nil
 }
 
-func (d *storeDispatcher) lookupServer(suffix string) interface{} {
+func (d *storeDispatcher) lookupServer(suffix string) (interface{}, error) {
+	// Strip leading "/" if present so that server internals can reliably use
+	// naming.Join(suffix, "foo").
+	suffix = strings.TrimPrefix(suffix, "/")
 	if strings.HasSuffix(suffix, store.StoreSuffix) {
-		return store.NewServerStore(d.s)
+		return store.NewServerStore(d.s), nil
 	} else if strings.HasSuffix(suffix, raw.RawStoreSuffix) {
-		return raw.NewServerStore(d.s)
+		return raw.NewServerStore(d.s), nil
 	} else {
-		o := d.s.lookupObject(suffix)
-		return store.NewServerObject(o)
+		// TODO(sadovsky): Create Object, Transaction, and TransactionRoot stubs,
+		// merge them, and return the result. See TODO in
+		// veyron2/services/store/service.vdl.
+		o, err := d.s.lookupObject(suffix)
+		if err != nil {
+			return nil, err
+		}
+		return store.NewServerObject(o), nil
 	}
 }
 
-func (s *Server) lookupObject(name string) *object {
-	return &object{name: name, obj: s.store.Bind(name), server: s}
+func (s *Server) lookupObject(name string) (*object, error) {
+	oname, tid, err := stripTransactionComponent(name)
+	if err != nil {
+		return nil, err
+	}
+	return &object{name: oname, obj: s.store.Bind(oname), tid: tid, server: s}, nil
 }