Merge "cmd/vdl: use v23tests library in vdl_test"
diff --git a/.gitignore b/.gitignore
index ebef272..cd59ac6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
+/.jiri
+#TODO(nlacasse): Get rid of .v23 below once v23->jiri transition is complete.
/.v23
**/.DS_Store
diff --git a/lib/xrpc/xserver.go b/lib/xrpc/xserver.go
deleted file mode 100644
index b06f7e5..0000000
--- a/lib/xrpc/xserver.go
+++ /dev/null
@@ -1,123 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package xserver provides an alternate RPC server API with the goal of
-// being simpler to use and understand.
-package xrpc
-
-import (
- "v.io/v23"
- "v.io/v23/context"
- "v.io/v23/rpc"
- "v.io/v23/security"
-)
-
-type server struct {
- s rpc.DeprecatedServer
-}
-
-// NewServer creates a new Server instance to serve a service object.
-//
-// The server will listen for network connections as specified by the
-// ListenSpec attached to ctx. Depending on your RuntimeFactory, 'roaming'
-// support may be enabled. In this mode the server will listen for
-// changes in the network configuration using a Stream created on the
-// supplied Publisher and change the set of Endpoints it publishes to
-// the mount table accordingly.
-//
-// The server associates object with name by publishing the address of
-// this server in the namespace under the supplied name and using
-// authorizer to authorize access to it. RPCs invoked on the supplied
-// name will be delivered to methods implemented by the supplied
-// object. Reflection is used to match requests to the object's
-// method set. As a special-case, if the object implements the
-// Invoker interface, the Invoker is used to invoke methods directly,
-// without reflection. If name is an empty string, no attempt will
-// made to publish.
-func NewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (rpc.Server, error) {
- s, err := v23.NewServer(ctx, opts...)
- if err != nil {
- return nil, err
- }
- if _, err = s.Listen(v23.GetListenSpec(ctx)); err != nil {
- s.Stop()
- return nil, err
- }
- if err = s.Serve(name, object, auth); err != nil {
- s.Stop()
- return nil, err
- }
- return &server{s: s}, nil
-}
-
-// NewDispatchingServer creates a new Server instance to serve a given dispatcher.
-//
-// The server will listen for network connections as specified by the
-// ListenSpec attached to ctx. Depending on your RuntimeFactory, 'roaming'
-// support may be enabled. In this mode the server will listen for
-// changes in the network configuration using a Stream created on the
-// supplied Publisher and change the set of Endpoints it publishes to
-// the mount table accordingly.
-//
-// The server associates dispatcher with the portion of the namespace
-// for which name is a prefix by publishing the address of this server
-// to the namespace under the supplied name. If name is an empty
-// string, no attempt will made to publish. RPCs invoked on the
-// supplied name will be delivered to the supplied Dispatcher's Lookup
-// method which will in turn return the object and security.Authorizer
-// used to serve the actual RPC call. If name is an empty string, no
-// attempt will made to publish that name to a mount table.
-func NewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (rpc.Server, error) {
- s, err := v23.NewServer(ctx, opts...)
- if err != nil {
- return nil, err
- }
- if _, err = s.Listen(v23.GetListenSpec(ctx)); err != nil {
- return nil, err
- }
- if err = s.ServeDispatcher(name, disp); err != nil {
- return nil, err
- }
- return &server{s: s}, nil
-}
-
-// AddName adds the specified name to the mount table for this server.
-// AddName may be called multiple times.
-func (s *server) AddName(name string) error {
- return s.s.AddName(name)
-}
-
-// RemoveName removes the specified name from the mount table.
-// RemoveName may be called multiple times.
-func (s *server) RemoveName(name string) {
- s.s.RemoveName(name)
-}
-
-// Status returns the current status of the server, see ServerStatus
-// for details.
-func (s *server) Status() rpc.ServerStatus {
- return s.s.Status()
-}
-
-// WatchNetwork registers a channel over which NetworkChange's will
-// be sent. The Server will not block sending data over this channel
-// and hence change events may be lost if the caller doesn't ensure
-// there is sufficient buffering in the channel.
-func (s *server) WatchNetwork(ch chan<- rpc.NetworkChange) {
- s.s.WatchNetwork(ch)
-}
-
-// UnwatchNetwork unregisters a channel previously registered using
-// WatchNetwork.
-func (s *server) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
- s.s.UnwatchNetwork(ch)
-}
-
-// Stop gracefully stops all services on this Server. New calls are
-// rejected, but any in-flight calls are allowed to complete. All
-// published mountpoints are unmounted. This call waits for this
-// process to complete, and returns once the server has been shut down.
-func (s *server) Stop() error {
- return s.s.Stop()
-}
diff --git a/lib/xrpc/xserver_test.go b/lib/xrpc/xserver_test.go
deleted file mode 100644
index b799fb4..0000000
--- a/lib/xrpc/xserver_test.go
+++ /dev/null
@@ -1,70 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package xrpc_test
-
-import (
- "testing"
-
- "v.io/v23"
- "v.io/v23/context"
- "v.io/v23/rpc"
- "v.io/v23/security"
- _ "v.io/x/ref/runtime/factories/generic"
- "v.io/x/ref/test"
-)
-
-func init() {
- test.Init()
-}
-
-type service struct{}
-
-func (s *service) Yo(ctx *context.T, call rpc.ServerCall) (string, error) {
- return "yo", nil
-}
-
-func TestXServer(t *testing.T) {
- ctx, shutdown := test.V23Init()
- defer shutdown()
-
- ctx, server, err := v23.WithNewServer(ctx, "", &service{}, nil)
- if err != nil {
- t.Fatalf("Error creating server: %v", err)
- }
- ep := server.Status().Endpoints[0]
-
- var out string
- if err := v23.GetClient(ctx).Call(ctx, ep.Name(), "Yo", nil, []interface{}{&out}); err != nil {
- t.Fatalf("Call failed: %v", err)
- }
- if out != "yo" {
- t.Fatalf("Wanted yo, got %s", out)
- }
-}
-
-type dispatcher struct{}
-
-func (d *dispatcher) Lookup(_ *context.T, suffix string) (interface{}, security.Authorizer, error) {
- return &service{}, nil, nil
-}
-
-func TestXDispatchingServer(t *testing.T) {
- ctx, shutdown := test.V23Init()
- defer shutdown()
-
- ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &dispatcher{})
- if err != nil {
- t.Fatalf("Error creating server: %v", err)
- }
- ep := server.Status().Endpoints[0]
-
- var out string
- if err := v23.GetClient(ctx).Call(ctx, ep.Name(), "Yo", nil, []interface{}{&out}); err != nil {
- t.Fatalf("Call failed: %v", err)
- }
- if out != "yo" {
- t.Fatalf("Wanted yo, got %s", out)
- }
-}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 76a5206..a7991c7 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -159,6 +159,12 @@
// RemoteEndpoint returns the remote vanadium Endpoint
func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
+// LocalBlessings returns the local blessings.
+func (c *Conn) LocalBlessings() security.Blessings { return c.lBlessings }
+
+// RemoteBlessings returns the remote blessings.
+func (c *Conn) RemoteBlessings() security.Blessings { return c.rBlessings }
+
// CommonVersion returns the RPCVersion negotiated between the local and remote endpoints.
func (c *Conn) CommonVersion() version.RPCVersion { return c.version }
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index cdb81a7..92f8348 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -183,6 +183,9 @@
// is identified by the provided rid. nil is returned if there is no such Conn.
// FindWithRoutingID will return an error iff the cache is closed.
func (c *ConnCache) FindWithRoutingID(rid naming.RoutingID) (*conn.Conn, error) {
+ if rid == naming.NullRoutingID {
+ return nil, nil
+ }
defer c.mu.Unlock()
c.mu.Lock()
if c.addrCache == nil {
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 6ec43fb..d39b1ad 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -266,7 +266,11 @@
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
- return m.internalDial(ctx, remote, fn, &flowHandler{q: m.q, closed: m.closed})
+ var fh conn.FlowHandler
+ if m.rid != naming.NullRoutingID {
+ fh = &flowHandler{q: m.q, closed: m.closed}
+ }
+ return m.internalDial(ctx, remote, fn, fh)
}
func (m *manager) internalDial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer, fh conn.FlowHandler) (flow.Flow, error) {
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 41c470f..62403da 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -15,6 +15,7 @@
"v.io/v23/naming"
_ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
"v.io/x/ref/test"
)
@@ -82,6 +83,29 @@
testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
}
+func TestNullClientBlessings(t *testing.T) {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ am := New(ctx, naming.FixedRoutingID(0x5555))
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+ t.Fatal(err)
+ }
+ dm := New(ctx, naming.NullRoutingID)
+ _, af := testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+ // Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
+ if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
+ t.Errorf("got %v, want zero-value blessings", rBlessings)
+ }
+ dm = New(ctx, naming.FixedRoutingID(0x1111))
+ _, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+ // Ensure that the remote blessings of the underlying conn of the accepted flow are
+ // non-zero if we did specify a RoutingID.
+ if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
+ t.Errorf("got %v, want non-zero blessings", rBlessings)
+ }
+}
+
func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
eps := am.ListeningEndpoints()
if len(eps) == 0 {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index f3452b0..c4f358e 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -190,7 +190,13 @@
}
// Add the flow.Manager to the context.
- ctx, _, err = r.ExperimentalWithNewFlowManager(ctx)
+ // This initial Flow Manager can only be used as a client.
+ ctx, _, err = r.setNewClientFlowManager(ctx)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ // Add the Client to the context.
+ ctx, _, err = r.WithNewClient(ctx)
if err != nil {
return nil, nil, nil, err
}
@@ -321,21 +327,22 @@
return sm, nil
}
-func newFlowManager(ctx *context.T) (flow.Manager, error) {
- rid, err := naming.NewRoutingID()
- if err != nil {
- return nil, err
- }
- return manager.New(ctx, rid), nil
+func (r *Runtime) setNewClientFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+ return r.setNewFlowManager(ctx, naming.NullRoutingID)
}
-func (r *Runtime) setNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
- fm, err := newFlowManager(ctx)
+func (r *Runtime) setNewBidiFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+ rid, err := naming.NewRoutingID()
if err != nil {
return nil, nil, err
}
+ return r.setNewFlowManager(ctx, rid)
+}
+
+func (r *Runtime) setNewFlowManager(ctx *context.T, rid naming.RoutingID) (*context.T, flow.Manager, error) {
+ fm := manager.New(ctx, rid)
// TODO(mattr): How can we close a flow manager.
- if err = r.addChild(ctx, fm, func() {}); err != nil {
+ if err := r.addChild(ctx, fm, func() {}); err != nil {
return ctx, nil, err
}
newctx := context.WithValue(ctx, flowManagerKey, fm)
@@ -396,7 +403,12 @@
if newctx, err = r.setNewStreamManager(newctx); err != nil {
return ctx, err
}
- if newctx, _, err = r.setNewFlowManager(newctx); err != nil {
+ if rid := r.ExperimentalGetFlowManager(newctx).RoutingID(); rid == naming.NullRoutingID {
+ newctx, _, err = r.setNewClientFlowManager(newctx)
+ } else {
+ newctx, _, err = r.setNewBidiFlowManager(newctx)
+ }
+ if err != nil {
return ctx, err
}
if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
@@ -559,7 +571,7 @@
func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- newctx, m, err := r.setNewFlowManager(ctx)
+ newctx, m, err := r.setNewBidiFlowManager(ctx)
if err != nil {
return ctx, nil, err
}
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index 9b7f6cd..fc70a76 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -161,6 +161,9 @@
if oldman == nil {
t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
}
+ if rid := oldman.RoutingID(); rid != naming.NullRoutingID {
+ t.Errorf("Initial flow.Manager should have NullRoutingID, got %v", rid)
+ }
newctx, newman, err := r.ExperimentalWithNewFlowManager(ctx)
if err != nil || newman == nil || newman == oldman {
t.Fatalf("Could not create flow manager: %v", err)
@@ -172,4 +175,7 @@
if man != newman || man == oldman {
t.Error("ExperimentalWithNewFlowManager didn't update the context properly")
}
+ if man.RoutingID() == naming.NullRoutingID {
+ t.Error("Newly created flow.Manager should not have NullRoutingID")
+ }
}
diff --git a/services/agent/internal/server/server.go b/services/agent/internal/server/server.go
index a9ccc5c..cce84a9 100644
--- a/services/agent/internal/server/server.go
+++ b/services/agent/internal/server/server.go
@@ -339,6 +339,9 @@
}
func (m *keymgr) ServePrincipal(handle [agent.PrincipalHandleByteSize]byte, path string) error {
+ if maxLen := GetMaxSockPathLen(); len(path) > maxLen {
+ return fmt.Errorf("socket path (%s) exceeds maximum allowed socket path length (%d)", path, maxLen)
+ }
if _, err := m.readKey(handle); err != nil {
return err
}
diff --git a/services/agent/internal/server/sock_len.go b/services/agent/internal/server/sock_len.go
new file mode 100644
index 0000000..7706d58
--- /dev/null
+++ b/services/agent/internal/server/sock_len.go
@@ -0,0 +1,9 @@
+package server
+
+// #include <sys/un.h>
+import "C"
+
+func GetMaxSockPathLen() int {
+ var t C.struct_sockaddr_un
+ return len(t.sun_path)
+}
diff --git a/services/agent/internal/server/sock_len_darwin_test.go b/services/agent/internal/server/sock_len_darwin_test.go
new file mode 100644
index 0000000..ca8fc74
--- /dev/null
+++ b/services/agent/internal/server/sock_len_darwin_test.go
@@ -0,0 +1,11 @@
+package server
+
+import (
+ "testing"
+)
+
+func TestMaxSockPathLen(t *testing.T) {
+ if length := GetMaxSockPathLen(); length != 104 {
+ t.Errorf("Expected max socket path length to be 104 on darwin, got %d instead", length)
+ }
+}
diff --git a/services/agent/internal/server/sock_len_linux_test.go b/services/agent/internal/server/sock_len_linux_test.go
new file mode 100644
index 0000000..e592f12
--- /dev/null
+++ b/services/agent/internal/server/sock_len_linux_test.go
@@ -0,0 +1,11 @@
+package server
+
+import (
+ "testing"
+)
+
+func TestMaxSockPathLen(t *testing.T) {
+ if length := GetMaxSockPathLen(); length != 108 {
+ t.Errorf("Expected max socket path length to be 108 on linux, got %d instead", length)
+ }
+}
diff --git a/services/application/application/impl_test.go b/services/application/application/impl_test.go
index 007d507..1bdf115 100644
--- a/services/application/application/impl_test.go
+++ b/services/application/application/impl_test.go
@@ -97,12 +97,6 @@
return nil
}
-func (s *server) PutX(ctx *context.T, _ rpc.ServerCall, profile string, env application.Envelope, overwrite bool) error {
- ctx.VI(2).Infof("%v.PutX(%v, %v, %t) was called", s.suffix, profile, env, overwrite)
- fmt.Fprintf(&serverOut, "PutX(%s, ..., %t)\n", profile, overwrite)
- return nil
-}
-
func (s *server) Profiles(ctx *context.T, _ rpc.ServerCall) ([]string, error) {
ctx.VI(2).Infof("%v.Profiles() was called", s.suffix)
return strings.Split(profiles, ","), nil
diff --git a/services/application/applicationd/dispatcher.go b/services/application/applicationd/dispatcher.go
index 8f081ac..506d3ae 100644
--- a/services/application/applicationd/dispatcher.go
+++ b/services/application/applicationd/dispatcher.go
@@ -44,7 +44,7 @@
naming.Join("/acls", "data"),
naming.Join("/acls", name, "data"),
(*applicationPermsStore)(d.store),
- []string{"Put", "PutX", "__Glob"})
+ []string{"Put", "__Glob"})
if err != nil {
return nil, nil, err
}
diff --git a/services/application/applicationd/service.go b/services/application/applicationd/service.go
index 7d12040..fca06f8 100644
--- a/services/application/applicationd/service.go
+++ b/services/application/applicationd/service.go
@@ -135,10 +135,6 @@
return empty, verror.New(verror.ErrNoExist, ctx)
}
-func (i *appRepoService) PutX(ctx *context.T, call rpc.ServerCall, profile string, envelope application.Envelope, overwrite bool) error {
- return i.Put(ctx, call, profile, envelope, overwrite)
-}
-
func (i *appRepoService) Put(ctx *context.T, call rpc.ServerCall, profile string, envelope application.Envelope, overwrite bool) error {
ctx.VI(0).Infof("%v.Put(%v, %v, %t)", i.suffix, profile, envelope, overwrite)
name, version, err := parse(ctx, i.suffix)
diff --git a/services/device/device/publish.go b/services/device/device/publish.go
index 5862d86..f7d84a0 100644
--- a/services/device/device/publish.go
+++ b/services/device/device/publish.go
@@ -140,8 +140,7 @@
// TODO(caprita): use the profile detection machinery and/or let user
// specify the profile by hand.
profile := fmt.Sprintf("%s-%s", goosFlag, goarchFlag)
- // TODO(caprita): use a label e.g. "prod" instead of "0".
- appVON := naming.Join(applicationService, envelopeName, "0")
+ appVON := naming.Join(applicationService, envelopeName)
appClient := repository.ApplicationClient(appVON)
envelope, err := appClient.Match(ctx, []string{profile})
if verror.ErrorID(err) == verror.ErrNoExist.ID {
@@ -173,8 +172,16 @@
envelope.Binary.Signature = security.Signature{}
envelope.Publisher = security.Blessings{}
}
-
- if err := appClient.Put(ctx, profile, envelope, true); err != nil {
+ appVON = naming.Join(appVON, timestamp)
+ appClient = repository.ApplicationClient(appVON)
+ if err := appClient.Put(ctx, profile, envelope, false); err != nil {
+ // NOTE(caprita): We don't retry if an envelope already exists
+ // at the versioned name, as we do when uploading binaries. In
+ // the case of binaries, it's likely that the same binary is
+ // uploaded more than once in a given second, due to apps
+ // sharing the same binary. The scenarios where the same app is
+ // published repeatedly in a short time-frame are expected to be
+ // rare, and the operator can retry manually in such cases.
return err
}
fmt.Fprintf(env.Stdout, "Published %q\n", appVON)
diff --git a/services/device/dmrun/dmrun.go b/services/device/dmrun/dmrun.go
index c829566..d816542 100644
--- a/services/device/dmrun/dmrun.go
+++ b/services/device/dmrun/dmrun.go
@@ -234,6 +234,10 @@
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", ref.EnvCredentials, creds))
} else if agentCreds := os.Getenv(ref.EnvAgentEndpoint); len(agentCreds) > 0 {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", ref.EnvAgentEndpoint, agentCreds))
+ } else if agentCreds := os.Getenv(ref.EnvAgentPath); len(agentCreds) > 0 {
+ cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", ref.EnvAgentPath, agentCreds))
+ } else {
+ fmt.Fprintf(os.Stderr, "WARNING: no credentials found. You'll probably have authorization issues later on.\n")
}
}
diff --git a/services/device/mgmt_v23_test.go b/services/device/mgmt_v23_test.go
index 5940bcc..78a7795 100644
--- a/services/device/mgmt_v23_test.go
+++ b/services/device/mgmt_v23_test.go
@@ -350,14 +350,14 @@
// Allow publishers to create and update envelopes
deviceBin.Run("acl", "set", appDName, "root/a", "Read,Write,Resolve")
- sampleAppName := appDName + "/testapp/0"
+ sampleAppName := appDName + "/testapp"
appPubName := "testbinaryd"
appEnvelopeFilename := filepath.Join(workDir, "app.envelope")
appEnvelope := fmt.Sprintf("{\"Title\":\"BINARYD\", \"Args\":[\"--name=%s\", \"--root-dir=./binstore\", \"--v23.tcp.address=127.0.0.1:0\", \"--http=127.0.0.1:0\"], \"Binary\":{\"File\":%q}, \"Env\":[]}", appPubName, sampleAppBinName)
ioutil.WriteFile(appEnvelopeFilename, []byte(appEnvelope), 0666)
defer os.Remove(appEnvelopeFilename)
- output := applicationBin.Run("put", sampleAppName, deviceProfile, appEnvelopeFilename)
+ output := applicationBin.Run("put", sampleAppName+"/0", deviceProfile, appEnvelopeFilename)
if got, want := output, fmt.Sprintf("Application envelope added for profile %s.", deviceProfile); got != want {
i.Fatalf("got %q, want %q", got, want)
}
diff --git a/services/repository/repository.vdl b/services/repository/repository.vdl
index 2d977c2..a9f6287 100644
--- a/services/repository/repository.vdl
+++ b/services/repository/repository.vdl
@@ -25,8 +25,6 @@
// An error is returned if an envelope already exists, unless the
// overwrite option is set.
Put(Profile string, Envelope application.Envelope, Overwrite bool) error {access.Write}
- // DEPRECATED. Please use Put for new code.
- PutX(Profile string, Envelope application.Envelope, Overwrite bool) error {access.Write}
// Remove removes the application envelope for the given profile
// name and application version (specified through the object name
// suffix).
diff --git a/services/repository/repository.vdl.go b/services/repository/repository.vdl.go
index 07c29d2..6d62aa4 100644
--- a/services/repository/repository.vdl.go
+++ b/services/repository/repository.vdl.go
@@ -50,8 +50,6 @@
// An error is returned if an envelope already exists, unless the
// overwrite option is set.
Put(ctx *context.T, Profile string, Envelope application.Envelope, Overwrite bool, opts ...rpc.CallOpt) error
- // DEPRECATED. Please use Put for new code.
- PutX(ctx *context.T, Profile string, Envelope application.Envelope, Overwrite bool, opts ...rpc.CallOpt) error
// Remove removes the application envelope for the given profile
// name and application version (specified through the object name
// suffix).
@@ -91,11 +89,6 @@
return
}
-func (c implApplicationClientStub) PutX(ctx *context.T, i0 string, i1 application.Envelope, i2 bool, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "PutX", []interface{}{i0, i1, i2}, nil, opts...)
- return
-}
-
func (c implApplicationClientStub) Remove(ctx *context.T, i0 string, opts ...rpc.CallOpt) (err error) {
err = v23.GetClient(ctx).Call(ctx, c.name, "Remove", []interface{}{i0}, nil, opts...)
return
@@ -131,8 +124,6 @@
// An error is returned if an envelope already exists, unless the
// overwrite option is set.
Put(ctx *context.T, call rpc.ServerCall, Profile string, Envelope application.Envelope, Overwrite bool) error
- // DEPRECATED. Please use Put for new code.
- PutX(ctx *context.T, call rpc.ServerCall, Profile string, Envelope application.Envelope, Overwrite bool) error
// Remove removes the application envelope for the given profile
// name and application version (specified through the object name
// suffix).
@@ -191,10 +182,6 @@
return s.impl.Put(ctx, call, i0, i1, i2)
}
-func (s implApplicationServerStub) PutX(ctx *context.T, call rpc.ServerCall, i0 string, i1 application.Envelope, i2 bool) error {
- return s.impl.PutX(ctx, call, i0, i1, i2)
-}
-
func (s implApplicationServerStub) Remove(ctx *context.T, call rpc.ServerCall, i0 string) error {
return s.impl.Remove(ctx, call, i0)
}
@@ -234,16 +221,6 @@
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
},
{
- Name: "PutX",
- Doc: "// DEPRECATED. Please use Put for new code.",
- InArgs: []rpc.ArgDesc{
- {"Profile", ``}, // string
- {"Envelope", ``}, // application.Envelope
- {"Overwrite", ``}, // bool
- },
- Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
- },
- {
Name: "Remove",
Doc: "// Remove removes the application envelope for the given profile\n// name and application version (specified through the object name\n// suffix).\n//\n// If no version is specified as part of the suffix, the method removes\n// all versions for the given profile.\n//\n// If the profile is the string \"*\", all profiles are removed for the\n// given version (or for all versions if the version is not specified).",
InArgs: []rpc.ArgDesc{
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index e07094c..9bcec0d 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -9,7 +9,6 @@
"sync"
"v.io/v23/context"
- "v.io/v23/glob"
"v.io/v23/rpc"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
@@ -42,6 +41,8 @@
////////////////////////////////////////
// RPC methods
+// TODO(sadovsky): Implement Glob__ or GlobChildren__.
+
// TODO(sadovsky): Require the app name to match the client's blessing name.
// I.e. reserve names at the app level of the hierarchy.
func (a *app) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
@@ -83,17 +84,17 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-func (a *app) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error {
+func (a *app) ListDatabases(ctx *context.T, call rpc.ServerCall) ([]string, error) {
if !a.exists {
- return verror.New(verror.ErrNoExist, ctx, a.name)
+ return nil, verror.New(verror.ErrNoExist, ctx, a.name)
}
// Check perms.
sn := a.s.st.NewSnapshot()
if err := util.GetWithAuth(ctx, call, sn, a.stKey(), &appData{}); err != nil {
sn.Abort()
- return err
+ return nil, err
}
- return util.Glob(ctx, call, matcher, sn, sn.Abort, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+ return util.ListChildren(ctx, call, sn, util.JoinKeyParts(util.DbInfoPrefix, a.name))
}
////////////////////////////////////////
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index dac3d34..06a6757 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -34,21 +34,26 @@
func (disp *dispatcher) Lookup(ctx *context.T, suffix string) (interface{}, security.Authorizer, error) {
suffix = strings.TrimPrefix(suffix, "/")
- parts := strings.SplitN(suffix, "/", 2)
if len(suffix) == 0 {
return wire.ServiceServer(disp.s), auth, nil
}
+ // If the first slash-separated component of suffix is SyncbaseSuffix,
+ // dispatch to the sync module.
+ parts := strings.SplitN(suffix, "/", 2)
if parts[0] == util.SyncbaseSuffix {
return interfaces.SyncServer(disp.s.sync), auth, nil
}
+ // Otherwise, split on NameSepWithSlashes to get hierarchy component names.
+ parts = strings.SplitN(suffix, pubutil.NameSepWithSlashes, 2)
+
// Validate all key atoms up front, so that we can avoid doing so in all our
// method implementations.
appName := parts[0]
if !pubutil.ValidName(appName) {
- return nil, nil, wire.NewErrInvalidName(nil, suffix)
+ return nil, nil, wire.NewErrInvalidName(ctx, suffix)
}
aExists := false
@@ -74,7 +79,7 @@
// All database, table, and row methods require the app to exist. If it
// doesn't, abort early.
if !aExists {
- return nil, nil, verror.New(verror.ErrNoExist, nil, a.name)
+ return nil, nil, verror.New(verror.ErrNoExist, ctx, a.name)
}
// Note, it's possible for the app to be deleted concurrently with downstream
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 6dd6f25..80b46a0 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -19,15 +19,36 @@
// SyncGroup-related methods.
- // PublishSyncGroup is typically invoked on a "central" peer to publish
- // the SyncGroup.
- PublishSyncGroup(sg SyncGroup) error {access.Write}
+ // PublishSyncGroup is invoked on the SyncGroup name (typically served
+ // by a "central" peer) to publish the SyncGroup. It takes the name of
+ // Syncbase doing the publishing (the publisher) and returns the name
+ // of the Syncbase where the SyncGroup is published (the publishee).
+ // This allows the publisher and the publishee to learn of each other.
+ // When a SyncGroup is published, the publishee is given the SyncGroup
+ // metadata, its current version at the publisher, and the current
+ // SyncGroup generation vector. The generation vector serves as a
+ // checkpoint at the time of publishing. The publishing proceeds
+ // asynchronously, and the publishee learns the SyncGroup history
+ // through the routine p2p sync process and determines when it has
+ // caught up to the level of knowledge at the time of publishing using
+ // the checkpointed generation vector. Until that point, the publishee
+ // locally deems the SyncGroup to be in a pending state and does not
+ // mutate it. Thus it locally rejects SyncGroup joins or updates to
+ // its spec until it is caught up on the SyncGroup history.
+ PublishSyncGroup(publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string | error) {access.Write}
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
// allowed to join the named SyncGroup, and if so, adds the requestor to
- // the SyncGroup.
- JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (SyncGroup | error) {access.Read}
+ // the SyncGroup. It returns a copy of the updated SyncGroup metadata,
+ // its version, and the SyncGroup generation vector at the time of the
+ // join. Similar to the PublishSyncGroup scenario, the joiner at that
+ // point does not have the SyncGroup history and locally deems it to be
+ // in a pending state and does not mutate it. This means it rejects
+ // local updates to the SyncGroup spec or, if it were also an admin on
+ // the SyncGroup, it would reject SyncGroup joins until it is caught up
+ // on the SyncGroup history through p2p sync.
+ JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector | error) {access.Read}
// BlobSync methods.
@@ -50,3 +71,7 @@
FetchBlobRecipe(br wire.BlobRef) stream<_, ChunkHash> error
FetchChunks() stream<ChunkHash, ChunkData> error
}
+
+error (
+ DupSyncGroupPublish(name string) {"en": "duplicate publish on SyncGroup: {name}"}
+)
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index 3006f3c..102699b 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -12,14 +12,29 @@
"io"
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/i18n"
"v.io/v23/rpc"
"v.io/v23/vdl"
+ "v.io/v23/verror"
// VDL user imports
"v.io/v23/security/access"
"v.io/v23/services/syncbase/nosql"
)
+var (
+ ErrDupSyncGroupPublish = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.DupSyncGroupPublish", verror.NoRetry, "{1:}{2:} duplicate publish on SyncGroup: {3}")
+)
+
+func init() {
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDupSyncGroupPublish.ID), "{1:}{2:} duplicate publish on SyncGroup: {3}")
+}
+
+// NewErrDupSyncGroupPublish returns an error with the ErrDupSyncGroupPublish ID.
+func NewErrDupSyncGroupPublish(ctx *context.T, name string) error {
+ return verror.New(ErrDupSyncGroupPublish, ctx, name)
+}
+
// SyncClientMethods is the client interface
// containing Sync methods.
//
@@ -30,14 +45,35 @@
// the missing log records when compared to the initiator's generation
// vector for one Database for either SyncGroup metadata or data.
GetDeltas(ctx *context.T, req DeltaReq, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
- // PublishSyncGroup is typically invoked on a "central" peer to publish
- // the SyncGroup.
- PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
+ // PublishSyncGroup is invoked on the SyncGroup name (typically served
+ // by a "central" peer) to publish the SyncGroup. It takes the name of
+ // Syncbase doing the publishing (the publisher) and returns the name
+ // of the Syncbase where the SyncGroup is published (the publishee).
+ // This allows the publisher and the publishee to learn of each other.
+ // When a SyncGroup is published, the publishee is given the SyncGroup
+ // metadata, its current version at the publisher, and the current
+ // SyncGroup generation vector. The generation vector serves as a
+ // checkpoint at the time of publishing. The publishing proceeds
+ // asynchronously, and the publishee learns the SyncGroup history
+ // through the routine p2p sync process and determines when it has
+ // caught up to the level of knowledge at the time of publishing using
+ // the checkpointed generation vector. Until that point, the publishee
+ // locally deems the SyncGroup to be in a pending state and does not
+ // mutate it. Thus it locally rejects SyncGroup joins or updates to
+ // its spec until it is caught up on the SyncGroup history.
+ PublishSyncGroup(ctx *context.T, publisher string, sg SyncGroup, version string, genvec PrefixGenVector, opts ...rpc.CallOpt) (string, error)
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
// allowed to join the named SyncGroup, and if so, adds the requestor to
- // the SyncGroup.
- JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (SyncGroup, error)
+ // the SyncGroup. It returns a copy of the updated SyncGroup metadata,
+ // its version, and the SyncGroup generation vector at the time of the
+ // join. Similar to the PublishSyncGroup scenario, the joiner at that
+ // point does not have the SyncGroup history and locally deems it to be
+ // in a pending state and does not mutate it. This means it rejects
+ // local updates to the SyncGroup spec or, if it were also an admin on
+ // the SyncGroup, it would reject SyncGroup joins until it is caught up
+ // on the SyncGroup history through p2p sync.
+ JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
// HaveBlob verifies that the peer has the requested blob, and if
// present, returns its size.
HaveBlob(ctx *context.T, br nosql.BlobRef, opts ...rpc.CallOpt) (int64, error)
@@ -80,13 +116,13 @@
return
}
-func (c implSyncClientStub) PublishSyncGroup(ctx *context.T, i0 SyncGroup, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "PublishSyncGroup", []interface{}{i0}, nil, opts...)
+func (c implSyncClientStub) PublishSyncGroup(ctx *context.T, i0 string, i1 SyncGroup, i2 string, i3 PrefixGenVector, opts ...rpc.CallOpt) (o0 string, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "PublishSyncGroup", []interface{}{i0, i1, i2, i3}, []interface{}{&o0}, opts...)
return
}
-func (c implSyncClientStub) JoinSyncGroupAtAdmin(ctx *context.T, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (o0 SyncGroup, err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroupAtAdmin", []interface{}{i0, i1, i2}, []interface{}{&o0}, opts...)
+func (c implSyncClientStub) JoinSyncGroupAtAdmin(ctx *context.T, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (o0 SyncGroup, o1 string, o2 PrefixGenVector, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroupAtAdmin", []interface{}{i0, i1, i2}, []interface{}{&o0, &o1, &o2}, opts...)
return
}
@@ -440,14 +476,35 @@
// the missing log records when compared to the initiator's generation
// vector for one Database for either SyncGroup metadata or data.
GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, req DeltaReq, initiator string) error
- // PublishSyncGroup is typically invoked on a "central" peer to publish
- // the SyncGroup.
- PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
+ // PublishSyncGroup is invoked on the SyncGroup name (typically served
+ // by a "central" peer) to publish the SyncGroup. It takes the name of
+ // Syncbase doing the publishing (the publisher) and returns the name
+ // of the Syncbase where the SyncGroup is published (the publishee).
+ // This allows the publisher and the publishee to learn of each other.
+ // When a SyncGroup is published, the publishee is given the SyncGroup
+ // metadata, its current version at the publisher, and the current
+ // SyncGroup generation vector. The generation vector serves as a
+ // checkpoint at the time of publishing. The publishing proceeds
+ // asynchronously, and the publishee learns the SyncGroup history
+ // through the routine p2p sync process and determines when it has
+ // caught up to the level of knowledge at the time of publishing using
+ // the checkpointed generation vector. Until that point, the publishee
+ // locally deems the SyncGroup to be in a pending state and does not
+ // mutate it. Thus it locally rejects SyncGroup joins or updates to
+ // its spec until it is caught up on the SyncGroup history.
+ PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string, error)
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
// allowed to join the named SyncGroup, and if so, adds the requestor to
- // the SyncGroup.
- JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
+ // the SyncGroup. It returns a copy of the updated SyncGroup metadata,
+ // its version, and the SyncGroup generation vector at the time of the
+ // join. Similar to the PublishSyncGroup scenario, the joiner at that
+ // point does not have the SyncGroup history and locally deems it to be
+ // in a pending state and does not mutate it. This means it rejects
+ // local updates to the SyncGroup spec or, if it were also an admin on
+ // the SyncGroup, it would reject SyncGroup joins until it is caught up
+ // on the SyncGroup history through p2p sync.
+ JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
// HaveBlob verifies that the peer has the requested blob, and if
// present, returns its size.
HaveBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) (int64, error)
@@ -475,14 +532,35 @@
// the missing log records when compared to the initiator's generation
// vector for one Database for either SyncGroup metadata or data.
GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, req DeltaReq, initiator string) error
- // PublishSyncGroup is typically invoked on a "central" peer to publish
- // the SyncGroup.
- PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
+ // PublishSyncGroup is invoked on the SyncGroup name (typically served
+ // by a "central" peer) to publish the SyncGroup. It takes the name of
+ // Syncbase doing the publishing (the publisher) and returns the name
+ // of the Syncbase where the SyncGroup is published (the publishee).
+ // This allows the publisher and the publishee to learn of each other.
+ // When a SyncGroup is published, the publishee is given the SyncGroup
+ // metadata, its current version at the publisher, and the current
+ // SyncGroup generation vector. The generation vector serves as a
+ // checkpoint at the time of publishing. The publishing proceeds
+ // asynchronously, and the publishee learns the SyncGroup history
+ // through the routine p2p sync process and determines when it has
+ // caught up to the level of knowledge at the time of publishing using
+ // the checkpointed generation vector. Until that point, the publishee
+ // locally deems the SyncGroup to be in a pending state and does not
+ // mutate it. Thus it locally rejects SyncGroup joins or updates to
+ // its spec until it is caught up on the SyncGroup history.
+ PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string, error)
// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
// Syncbase on a SyncGroup admin. It checks whether the requestor is
// allowed to join the named SyncGroup, and if so, adds the requestor to
- // the SyncGroup.
- JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
+ // the SyncGroup. It returns a copy of the updated SyncGroup metadata,
+ // its version, and the SyncGroup generation vector at the time of the
+ // join. Similar to the PublishSyncGroup scenario, the joiner at that
+ // point does not have the SyncGroup history and locally deems it to be
+ // in a pending state and does not mutate it. This means it rejects
+ // local updates to the SyncGroup spec or, if it were also an admin on
+ // the SyncGroup, it would reject SyncGroup joins until it is caught up
+ // on the SyncGroup history through p2p sync.
+ JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
// HaveBlob verifies that the peer has the requested blob, and if
// present, returns its size.
HaveBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) (int64, error)
@@ -534,11 +612,11 @@
return s.impl.GetDeltas(ctx, call, i0, i1)
}
-func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
- return s.impl.PublishSyncGroup(ctx, call, i0)
+func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 string, i1 SyncGroup, i2 string, i3 PrefixGenVector) (string, error) {
+ return s.impl.PublishSyncGroup(ctx, call, i0, i1, i2, i3)
}
-func (s implSyncServerStub) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo) (SyncGroup, error) {
+func (s implSyncServerStub) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo) (SyncGroup, string, PrefixGenVector, error) {
return s.impl.JoinSyncGroupAtAdmin(ctx, call, i0, i1, i2)
}
@@ -586,22 +664,30 @@
},
{
Name: "PublishSyncGroup",
- Doc: "// PublishSyncGroup is typically invoked on a \"central\" peer to publish\n// the SyncGroup.",
+ Doc: "// PublishSyncGroup is invoked on the SyncGroup name (typically served\n// by a \"central\" peer) to publish the SyncGroup. It takes the name of\n// Syncbase doing the publishing (the publisher) and returns the name\n// of the Syncbase where the SyncGroup is published (the publishee).\n// This allows the publisher and the publishee to learn of each other.\n// When a SyncGroup is published, the publishee is given the SyncGroup\n// metadata, its current version at the publisher, and the current\n// SyncGroup generation vector. The generation vector serves as a\n// checkpoint at the time of publishing. The publishing proceeds\n// asynchronously, and the publishee learns the SyncGroup history\n// through the routine p2p sync process and determines when it has\n// caught up to the level of knowledge at the time of publishing using\n// the checkpointed generation vector. Until that point, the publishee\n// locally deems the SyncGroup to be in a pending state and does not\n// mutate it. Thus it locally rejects SyncGroup joins or updates to\n// its spec until it is caught up on the SyncGroup history.",
InArgs: []rpc.ArgDesc{
- {"sg", ``}, // SyncGroup
+ {"publisher", ``}, // string
+ {"sg", ``}, // SyncGroup
+ {"version", ``}, // string
+ {"genvec", ``}, // PrefixGenVector
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // string
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
},
{
Name: "JoinSyncGroupAtAdmin",
- Doc: "// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's\n// Syncbase on a SyncGroup admin. It checks whether the requestor is\n// allowed to join the named SyncGroup, and if so, adds the requestor to\n// the SyncGroup.",
+ Doc: "// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's\n// Syncbase on a SyncGroup admin. It checks whether the requestor is\n// allowed to join the named SyncGroup, and if so, adds the requestor to\n// the SyncGroup. It returns a copy of the updated SyncGroup metadata,\n// its version, and the SyncGroup generation vector at the time of the\n// join. Similar to the PublishSyncGroup scenario, the joiner at that\n// point does not have the SyncGroup history and locally deems it to be\n// in a pending state and does not mutate it. This means it rejects\n// local updates to the SyncGroup spec or, if it were also an admin on\n// the SyncGroup, it would reject SyncGroup joins until it is caught up\n// on the SyncGroup history through p2p sync.",
InArgs: []rpc.ArgDesc{
{"sgName", ``}, // string
{"joinerName", ``}, // string
{"myInfo", ``}, // nosql.SyncGroupMemberInfo
},
OutArgs: []rpc.ArgDesc{
- {"", ``}, // SyncGroup
+ {"sg", ``}, // SyncGroup
+ {"version", ``}, // string
+ {"genvec", ``}, // PrefixGenVector
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 1755076..f6e14e1 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -7,13 +7,11 @@
import (
"math/rand"
"path"
- "strconv"
"strings"
"sync"
"time"
"v.io/v23/context"
- "v.io/v23/glob"
"v.io/v23/rpc"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase/nosql"
@@ -129,6 +127,8 @@
////////////////////////////////////////
// RPC methods
+// TODO(sadovsky): Implement Glob__ or GlobChildren__.
+
func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, metadata *wire.SchemaMetadata, perms access.Permissions) error {
if d.exists {
return verror.New(verror.ErrExist, ctx, d.name)
@@ -178,24 +178,24 @@
d.mu.Lock()
defer d.mu.Unlock()
var id uint64
- var batchType string
+ var batchType util.BatchType
for {
id = uint64(rng.Int63())
if bo.ReadOnly {
if _, ok := d.sns[id]; !ok {
d.sns[id] = d.st.NewSnapshot()
- batchType = "sn"
+ batchType = util.BatchTypeSn
break
}
} else {
if _, ok := d.txs[id]; !ok {
d.txs[id] = d.st.NewTransaction()
- batchType = "tx"
+ batchType = util.BatchTypeTx
break
}
}
}
- return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
+ return strings.Join([]string{d.name, util.JoinBatchInfo(batchType, id)}, util.BatchSep), nil
}
func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
@@ -251,10 +251,20 @@
}
func (d *databaseReq) Exec(ctx *context.T, call wire.DatabaseExecServerCall, schemaVersion int32, q string) error {
+ if !d.exists {
+ return verror.New(verror.ErrNoExist, ctx, d.name)
+ }
if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- impl := func(headers []string, rs query.ResultStream, err error) error {
+ impl := func(sntx store.SnapshotOrTransaction) error {
+ db := &queryDb{
+ ctx: ctx,
+ call: call,
+ req: d,
+ sntx: sntx,
+ }
+ headers, rs, err := exec.Exec(db, q)
if err != nil {
return err
}
@@ -275,23 +285,13 @@
}
return rs.Err()
}
- var sntx store.SnapshotOrTransaction
if d.batchId != nil {
- sntx = d.batchReader()
+ return impl(d.batchReader())
} else {
- sntx = d.st.NewSnapshot()
+ sntx := d.st.NewSnapshot()
defer sntx.Abort()
+ return impl(sntx)
}
- // queryDb implements the query.Database interface, which is needed by the
- // exec.Exec function.
- db := &queryDb{
- ctx: ctx,
- call: call,
- req: d,
- sntx: sntx,
- }
-
- return impl(exec.Exec(db, q))
}
func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
@@ -318,20 +318,25 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error {
+func (d *databaseReq) ListTables(ctx *context.T, call rpc.ServerCall) ([]string, error) {
if !d.exists {
- return verror.New(verror.ErrNoExist, ctx, d.name)
+ return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+ }
+ impl := func(sntx store.SnapshotOrTransaction) ([]string, error) {
+ // Check perms.
+ if err := util.GetWithAuth(ctx, call, sntx, d.stKey(), &databaseData{}); err != nil {
+ sntx.Abort()
+ return nil, err
+ }
+ return util.ListChildren(ctx, call, sntx, util.TablePrefix)
}
if d.batchId != nil {
- return wire.NewErrBoundToBatch(ctx)
+ return impl(d.batchReader())
+ } else {
+ sntx := d.st.NewSnapshot()
+ defer sntx.Abort()
+ return impl(sntx)
}
- // Check perms.
- sn := d.st.NewSnapshot()
- if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
- sn.Abort()
- return err
- }
- return util.Glob(ctx, call, matcher, sn, sn.Abort, util.TablePrefix)
}
////////////////////////////////////////
diff --git a/services/syncbase/server/nosql/database_watch.go b/services/syncbase/server/nosql/database_watch.go
index e201987..374cab4 100644
--- a/services/syncbase/server/nosql/database_watch.go
+++ b/services/syncbase/server/nosql/database_watch.go
@@ -172,7 +172,7 @@
}
parts := util.SplitKeyParts(opKey)
// TODO(rogulenko): Currently we process only rows, i.e. keys of the form
- // $row:xxx:yyy. Consider processing other keys.
+ // <RowPrefix>:xxx:yyy. Consider processing other keys.
if len(parts) != 3 || parts[0] != util.RowPrefix {
continue
}
@@ -188,7 +188,7 @@
continue
}
change := watch.Change{
- Name: naming.Join(table, row),
+ Name: naming.Join(table, pubutil.NameSep, row),
Continued: true,
}
switch op := logEntry.Op.(type) {
diff --git a/services/syncbase/server/nosql/dispatcher.go b/services/syncbase/server/nosql/dispatcher.go
index 269a7cd..4ec16c0 100644
--- a/services/syncbase/server/nosql/dispatcher.go
+++ b/services/syncbase/server/nosql/dispatcher.go
@@ -5,7 +5,6 @@
package nosql
import (
- "strconv"
"strings"
"v.io/v23/context"
@@ -34,25 +33,25 @@
// RPC method implementations to perform proper authorization.
var auth security.Authorizer = security.AllowEveryone()
-func (disp *dispatcher) Lookup(_ *context.T, suffix string) (interface{}, security.Authorizer, error) {
+func (disp *dispatcher) Lookup(ctx *context.T, suffix string) (interface{}, security.Authorizer, error) {
suffix = strings.TrimPrefix(suffix, "/")
- parts := strings.Split(suffix, "/")
+ parts := strings.Split(suffix, pubutil.NameSepWithSlashes)
if len(parts) == 0 {
vlog.Fatal("invalid nosql.dispatcher Lookup")
}
- dParts := strings.Split(parts[0], util.BatchSep)
+ dParts := strings.SplitN(parts[0], util.BatchSep, 2)
dName := dParts[0]
// Validate all key atoms up front, so that we can avoid doing so in all our
// method implementations.
if !pubutil.ValidName(dName) {
- return nil, nil, wire.NewErrInvalidName(nil, suffix)
+ return nil, nil, wire.NewErrInvalidName(ctx, suffix)
}
for _, s := range parts[1:] {
if !pubutil.ValidName(s) {
- return nil, nil, wire.NewErrInvalidName(nil, suffix)
+ return nil, nil, wire.NewErrInvalidName(ctx, suffix)
}
}
@@ -75,8 +74,10 @@
}
dReq := &databaseReq{database: d}
- if !setBatchFields(dReq, dParts) {
- return nil, nil, wire.NewErrInvalidName(nil, suffix)
+ if len(dParts) == 2 {
+ if !setBatchFields(dReq, dParts[1]) {
+ return nil, nil, wire.NewErrInvalidName(ctx, suffix)
+ }
}
if len(parts) == 1 {
return nosqlWire.DatabaseServer(dReq), auth, nil
@@ -85,7 +86,7 @@
// All table and row methods require the database to exist. If it doesn't,
// abort early.
if !dExists {
- return nil, nil, verror.New(verror.ErrNoExist, nil, d.name)
+ return nil, nil, verror.New(verror.ErrNoExist, ctx, d.name)
}
// Note, it's possible for the database to be deleted concurrently with
@@ -108,20 +109,14 @@
return nosqlWire.RowServer(rReq), auth, nil
}
- return nil, nil, verror.NewErrNoExist(nil)
+ return nil, nil, verror.NewErrNoExist(ctx)
}
// setBatchFields sets the batch-related fields in databaseReq based on the
-// value of dParts, the parts of the database name component. It returns false
-// if dParts is malformed.
-func setBatchFields(d *databaseReq, dParts []string) bool {
- if len(dParts) == 1 {
- return true
- }
- if len(dParts) != 3 {
- return false
- }
- batchId, err := strconv.ParseUint(dParts[2], 0, 64)
+// value of batchInfo (suffix of the database name component). It returns false
+// if batchInfo is malformed.
+func setBatchFields(d *databaseReq, batchInfo string) bool {
+ batchType, batchId, err := util.SplitBatchInfo(batchInfo)
if err != nil {
return false
}
@@ -129,13 +124,11 @@
d.mu.Lock()
defer d.mu.Unlock()
var ok bool
- switch dParts[1] {
- case "sn":
+ switch batchType {
+ case util.BatchTypeSn:
d.sn, ok = d.sns[batchId]
- case "tx":
+ case util.BatchTypeTx:
d.tx, ok = d.txs[batchId]
- default:
- return false
}
return ok
}
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index 758c58a..1efe739 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -26,6 +26,8 @@
////////////////////////////////////////
// RPC methods
+// TODO(sadovsky): Implement Glob__ or GlobChildren__.
+
func (r *rowReq) Exists(ctx *context.T, call rpc.ServerCall, schemaVersion int32) (bool, error) {
_, err := r.Get(ctx, call, schemaVersion)
return util.ErrorToExists(err)
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 27097b6..8d68d7a 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -8,7 +8,6 @@
"strings"
"v.io/v23/context"
- "v.io/v23/glob"
"v.io/v23/rpc"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase/nosql"
@@ -31,6 +30,8 @@
////////////////////////////////////////
// RPC methods
+// TODO(sadovsky): Implement Glob__ or GlobChildren__.
+
func (t *tableReq) Create(ctx *context.T, call rpc.ServerCall, schemaVersion int32, perms access.Permissions) error {
if t.d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
@@ -316,26 +317,6 @@
}
}
-func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error {
- impl := func(sntx store.SnapshotOrTransaction, closeSntx func() error) error {
- // Check perms.
- if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
- closeSntx()
- return err
- }
- // TODO(rogulenko): Check prefix permissions for children.
- return util.Glob(ctx, call, matcher, sntx, closeSntx, util.JoinKeyParts(util.RowPrefix, t.name))
- }
- if t.d.batchId != nil {
- return impl(t.d.batchReader(), func() error {
- return nil
- })
- } else {
- sn := t.d.st.NewSnapshot()
- return impl(sn, sn.Abort)
- }
-}
-
////////////////////////////////////////
// Internal helpers
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 6ca96f1..f2ec671 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -13,7 +13,6 @@
"sync"
"v.io/v23/context"
- "v.io/v23/glob"
"v.io/v23/rpc"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
@@ -136,6 +135,8 @@
////////////////////////////////////////
// RPC methods
+// TODO(sadovsky): Implement Glob__ or GlobChildren__.
+
func (s *service) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
return store.RunInTransaction(s.st, func(tx store.Transaction) error {
data := &serviceData{}
@@ -158,14 +159,14 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-func (s *service) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error {
+func (s *service) ListApps(ctx *context.T, call rpc.ServerCall) ([]string, error) {
// Check perms.
sn := s.st.NewSnapshot()
if err := util.GetWithAuth(ctx, call, sn, s.stKey(), &serviceData{}); err != nil {
sn.Abort()
- return err
+ return nil, err
}
- return util.Glob(ctx, call, matcher, sn, sn.Abort, util.AppPrefix)
+ return util.ListChildren(ctx, call, sn, util.AppPrefix)
}
////////////////////////////////////////
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index ab2e401..a14d006 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -8,9 +8,9 @@
"time"
)
-// TODO(sadovsky): Consider using shorter strings.
-
// Constants related to storage engine keys.
+// Note, these are persisted and therefore must not be modified.
+// Below, they are ordered lexicographically by value.
const (
AppPrefix = "$app"
ClockPrefix = "$clock"
@@ -23,26 +23,48 @@
SyncPrefix = "$sync"
TablePrefix = "$table"
VersionPrefix = "$version"
+
+ // TODO(sadovsky): Changing these prefixes breaks various tests. Tests
+ // generally shouldn't depend on the values of these constants.
+ /*
+ AppPrefix = "a"
+ ClockPrefix = "c"
+ DatabasePrefix = "d"
+ DbInfoPrefix = "i"
+ LogPrefix = "l"
+ PermsPrefix = "p"
+ RowPrefix = "r"
+ ServicePrefix = "s"
+ TablePrefix = "t"
+ VersionPrefix = "v"
+ SyncPrefix = "y"
+ */
+
+ // Separator for parts of storage engine keys.
+ // TODO(sadovsky): Allow ":" in names and use a different separator here.
+ KeyPartSep = ":"
+
+ // PrefixRangeLimitSuffix is a key suffix that indicates the end of a prefix
+ // range. Must be greater than any character allowed in client-provided keys.
+ PrefixRangeLimitSuffix = "\xff"
)
// Constants related to object names.
const (
- // Service object name suffix for Syncbase-to-Syncbase RPCs.
- SyncbaseSuffix = "$sync"
+ // Object name component for Syncbase-to-Syncbase (sync) RPCs.
+ // Sync object names have the form:
+ // <syncbase>/@@sync/...
+ SyncbaseSuffix = "@@sync"
// Separator for batch info in database names.
- BatchSep = ":"
- // Separator for parts of storage engine keys.
- KeyPartSep = ":"
- // PrefixRangeLimitSuffix is the suffix of a key which indicates the end of
- // a prefix range. Should be more than any regular key in the store.
- // TODO(rogulenko): Change this constant to something out of the UTF8 space.
- PrefixRangeLimitSuffix = "~"
+ // Batch object names have the form:
+ // <syncbase>/<app>/$/<database>@@<batchInfo>/...
+ BatchSep = "@@"
)
// Constants related to syncbase clock.
const (
- // The pool.ntp.org project is a big virtual cluster of timeservers
- // providing reliable easy to use NTP service for millions of clients.
+ // The pool.ntp.org project is a big virtual cluster of timeservers,
+ // providing reliable, easy-to-use NTP service for millions of clients.
// See more at http://www.pool.ntp.org/en/
NtpServerPool = "pool.ntp.org"
NtpSampleCount = 15
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
deleted file mode 100644
index c483347..0000000
--- a/services/syncbase/server/util/glob.go
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package util
-
-import (
- "v.io/v23/context"
- "v.io/v23/glob"
- "v.io/v23/naming"
- "v.io/v23/rpc"
- "v.io/x/lib/vlog"
- "v.io/x/ref/services/syncbase/store"
-)
-
-// NOTE(nlacasse): Syncbase handles Glob requests by implementing
-// GlobChildren__ at each level (service, app, database, table).
-
-// Glob performs a glob. It calls closeSntx to close sntx.
-func Glob(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element, sntx store.SnapshotOrTransaction, closeSntx func() error, stKeyPrefix string) error {
- prefix, _ := matcher.FixedPrefix()
- it := sntx.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
- defer closeSntx()
- key := []byte{}
- for it.Advance() {
- key = it.Key(key)
- parts := SplitKeyParts(string(key))
- name := parts[len(parts)-1]
- if matcher.Match(name) {
- if err := call.SendStream().Send(naming.GlobChildrenReplyName{Value: name}); err != nil {
- return err
- }
- }
- }
- if err := it.Err(); err != nil {
- vlog.VI(1).Infof("Glob() failed: %v", err)
- call.SendStream().Send(naming.GlobChildrenReplyError{Value: naming.GlobError{Error: err}})
- }
- return nil
-}
diff --git a/services/syncbase/server/util/key_util.go b/services/syncbase/server/util/key_util.go
index 0ac6686..c0d79ad 100644
--- a/services/syncbase/server/util/key_util.go
+++ b/services/syncbase/server/util/key_util.go
@@ -5,18 +5,21 @@
package util
import (
+ "strconv"
"strings"
"v.io/v23/syncbase/util"
+ "v.io/v23/verror"
)
// JoinKeyParts builds keys for accessing data in the storage engine.
+// TODO(sadovsky): Allow ":" in names and use a different separator here.
func JoinKeyParts(parts ...string) string {
- // TODO(sadovsky): Figure out which delimiter makes the most sense.
return strings.Join(parts, KeyPartSep)
}
// SplitKeyParts is the inverse of JoinKeyParts.
+// TODO(sadovsky): Allow ":" in names and use a different separator here.
func SplitKeyParts(key string) []string {
return strings.Split(key, KeyPartSep)
}
@@ -35,3 +38,36 @@
}
return []byte(fullStart), []byte(fullLimit)
}
+
+type BatchType int
+
+const (
+ BatchTypeSn BatchType = iota // snapshot
+ BatchTypeTx // transaction
+)
+
+// JoinBatchInfo encodes batch type and id into a single "info" string.
+func JoinBatchInfo(batchType BatchType, batchId uint64) string {
+ return strings.Join([]string{strconv.Itoa(int(batchType)), strconv.FormatUint(batchId, 10)}, BatchSep)
+}
+
+// SplitBatchInfo is the inverse of JoinBatchInfo.
+func SplitBatchInfo(batchInfo string) (BatchType, uint64, error) {
+ parts := strings.Split(batchInfo, BatchSep)
+ if len(parts) != 2 {
+ return BatchTypeSn, 0, verror.New(verror.ErrBadArg, nil, batchInfo)
+ }
+ batchTypeInt, err := strconv.Atoi(parts[0])
+ if err != nil {
+ return BatchTypeSn, 0, err
+ }
+ batchType := BatchType(batchTypeInt)
+ if batchType != BatchTypeSn && batchType != BatchTypeTx {
+ return BatchTypeSn, 0, verror.New(verror.ErrBadArg, nil, batchInfo)
+ }
+ batchId, err := strconv.ParseUint(parts[1], 0, 64)
+ if err != nil {
+ return BatchTypeSn, 0, err
+ }
+ return batchType, batchId, nil
+}
diff --git a/services/syncbase/server/util/list_children.go b/services/syncbase/server/util/list_children.go
new file mode 100644
index 0000000..eaa68bd
--- /dev/null
+++ b/services/syncbase/server/util/list_children.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package util
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/x/ref/services/syncbase/store"
+)
+
+// ListChildren returns the names of all apps, databases, or tables with the
+// given key prefix. Designed for use by Service.ListApps, App.ListDatabases,
+// and Database.ListTables.
+func ListChildren(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction, stKeyPrefix string) ([]string, error) {
+ it := sntx.Scan(ScanPrefixArgs(stKeyPrefix, ""))
+ key := []byte{}
+ res := []string{}
+ for it.Advance() {
+ key = it.Key(key)
+ parts := SplitKeyParts(string(key))
+ res = append(res, parts[len(parts)-1])
+ }
+ if err := it.Err(); err != nil {
+ return nil, err
+ }
+ return res, nil
+}
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index b3d96d4..15fa69a 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -52,6 +52,21 @@
return nil
}
+// Exists returns true if the key exists in the store.
+// TODO(rdaoud): for now it only bypasses the Get's VOM decode step. It should
+// be optimized further by adding a st.Exists(k) API and let each implementation
+// do its best to reduce data fetching in its key lookup.
+func Exists(ctx *context.T, st store.StoreReader, k string) (bool, error) {
+ _, err := st.Get([]byte(k), nil)
+ if err != nil {
+ if verror.ErrorID(err) == store.ErrUnknownKey.ID {
+ return false, nil
+ }
+ return false, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return true, nil
+}
+
// GetWithAuth does Get followed by an auth check.
func GetWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, k string, v Permser) error {
if err := Get(ctx, st, k, v); err != nil {
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 22b53f4..3178cb7 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -8,6 +8,7 @@
"fmt"
"math"
"sync"
+ "time"
"v.io/v23/context"
"v.io/v23/verror"
@@ -173,6 +174,12 @@
return tx.itx.Abort()
}
+// GetStoreTime returns the current time from the given transaction store.
+func GetStoreTime(ctx *context.T, tx store.Transaction) time.Time {
+ wtx := tx.(*transaction)
+ return wtx.st.clock.Now(ctx)
+}
+
// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
// that the transaction writes when it is committed. It allows the SyncGroup
// operations (create, join, leave, destroy) to notify the sync watcher of the
diff --git a/services/syncbase/testutil/layer.go b/services/syncbase/testutil/layer.go
index fc568b9..3f6cc54 100644
--- a/services/syncbase/testutil/layer.go
+++ b/services/syncbase/testutil/layer.go
@@ -12,6 +12,7 @@
"v.io/v23/context"
"v.io/v23/security"
"v.io/v23/security/access"
+ wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/v23/syncbase/nosql"
"v.io/v23/syncbase/util"
@@ -72,7 +73,8 @@
t.Errorf("Perms do not match: got %v, want %v", gotPerms, wantPerms)
}
- // Even though self2 exists, Exists returns false because Read access is needed.
+ // Even though self2 exists, Exists returns false because Read access is
+ // needed.
assertExists(t, ctx, self2, "self2", false)
// Test that create fails if the parent perms disallow access.
@@ -90,6 +92,33 @@
assertExists(t, ctx, self3, "self3", false)
}
+// Tests that non-ASCII UTF-8 chars are supported at all layers as long as they
+// satisfy util.ValidName.
+// This test only requires layer.Create to be implemented and thus works for
+// rows.
+func TestCreateNameValidation(t *testing.T, ctx *context.T, i interface{}) {
+ parent := makeLayer(i)
+
+ // Invalid names.
+ // TODO(sadovsky): Add names with slashes to this list once we implement
+ // client-side name validation. As it stands, some names with slashes result
+ // in RPCs against objects at the next layer of hierarchy, and naming.Join
+ // drops some leading and trailing slashes before they reach the server-side
+ // name-checking code.
+ for _, name := range []string{"a\x00", "\x00a", "@@", "a@@", "@@a", "@@a", "$", "a/$", "$/a"} {
+ if err := parent.Child(name).Create(ctx, nil); verror.ErrorID(err) != wire.ErrInvalidName.ID {
+ t.Fatalf("Create(%q) should have failed: %v", name, err)
+ }
+ }
+
+ // Valid names.
+ for _, name := range []string{"a", "aa", "*", "a*", "*a", "a*b", "a/b", "a/$$", "$$/a", "a/$$/b", "dev.v.io/a/admin@myapp.com", "alice/bob", "안녕하세요"} {
+ if err := parent.Child(name).Create(ctx, nil); err != nil {
+ t.Fatalf("Create(%q) failed: %v", name, err)
+ }
+ }
+}
+
// TestDestroy tests that object destruction works as expected.
func TestDestroy(t *testing.T, ctx *context.T, i interface{}) {
parent := makeLayer(i)
@@ -179,7 +208,7 @@
var err error
got, err = self.ListChildren(ctx)
- want = []string{}
+ want = []string(nil)
if err != nil {
t.Fatalf("self.ListChildren() failed: %v", err)
}
@@ -210,6 +239,19 @@
if !reflect.DeepEqual(got, want) {
t.Fatalf("Lists do not match: got %v, want %v", got, want)
}
+
+ hello := "안녕하세요"
+ if err := self.Child(hello).Create(ctx, nil); err != nil {
+ t.Fatalf("hello.Create() failed: %v", err)
+ }
+ got, err = self.ListChildren(ctx)
+ want = []string{"x", "y", hello}
+ if err != nil {
+ t.Fatalf("self.ListChildren() failed: %v", err)
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("Lists do not match: got %v, want %v", got, want)
+ }
}
// TestPerms tests that {Set,Get}Permissions work as expected.
diff --git a/services/syncbase/testutil/v23util.go b/services/syncbase/testutil/v23util.go
index ef2c3f6..8e370a8 100644
--- a/services/syncbase/testutil/v23util.go
+++ b/services/syncbase/testutil/v23util.go
@@ -55,7 +55,7 @@
}
}
-// RunClient runs modules.Program and waits until it terminates.
+// RunClient runs the given program and waits until it terminates.
func RunClient(t *v23tests.T, creds *modules.CustomCredentials, program modules.Program, args ...string) {
client, err := t.Shell().StartWithOpts(
t.Shell().DefaultStartOpts().WithCustomCredentials(creds),
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index 40ef3b3..086296c 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -279,13 +279,6 @@
}
}
- // The new node must not exist.
- if ok, err := hasNode(ctx, tx, oid, version); err != nil {
- return err
- } else if ok {
- return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version)
- }
-
// Verify the parents, determine the node level. Also save the levels
// of the parent nodes for later in this function in graft updates.
parentLevels := make(map[string]uint64)
@@ -620,10 +613,14 @@
// prune trims the DAG of an object at a given version (node) by deleting all
// its ancestor nodes, making it the new root node. For each deleted node it
-// calls the given callback function to delete its log record.
+// calls the given callback function to delete its log record. If NoVersion
+// is given instead, then all object nodes are deleted, including the head node.
//
-// Note: this function should only be used when sync determines that all devices
-// that know about this object have gotten past this version.
+// Note: this function is typically used when sync determines that all devices
+// that know about this object have gotten past this version, as part of its
+// GC operations. It can also be used when an object history is obliterated,
+// for example when destroying a SyncGroup, which is also versioned and tracked
+// in the DAG.
//
// The batch set passed is used to track batches affected by the deletion of DAG
// objects across multiple calls to prune(). It is later given to pruneDone()
@@ -633,21 +630,33 @@
return verror.New(verror.ErrInternal, ctx, "missing batch set")
}
- // Get the node at the pruning point and set its parents to nil.
- // It will become the oldest DAG node (root) for the object.
- node, err := getNode(ctx, tx, oid, version)
- if err != nil {
- return err
- }
- if node.Parents == nil {
- // Nothing to do, this node is already the root.
- return nil
- }
-
- parents := node.Parents
- node.Parents = nil
- if err = setNode(ctx, tx, oid, version, node); err != nil {
- return err
+ var parents []string
+ if version == NoVersion {
+ // Delete all object versions including its head version.
+ head, err := getHead(ctx, tx, oid)
+ if err != nil {
+ return err
+ }
+ if err := delHead(ctx, tx, oid); err != nil {
+ return err
+ }
+ parents = []string{head}
+ } else {
+ // Get the node at the pruning point and set its parents to nil.
+ // It will become the oldest DAG node (root) for the object.
+ node, err := getNode(ctx, tx, oid, version)
+ if err != nil {
+ return err
+ }
+ if node.Parents == nil {
+ // Nothing to do, this node is already the root.
+ return nil
+ }
+ parents = node.Parents
+ node.Parents = nil
+ if err = setNode(ctx, tx, oid, version, node); err != nil {
+ return err
+ }
}
// Delete all ancestor nodes and their log records. Delete as many as
@@ -724,7 +733,7 @@
// setNode stores the DAG node entry.
func setNode(ctx *context.T, tx store.Transaction, oid, version string, node *dagNode) error {
if version == NoVersion {
- return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ vlog.Fatalf("sync: setNode: invalid version: %s", version)
}
return util.Put(ctx, tx, nodeKey(oid, version), node)
@@ -733,7 +742,7 @@
// getNode retrieves the DAG node entry for the given (oid, version).
func getNode(ctx *context.T, st store.StoreReader, oid, version string) (*dagNode, error) {
if version == NoVersion {
- return nil, verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ vlog.Fatalf("sync: getNode: invalid version: %s", version)
}
var node dagNode
@@ -747,7 +756,7 @@
// delNode deletes the DAG node entry.
func delNode(ctx *context.T, tx store.Transaction, oid, version string) error {
if version == NoVersion {
- return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+ vlog.Fatalf("sync: delNode: invalid version: %s", version)
}
return util.Delete(ctx, tx, nodeKey(oid, version))
@@ -755,14 +764,11 @@
// hasNode returns true if the node (oid, version) exists in the DAG.
func hasNode(ctx *context.T, st store.StoreReader, oid, version string) (bool, error) {
- // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- if _, err := getNode(ctx, st, oid, version); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
+ if version == NoVersion {
+ vlog.Fatalf("sync: hasNode: invalid version: %s", version)
}
- return true, nil
+
+ return util.Exists(ctx, st, nodeKey(oid, version))
}
// headKey returns the key used to access the DAG object head.
@@ -773,7 +779,7 @@
// setHead stores version as the DAG object head.
func setHead(ctx *context.T, tx store.Transaction, oid, version string) error {
if version == NoVersion {
- return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
+ vlog.Fatalf("sync: setHead: invalid version: %s", version)
}
return util.Put(ctx, tx, headKey(oid), version)
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index c72b3a7..57291a4 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -255,11 +255,7 @@
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
- // Make sure an existing node cannot be added again.
tx := st.NewTransaction()
- if err := s.addNode(nil, tx, oid, "2", "foo", false, []string{"1", "3"}, NoBatchId, nil); err == nil {
- t.Errorf("addNode() did not fail when given an existing node")
- }
// Make sure a new node cannot have more than 2 parents.
if err := s.addNode(nil, tx, oid, "4", "foo", false, []string{"1", "2", "3"}, NoBatchId, nil); err == nil {
@@ -780,10 +776,10 @@
exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"3", "4"}, "6": {"5"}, "7": {"2"}, "8": {"6", "7"}, "9": {"8"}}
// Loop pruning at an invalid version (333) then at different valid versions.
- testVersions := []string{"333", "1", "2", "6", "8", "9", "9"}
- delCounts := []int{0, 0, 1, 4, 2, 1, 0}
- which := "prune-snip-"
- remain := 9
+ // The last version used (NoVersion) deletes all remaining nodes for the
+ // object, including the head node.
+ testVersions := []string{"333", "1", "2", "6", "8", "9", "9", NoVersion}
+ delCounts := []int{0, 0, 1, 4, 2, 1, 0, 1}
for i, version := range testVersions {
batches := newBatchPruning()
@@ -807,11 +803,13 @@
oid, version, del, delCounts[i])
}
- which += "*"
- remain -= del
-
- if head, err := getHead(nil, st, oid); err != nil || head != "9" {
- t.Errorf("object %s has wrong head: %s", oid, head)
+ head, err := getHead(nil, st, oid)
+ if version != NoVersion {
+ if err != nil || head != "9" {
+ t.Errorf("object %s has wrong head: %s", oid, head)
+ }
+ } else if err == nil {
+ t.Errorf("found head %s for object %s after pruning all versions", head, oid)
}
tx = st.NewTransaction()
@@ -823,16 +821,20 @@
// Remove pruned nodes from the expected parent map used to validate
// and set the parents of the pruned node to nil.
- intVersion, err := strconv.ParseInt(version, 10, 32)
- if err != nil {
- t.Errorf("invalid version: %s", version)
- }
-
- if intVersion < 10 {
- for j := int64(0); j < intVersion; j++ {
- delete(exp, fmt.Sprintf("%d", j))
+ if version == NoVersion {
+ exp = make(map[string][]string)
+ } else {
+ intVersion, err := strconv.ParseInt(version, 10, 32)
+ if err != nil {
+ t.Errorf("invalid version: %s", version)
}
- exp[version] = nil
+
+ if intVersion < 10 {
+ for j := int64(0); j < intVersion; j++ {
+ delete(exp, fmt.Sprintf("%d", j))
+ }
+ exp[version] = nil
+ }
}
pmap := getParentMap(nil, st, oid, nil)
@@ -903,6 +905,16 @@
if !reflect.DeepEqual(pmap, exp) {
t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
}
+
+ // Invalid pruning without a batch set.
+ tx = st.NewTransaction()
+ err = prune(nil, tx, oid, version, nil, func(ctx *context.T, tx store.Transaction, lr string) error {
+ return nil
+ })
+ if err == nil {
+ t.Errorf("pruning object %s:%s without a batch set did not fail", oid, version)
+ }
+ tx.Abort()
}
// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 935539b..7c1a045 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -457,7 +457,7 @@
}
tx := svc.St().NewTransaction()
- if err := addSyncGroup(nil, tx, sg1); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg1); err != nil {
t.Fatalf("cannot add SyncGroup ID %d, err %v", sg1.Id, err)
}
if err := tx.Commit(); err != nil {
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 46d03a1..0c78f52 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -226,7 +226,11 @@
}
err := store.RunInTransaction(rSt.st, func(tx store.Transaction) error {
- sg, err := getSyncGroupById(ctx, tx, gid)
+ version, err := getSyncGroupVersion(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+ sg, err := getSGDataEntry(ctx, tx, gid, version)
if err != nil {
return err
}
@@ -239,7 +243,7 @@
vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
- return setSGDataEntry(ctx, tx, gid, sg)
+ return setSGDataEntry(ctx, tx, gid, version, sg)
})
if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index f2e72ae..0b73aad 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -12,6 +12,7 @@
// records in response to a GetDeltas request, it replays those log
// records to get in sync with the sender.
import (
+ "container/list"
"fmt"
"math/rand"
"path"
@@ -73,6 +74,11 @@
syncState map[string]*dbSyncStateInMem
syncStateLock sync.Mutex // lock to protect access to the sync state.
+ // In-memory queue of SyncGroups to be published. It is reconstructed
+ // at startup from SyncGroup info so it does not need to be persisted.
+ sgPublishQueue *list.List
+ sgPublishQueueLock sync.Mutex
+
// In-memory tracking of batches during their construction.
// The sync Initiator and Watcher build batches incrementally here
// and then persist them in DAG batch entries. The mutex guards
@@ -124,8 +130,9 @@
// sync module responds to incoming RPCs from remote sync modules.
func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service, rootDir string) (*syncService, error) {
s := &syncService{
- sv: sv,
- batches: make(batchSet),
+ sv: sv,
+ batches: make(batchSet),
+ sgPublishQueue: list.New(),
}
data := &syncData{}
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 1eebf88..d820d8f 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -53,6 +53,7 @@
import (
"fmt"
"strconv"
+ "time"
"v.io/v23/context"
"v.io/v23/verror"
@@ -104,12 +105,23 @@
return out
}
+// sgPublishInfo holds information on a SyncGroup waiting to be published to a
+// remote peer. It is an in-memory entry in a queue of pending SyncGroups.
+type sgPublishInfo struct {
+ sgName string
+ appName string
+ dbName string
+ queued time.Time
+ lastTry time.Time
+}
+
// initSync initializes the sync module during startup. It scans all the
// databases across all apps to initialize the following:
// a) in-memory sync state of a Database and all its SyncGroups consisting of
// the current generation number, log position and generation vector.
// b) watcher map of prefixes currently being synced.
// c) republish names in mount tables for all syncgroups.
+// d) in-memory queue of SyncGroups to be published.
//
// TODO(hpucha): This is incomplete. Flesh this out further.
func (s *syncService) initSync(ctx *context.T) error {
@@ -133,6 +145,10 @@
for _, prefix := range sg.Spec.Prefixes {
incrWatchPrefix(appName, dbName, prefix)
}
+
+ if sg.Status == interfaces.SyncGroupStatusPublishPending {
+ s.enqueuePublishSyncGroup(sg.Name, appName, dbName, false)
+ }
return false
})
@@ -164,6 +180,23 @@
return errFinal
}
+// enqueuePublishSyncGroup appends the given SyncGroup to the publish queue.
+func (s *syncService) enqueuePublishSyncGroup(sgName, appName, dbName string, attempted bool) {
+ s.sgPublishQueueLock.Lock()
+ defer s.sgPublishQueueLock.Unlock()
+
+ entry := &sgPublishInfo{
+ sgName: sgName,
+ appName: appName,
+ dbName: dbName,
+ queued: time.Now(),
+ }
+ if attempted {
+ entry.lastTry = entry.queued
+ }
+ s.sgPublishQueue.PushBack(entry)
+}
+
// Note: For all the utilities below, if the sgid parameter is non-nil, the
// operation is performed in the SyncGroup space. If nil, it is performed in the
// data space for the Database.
@@ -420,15 +453,7 @@
// hasLogRec returns true if the log record for (devid, gen) exists.
func hasLogRec(st store.StoreReader, pfx string, id, gen uint64) (bool, error) {
- // TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
- var rec localLogRec
- if err := util.Get(nil, st, logRecKey(pfx, id, gen), &rec); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
- }
- return true, nil
+ return util.Exists(nil, st, logRecKey(pfx, id, gen))
}
// putLogRec stores the log record.
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 91ba499..2fb5479 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -81,6 +81,7 @@
}
// verifySyncGroup verifies if a SyncGroup struct is well-formed.
+// TODO(rdaoud): define verrors for all ErrBadArg cases.
func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error {
if sg == nil {
return verror.New(verror.ErrBadArg, ctx, "group information not specified")
@@ -106,40 +107,157 @@
if len(sg.Joiners) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
}
- if len(sg.Spec.Prefixes) == 0 {
+ return verifySyncGroupSpec(ctx, &sg.Spec)
+}
+
+// verifySyncGroupSpec verifies if a SyncGroupSpec is well-formed.
+func verifySyncGroupSpec(ctx *context.T, spec *wire.SyncGroupSpec) error {
+ if spec == nil {
+ return verror.New(verror.ErrBadArg, ctx, "group spec not specified")
+ }
+ if len(spec.Prefixes) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
}
+
+ // Duplicate prefixes are not allowed.
+ prefixes := make(map[string]bool, len(spec.Prefixes))
+ for _, pfx := range spec.Prefixes {
+ prefixes[pfx] = true
+ }
+ if len(prefixes) != len(spec.Prefixes) {
+ return verror.New(verror.ErrBadArg, ctx, "group has duplicate prefixes specified")
+ }
return nil
}
-// addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
- // Verify SyncGroup before storing it since it may have been received
- // from a remote peer.
+// samePrefixes returns true if the two sets of prefixes are the same.
+func samePrefixes(pfx1, pfx2 []string) bool {
+ pfxMap := make(map[string]uint8)
+ for _, p := range pfx1 {
+ pfxMap[p] |= 0x01
+ }
+ for _, p := range pfx2 {
+ pfxMap[p] |= 0x02
+ }
+ for _, mask := range pfxMap {
+ if mask != 0x03 {
+ return false
+ }
+ }
+ return true
+}
+
+// addSyncGroup adds a new SyncGroup given its version and information. This
+// also includes creating a DAG node entry and updating the DAG head. If the
+// caller is the creator of the SyncGroup, a local log record is also created
+// using the given server ID and gen and pos counters to index the log record.
+// Otherwise, it's a joiner case and the SyncGroup is put in a pending state
+// (waiting for its full metadata to be synchronized) and the log record is
+// skipped, delaying its creation till the Initiator does p2p sync.
+func (s *syncService) addSyncGroup(ctx *context.T, tx store.Transaction, version string, creator bool, remotePublisher string, genvec interfaces.PrefixGenVector, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+ // Verify the SyncGroup information before storing it since it may have
+ // been received from a remote peer.
if err := verifySyncGroup(ctx, sg); err != nil {
return err
}
- if ok, err := hasSGDataEntry(tx, sg.Id); err != nil {
- return err
- } else if ok {
- return verror.New(verror.ErrExist, ctx, "group id already exists")
- }
+ // Add the group name and ID entries.
if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
return err
} else if ok {
return verror.New(verror.ErrExist, ctx, "group name already exists")
}
+ if ok, err := hasSGIdEntry(tx, sg.Id); err != nil {
+ return err
+ } else if ok {
+ return verror.New(verror.ErrExist, ctx, "group id already exists")
+ }
- // Add the group name and data entries.
+ state := sgLocalState{
+ RemotePublisher: remotePublisher,
+ SyncPending: !creator,
+ PendingGenVec: genvec,
+ }
+ if remotePublisher == "" {
+ state.NumLocalJoiners = 1
+ }
+
if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
return err
}
- if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
+ if err := setSGIdEntry(ctx, tx, sg.Id, &state); err != nil {
return err
}
- return nil
+ // Add the SyncGroup versioned data entry.
+ if ok, err := hasSGDataEntry(tx, sg.Id, version); err != nil {
+ return err
+ } else if ok {
+ return verror.New(verror.ErrExist, ctx, "group id version already exists")
+ }
+
+ return s.updateSyncGroupVersioning(ctx, tx, version, creator, servId, gen, pos, sg)
+}
+
+// updateSyncGroupVersioning updates the per-version information of a SyncGroup.
+// It writes a new versioned copy of the SyncGroup data entry, a new DAG node,
+// and updates the DAG head. Optionally, it also writes a new local log record
+// using the given server ID and gen and pos counters to index it. The caller
+// can provide the version number to use otherwise, if NoVersion is given, a new
+// version is generated by the function.
+// TODO(rdaoud): hook SyncGroup mutations (and deletions) to the watch log so
+// apps can monitor SG changes as well.
+func (s *syncService) updateSyncGroupVersioning(ctx *context.T, tx store.Transaction, version string, withLog bool, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+ if version == NoVersion {
+ version = newSyncGroupVersion()
+ }
+
+ // Add the SyncGroup versioned data entry.
+ if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+ return err
+ }
+
+ // Add a sync log record for the SyncGroup if needed.
+ oid := sgIdKey(sg.Id)
+ logKey := ""
+ if withLog {
+ if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
+ return err
+ }
+ logKey = logRecKey(oid, servId, gen)
+ }
+
+ // Add the SyncGroup to the DAG.
+ var parents []string
+ if head, err := getHead(ctx, tx, oid); err == nil {
+ parents = []string{head}
+ } else if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ return err
+ }
+ if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
+ return err
+ }
+ return setHead(ctx, tx, oid, version)
+}
+
+// addSyncGroupLogRec adds a new local log record for a SyncGroup.
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
+ oid := sgIdKey(gid)
+ rec := &localLogRec{
+ Metadata: interfaces.LogRecMetadata{
+ ObjId: oid,
+ CurVers: version,
+ Delete: false,
+ UpdTime: watchable.GetStoreTime(ctx, tx),
+ Id: servId,
+ Gen: gen,
+ RecType: interfaces.NodeRec,
+ BatchId: NoBatchId,
+ },
+ Pos: pos,
+ }
+
+ return putLogRec(ctx, tx, oid, rec)
}
// getSyncGroupId retrieves the SyncGroup ID given its name.
@@ -147,18 +265,18 @@
return getSGNameEntry(ctx, st, name)
}
-// getSyncGroupName retrieves the SyncGroup name given its ID.
-func getSyncGroupName(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
- sg, err := getSyncGroupById(ctx, st, gid)
- if err != nil {
- return "", err
- }
- return sg.Name, nil
+// getSyncGroupVersion retrieves the current version of the SyncGroup.
+func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
+ return getHead(ctx, st, sgIdKey(gid))
}
// getSyncGroupById retrieves the SyncGroup given its ID.
func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
- return getSGDataEntry(ctx, st, gid)
+ version, err := getSyncGroupVersion(ctx, st, gid)
+ if err != nil {
+ return nil, err
+ }
+ return getSGDataEntry(ctx, st, gid, version)
}
// getSyncGroupByName retrieves the SyncGroup given its name.
@@ -176,19 +294,53 @@
if err != nil {
return err
}
- if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
- return err
- }
- return delSGDataEntry(ctx, tx, sg.Id)
+ return delSyncGroupByName(ctx, tx, sg.Name)
}
// delSyncGroupByName deletes the SyncGroup given its name.
func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
+ // Get the SyncGroup ID and current version.
gid, err := getSyncGroupId(ctx, tx, name)
if err != nil {
return err
}
- return delSyncGroupById(ctx, tx, gid)
+ version, err := getSyncGroupVersion(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ // Delete the name and ID entries.
+ if err := delSGNameEntry(ctx, tx, name); err != nil {
+ return err
+ }
+ if err := delSGIdEntry(ctx, tx, gid); err != nil {
+ return err
+ }
+
+ // Delete all versioned SyncGroup data entries (same versions as DAG
+ // nodes). This is done separately from pruning the DAG nodes because
+ // some nodes may have no log record pointing back to the SyncGroup data
+ // entries (loose coupling to support the pending SyncGroup state).
+ oid := sgIdKey(gid)
+ err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
+ return delSGDataEntry(ctx, tx, gid, v)
+ })
+ if err != nil {
+ return err
+ }
+
+ // Delete all DAG nodes and log records.
+ bset := newBatchPruning()
+ err = prune(ctx, tx, oid, NoVersion, bset, func(ctx *context.T, tx store.Transaction, lr string) error {
+ if lr != "" {
+ return util.Delete(ctx, tx, lr)
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ return pruneDone(ctx, tx, bset)
}
// refreshMembersIfExpired updates the aggregate view of SyncGroup members
@@ -252,16 +404,23 @@
// make forEachSyncGroup() stop the iteration earlier; otherwise the function
// loops across all SyncGroups in the Database.
func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
- scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
- stream := st.Scan(scanStart, scanLimit)
+ stream := st.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+ defer stream.Cancel()
+
for stream.Advance() {
- var sg interfaces.SyncGroup
- if vom.Decode(stream.Value(nil), &sg) != nil {
- vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil)))
+ var gid interfaces.GroupId
+ if vom.Decode(stream.Value(nil), &gid) != nil {
+ vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup ID for key %s", string(stream.Key(nil)))
continue
}
- if callback(&sg) {
+ sg, err := getSyncGroupById(nil, st, gid)
+ if err != nil {
+ vlog.Errorf("sync: forEachSyncGroup: cannot get SyncGroup %d: %v", gid, err)
+ continue
+ }
+
+ if callback(sg) {
break // done, early exit
}
}
@@ -324,63 +483,69 @@
// Use the functions above to manipulate SyncGroups.
var (
- // sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
- sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-
- // sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
- sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+ // Prefixes used to store the different mappings of a SyncGroup:
+ // sgNameKeyPrefix: name --> ID
+ // sgIdKeyPrefix: ID --> SyncGroup local state
+ // sgDataKeyPrefix: (ID, version) --> SyncGroup data (synchronized)
+ //
+ // Note: as with other syncable objects, the DAG "heads" table contains
+ // a reference to the current SyncGroup version, and the DAG "nodes"
+ // table tracks its history of mutations.
+ // TODO(rdaoud): change the data key prefix to use the SG OID instead
+ // of its ID, to be similar to the versioned user data keys. The OID
+ // would use another SG-data prefix: "$sync:sgd:<gid>" and the data
+ // entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
+ sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+ sgIdKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
+ sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
)
-// sgDataKey returns the key used to access the SyncGroup data entry.
-func sgDataKey(gid interfaces.GroupId) string {
- return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid))
+// sgIdStr returns the SyncGroup ID in string format.
+// TODO(rdaoud): delete when the SG ID becomes a string throughout.
+func sgIdStr(gid interfaces.GroupId) string {
+ return fmt.Sprintf("%d", uint64(gid))
}
// sgNameKey returns the key used to access the SyncGroup name entry.
func sgNameKey(name string) string {
- return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
+ return util.JoinKeyParts(sgNameKeyPrefix, name)
+}
+
+// sgIdKey returns the key used to access the SyncGroup ID entry.
+func sgIdKey(gid interfaces.GroupId) string {
+ return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgDataKey returns the key used to access a version of the SyncGroup data.
+func sgDataKey(gid interfaces.GroupId, version string) string {
+ return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
}
// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
func splitSgNameKey(ctx *context.T, key string) (string, error) {
- prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
-
// Note that the actual SyncGroup name may contain ":" as a separator.
- if !strings.HasPrefix(key, prefix) {
+ // So don't split the key on the separator, instead trim its prefix.
+ prefix := util.JoinKeyParts(sgNameKeyPrefix, "")
+ name := strings.TrimPrefix(key, prefix)
+ if name == key {
return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
}
- return strings.TrimPrefix(key, prefix), nil
-}
-
-// hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
- // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- var sg interfaces.SyncGroup
- if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
- }
- return true, nil
+ return name, nil
}
// hasSGNameEntry returns true if the SyncGroup name entry exists.
func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
- // TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- var gid interfaces.GroupId
- if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- err = nil
- }
- return false, err
- }
- return true, nil
+ return util.Exists(nil, sntx, sgNameKey(name))
}
-// setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
- return util.Put(ctx, tx, sgDataKey(gid), sg)
+// hasSGIdEntry returns true if the SyncGroup ID entry exists.
+func hasSGIdEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
+ return util.Exists(nil, sntx, sgIdKey(gid))
+}
+
+// hasSGDataEntry returns true if the SyncGroup versioned data entry exists.
+func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId, version string) (bool, error) {
+ return util.Exists(nil, sntx, sgDataKey(gid, version))
}
// setSGNameEntry stores the SyncGroup name entry.
@@ -388,34 +553,58 @@
return util.Put(ctx, tx, sgNameKey(name), gid)
}
-// getSGDataEntry retrieves the SyncGroup data for a given group ID.
-func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
+// setSGIdEntry stores the SyncGroup ID entry.
+func setSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, state *sgLocalState) error {
+ return util.Put(ctx, tx, sgIdKey(gid), state)
+}
+
+// setSGDataEntry stores the SyncGroup versioned data entry.
+func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
+ return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+}
+
+// getSGNameEntry retrieves the SyncGroup ID for a given name.
+func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
+ var gid interfaces.GroupId
+ if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
+ return interfaces.NoGroupId, err
+ }
+ return gid, nil
+}
+
+// getSGIdEntry retrieves the SyncGroup local state for a given group ID.
+func getSGIdEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*sgLocalState, error) {
+ var state sgLocalState
+ if err := util.Get(ctx, st, sgIdKey(gid), &state); err != nil {
+ return nil, err
+ }
+ return &state, nil
+}
+
+// getSGDataEntry retrieves the SyncGroup data for a given group ID and version.
+func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId, version string) (*interfaces.SyncGroup, error) {
var sg interfaces.SyncGroup
- if err := util.Get(ctx, st, sgDataKey(gid), &sg); err != nil {
+ if err := util.Get(ctx, st, sgDataKey(gid, version), &sg); err != nil {
return nil, err
}
return &sg, nil
}
-// getSGNameEntry retrieves the SyncGroup name to ID mapping.
-func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
- var gid interfaces.GroupId
- if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
- return gid, err
- }
- return gid, nil
-}
-
-// delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
- return util.Delete(ctx, tx, sgDataKey(gid))
-}
-
-// delSGNameEntry deletes the SyncGroup name to ID mapping.
+// delSGNameEntry deletes the SyncGroup name entry.
func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
return util.Delete(ctx, tx, sgNameKey(name))
}
+// delSGIdEntry deletes the SyncGroup ID entry.
+func delSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
+ return util.Delete(ctx, tx, sgIdKey(gid))
+}
+
+// delSGDataEntry deletes the SyncGroup versioned data entry.
+func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string) error {
+ return util.Delete(ctx, tx, sgDataKey(gid, version))
+}
+
////////////////////////////////////////////////////////////
// SyncGroup methods between Client and Syncbase.
@@ -424,9 +613,22 @@
vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
- // Get this Syncbase's sync module handle.
ss := sd.sync.(*syncService)
- var sg *interfaces.SyncGroup
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+ // Instantiate sg. Add self as joiner.
+ gid, version := newSyncGroupId(), newSyncGroupVersion()
+ sg := &interfaces.SyncGroup{
+ Id: gid,
+ Name: sgName,
+ SpecVersion: version,
+ Spec: spec,
+ Creator: ss.name,
+ AppName: appName,
+ DbName: dbName,
+ Status: interfaces.SyncGroupStatusPublishPending,
+ Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
+ }
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
@@ -436,29 +638,17 @@
// TODO(hpucha): Check prefix ACLs on all SG prefixes.
// This may need another method on util.Database interface.
-
// TODO(hpucha): Do some SG ACL checking. Check creator
// has Admin privilege.
- // Instantiate sg. Add self as joiner.
- sg = &interfaces.SyncGroup{
- Id: newSyncGroupId(),
- Name: sgName,
- SpecVersion: newSyncGroupVersion(),
- Spec: spec,
- Creator: ss.name,
- AppName: sd.db.App().Name(),
- DbName: sd.db.Name(),
- Status: interfaces.SyncGroupStatusPublishPending,
- Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
- }
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
- if err := addSyncGroup(ctx, tx, sg); err != nil {
+ if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
return err
}
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
// Take a snapshot of the data to bootstrap the SyncGroup.
return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
})
@@ -467,9 +657,13 @@
return err
}
- ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
- // Local SG create succeeded. Publish the SG at the chosen server.
- sd.publishSyncGroup(ctx, call, sgName)
+ ss.initSyncStateInMem(ctx, appName, dbName, gid)
+
+ // Local SG create succeeded. Publish the SG at the chosen server, or if
+ // that fails, enqueue it for later publish retries.
+ if err := sd.publishSyncGroup(ctx, call, sgName); err != nil {
+ ss.enqueuePublishSyncGroup(sgName, appName, dbName, true)
+ }
// Publish at the chosen mount table and in the neighborhood.
sd.publishInMountTables(ctx, call, spec)
@@ -492,24 +686,54 @@
return err
}
- // Check if SyncGroup already exists.
- sg, sgErr = getSyncGroupByName(ctx, tx, sgName)
+ // Check if SyncGroup already exists and get its info.
+ var gid interfaces.GroupId
+ gid, sgErr = getSyncGroupId(ctx, tx, sgName)
if sgErr != nil {
return sgErr
}
- // SyncGroup already exists. Possibilities include created
- // locally, already joined locally or published at the device as
- // a result of SyncGroup creation on a different device.
- //
- // TODO(hpucha): Handle the above cases. If the SG was published
- // locally, but not joined, we need to bootstrap the DAG and
- // watcher. If multiple joins are done locally, we may want to
- // ref count the SG state and track the leaves accordingly. So
- // we may need to add some local state for each SyncGroup.
+ sg, sgErr = getSyncGroupById(ctx, tx, gid)
+ if sgErr != nil {
+ return sgErr
+ }
// Check SG ACL.
- return authorize(ctx, call.Security(), sg)
+ if err := authorize(ctx, call.Security(), sg); err != nil {
+ return err
+ }
+
+ // SyncGroup already exists, increment the number of local
+ // joiners in its local state information. This presents
+ // different scenarios:
+ // 1- An additional local joiner: the current number of local
+ // joiners is > 0 and the SyncGroup was already bootstrapped
+ // to the Watcher, so there is nothing else to do.
+ // 2- A new local joiner after all previous local joiners had
+ // left: the number of local joiners is 0, the Watcher must
+ // be re-notified via a SyncGroup bootstrap because the last
+ // previous joiner to leave had un-notified the Watcher. In
+ // this scenario the SyncGroup was not destroyed after the
+ // last joiner left because the SyncGroup was also published
+ // here by a remote peer and thus cannot be destroyed only
+ // based on the local joiners.
+ // 3- A first local joiner for a SyncGroup that was published
+ // here from a remote Syncbase: the number of local joiners
+ // is also 0 (and the remote publish flag is set), and the
+ // Watcher must be notified via a SyncGroup bootstrap.
+ // Conclusion: bootstrap if the number of local joiners is 0.
+ sgState, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ if sgState.NumLocalJoiners == 0 {
+ if err := sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes); err != nil {
+ return err
+ }
+ }
+ sgState.NumLocalJoiners++
+ return setSGIdEntry(ctx, tx, gid, sgState)
})
// The presented blessing is allowed to make this Syncbase instance join
@@ -532,48 +756,41 @@
ss := sd.sync.(*syncService)
// Contact a SyncGroup Admin to join the SyncGroup.
- sg = &interfaces.SyncGroup{}
- *sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
+ sg2, version, genvec, err := sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
if err != nil {
return nullSpec, err
}
// Verify that the app/db combination is valid for this SyncGroup.
- if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() {
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
+ if sg2.AppName != appName || sg2.DbName != dbName {
return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
}
err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
-
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
- // TODO(hpucha): Get SG Deltas from Admin device.
-
- if err := addSyncGroup(ctx, tx, sg); err != nil {
+ if err := ss.addSyncGroup(ctx, tx, version, false, "", genvec, 0, 0, 0, &sg2); err != nil {
return err
}
// Take a snapshot of the data to bootstrap the SyncGroup.
- return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes)
+ return sd.bootstrapSyncGroup(ctx, tx, sg2.Spec.Prefixes)
})
if err != nil {
return nullSpec, err
}
- ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+ ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
// Publish at the chosen mount table and in the neighborhood.
- sd.publishInMountTables(ctx, call, sg.Spec)
+ sd.publishInMountTables(ctx, call, sg2.Spec)
- return sg.Spec, nil
+ return sg2.Spec, nil
}
func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
- var sgNames []string
-
vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
- defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end")
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
@@ -584,8 +801,8 @@
}
// Scan all the SyncGroup names found in the Database.
- scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
- stream := sn.Scan(scanStart, scanLimit)
+ stream := sn.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+ var sgNames []string
var key []byte
for stream.Advance() {
sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
@@ -599,18 +816,19 @@
return nil, err
}
+ vlog.VI(2).Infof("sync: GetSyncGroupNames: %v", sgNames)
return sgNames, nil
}
func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
- var spec wire.SyncGroupSpec
-
vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
- defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s", sgName)
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
+ var spec wire.SyncGroupSpec
+
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
return spec, "", err
@@ -623,41 +841,45 @@
}
// TODO(hpucha): Check SyncGroup ACL.
- spec = sg.Spec
- return spec, sg.SpecVersion, nil
+ vlog.VI(2).Infof("sync: GetSyncGroupSpec: %s spec %v", sgName, sg.Spec)
+ return sg.Spec, sg.SpecVersion, nil
}
func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
- var members map[string]wire.SyncGroupMemberInfo
-
vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
- defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+ defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s", sgName)
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
- return members, err
+ return nil, err
}
// Get the SyncGroup information.
sg, err := getSyncGroupByName(ctx, sn, sgName)
if err != nil {
- return members, err
+ return nil, err
}
// TODO(hpucha): Check SyncGroup ACL.
- members = sg.Joiners
- return members, nil
+ vlog.VI(2).Infof("sync: GetSyncGroupMembers: %s members %v", sgName, sg.Joiners)
+ return sg.Joiners, nil
}
-// TODO(hpucha): Enable syncing syncgroup metadata.
func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
+ if err := verifySyncGroupSpec(ctx, &spec); err != nil {
+ return err
+ }
+
+ ss := sd.sync.(*syncService)
+ //appName, dbName := sd.db.App().Name(), sd.db.Name()
+
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -669,10 +891,33 @@
return err
}
- // TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+ if version != NoVersion && sg.SpecVersion != version {
+ return verror.NewErrBadVersion(ctx)
+ }
+ // Must not change the SyncGroup prefixes.
+ if !samePrefixes(spec.Prefixes, sg.Spec.Prefixes) {
+ return verror.New(verror.ErrBadArg, ctx, "cannot modify prefixes")
+ }
+
+ sgState, err := getSGIdEntry(ctx, tx, sg.Id)
+ if err != nil {
+ return err
+ }
+ if sgState.SyncPending {
+ return verror.NewErrBadState(ctx)
+ }
+
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
+ gen, pos := uint64(1), uint64(1)
+
+ // TODO(hpucha): Check SyncGroup ACL.
+
+ newVersion := newSyncGroupVersion()
sg.Spec = spec
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ sg.SpecVersion = newVersion
+ return ss.updateSyncGroupVersioning(ctx, tx, newVersion, true, ss.id, gen, pos, sg)
})
return err
}
@@ -680,9 +925,27 @@
//////////////////////////////
// Helper functions
-// TODO(hpucha): Call this periodically until we are able to contact the remote peer.
+// publishSyncGroup publishes the SyncGroup at the remote peer and update its
+// status. If the publish operation is either successful or rejected by the
+// peer, the status is updated to "running" or "rejected" respectively and the
+// function returns "nil" to indicate to the caller there is no need to make
+// further attempts. Otherwise an error (typically RPC error, but could also
+// be a store error) is returned to the caller.
+// TODO(rdaoud): make all SG admins try to publish after they join.
func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
- sg, err := getSyncGroupByName(ctx, sd.db.St(), sgName)
+ st := sd.db.St()
+ ss := sd.sync.(*syncService)
+ //appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+ gid, err := getSyncGroupId(ctx, st, sgName)
+ if err != nil {
+ return err
+ }
+ version, err := getSyncGroupVersion(ctx, st, gid)
+ if err != nil {
+ return err
+ }
+ sg, err := getSGDataEntry(ctx, st, gid, version)
if err != nil {
return err
}
@@ -691,34 +954,74 @@
return nil
}
+ // Note: the remote peer is given the SyncGroup version and genvec at
+ // the point before the post-publish update, at which time the status
+ // and joiner list of the SyncGroup get updated. This is functionally
+ // correct, just not symmetrical with what happens at joiner, which
+ // receives the SyncGroup state post-join.
+ // TODO(rdaoud): send the SyncGroup genvec to the remote peer.
+ status := interfaces.SyncGroupStatusPublishRejected
+
c := interfaces.SyncClient(sgName)
- err = c.PublishSyncGroup(ctx, *sg)
+ peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
- // Publish failed temporarily. Retry later.
- // TODO(hpucha): Is there an RPC error that we can check here?
- if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
- return err
- }
-
- // Publish succeeded.
if err == nil {
- // TODO(hpucha): Get SG Deltas from publisher. Obtaining the
- // new version from the publisher prevents SG conflicts.
- return err
+ status = interfaces.SyncGroupStatusRunning
+ } else {
+ errId := verror.ErrorID(err)
+ if errId == interfaces.ErrDupSyncGroupPublish.ID {
+ // Duplicate publish: another admin already published
+ // the SyncGroup, nothing else needs to happen because
+ // that other admin would have updated the SyncGroup
+ // status and p2p SG sync will propagate the change.
+ // TODO(rdaoud): what if that other admin crashes and
+ // never updates the SyncGroup status (dies permanently
+ // or is ejected before the status update)? Eventually
+ // some admin must decide to update the SG status anyway
+ // even if that causes extra SG mutations and conflicts.
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: duplicate publish", sgName)
+ return nil
+ }
+
+ if errId != verror.ErrExist.ID {
+ // The publish operation failed with an error other
+ // than ErrExist then it must be retried later on.
+ // TODO(hpucha): Is there an RPC error that we can check here?
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: failed, retry later: %v", sgName, err)
+ return err
+ }
}
- // Publish rejected. Persist that to avoid retrying in the
- // future and to remember the split universe scenario.
- err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
+ // The publish operation is done because either it succeeded or it
+ // failed with the ErrExist error. Update the SyncGroup status and, if
+ // the publish was successful, add the remote peer to the SyncGroup.
+ vlog.VI(3).Infof("sync: publishSyncGroup: %s: peer %s: done: status %s: %v",
+ sgName, peer, status.String(), err)
+
+ err = store.RunInTransaction(st, func(tx store.Transaction) error {
// Ensure SG still exists.
- sg, err := getSyncGroupByName(ctx, tx, sgName)
+ sg, err := getSyncGroupById(ctx, tx, gid)
if err != nil {
return err
}
- sg.Status = interfaces.SyncGroupStatusPublishRejected
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ // Reserve a log generation and position counts for the new
+ // SyncGroup version.
+ //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
+
+ sg.Status = status
+ if status == interfaces.SyncGroupStatusRunning {
+ // TODO(hpucha): Default priority?
+ sg.Joiners[peer] = wire.SyncGroupMemberInfo{}
+ }
+
+ return ss.updateSyncGroupVersioning(ctx, tx, NoVersion, true, ss.id, gen, pos, sg)
})
+ if err != nil {
+ vlog.Errorf("sync: publishSyncGroup: cannot update SyncGroup %s status to %s: %v",
+ sgName, status.String(), err)
+ }
return err
}
@@ -801,7 +1104,7 @@
return nil
}
-func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
c := interfaces.SyncClient(sgName)
return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
@@ -819,36 +1122,49 @@
////////////////////////////////////////////////////////////
// Methods for SyncGroup create/join between Syncbases.
-func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error {
+func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg interfaces.SyncGroup, version string, genvec interfaces.PrefixGenVector) (string, error) {
st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName)
if err != nil {
- return err
+ return s.name, err
}
err = store.RunInTransaction(st, func(tx store.Transaction) error {
- localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
-
+ gid, err := getSyncGroupId(ctx, tx, sg.Name)
if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
return err
}
- // SG name already claimed.
- if err == nil && localSG.Id != sg.Id {
- return verror.New(verror.ErrExist, ctx, sg.Name)
- }
-
- // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG
- // metadata if needed.
- //
- // TODO(hpucha): Catch up on SG versions so far.
-
- // SG already published. Update if needed.
- if err == nil && localSG.Id == sg.Id {
- if localSG.Status == interfaces.SyncGroupStatusPublishPending {
- localSG.Status = interfaces.SyncGroupStatusRunning
- return setSGDataEntry(ctx, tx, localSG.Id, localSG)
+ if err == nil {
+ // SG name already claimed. Note that in this case of
+ // split-brain (same SG name, different IDs), those in
+ // SG ID being rejected here do not benefit from the
+ // de-duping optimization below and will end up making
+ // duplicate SG mutations to set the status, yielding
+ // more SG conflicts. It is functionally correct but
+ // bypasses the de-dup optimization for the rejected SG.
+ if gid != sg.Id {
+ return verror.New(verror.ErrExist, ctx, sg.Name)
}
- return nil
+
+ // SG exists locally, either locally created/joined or
+ // previously published. Make it idempotent for the
+ // same publisher, otherwise it's a duplicate.
+ state, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+ if state.RemotePublisher == "" {
+ // Locally created/joined SyncGroup: update its
+ // state to include the publisher.
+ state.RemotePublisher = publisher
+ return setSGIdEntry(ctx, tx, gid, state)
+ }
+ if publisher == state.RemotePublisher {
+ // Same previous publisher: nothing to change,
+ // the old genvec and version info is valid.
+ return nil
+ }
+ return interfaces.NewErrDupSyncGroupPublish(ctx, sg.Name)
}
// Publish the SyncGroup.
@@ -856,23 +1172,21 @@
// TODO(hpucha): Use some ACL check to allow/deny publishing.
// TODO(hpucha): Ensure node is on Admin ACL.
- // TODO(hpucha): Default priority?
- sg.Joiners[s.name] = wire.SyncGroupMemberInfo{}
- sg.Status = interfaces.SyncGroupStatusRunning
- return addSyncGroup(ctx, tx, &sg)
+ return s.addSyncGroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
})
- if err != nil {
- return err
+ if err == nil {
+ s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
}
- s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
- return nil
+ return s.name, err
}
-func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
var dbSt store.Store
var gid interfaces.GroupId
var err error
+ var stAppName, stDbName string
+ nullSG, nullGV := interfaces.SyncGroup{}, interfaces.PrefixGenVector{}
// Find the database store for this SyncGroup.
//
@@ -885,6 +1199,7 @@
if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
// Found the SyncGroup being looked for.
dbSt = st
+ stAppName, stDbName = appName, dbName
return true
}
return false
@@ -892,10 +1207,12 @@
// SyncGroup not found.
if err != nil {
- return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
+ return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
}
+ version := newSyncGroupVersion()
var sg *interfaces.SyncGroup
+
err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
var err error
sg, err = getSyncGroupById(ctx, tx, gid)
@@ -908,13 +1225,27 @@
return err
}
+ // Check that the SG is not in pending state.
+ state, err := getSGIdEntry(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+ if state.SyncPending {
+ return verror.NewErrBadState(ctx)
+ }
+
+ // Reserve a log generation and position counts for the new SyncGroup.
+ //gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
+ gen, pos := uint64(1), uint64(1)
+
// Add to joiner list.
sg.Joiners[joinerName] = joinerInfo
- return setSGDataEntry(ctx, tx, sg.Id, sg)
+ return s.updateSyncGroupVersioning(ctx, tx, version, true, s.id, gen, pos, sg)
})
if err != nil {
- return interfaces.SyncGroup{}, err
+ return nullSG, "", nullGV, err
}
- return *sg, nil
+ // TODO(rdaoud): return the SyncGroup genvec
+ return *sg, version, nullGV, nil
}
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index 4dc5845..af44cfb 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -50,6 +50,7 @@
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
+ s := svc.sync
checkSGStats(t, svc, "add-1", 0, 0)
@@ -57,6 +58,7 @@
sgName := "foobar"
sgId := interfaces.GroupId(1234)
+ version := "v111"
sg := &interfaces.SyncGroup{
Name: sgName,
@@ -76,7 +78,7 @@
}
tx := st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg); err != nil {
+ if err := s.addSyncGroup(nil, tx, version, true, "", nil, s.id, 1, 1, sg); err != nil {
t.Errorf("cannot add SyncGroup ID %d: %v", sg.Id, err)
}
if err := tx.Commit(); err != nil {
@@ -88,10 +90,6 @@
if id, err := getSyncGroupId(nil, st, sgName); err != nil || id != sgId {
t.Errorf("cannot get ID of SyncGroup %s: got %d instead of %d; err: %v", sgName, id, sgId, err)
}
- if name, err := getSyncGroupName(nil, st, sgId); err != nil || name != sgName {
- t.Errorf("cannot get name of SyncGroup %d: got %s instead of %s; err: %v",
- sgId, name, sgName, err)
- }
sgOut, err := getSyncGroupById(nil, st, sgId)
if err != nil {
@@ -150,7 +148,7 @@
sg.Name = "another-name"
tx = st.NewTransaction()
- if err = addSyncGroup(nil, tx, sg); err == nil {
+ if err = s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg); err == nil {
t.Errorf("re-adding SyncGroup %d did not fail", sgId)
}
tx.Abort()
@@ -159,7 +157,7 @@
sg.Id = interfaces.GroupId(5555)
tx = st.NewTransaction()
- if err = addSyncGroup(nil, tx, sg); err == nil {
+ if err = s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 3, 3, sg); err == nil {
t.Errorf("adding SyncGroup %s with a different ID did not fail", sgName)
}
tx.Abort()
@@ -173,9 +171,6 @@
if id, err := getSyncGroupId(nil, st, badName); err == nil {
t.Errorf("found non-existing SyncGroup %s: got ID %d", badName, id)
}
- if name, err := getSyncGroupName(nil, st, badId); err == nil {
- t.Errorf("found non-existing SyncGroup %d: got name %s", badId, name)
- }
if sg, err := getSyncGroupByName(nil, st, badName); err == nil {
t.Errorf("found non-existing SyncGroup %s: got %v", badName, sg)
}
@@ -191,10 +186,11 @@
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
+ s := svc.sync
checkBadAddSyncGroup := func(t *testing.T, st store.Store, sg *interfaces.SyncGroup, msg string) {
tx := st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg); err == nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg); err == nil {
t.Errorf("checkBadAddSyncGroup: adding bad SyncGroup (%s) did not fail", msg)
}
tx.Abort()
@@ -202,13 +198,22 @@
checkBadAddSyncGroup(t, st, nil, "nil SG")
- sg := &interfaces.SyncGroup{Id: 1234}
+ sg := &interfaces.SyncGroup{}
checkBadAddSyncGroup(t, st, sg, "SG w/o name")
- sg = &interfaces.SyncGroup{Name: "foobar"}
- checkBadAddSyncGroup(t, st, sg, "SG w/o Id")
+ sg.Name = "foobar"
+ checkBadAddSyncGroup(t, st, sg, "SG w/o AppName")
- sg.Id = 1234
+ sg.AppName = "mockApp"
+ checkBadAddSyncGroup(t, st, sg, "SG w/o DbName")
+
+ sg.DbName = "mockDb"
+ checkBadAddSyncGroup(t, st, sg, "SG w/o creator")
+
+ sg.Creator = "haha"
+ checkBadAddSyncGroup(t, st, sg, "SG w/o ID")
+
+ sg.Id = newSyncGroupId()
checkBadAddSyncGroup(t, st, sg, "SG w/o Version")
sg.SpecVersion = "v1"
@@ -218,6 +223,9 @@
"phone": nosql.SyncGroupMemberInfo{SyncPriority: 10},
}
checkBadAddSyncGroup(t, st, sg, "SG w/o Prefixes")
+
+ sg.Spec.Prefixes = []string{"foo", "bar", "foo"}
+ checkBadAddSyncGroup(t, st, sg, "SG with duplicate Prefixes")
}
// TestDeleteSyncGroup tests deleting a SyncGroup.
@@ -227,6 +235,7 @@
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
+ s := svc.sync
sgName := "foobar"
sgId := interfaces.GroupId(1234)
@@ -264,7 +273,7 @@
}
tx = st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg); err != nil {
t.Errorf("creating SyncGroup ID %d failed: %v", sgId, err)
}
if err := tx.Commit(); err != nil {
@@ -285,16 +294,24 @@
checkSGStats(t, svc, "del-3", 0, 0)
- // Create it again then delete it by name.
+ // Create it again, update it, then delete it by name.
tx = st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg); err != nil {
t.Errorf("creating SyncGroup ID %d after delete failed: %v", sgId, err)
}
if err := tx.Commit(); err != nil {
t.Errorf("cannot commit adding SyncGroup ID %d after delete: %v", sgId, err)
}
+ tx = st.NewTransaction()
+ if err := s.updateSyncGroupVersioning(nil, tx, NoVersion, true, s.id, 3, 3, sg); err != nil {
+ t.Errorf("updating SyncGroup ID %d version: %v", sgId, err)
+ }
+ if err := tx.Commit(); err != nil {
+ t.Errorf("cannot commit updating SyncGroup ID %d version: %v", sgId, err)
+ }
+
checkSGStats(t, svc, "del-4", 1, 3)
tx = st.NewTransaction()
@@ -315,6 +332,7 @@
svc := createService(t)
defer destroyService(t, svc)
st := svc.St()
+ s := svc.sync
sgName1, sgName2 := "foo", "bar"
sgId1, sgId2 := interfaces.GroupId(1234), interfaces.GroupId(8888)
@@ -357,7 +375,7 @@
}
tx := st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg1); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg1); err != nil {
t.Errorf("creating SyncGroup ID %d failed: %v", sgId1, err)
}
if err := tx.Commit(); err != nil {
@@ -367,7 +385,7 @@
checkSGStats(t, svc, "multi-1", 1, 3)
tx = st.NewTransaction()
- if err := addSyncGroup(nil, tx, sg2); err != nil {
+ if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg2); err != nil {
t.Errorf("creating SyncGroup ID %d failed: %v", sgId2, err)
}
if err := tx.Commit(); err != nil {
@@ -510,3 +528,21 @@
}
}
}
+
+// TestPrefixCompare tests the prefix comparison utility.
+func TestPrefixCompare(t *testing.T) {
+ check := func(t *testing.T, pfx1, pfx2 []string, want bool, msg string) {
+ if got := samePrefixes(pfx1, pfx2); got != want {
+ t.Errorf("samePrefixes: %s: got %t instead of %t", msg, got, want)
+ }
+ }
+
+ check(t, nil, nil, true, "both nil")
+ check(t, []string{}, nil, true, "empty vs nil")
+ check(t, []string{"a", "b"}, []string{"b", "a"}, true, "different ordering")
+ check(t, []string{"a", "b", "c"}, []string{"b", "a"}, false, "p1 superset of p2")
+ check(t, []string{"a", "b"}, []string{"b", "a", "c"}, false, "p2 superset of p1")
+ check(t, []string{"a", "b", "c"}, []string{"b", "d", "a"}, false, "overlap")
+ check(t, []string{"a", "b", "c"}, []string{"x", "y"}, false, "no overlap")
+ check(t, []string{"a", "b"}, []string{"B", "a"}, false, "upper/lowercases")
+}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 9cf53d3..a0c8731 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -43,3 +43,44 @@
Metadata interfaces.LogRecMetadata
Pos uint64 // position in the Database log.
}
+
+// sgLocalState holds the SyncGroup local state, only relevant to this member
+// (i.e. the local Syncbase). This is needed for crash recovery of the internal
+// state transitions of the SyncGroup.
+type sgLocalState struct {
+ // The count of local joiners to the same SyncGroup.
+ NumLocalJoiners uint32
+
+ // The SyncGroup is watched when the sync Watcher starts processing the
+ // SyncGroup data. When a SyncGroup is created or joined, an entry is
+ // added to the Watcher queue (log) to inform it from which point to
+ // start accepting store mutations, an asynchronous notification similar
+ // to regular store mutations. When the Watcher processes that queue
+ // entry, it sets this bit to true. When Syncbase restarts, the value
+ // of this bit allows the new sync Watcher to recreate its in-memory
+ // state by resuming to watch only the prefixes of SyncGroups that were
+ // previously being watched.
+ Watched bool
+
+ // The SyncGroup was published here by this remote peer (if non-empty
+ // string), typically the SyncGroup creator. In this case the SyncGroup
+ // cannot be GCed locally even if it has no local joiners.
+ RemotePublisher string
+
+ // The SyncGroup is in pending state on a device that learns the current
+ // state of the SyncGroup from another device but has not yet received
+ // through peer-to-peer sync the history of the changes (DAG and logs).
+ // This happens in two cases:
+ // 1- A joiner was accepted into a SyncGroup by a SyncGroup admin and
+ // only given the current SyncGroup info synchronously and will
+ // receive the full history later via p2p sync.
+ // 2- A remote server where the SyncGroup is published was told by the
+ // SyncGroup publisher the current SyncGroup info synchronously and
+ // will receive the full history later via p2p sync.
+ // The pending state is over when the device reaches or exceeds the
+ // knowledge level indicated in the pending genvec. While SyncPending
+ // is true, no local SyncGroup mutations are allowed (i.e. no join or
+ // set-spec requests).
+ SyncPending bool
+ PendingGenVec interfaces.PrefixGenVector
+}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index c02d2d4..34f425f 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -61,11 +61,55 @@
}) {
}
+// sgLocalState holds the SyncGroup local state, only relevant to this member
+// (i.e. the local Syncbase). This is needed for crash recovery of the internal
+// state transitions of the SyncGroup.
+type sgLocalState struct {
+ // The count of local joiners to the same SyncGroup.
+ NumLocalJoiners uint32
+ // The SyncGroup is watched when the sync Watcher starts processing the
+ // SyncGroup data. When a SyncGroup is created or joined, an entry is
+ // added to the Watcher queue (log) to inform it from which point to
+ // start accepting store mutations, an asynchronous notification similar
+ // to regular store mutations. When the Watcher processes that queue
+ // entry, it sets this bit to true. When Syncbase restarts, the value
+ // of this bit allows the new sync Watcher to recreate its in-memory
+ // state by resuming to watch only the prefixes of SyncGroups that were
+ // previously being watched.
+ Watched bool
+ // The SyncGroup was published here by this remote peer (if non-empty
+ // string), typically the SyncGroup creator. In this case the SyncGroup
+ // cannot be GCed locally even if it has no local joiners.
+ RemotePublisher string
+ // The SyncGroup is in pending state on a device that learns the current
+ // state of the SyncGroup from another device but has not yet received
+ // through peer-to-peer sync the history of the changes (DAG and logs).
+ // This happens in two cases:
+ // 1- A joiner was accepted into a SyncGroup by a SyncGroup admin and
+ // only given the current SyncGroup info synchronously and will
+ // receive the full history later via p2p sync.
+ // 2- A remote server where the SyncGroup is published was told by the
+ // SyncGroup publisher the current SyncGroup info synchronously and
+ // will receive the full history later via p2p sync.
+ // The pending state is over when the device reaches or exceeds the
+ // knowledge level indicated in the pending genvec. While SyncPending
+ // is true, no local SyncGroup mutations are allowed (i.e. no join or
+ // set-spec requests).
+ SyncPending bool
+ PendingGenVec interfaces.PrefixGenVector
+}
+
+func (sgLocalState) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/vsync.sgLocalState"`
+}) {
+}
+
func init() {
vdl.Register((*syncData)(nil))
vdl.Register((*localGenInfo)(nil))
vdl.Register((*dbSyncState)(nil))
vdl.Register((*localLogRec)(nil))
+ vdl.Register((*sgLocalState)(nil))
}
const logPrefix = "log" // log state.
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index 6edb6c0..3e67250 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -580,20 +580,6 @@
ctx.Errorf("close called on non-existent call: %v", id)
}
-func (c *Controller) maybeCreateServer(serverId uint32, opts ...rpc.ServerOpt) (*server.Server, error) {
- c.Lock()
- defer c.Unlock()
- if server, ok := c.servers[serverId]; ok {
- return server, nil
- }
- server, err := server.NewServer(serverId, c.listenSpec, c, opts...)
- if err != nil {
- return nil, err
- }
- c.servers[serverId] = server
- return server, nil
-}
-
// HandleLookupResponse handles the result of a Dispatcher.Lookup call that was
// run by the Javascript server.
func (c *Controller) HandleLookupResponse(ctx *context.T, id int32, data string) {
@@ -624,22 +610,25 @@
server.HandleAuthResponse(ctx, id, data)
}
-// Serve instructs WSPR to start listening for calls on behalf
-// of a javascript server.
-func (c *Controller) Serve(ctx *context.T, _ rpc.ServerCall, name string, serverId uint32, rpcServerOpts []RpcServerOption) error {
-
+// NewServer instructs WSPR to create a server and start listening for calls on
+// behalf of a JavaScript server.
+func (c *Controller) NewServer(ctx *context.T, _ rpc.ServerCall, name string, serverId uint32, rpcServerOpts []RpcServerOption) error {
opts, err := c.serverOpts(rpcServerOpts)
if err != nil {
return verror.Convert(verror.ErrInternal, nil, err)
}
- server, err := c.maybeCreateServer(serverId, opts...)
+
+ s, err := server.NewServer(serverId, name, c.listenSpec, c, opts...)
if err != nil {
return verror.Convert(verror.ErrInternal, nil, err)
}
- ctx.VI(2).Infof("serving under name: %q", name)
- if err := server.Serve(name); err != nil {
- return verror.Convert(verror.ErrInternal, nil, err)
- }
+
+ c.Lock()
+ c.servers[serverId] = s
+ c.Unlock()
+
+ ctx.VI(2).Infof("server created under name: %q", name)
+
return nil
}
@@ -661,11 +650,11 @@
// AddName adds a published name to an existing server.
func (c *Controller) AddName(_ *context.T, _ rpc.ServerCall, serverId uint32, name string) error {
- // Create a server for the pipe, if it does not exist already
- server, err := c.maybeCreateServer(serverId)
+ server, err := c.getServerById(serverId)
if err != nil {
- return verror.Convert(verror.ErrInternal, nil, err)
+ return err
}
+
// Add name
if err := server.AddName(name); err != nil {
return verror.Convert(verror.ErrInternal, nil, err)
@@ -675,11 +664,11 @@
// RemoveName removes a published name from an existing server.
func (c *Controller) RemoveName(_ *context.T, _ rpc.ServerCall, serverId uint32, name string) error {
- // Create a server for the pipe, if it does not exist already
- server, err := c.maybeCreateServer(serverId)
+ server, err := c.getServerById(serverId)
if err != nil {
- return verror.Convert(verror.ErrInternal, nil, err)
+ return err
}
+
// Remove name
server.RemoveName(name)
// Remove name from signature cache as well
@@ -687,6 +676,18 @@
return nil
}
+// getServerById return a server for the given id or an error if id does not match any servers.
+func (c *Controller) getServerById(serverId uint32) (*server.Server, error) {
+ c.Lock()
+ defer c.Unlock()
+ server, ok := c.servers[serverId]
+ if !ok {
+ return nil, verror.Convert(verror.ErrInternal, nil, fmt.Errorf("Trying to getServerById, but id %d does not match any servers.", serverId))
+ }
+
+ return server, nil
+}
+
// HandleServerResponse handles the completion of outstanding calls to JavaScript services
// by filling the corresponding channel with the result from JavaScript.
func (c *Controller) HandleServerResponse(ctx *context.T, id int32, data string) {
diff --git a/services/wspr/internal/app/app_test.go b/services/wspr/internal/app/app_test.go
index 8731152..567ec85 100644
--- a/services/wspr/internal/app/app_test.go
+++ b/services/wspr/internal/app/app_test.go
@@ -352,7 +352,7 @@
typeEncoder := vom.NewTypeEncoder(typeStream)
req, err := makeRequest(typeEncoder, RpcRequest{
Name: "__controller",
- Method: "Serve",
+ Method: "NewServer",
NumInArgs: 3,
NumOutArgs: 1,
Deadline: vdltime.Deadline{},
diff --git a/services/wspr/internal/app/controller.vdl b/services/wspr/internal/app/controller.vdl
index e464467..33f1bef 100644
--- a/services/wspr/internal/app/controller.vdl
+++ b/services/wspr/internal/app/controller.vdl
@@ -12,9 +12,9 @@
)
type Controller interface {
- // Serve instructs WSPR to start listening for calls on behalf
- // of a javascript server.
- Serve(name string, serverId uint32, serverOpts []RpcServerOption) error
+ // NewServer instructs WSPR to create a server and start listening for calls on
+ // behalf of a JavaScript server.
+ NewServer(name string, serverId uint32, serverOpts []RpcServerOption) error
// Stop instructs WSPR to stop listening for calls for the
// given javascript server.
Stop(serverId uint32) error
diff --git a/services/wspr/internal/app/controller.vdl.go b/services/wspr/internal/app/controller.vdl.go
index 439110e..0f73a4c 100644
--- a/services/wspr/internal/app/controller.vdl.go
+++ b/services/wspr/internal/app/controller.vdl.go
@@ -22,9 +22,9 @@
// ControllerClientMethods is the client interface
// containing Controller methods.
type ControllerClientMethods interface {
- // Serve instructs WSPR to start listening for calls on behalf
- // of a javascript server.
- Serve(ctx *context.T, name string, serverId uint32, serverOpts []RpcServerOption, opts ...rpc.CallOpt) error
+ // NewServer instructs WSPR to create a server and start listening for calls on
+ // behalf of a JavaScript server.
+ NewServer(ctx *context.T, name string, serverId uint32, serverOpts []RpcServerOption, opts ...rpc.CallOpt) error
// Stop instructs WSPR to stop listening for calls for the
// given javascript server.
Stop(ctx *context.T, serverId uint32, opts ...rpc.CallOpt) error
@@ -74,8 +74,8 @@
name string
}
-func (c implControllerClientStub) Serve(ctx *context.T, i0 string, i1 uint32, i2 []RpcServerOption, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "Serve", []interface{}{i0, i1, i2}, nil, opts...)
+func (c implControllerClientStub) NewServer(ctx *context.T, i0 string, i1 uint32, i2 []RpcServerOption, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "NewServer", []interface{}{i0, i1, i2}, nil, opts...)
return
}
@@ -157,9 +157,9 @@
// ControllerServerMethods is the interface a server writer
// implements for Controller.
type ControllerServerMethods interface {
- // Serve instructs WSPR to start listening for calls on behalf
- // of a javascript server.
- Serve(ctx *context.T, call rpc.ServerCall, name string, serverId uint32, serverOpts []RpcServerOption) error
+ // NewServer instructs WSPR to create a server and start listening for calls on
+ // behalf of a JavaScript server.
+ NewServer(ctx *context.T, call rpc.ServerCall, name string, serverId uint32, serverOpts []RpcServerOption) error
// Stop instructs WSPR to stop listening for calls for the
// given javascript server.
Stop(ctx *context.T, call rpc.ServerCall, serverId uint32) error
@@ -229,8 +229,8 @@
gs *rpc.GlobState
}
-func (s implControllerServerStub) Serve(ctx *context.T, call rpc.ServerCall, i0 string, i1 uint32, i2 []RpcServerOption) error {
- return s.impl.Serve(ctx, call, i0, i1, i2)
+func (s implControllerServerStub) NewServer(ctx *context.T, call rpc.ServerCall, i0 string, i1 uint32, i2 []RpcServerOption) error {
+ return s.impl.NewServer(ctx, call, i0, i1, i2)
}
func (s implControllerServerStub) Stop(ctx *context.T, call rpc.ServerCall, i0 uint32) error {
@@ -310,8 +310,8 @@
PkgPath: "v.io/x/ref/services/wspr/internal/app",
Methods: []rpc.MethodDesc{
{
- Name: "Serve",
- Doc: "// Serve instructs WSPR to start listening for calls on behalf\n// of a javascript server.",
+ Name: "NewServer",
+ Doc: "// NewServer instructs WSPR to create a server and start listening for calls on\n// behalf of a JavaScript server.",
InArgs: []rpc.ArgDesc{
{"name", ``}, // string
{"serverId", ``}, // uint32
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
index 08661c0..5e693f3 100644
--- a/services/wspr/internal/rpc/server/server.go
+++ b/services/wspr/internal/rpc/server/server.go
@@ -68,19 +68,12 @@
// This should be locked before outstandingRequestLock.
serverStateLock sync.Mutex
- // The rpc.ListenSpec to use with server.Listen
- listenSpec *rpc.ListenSpec
+ // The server that handles the rpc layer.
+ server rpc.Server
- // The server that handles the rpc layer. Listen on this server is
- // lazily started.
- server rpc.DeprecatedServer
-
- // The saved dispatcher to reuse when serve is called multiple times.
+ // The saved dispatcher.
dispatcher *dispatcher
- // Whether the server is listening.
- isListening bool
-
// The server id.
id uint32
helper ServerHelper
@@ -100,11 +93,10 @@
type serverContextKey struct{}
-func NewServer(id uint32, listenSpec *rpc.ListenSpec, helper ServerHelper, opts ...rpc.ServerOpt) (*Server, error) {
+func NewServer(id uint32, name string, listenSpec *rpc.ListenSpec, helper ServerHelper, opts ...rpc.ServerOpt) (*Server, error) {
server := &Server{
- id: id,
- helper: helper,
- listenSpec: listenSpec,
+ id: id,
+ helper: helper,
outstandingServerRequests: make(map[int32]chan *lib.ServerRpcReply),
outstandingAuthRequests: make(map[int32]chan error),
outstandingValidationRequests: make(map[int32]chan []error),
@@ -112,10 +104,20 @@
var err error
ctx := helper.Context()
ctx = context.WithValue(ctx, serverContextKey{}, server)
- if server.server, err = v23.NewServer(ctx, opts...); err != nil {
+
+ server.serverStateLock.Lock()
+ defer server.serverStateLock.Unlock()
+
+ server.dispatcher = newDispatcher(server.id, server, server, server, server.helper)
+ ctx = v23.WithListenSpec(ctx, *listenSpec)
+
+ if ctx, server.server, err = v23.WithNewDispatchingServer(ctx, name, server.dispatcher, opts...); err != nil {
return nil, err
}
server.ctx = ctx
+ server.statusClose = make(chan struct{}, 1)
+ go server.readStatus()
+
return server, nil
}
@@ -602,29 +604,6 @@
}
}
-func (s *Server) Serve(name string) error {
- s.serverStateLock.Lock()
- defer s.serverStateLock.Unlock()
-
- if s.dispatcher == nil {
- s.dispatcher = newDispatcher(s.id, s, s, s, s.helper)
- }
-
- if !s.isListening {
- _, err := s.server.Listen(*s.listenSpec)
- if err != nil {
- return err
- }
- s.isListening = true
- }
- if err := s.server.ServeDispatcher(name, s.dispatcher); err != nil {
- return err
- }
- s.statusClose = make(chan struct{}, 1)
- go s.readStatus()
- return nil
-}
-
func (s *Server) popServerRequest(id int32) chan *lib.ServerRpcReply {
s.outstandingRequestLock.Lock()
defer s.outstandingRequestLock.Unlock()
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index 4257409..43f613a 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -43,14 +43,6 @@
func TestMultipleProxies(t *testing.T) {
pctx, shutdown := v23.Init()
defer shutdown()
- p2ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
- if err != nil {
- t.Fatal(err)
- }
- p3ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
- if err != nil {
- t.Fatal(err)
- }
actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
if err != nil {
t.Fatal(err)
@@ -62,9 +54,9 @@
pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
- p2ep := startProxy(t, p2ctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
+ p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
- p3ep := startProxy(t, p3ctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
+ p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
if err := am.Listen(actx, "v23", p3ep.String()); err != nil {
t.Fatal(err)
@@ -147,7 +139,7 @@
ls.Addrs = append(ls.Addrs, addr)
}
ctx = v23.WithListenSpec(ctx, ls)
- proxy, err := xproxyd.New(ctx)
+ proxy, _, err := xproxyd.New(ctx)
if err != nil {
t.Fatal(err)
}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index fc97ceb..d05a561 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -24,41 +24,45 @@
proxyEndpoints []naming.Endpoint
}
-func New(ctx *context.T) (*proxy, error) {
+func New(ctx *context.T) (*proxy, *context.T, error) {
+ ctx, mgr, err := v23.ExperimentalWithNewFlowManager(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
p := &proxy{
- m: v23.ExperimentalGetFlowManager(ctx),
+ m: mgr,
}
for _, addr := range v23.GetListenSpec(ctx).Addrs {
if addr.Protocol == "v23" {
ep, err := v23.NewEndpoint(addr.Address)
if err != nil {
- return nil, err
+ return nil, nil, err
}
f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
if err != nil {
- return nil, err
+ return nil, nil, err
}
// Send a byte telling the acceptor that we are a proxy.
if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
- return nil, err
+ return nil, nil, err
}
msg, err := readMessage(ctx, f)
if err != nil {
- return nil, err
+ return nil, nil, err
}
m, ok := msg.(*message.ProxyResponse)
if !ok {
- return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
+ return nil, nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
}
p.mu.Lock()
p.proxyEndpoints = append(p.proxyEndpoints, m.Endpoints...)
p.mu.Unlock()
} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
- return nil, err
+ return nil, nil, err
}
}
go p.listenLoop(ctx)
- return p, nil
+ return p, ctx, nil
}
func (p *proxy) ListeningEndpoints() []naming.Endpoint {
diff --git a/test/modules/shell.go b/test/modules/shell.go
index 4b3400e..c4dae02 100644
--- a/test/modules/shell.go
+++ b/test/modules/shell.go
@@ -229,7 +229,7 @@
sh.ctx = ctx
sh.logger = ctx
- if sh.tempCredDir, err = ioutil.TempDir("", "shell_credentials-"); err != nil {
+ if sh.tempCredDir, err = ioutil.TempDir("", "sh_creds"); err != nil {
return nil, err
}
if sh.agent, err = keymgr.NewLocalAgent(sh.tempCredDir, nil); err != nil {
@@ -282,7 +282,7 @@
return fd, nil
}
-// NewCustomCredentials creates a new Principal for StartWithOpts..
+// NewCustomCredentials creates a new Principal for StartWithOpts.
// Returns nil if the shell is not managing principals.
func (sh *Shell) NewCustomCredentials() (cred *CustomCredentials, err error) {
// Create child principal.
@@ -293,7 +293,7 @@
if err != nil {
return nil, err
}
- dir, err := ioutil.TempDir(sh.tempCredDir, "agent")
+ dir, err := ioutil.TempDir(sh.tempCredDir, "a")
if err != nil {
return nil, err
}