Merge "veyron/services/mounttable: Fix Glob bug."
diff --git a/examples/bank/pbankd/main.go b/examples/bank/pbankd/main.go
index acf9382..46caabf 100644
--- a/examples/bank/pbankd/main.go
+++ b/examples/bank/pbankd/main.go
@@ -442,10 +442,10 @@
// bankAccountServer := bank.NewServerBankAccount(pbankd)
// // Setup bank and account authorizers.
- // bankAuth := vsecurity.NewACLAuthorizer(security.NewWhitelistACL(
+ // bankAuth := vsecurity.NewACLAuthorizer(security.ACL{In:
// map[security.BlessingPattern]security.LabelSet{
// security.AllPrincipals: security.LabelSet(security.ReadLabel | security.WriteLabel),
- // }))
+ // }})
// bankAccountAuth := AccountAuthorizer(runtime.Identity().PublicID().Names()[0] + SUFFIX_REGEXP)
// dispatcher := newBankDispatcher(bankServer, bankAccountServer, bankAuth, bankAccountAuth)
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index c2dee7a..9e39a2e 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -207,12 +207,10 @@
}
rStream := stream.RecvStream()
for rStream.Advance() {
- cb := rStream.Value()
- for _, change := range cb.Changes {
- if entry, ok := change.Value.(*storage.Entry); ok {
- if box, ok := entry.Value.(boxes.Box); ok && box.DeviceId != gs.myIPAddr {
- nativeJava.addBox(&box)
- }
+ change := rStream.Value()
+ if entry, ok := change.Value.(*storage.Entry); ok {
+ if box, ok := entry.Value.(boxes.Box); ok && box.DeviceId != gs.myIPAddr {
+ nativeJava.addBox(&box)
}
}
}
@@ -234,10 +232,9 @@
}
func (gs *goState) registerAsPeer(ctx context.T) {
- auth := vsecurity.NewACLAuthorizer(vsecurity.NewWhitelistACL(
- map[security.BlessingPattern]security.LabelSet{
- security.AllPrincipals: security.LabelSet(security.AdminLabel),
- }))
+ auth := vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
+ security.AllPrincipals: security.LabelSet(security.AdminLabel),
+ }})
gs.disp.drawAuth = auth
gs.disp.drawServer = ipc.ReflectInvoker(boxes.NewServerDrawInterface(gs))
endPt, err := gs.ipc.Listen("tcp", gs.myIPAddr+drawServicePort)
diff --git a/examples/stfortune/stfortune/main.go b/examples/stfortune/stfortune/main.go
index b48462c..dc7c48d 100644
--- a/examples/stfortune/stfortune/main.go
+++ b/examples/stfortune/stfortune/main.go
@@ -97,23 +97,21 @@
rStream := stream.RecvStream()
for rStream.Advance() {
- batch := rStream.Value()
+ change := rStream.Value()
- for _, change := range batch.Changes {
- entry, ok := change.Value.(*storage.Entry)
- if !ok {
- log.Printf("watcher change Value not a storage Entry: %#v", change.Value)
- continue
- }
-
- fortune, ok := entry.Value.(schema.FortuneData)
- if !ok {
- log.Printf("watcher data not a FortuneData Entry: %#v", entry.Value)
- continue
- }
-
- fmt.Printf("watcher: new fortune: %s\n", fortune.Fortune)
+ entry, ok := change.Value.(*storage.Entry)
+ if !ok {
+ log.Printf("watcher change Value not a storage Entry: %#v", change.Value)
+ continue
}
+
+ fortune, ok := entry.Value.(schema.FortuneData)
+ if !ok {
+ log.Printf("watcher data not a FortuneData Entry: %#v", entry.Value)
+ continue
+ }
+
+ fmt.Printf("watcher: new fortune: %s\n", fortune.Fortune)
}
err = rStream.Err()
if err == nil {
diff --git a/lib/testutil/blackbox/subprocess.go b/lib/testutil/blackbox/subprocess.go
index 0cb8092..984062b 100644
--- a/lib/testutil/blackbox/subprocess.go
+++ b/lib/testutil/blackbox/subprocess.go
@@ -53,6 +53,31 @@
return "--test.timeout=1m"
}
+// VeyronEnvironment returns only the environment variables needed for
+// Veyron execution from the provided list.
+func VeyronEnvironment(env []string) []string {
+ veyronEnvironmentPrefixes := []string{
+ "VEYRON_",
+ "NAMESPACE_ROOT",
+ "PAUSE_BEFORE_STOP",
+ "TMPDIR",
+ }
+
+ var ret []string
+ for _, e := range env {
+ if eqIdx := strings.Index(e, "="); eqIdx > 0 {
+ key := e[:eqIdx]
+ for _, prefix := range veyronEnvironmentPrefixes {
+ if strings.HasPrefix(key, prefix) {
+ ret = append(ret, fmt.Sprintf("%s=%s", key, e[eqIdx+1:]))
+ break
+ }
+ }
+ }
+ }
+ return ret
+}
+
// HelperCommand() takes an argument list and starts a helper subprocess.
// t maybe nil to allow use from outside of tests.
func HelperCommand(t *testing.T, command string, args ...string) *Child {
@@ -77,7 +102,8 @@
} else {
cmd.Stderr = stderr
}
- cmd.Env = append([]string{"VEYRON_BLACKBOX_TEST=1"}, os.Environ()...)
+
+ cmd.Env = append([]string{"VEYRON_BLACKBOX_TEST=1"}, VeyronEnvironment(os.Environ())...)
stdout, _ := cmd.StdoutPipe()
stdin, _ := cmd.StdinPipe()
return &Child{
diff --git a/lib/testutil/blackbox/subprocess_test.go b/lib/testutil/blackbox/subprocess_test.go
index 5105346..d3f8bf3 100644
--- a/lib/testutil/blackbox/subprocess_test.go
+++ b/lib/testutil/blackbox/subprocess_test.go
@@ -7,6 +7,7 @@
"fmt"
"io"
"os"
+ "reflect"
"strconv"
"strings"
"syscall"
@@ -251,3 +252,30 @@
waitForNonExistence(t, []int{child})
}
+
+func TestVeyronEnvironmentment(t *testing.T) {
+ in := [][]string{
+ []string{},
+ []string{"Hello", "NO=1"},
+ []string{"VEYRON_X=1", "Hello"},
+ []string{"NAMESPACE_ROOT=1", "VEYRON_IDENTITY=me"},
+ []string{"PAUSE_BEFORE_STOP=1", "NAMESPACE_ROOT2=2", "BOB=ALICE", "Hello"},
+ }
+
+ expected := [][]string{
+ []string{},
+ []string{},
+ []string{"VEYRON_X=1"},
+ []string{"NAMESPACE_ROOT=1", "VEYRON_IDENTITY=me"},
+ []string{"PAUSE_BEFORE_STOP=1", "NAMESPACE_ROOT2=2"},
+ }
+
+ for i, _ := range in {
+ s := blackbox.VeyronEnvironment(in[i])
+ if len(s) == 0 && len(expected[i]) == 0 {
+ continue
+ } else if !reflect.DeepEqual(expected[i], s) {
+ t.Fatalf("subtest %d: got %v, expected %v", i, s, expected[i])
+ }
+ }
+}
diff --git a/lib/testutil/security/util_test.go b/lib/testutil/security/util_test.go
index 3a2fd3a..9586f57 100644
--- a/lib/testutil/security/util_test.go
+++ b/lib/testutil/security/util_test.go
@@ -51,12 +51,12 @@
}
defer r.Cleanup()
acl := security.ACL{}
- acl.In.Principals = map[security.BlessingPattern]security.LabelSet{
- "veyron/*": security.LabelSet(security.ReadLabel),
+ acl.In = map[security.BlessingPattern]security.LabelSet{
+ "veyron/...": security.LabelSet(security.ReadLabel),
"veyron/alice": security.LabelSet(security.ReadLabel | security.WriteLabel),
"veyron/bob": security.LabelSet(security.AdminLabel),
}
- acl.NotIn.Principals = map[security.BlessingPattern]security.LabelSet{
+ acl.NotIn = map[string]security.LabelSet{
"veyron/che": security.LabelSet(security.ReadLabel),
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 9224834..3135149 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -160,12 +160,11 @@
case "nilAuth":
authorizer = nil
case "aclAuth":
- // Only authorize clients matching patterns "client" or "server/*".
- authorizer = vsecurity.NewACLAuthorizer(vsecurity.NewWhitelistACL(
- map[security.BlessingPattern]security.LabelSet{
- "server/*": security.LabelSet(security.AdminLabel),
- "client": security.LabelSet(security.AdminLabel),
- }))
+ // Only authorize clients matching patterns "client" or "server/...".
+ authorizer = vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
+ "server/...": security.LabelSet(security.AdminLabel),
+ "client": security.LabelSet(security.AdminLabel),
+ }})
default:
authorizer = testServerAuthorizer{}
}
@@ -1056,70 +1055,6 @@
}
}
-// TestPublishOptions verifies that the options that are relevant to how
-// a server publishes its endpoints have the right effect.
-func TestPublishOptions(t *testing.T) {
- sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
- ns := newNamespace()
- cases := []struct {
- opts []ipc.ServerOpt
- expect []string
- }{
- {[]ipc.ServerOpt{}, []string{"127.0.0.1", "127.0.0.1"}},
- {[]ipc.ServerOpt{veyron2.PublishAll}, []string{"127.0.0.1", "127.0.0.1"}},
- {[]ipc.ServerOpt{veyron2.PublishFirst}, []string{"127.0.0.1"}},
- {[]ipc.ServerOpt{veyron2.EndpointRewriteOpt("example1.com"), veyron2.EndpointRewriteOpt("example2.com")}, []string{"example2.com", "example2.com"}},
- {[]ipc.ServerOpt{veyron2.PublishFirst, veyron2.EndpointRewriteOpt("example.com")}, []string{"example.com"}},
- }
- for i, c := range cases {
- server, err := InternalNewServer(testContext(), sm, ns, append(c.opts, vc.FixedLocalID(serverID))...)
- if err != nil {
- t.Errorf("InternalNewServer failed: %v", err)
- continue
- }
- if _, err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
- t.Errorf("server.Listen failed: %v", err)
- server.Stop()
- continue
- }
- if _, err := server.Listen("tcp", "127.0.0.1:0"); err != nil {
- t.Errorf("server.Listen failed: %v", err)
- server.Stop()
- continue
- }
- if err := server.Serve("mountpoint", &testServerDisp{}); err != nil {
- t.Errorf("server.Publish failed: %v", err)
- server.Stop()
- continue
- }
- servers, err := ns.Resolve(testContext(), "mountpoint")
- if err != nil {
- t.Errorf("mountpoint not found in mounttable")
- server.Stop()
- continue
- }
- var got []string
- for _, s := range servers {
- address, _ := naming.SplitAddressName(s)
- ep, err := inaming.NewEndpoint(address)
- if err != nil {
- t.Errorf("case #%d: server with invalid endpoint %q: %v", i, address, err)
- continue
- }
- host, _, err := net.SplitHostPort(ep.Addr().String())
- if err != nil {
- t.Errorf("case #%d: server endpoint with invalid address %q: %v", i, ep.Addr(), err)
- continue
- }
- got = append(got, host)
- }
- if want := c.expect; !reflect.DeepEqual(want, got) {
- t.Errorf("case #%d: expected mounted servers with addresses %q, got %q instead", i, want, got)
- }
- server.Stop()
- }
-}
-
// TestReconnect verifies that the client transparently re-establishes the
// connection to the server if the server dies and comes back (on the same
// endpoint).
@@ -1171,6 +1106,64 @@
}
}
+func TestPreferredAddress(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
+ ns := newNamespace()
+ pa := func(string) (net.Addr, error) {
+ a := &net.IPAddr{}
+ a.IP = net.ParseIP("1.1.1.1")
+ return a, nil
+ }
+ server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID), veyron2.PreferredAddressOpt(pa))
+ if err != nil {
+ t.Errorf("InternalNewServer failed: %v", err)
+ }
+ defer server.Stop()
+ ep, err := server.Listen("tcp4", ":0")
+ iep := ep.(*inaming.Endpoint)
+ host, _, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ t.Errorf("unexpected error: %s", err)
+ }
+ if got, want := host, "1.1.1.1"; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ // Won't override the specified address.
+ ep, err = server.Listen("tcp4", "127.0.0.1:0")
+ iep = ep.(*inaming.Endpoint)
+ host, _, err = net.SplitHostPort(iep.Address)
+ if err != nil {
+ t.Errorf("unexpected error: %s", err)
+ }
+ if got, want := host, "127.0.0.1"; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+}
+
+func TestPreferredAddressErrors(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
+ ns := newNamespace()
+ paerr := func(string) (net.Addr, error) {
+ return nil, fmt.Errorf("oops")
+ }
+ server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID), veyron2.PreferredAddressOpt(paerr))
+ if err != nil {
+ t.Errorf("InternalNewServer failed: %v", err)
+ }
+ defer server.Stop()
+ ep, err := server.Listen("tcp4", ":0")
+ iep := ep.(*inaming.Endpoint)
+ host, _, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ t.Errorf("unexpected error: %s", err)
+ }
+ if got, want := host, "0.0.0.0"; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+}
+
type proxyHandle struct {
ns naming.Namespace
process *blackbox.Child
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index dec40be..5b5b36d 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -3,11 +3,13 @@
import (
"fmt"
"io"
+ "net"
"reflect"
"strings"
"sync"
"time"
+ "veyron/runtimes/google/lib/netconfig"
"veyron/runtimes/google/lib/publisher"
inaming "veyron/runtimes/google/naming"
isecurity "veyron/runtimes/google/security"
@@ -44,27 +46,27 @@
stopped bool // whether the server has been stopped.
stoppedChan chan struct{} // closed when the server has been stopped.
ns naming.Namespace
- publishOpt veyron2.ServerPublishOpt // which endpoints to publish
- publishing bool // is some name being published?
+ preferredAddress func(network string) (net.Addr, error)
servesMountTable bool
}
func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
s := &server{
- ctx: ctx,
- streamMgr: streamMgr,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listeners: make(map[stream.Listener]bool),
- stoppedChan: make(chan struct{}),
- ns: ns,
+ ctx: ctx,
+ streamMgr: streamMgr,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listeners: make(map[stream.Listener]bool),
+ stoppedChan: make(chan struct{}),
+ preferredAddress: preferredIPAddress,
+ ns: ns,
}
for _, opt := range opts {
switch opt := opt.(type) {
+ case veyron2.PreferredAddressOpt:
+ s.preferredAddress = opt
case stream.ListenerOpt:
// Collect all ServerOpts that are also ListenerOpts.
s.listenerOpts = append(s.listenerOpts, opt)
- case veyron2.ServerPublishOpt:
- s.publishOpt = opt
case veyron2.ServesMountTableOpt:
s.servesMountTable = bool(opt)
}
@@ -108,6 +110,64 @@
return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
}
+// preferredIPAddress returns the preferred IP address, which is,
+// a public IPv4 address, then any non-loopback IPv4, then a public
+// IPv6 address and finally any non-loopback/link-local IPv6
+func preferredIPAddress(network string) (net.Addr, error) {
+ interfaces, err := net.Interfaces()
+ if err != nil {
+ return nil, err
+ }
+ var any_ip4, any_ip6, pub_ip4, pub_ip6 net.Addr
+ for _, ifc := range interfaces {
+ addrs, err := ifc.Addrs()
+ if err != nil {
+ continue
+ }
+ for _, addr := range addrs {
+ ipn, ok := addr.(*net.IPNet)
+ if !ok {
+ continue
+ }
+ ip := ipn.IP
+ if ip == nil || ip.IsUnspecified() || ip.IsLoopback() || ip.IsMulticast() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
+ continue
+ }
+ if network == "tcp" || network == "tcp4" {
+ if t := ip.To4(); t != nil {
+ if any_ip4 == nil {
+ any_ip4 = addr
+ }
+ if pub_ip4 == nil && netconfig.IsGloballyRoutable(t) {
+ pub_ip4 = addr
+ }
+ }
+ }
+ if network == "tcp" || network == "tcp6" {
+ if t := ip.To16(); t != nil {
+ if any_ip6 == nil {
+ any_ip6 = addr
+ }
+ if pub_ip6 == nil && netconfig.IsGloballyRoutable(t) {
+ pub_ip6 = addr
+ }
+ }
+ }
+ }
+ }
+ switch {
+ case pub_ip4 != nil:
+ return pub_ip4, nil
+ case any_ip4 != nil:
+ return any_ip4, nil
+ case pub_ip6 != nil:
+ return pub_ip6, nil
+ case any_ip6 != nil:
+ return any_ip6, nil
+ }
+ return nil, fmt.Errorf("failed to find any usable address for %q", network)
+}
+
func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
s.Lock()
// Shortcut if the server is stopped, to avoid needlessly creating a
@@ -130,6 +190,30 @@
vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
return nil, err
}
+ iep, ok := ep.(*inaming.Endpoint)
+ if !ok {
+ return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
+ }
+
+ // We know the endpoint format, so we crack it open...
+ switch iep.Protocol {
+ case "tcp", "tcp4", "tcp6":
+ host, port, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ return nil, err
+ }
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
+ }
+ if ip.IsUnspecified() && s.preferredAddress != nil {
+ // Need to find a usable IP address.
+ if a, err := s.preferredAddress(protocol); err == nil {
+ iep.Address = net.JoinHostPort(a.String(), port)
+ }
+ }
+ }
+
s.Lock()
if s.stopped {
s.Unlock()
@@ -137,17 +221,15 @@
ln.Close()
return nil, errServerStopped
}
- publish := s.publishOpt == veyron2.PublishAll || !s.publishing
- s.publishing = true
s.listeners[ln] = true
- // We have a single goroutine per listener to accept new flows. Each flow is
- // served from its own goroutine.
+ // We have a single goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
s.active.Add(1)
if protocol == inaming.Network {
- go func(ln stream.Listener, ep naming.Endpoint, proxy string, publish bool) {
- s.proxyListenLoop(ln, ep, proxy, publish)
+ go func(ln stream.Listener, ep naming.Endpoint, proxy string) {
+ s.proxyListenLoop(ln, ep, proxy)
s.active.Done()
- }(ln, ep, proxyName, publish)
+ }(ln, ep, proxyName)
} else {
go func(ln stream.Listener, ep naming.Endpoint) {
s.listenLoop(ln, ep)
@@ -155,9 +237,7 @@
}(ln, ep)
}
s.Unlock()
- if publish {
- s.publisher.AddServer(s.publishEP(ep))
- }
+ s.publisher.AddServer(s.publishEP(ep))
return ep, nil
}
@@ -171,7 +251,7 @@
return naming.JoinAddressName(ep.String(), name)
}
-func (s *server) proxyListenLoop(ln stream.Listener, ep naming.Endpoint, proxy string, publish bool) {
+func (s *server) proxyListenLoop(ln stream.Listener, ep naming.Endpoint, proxy string) {
const (
min = 5 * time.Millisecond
max = 5 * time.Minute
@@ -180,9 +260,7 @@
s.listenLoop(ln, ep)
// The listener is done, so:
// (1) Unpublish its name
- if publish {
- s.publisher.RemoveServer(s.publishEP(ep))
- }
+ s.publisher.RemoveServer(s.publishEP(ep))
// (2) Reconnect to the proxy unless the server has been stopped
backoff := min
ln = nil
@@ -208,9 +286,7 @@
}
}
// (3) reconnected, publish new address
- if publish {
- s.publisher.AddServer(s.publishEP(ep))
- }
+ s.publisher.AddServer(s.publishEP(ep))
s.Lock()
s.listeners[ln] = true
s.Unlock()
@@ -386,7 +462,7 @@
for _, n := range id.Names() {
in[security.BlessingPattern(n+security.ChainSeparator+string(security.AllPrincipals))] = security.AllLabels
}
- return vsecurity.NewWhitelistACL(in)
+ return security.ACL{In: in}
}
func (fs *flowServer) serve() error {
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index 1921def..47646d5 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -13,7 +13,6 @@
"veyron/runtimes/google/ipc/version"
inaming "veyron/runtimes/google/naming"
- "veyron2"
"veyron2/ipc/stream"
"veyron2/naming"
"veyron2/verror"
@@ -122,17 +121,6 @@
}
func (m *manager) Listen(protocol, address string, opts ...stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
- var rewriteEP string
- var filteredOpts []stream.ListenerOpt
- for _, o := range opts {
- if rewriteOpt, ok := o.(veyron2.EndpointRewriteOpt); ok {
- // Last one 'wins'.
- rewriteEP = string(rewriteOpt)
- } else {
- filteredOpts = append(filteredOpts, o)
- }
- }
- opts = filteredOpts
m.muListeners.Lock()
if m.shutdown {
m.muListeners.Unlock()
@@ -162,19 +150,7 @@
ln := newNetListener(m, netln, opts)
m.listeners[ln] = true
m.muListeners.Unlock()
-
- network, address := netln.Addr().Network(), netln.Addr().String()
- if network == "tcp" && len(rewriteEP) > 0 {
- if _, port, err := net.SplitHostPort(address); err != nil {
- return nil, nil, fmt.Errorf("%q not a valid address: %v", address, err)
- } else {
- address = net.JoinHostPort(rewriteEP, port)
- }
- }
- // We use protocol rather than network when creating the endpoint to
- // honour the original request to Listen even if tcp is used under the
- // covers.
- ep := version.Endpoint(protocol, address, m.rid)
+ ep := version.Endpoint(protocol, netln.Addr().String(), m.rid)
return ln, ep, nil
}
diff --git a/runtimes/google/rt/ipc_test.go b/runtimes/google/rt/ipc_test.go
index f1499c5..d02406b 100644
--- a/runtimes/google/rt/ipc_test.go
+++ b/runtimes/google/rt/ipc_test.go
@@ -98,7 +98,7 @@
add(serverR.PublicIDStore(), googleYoutubeService, "")
// Add PublicIDs for communicating the "google/gmail" and "google/youtube" services
// to the clientR's PublicIDStore.
- add(clientR.PublicIDStore(), googleGmailClient, "google/*")
+ add(clientR.PublicIDStore(), googleGmailClient, "google/...")
add(clientR.PublicIDStore(), googleYoutubeClient, "google/youtube")
type testcase struct {
@@ -162,10 +162,9 @@
}
defer stopServer(server)
if err := server.Serve("", ipc.LeafDispatcher(&testService{},
- vsecurity.NewACLAuthorizer(vsecurity.NewWhitelistACL(
- map[security.BlessingPattern]security.LabelSet{
- security.AllPrincipals: security.AllLabels,
- })))); err != nil {
+ vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
+ security.AllPrincipals: security.AllLabels,
+ }}))); err != nil {
t.Errorf("error serving service: ", err)
continue
}
diff --git a/runtimes/google/rt/rt_test.go b/runtimes/google/rt/rt_test.go
index 0fc425f..06e5fa4 100644
--- a/runtimes/google/rt/rt_test.go
+++ b/runtimes/google/rt/rt_test.go
@@ -66,7 +66,7 @@
os.TempDir())
if str != expected {
- t.Fatalf("unexpected output from child: %s", str)
+ t.Fatalf("incorrect child output: got %s, expected %s", str, expected)
}
c.CloseStdin()
c.Expect("done")
diff --git a/runtimes/google/security/identity_test.go b/runtimes/google/security/identity_test.go
index eef3e12..95184c4 100644
--- a/runtimes/google/security/identity_test.go
+++ b/runtimes/google/security/identity_test.go
@@ -86,22 +86,22 @@
matchData []matchInstance
}{
{
- // self-signed alice chain, not a trusted identity provider so should only match "*"
+ // self-signed alice chain, not a trusted identity provider so should only match "..."
id: alice.PublicID(),
matchData: []matchInstance{
- {pattern: "*", want: true},
+ {pattern: "...", want: true},
{pattern: "alice", want: false},
- {pattern: "alice/*", want: false},
+ {pattern: "alice/...", want: false},
},
},
{
// veyron/alice: rooted in the trusted "veyron" identity provider
id: bless(newChain("immaterial").PublicID(), veyronChain, "alice", nil),
matchData: []matchInstance{
- {pattern: "*", want: true},
- {pattern: "veyron/*", want: true},
+ {pattern: "...", want: true},
+ {pattern: "veyron/...", want: true},
{pattern: "veyron/alice", want: true},
- {pattern: "veyron/alice/*", want: true},
+ {pattern: "veyron/alice/...", want: true},
{pattern: "veyron/alice/TV", want: true},
{pattern: "veyron", want: false},
{pattern: "veyron/ali", want: false},
@@ -114,15 +114,15 @@
// alice#veyron/alice#google/alice: two trusted identity providers
id: newSetPublicID(alice.PublicID(), bless(alice.PublicID(), veyronChain, "alice", nil), bless(alice.PublicID(), googleChain, "alice", nil)),
matchData: []matchInstance{
- {pattern: "*", want: true},
+ {pattern: "...", want: true},
// Since alice is not a trusted identity provider, the self-blessed identity
- // should not match "alice/*"
+ // should not match "alice/..."
{pattern: "alice", want: false},
- {pattern: "alice/*", want: false},
- {pattern: "veyron/*", want: true},
+ {pattern: "alice/...", want: false},
+ {pattern: "veyron/...", want: true},
{pattern: "veyron/alice", want: true},
{pattern: "veyron/alice/TV", want: true},
- {pattern: "veyron/alice/*", want: true},
+ {pattern: "veyron/alice/...", want: true},
{pattern: "ali", want: false},
{pattern: "aliced", want: false},
{pattern: "veyron", want: false},
@@ -131,7 +131,7 @@
{pattern: "veyron/bob", want: false},
{pattern: "google/alice", want: true},
{pattern: "google/alice/TV", want: true},
- {pattern: "google/alice/*", want: true},
+ {pattern: "google/alice/...", want: true},
},
},
}
@@ -308,8 +308,8 @@
cavOnlyPlayAtGoogle = methodRestrictionCaveat("google", S{"Play"})
// Can only talk to the "Google" service
cavOnlyGoogle = peerIdentityCaveat("google")
- // Can only call the PublicProfile method on veyron/alice/*
- cavOnlyPublicProfile = methodRestrictionCaveat("veyron/alice/*", S{"PublicProfile"})
+ // Can only call the PublicProfile method on veyron/alice/...
+ cavOnlyPublicProfile = methodRestrictionCaveat("veyron/alice/...", S{"PublicProfile"})
)
type rpc struct {
diff --git a/runtimes/google/security/publicid_store_test.go b/runtimes/google/security/publicid_store_test.go
index 394cb49..a60ada0 100644
--- a/runtimes/google/security/publicid_store_test.go
+++ b/runtimes/google/security/publicid_store_test.go
@@ -41,17 +41,17 @@
t.Fatalf("NewPublicIDStore failed: %s", err)
}
// First Add should succeed for any PublicID (cAlice.PublicID() below)
- if err := s.Add(cAlice.PublicID(), "alice/*"); err != nil {
+ if err := s.Add(cAlice.PublicID(), "alice/..."); err != nil {
t.Fatalf("%s.Add(%q, ...) failed unexpectedly: %s", s, cAlice.PublicID(), err)
}
// Subsequent Adds must succeed only for PublicIDs with cAlice's public key.
- if err := s.Add(cVeyronAlice.PublicID(), "*"); err != nil {
+ if err := s.Add(cVeyronAlice.PublicID(), "..."); err != nil {
t.Fatalf("%s.Add(%q, ...) failed unexpectedly: %s", s, cVeyronAlice.PublicID(), err)
}
- if err := s.Add(sAlice, "alice/*"); err != nil {
+ if err := s.Add(sAlice, "alice/..."); err != nil {
t.Fatalf("%s.Add(%q, ...) failed unexpectedly: %s", s, sAlice, err)
}
- if got, want := s.Add(cBob.PublicID(), "bob/*"), errStoreAddMismatch; got != want {
+ if got, want := s.Add(cBob.PublicID(), "bob/..."), errStoreAddMismatch; got != want {
t.Fatalf("%s.Add(%q, ...): got: %s, want: %s", s, cBob, got, want)
}
}
@@ -68,13 +68,13 @@
{"veyron", true},
{"veyron/alice@google", true},
{"veyron/alice@google/bob", true},
- {"veyron/alice@google/*", true},
+ {"veyron/alice@google/...", true},
{"", false},
- {"veyron*", false},
- {"*veyron", false},
+ {"veyron...", false},
+ {"...veyron", false},
{"/veyron", false},
{"veyron/", false},
- {"veyron/*/alice", false},
+ {"veyron/.../alice", false},
}
for _, d := range defaultPatterns {
if got := s.SetDefaultBlessingPattern(d.pattern); d.success != (got == nil) {
@@ -115,14 +115,14 @@
if err != nil {
t.Fatalf("NewPublicIDStore failed: %s", err)
}
- add(s, cGoogleAlice, "google") // use cGoogleAlice against all peers matching "google/*"
- add(s, cGoogleAlice, "veyron") // use cGoogleAlice against all peers matching "veyron/*" as well
- add(s, cVeyronAlice, "veyron/*") // use cVeyronAlice against peers matching "veyron/*"
- add(s, cVeyronAlice, "google") // use cVeyronAlice against peers matching "veyron/*"
- add(s, cVeyronServiceAlice, "veyron/service/*") // use cVeyronAlice against peers matching "veyron/service*"
- add(s, cGoogleServiceAlice, "google/service/*") // use cGoogleServiceAlice against peers matching "google/service/*"
- add(s, sGoogleAlice, "google/service") // use any PublicID from sGoogleAlice against peers matching "google/service"
- add(s, sAllAlice, "veyron") // use any PublicID from sAllAlice against peers matching "veyron"
+ add(s, cGoogleAlice, "google") // use cGoogleAlice against all peers matching "google/..."
+ add(s, cGoogleAlice, "veyron") // use cGoogleAlice against all peers matching "veyron/..." as well
+ add(s, cVeyronAlice, "veyron/...") // use cVeyronAlice against peers matching "veyron/..."
+ add(s, cVeyronAlice, "google") // use cVeyronAlice against peers matching "veyron/..."
+ add(s, cVeyronServiceAlice, "veyron/service/...") // use cVeyronAlice against peers matching "veyron/service/..."
+ add(s, cGoogleServiceAlice, "google/service/...") // use cGoogleServiceAlice against peers matching "google/service/..."
+ add(s, sGoogleAlice, "google/service") // use any PublicID from sGoogleAlice against peers matching "google/service"
+ add(s, sAllAlice, "veyron") // use any PublicID from sAllAlice against peers matching "veyron"
pkey := cAlice.PublicID().PublicKey()
@@ -157,13 +157,13 @@
defaultNames []string
}{
{"veyron", nil},
- {"veyron/*", []string{"veyron/alice", "veyron/service/user-24"}},
+ {"veyron/...", []string{"veyron/alice", "veyron/service/user-24"}},
{"veyron/alice", []string{"veyron/alice"}},
- {"veyron/service/*", []string{"veyron/service/user-24"}},
+ {"veyron/service/...", []string{"veyron/service/user-24"}},
{"google", nil},
- {"google/*", []string{"google/alice", "google/service/user-42"}},
+ {"google/...", []string{"google/alice", "google/service/user-42"}},
{"google/alice", []string{"google/alice"}},
- {"google/service/*", []string{"google/service/user-42"}},
+ {"google/service/...", []string{"google/service/user-42"}},
{"bob", nil},
}
for _, d := range testDataByBlessingPattern {
@@ -203,10 +203,10 @@
if err != nil {
t.Fatalf("NewPublicIDStore failed: %s", err)
}
- if err := s.Add(sAllAlice, "google/*"); err != nil {
+ if err := s.Add(sAllAlice, "google/..."); err != nil {
t.Fatalf("%s.Add(%q, ...) failed unexpectedly: %s", s, sAllAlice, err)
}
- if err := s.SetDefaultBlessingPattern("veyron/*"); err != nil {
+ if err := s.SetDefaultBlessingPattern("veyron/..."); err != nil {
t.Fatalf("%s.SetDefaultBlessingPattern failed: %s", s, err)
}
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 58fa467..8d3f461 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -131,12 +131,12 @@
w.curTx = NoTxID
stream := call.RecvStream()
for stream.Advance() {
- changes := stream.Value()
+ change := stream.Value()
- // Timestamp of these changes arriving at the Sync server.
+ // Timestamp of the change arriving at the Sync server.
syncTime := time.Now().UnixNano()
- if err := w.processChanges(changes, syncTime); err != nil {
+ if err := w.processChange(change, syncTime); err != nil {
// TODO(rdaoud): don't crash, instead add retry policies to attempt some degree of
// self-healing from a data corruption where feasible, otherwise quarantine this device
// from the cluster and stop Syncd to avoid propagating data corruptions.
@@ -155,55 +155,47 @@
}
}
-// processChanges applies the batch of changes (object mutations) received from the Watch API.
+// processChange applies a change (object mutation) received from the Watch API.
// The function grabs the write-lock to access the Log and DAG DBs.
-func (w *syncWatcher) processChanges(changes types.ChangeBatch, syncTime int64) error {
+func (w *syncWatcher) processChange(ch types.Change, syncTime int64) error {
w.syncd.lock.Lock()
defer w.syncd.lock.Unlock()
- vlog.VI(1).Infof("processChanges:: ready to process changes")
+ vlog.VI(1).Infof("processChange:: ready to process change")
- var lastResmark []byte
- for i := range changes.Changes {
- ch := &changes.Changes[i]
- mu, ok := ch.Value.(*raw.Mutation)
- if !ok {
- return fmt.Errorf("invalid change value, not a mutation: %#v", ch)
- }
+ mu, ok := ch.Value.(*raw.Mutation)
+ if !ok {
+ return fmt.Errorf("invalid change value, not a mutation: %#v", ch)
+ }
- // Begin a new transaction if needed.
- if w.curTx == NoTxID && ch.Continued {
- w.curTx = w.syncd.dag.addNodeTxStart()
- w.curTxSyncTime = syncTime
- }
+ // Begin a new transaction if needed.
+ if w.curTx == NoTxID && ch.Continued {
+ w.curTx = w.syncd.dag.addNodeTxStart()
+ w.curTxSyncTime = syncTime
+ }
- time := syncTime
- if w.curTx != NoTxID {
- // All LogValues belonging to the same transaction get the same timestamp.
- time = w.curTxSyncTime
- }
- val := &LogValue{Mutation: *mu, SyncTime: time, Delete: ch.State == types.DoesNotExist, Continued: ch.Continued}
- vlog.VI(2).Infof("processChanges:: processing record %v, Tx %v", val, w.curTx)
- if err := w.syncd.log.processWatchRecord(mu.ID, mu.Version, mu.PriorVersion, val, w.curTx); err != nil {
- return fmt.Errorf("cannot process mutation: %#v: %s", ch, err)
- }
+ time := syncTime
+ if w.curTx != NoTxID {
+ // All LogValues belonging to the same transaction get the same timestamp.
+ time = w.curTxSyncTime
+ }
+ val := &LogValue{Mutation: *mu, SyncTime: time, Delete: ch.State == types.DoesNotExist, Continued: ch.Continued}
+ vlog.VI(2).Infof("processChanges:: processing record %v, Tx %v", val, w.curTx)
+ if err := w.syncd.log.processWatchRecord(mu.ID, mu.Version, mu.PriorVersion, val, w.curTx); err != nil {
+ return fmt.Errorf("cannot process mutation: %#v: %s", ch, err)
+ }
- if !ch.Continued {
- lastResmark = ch.ResumeMarker
+ // End the previous transaction if any.
+ if w.curTx != NoTxID && !ch.Continued {
+ if err := w.syncd.dag.addNodeTxEnd(w.curTx); err != nil {
+ return err
}
-
- // End the previous transaction if any.
- if w.curTx != NoTxID && !ch.Continued {
- if err := w.syncd.dag.addNodeTxEnd(w.curTx); err != nil {
- return err
- }
- w.curTx = NoTxID
- }
+ w.curTx = NoTxID
}
// If the resume marker changed, update the device table.
- if lastResmark != nil {
- w.syncd.devtab.head.Resmark = lastResmark
+ if !ch.Continued {
+ w.syncd.devtab.head.Resmark = ch.ResumeMarker
}
return nil
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index ac23e76..899d58c 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -72,19 +72,27 @@
// fakeStream is used to simulate the reply stream of the Watch() API.
type fakeStream struct {
+ changes []types.Change
+ i int
+ resmark byte
canceled chan struct{}
err error
}
func newFakeStream() *fakeStream {
s := &fakeStream{}
+ if info.watchResmark != nil {
+ s.resmark = info.watchResmark[0]
+ }
+ s.changes = getChanges()
+ s.i = -1
s.canceled = make(chan struct{})
return s
}
func (s *fakeStream) RecvStream() interface {
Advance() bool
- Value() types.ChangeBatch
+ Value() types.Change
Err() error
} {
return s
@@ -98,6 +106,11 @@
return false
}
+ if s.i+1 == len(s.changes) {
+ // Make sure the next Recv() call returns EOF on the stream.
+ info.eofRecv = true
+ }
+
// If "eofRecv" is set, simulate a closed stream and make sure the next Recv() call blocks.
if info.eofRecv {
info.eofRecv, info.blockRecv = false, true
@@ -112,30 +125,23 @@
s.err = nil
return false
}
- // Otherwise return a batch of changes, and make sure the next Recv() call returns EOF on the stream.
+
+ // Otherwise return a change.
// Adjust the resume marker of the change records to follow the one given to the Watch request.
- info.eofRecv = true
+ s.i++
return true
}
-func (s *fakeStream) Value() types.ChangeBatch {
- changes := getChangeBatch()
+func (s *fakeStream) Value() types.Change {
+ ch := s.changes[s.i]
- var lastCount byte
- if info.watchResmark != nil {
- lastCount = info.watchResmark[0]
+ if !ch.Continued {
+ s.resmark++
+ resmark := []byte{s.resmark, 0, 0, 0, 0, 0, 0, 0}
+ ch.ResumeMarker = resmark
}
- for i := range changes.Changes {
- ch := &changes.Changes[i]
- if !ch.Continued {
- lastCount++
- resmark := []byte{lastCount, 0, 0, 0, 0, 0, 0, 0}
- changes.Changes[i].ResumeMarker = resmark
- }
- }
-
- return changes
+ return ch
}
func (s *fakeStream) Err() error {
@@ -149,13 +155,11 @@
close(s.canceled)
}
-// getChangeBatch returns a batch of store mutations used to simulate the Watch API.
+// getChanges returns a batch of store mutations used to simulate the Watch API.
// The batch contains two transactions to verify both new-object creation and the
// mutation of an existing object.
-func getChangeBatch() types.ChangeBatch {
- var batch types.ChangeBatch
-
- batch.Changes = []types.Change{
+func getChanges() []types.Change {
+ return []types.Change{
// 1st transaction: create "/" and "/a" and "/a/b" as 3 new objects (prior versions are 0).
types.Change{
Name: "",
@@ -289,8 +293,6 @@
Continued: false,
},
}
-
- return batch
}
// initTestDir creates a per-test directory to store the Sync DB files and returns it.
diff --git a/security/acl_authorizer_test.go b/security/acl_authorizer_test.go
index d71363f..1d04e89 100644
--- a/security/acl_authorizer_test.go
+++ b/security/acl_authorizer_test.go
@@ -187,35 +187,35 @@
// ACL for testing
acl := security.ACL{}
- acl.In.Principals = map[security.BlessingPattern]security.LabelSet{
- "*": LS(R),
- "fake/veyron/alice/*": LS(W, R),
- "fake/veyron/alice": LS(A, D, M),
- "fake/veyron/bob": LS(D, M),
- "fake/veyron/che/*": LS(W, R),
- "fake/veyron/che": LS(W, R),
+ acl.In = map[security.BlessingPattern]security.LabelSet{
+ "...": LS(R),
+ "fake/veyron/alice/...": LS(W, R),
+ "fake/veyron/alice": LS(A, D, M),
+ "fake/veyron/bob": LS(D, M),
+ "fake/veyron/che/...": LS(W, R),
+ "fake/veyron/che": LS(W, R),
}
- acl.NotIn.Principals = map[security.BlessingPattern]security.LabelSet{
+ acl.NotIn = map[string]security.LabelSet{
"fake/veyron/che/friend": LS(W),
}
// Authorizations for the above ACL.
authorizations := authMap{
- // alice and bob have only what "*" has.
+ // alice and bob have only what "..." has.
alice: LS(R),
bob: LS(R),
che: LS(R),
// veyron and veyronAlice have R, W, A, D, M from the "veyron/alice" and
- // "veyron/alice/*" ACL entries.
+ // "veyron/alice/..." ACL entries.
veyron: LS(R, W, A, D, M),
veyronAlice: LS(R, W, A, D, M),
- // veyronBob has R, D, M from "*" and "veyron/bob" ACL entries.
+ // veyronBob has R, D, M from "..." and "veyron/bob" ACL entries.
veyronBob: LS(R, D, M),
- // veyronAliceFriend has W, R from the "veyron/alice/*" ACL entry.
+ // veyronAliceFriend has W, R from the "veyron/alice/..." ACL entry.
veyronAliceFriend: LS(W, R),
// veyronChe has W, R from the "veyron/che" entry.
veyronChe: LS(W, R),
- // veyronCheFriend has W, R from the "veyron/che/*" entry, but loses W
+ // veyronCheFriend has W, R from the "veyron/che/..." entry, but loses W
// from the blacklist entry "veyron/che/friend".
veyronCheFriend: LS(R),
// nil PublicIDs are not authorized.
@@ -236,7 +236,7 @@
// Modify the ACL stored in the file and verify that the authorizations appropriately
// change for the fileACLAuthorizer.
- acl.In.Principals["fake/veyron/bob"] = LS(R, W, A, D, M)
+ acl.In["fake/veyron/bob"] = LS(R, W, A, D, M)
updateACLInFile(fileName, acl)
authorizations[veyronBob] = LS(R, W, A, D, M)
diff --git a/security/flag/flag_test.go b/security/flag/flag_test.go
index 4bd60fe..ffa8d93 100644
--- a/security/flag/flag_test.go
+++ b/security/flag/flag_test.go
@@ -30,10 +30,10 @@
}
var (
acl1 = security.ACL{}
- acl2 = vsecurity.NewWhitelistACL(map[security.BlessingPattern]security.LabelSet{
+ acl2 = security.ACL{In: map[security.BlessingPattern]security.LabelSet{
"veyron/alice": security.LabelSet(security.ReadLabel | security.WriteLabel),
"veyron/bob": security.LabelSet(security.ReadLabel),
- })
+ }}
)
acl2File := tsecurity.SaveACLToFile(acl2)
defer os.Remove(acl2File)
@@ -52,11 +52,11 @@
wantAuth: vsecurity.NewACLAuthorizer(acl1),
},
{
- flags: flagValue{"acl": "{\"In\":{\"Principals\":{\"veyron/alice\":\"RW\", \"veyron/bob\": \"R\"}}}"},
+ flags: flagValue{"acl": `{"In":{"veyron/alice":"RW", "veyron/bob": "R"}}`},
wantAuth: vsecurity.NewACLAuthorizer(acl2),
},
{
- flags: flagValue{"acl": "{\"In\":{\"Principals\":{\"veyron/bob\":\"R\", \"veyron/alice\": \"WR\"}}}"},
+ flags: flagValue{"acl": `{"In":{"veyron/bob":"R", "veyron/alice": "WR"}}`},
wantAuth: vsecurity.NewACLAuthorizer(acl2),
},
{
@@ -64,7 +64,7 @@
wantAuth: vsecurity.NewFileACLAuthorizer(acl2File),
},
{
- flags: flagValue{"acl_file": acl2File, "acl": "{\"In\":{\"Principals\":{\"veyron/alice\":\"RW\", \"veyron/bob\": \"R\"}}}"},
+ flags: flagValue{"acl_file": acl2File, "acl": `{"In":{"veyron/alice":"RW", "veyron/bob": "R"}}`},
wantPanic: true,
},
}
diff --git a/security/util.go b/security/util.go
index 7755797..b709c31 100644
--- a/security/util.go
+++ b/security/util.go
@@ -11,11 +11,10 @@
var nullACL security.ACL
-// NewWhitelistACL creates an ACL that grants access to only the provided
-// principals.
-func NewWhitelistACL(principals map[security.BlessingPattern]security.LabelSet) security.ACL {
+// OpenACL creates an ACL that grants access to all principals.
+func OpenACL() security.ACL {
acl := security.ACL{}
- acl.In.Principals = principals
+ acl.In = map[security.BlessingPattern]security.LabelSet{security.AllPrincipals: security.AllLabels}
return acl
}
diff --git a/security/util_test.go b/security/util_test.go
index 6c97fb6..d535039 100644
--- a/security/util_test.go
+++ b/security/util_test.go
@@ -27,12 +27,12 @@
func TestLoadSaveACL(t *testing.T) {
acl := security.ACL{}
- acl.In.Principals = map[security.BlessingPattern]security.LabelSet{
- "veyron/*": security.LabelSet(security.ReadLabel),
+ acl.In = map[security.BlessingPattern]security.LabelSet{
+ "veyron/...": security.LabelSet(security.ReadLabel),
"veyron/alice": security.LabelSet(security.ReadLabel | security.WriteLabel),
"veyron/bob": security.LabelSet(security.AdminLabel),
}
- acl.NotIn.Principals = map[security.BlessingPattern]security.LabelSet{
+ acl.NotIn = map[string]security.LabelSet{
"veyron/che": security.LabelSet(security.ReadLabel),
}
diff --git a/services/identity/identityd/main.go b/services/identity/identityd/main.go
index 48ba659..68cebb2 100644
--- a/services/identity/identityd/main.go
+++ b/services/identity/identityd/main.go
@@ -138,9 +138,9 @@
func newDispatcher(params blesser.GoogleParams) ipc.Dispatcher {
blessingService := ipc.ReflectInvoker(blesser.NewGoogleOAuthBlesserServer(params))
dischargerService := ipc.ReflectInvoker(services.NewServerDischarger(discharger.NewDischarger(params.R.Identity())))
- allowEveryoneACLAuth := vsecurity.NewACLAuthorizer(vsecurity.NewWhitelistACL(map[security.BlessingPattern]security.LabelSet{
+ allowEveryoneACLAuth := vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
security.AllPrincipals: security.AllLabels,
- }))
+ }})
return &dispatcher{blessingService, dischargerService, allowEveryoneACLAuth}
}
diff --git a/services/mgmt/node/impl/app_invoker.go b/services/mgmt/node/impl/app_invoker.go
index ef8301a..3571978 100644
--- a/services/mgmt/node/impl/app_invoker.go
+++ b/services/mgmt/node/impl/app_invoker.go
@@ -23,10 +23,9 @@
// logs/ - stderr/stdout and log files generated by instance
// info - app manager name and process id for the instance (if running)
// version - symbolic link to installation version for the instance
+// <status> - one of the values for instanceState enum
// instance-<id b>
// ...
-// stopped-instance-<id c> - stopped instances have their directory name prepended by 'stopped-'
-// ...
// installation-<id 2>
// ...
// app-<hash 2>
@@ -48,13 +47,31 @@
// Concurrency model: installations can be created independently of one another;
// installations can be removed at any time (any running instances will be
// stopped). The first call to Uninstall will rename the installation dir as a
-// first step; subsequent Uninstalls will fail. Instances can be created
+// first step; subsequent Uninstall's will fail. Instances can be created
// independently of one another, as long as the installation exists (if it gets
-// Uninstalled during an instance Start, the Start may fail). When an instance
-// is stopped, the first call to Stop renames the instance dir; subsequent Stop
-// calls will fail. Resume will attempt to create an info file; if one exists
-// already, Resume fails. Suspend will attempt to rename the info file; if none
-// present, Suspend will fail.
+// Uninstall'ed during an instance Start, the Start may fail).
+//
+// The status file present in each instance is used to flag the state of the
+// instance and prevent concurrent operations against the instance:
+//
+// - when an instance is created with Start, it is placed in state 'suspended'.
+// To run the instance, Start transitions 'suspended' to 'starting' and then
+// 'started' (upon success) or the instance is deleted (upon failure).
+//
+// - Suspend attempts to transition from 'started' to 'suspending' (if the
+// instance was not in 'started' state, Suspend fails). From 'suspending', the
+// instance transitions to 'suspended' upon success or back to 'started' upon
+// failure.
+//
+// - Resume attempts to transition from 'suspended' to 'starting' (if the
+// instance was not in 'suspended' state, Resume fails). From 'starting', the
+// instance transitions to 'started' upon success or back to 'suspended' upon
+// failure.
+//
+// - Stop attempts to transition from 'started' to 'stopping' and then to
+// 'stopped' (upon success) or back to 'started' (upon failure); or from
+// 'suspended' to 'stopped'. If the initial state is neither 'started' or
+// 'suspended', Stop fails.
//
// TODO(caprita): There is room for synergy between how node manager organizes
// its own workspace and that for the applications it runs. In particular,
@@ -88,6 +105,61 @@
"veyron2/vlog"
)
+// instanceState describes the states that an instance can be in at any time.
+type instanceState int
+
+const (
+ starting instanceState = iota
+ started
+ suspending
+ suspended
+ stopping
+ stopped
+)
+
+// String returns the name that will be used to encode the state as a file name
+// in the instance's dir.
+func (s instanceState) String() string {
+ switch s {
+ case starting:
+ return "starting"
+ case started:
+ return "started"
+ case suspending:
+ return "suspending"
+ case suspended:
+ return "suspended"
+ case stopping:
+ return "stopping"
+ case stopped:
+ return "stopped"
+ default:
+ return "unknown"
+ }
+}
+
+func transition(instanceDir string, initial, target instanceState) error {
+ initialState := filepath.Join(instanceDir, initial.String())
+ targetState := filepath.Join(instanceDir, target.String())
+ if err := os.Rename(initialState, targetState); err != nil {
+ if os.IsNotExist(err) {
+ return errInvalidOperation
+ }
+ vlog.Errorf("Rename(%v, %v) failed: %v", initialState, targetState, err) // Something went really wrong.
+ return errOperationFailed
+ }
+ return nil
+}
+
+func initializeState(instanceDir string, initial instanceState) error {
+ initialStatus := filepath.Join(instanceDir, initial.String())
+ if err := ioutil.WriteFile(initialStatus, []byte("status"), 0600); err != nil {
+ vlog.Errorf("WriteFile(%v) failed: %v", initialStatus, err)
+ return errOperationFailed
+ }
+ return nil
+}
+
// instanceInfo holds state about a running instance.
type instanceInfo struct {
AppCycleMgrName string
@@ -200,8 +272,13 @@
return "instance-" + instanceID
}
-func stoppedInstanceDirName(instanceID string) string {
- return "stopped-instance-" + instanceID
+func mkdir(dir string) error {
+ perm := os.FileMode(0700)
+ if err := os.MkdirAll(dir, perm); err != nil {
+ vlog.Errorf("MkdirAll(%v, %v) failed: %v", dir, perm, err)
+ return err
+ }
+ return nil
}
func (i *appInvoker) Install(call ipc.ServerContext, applicationVON string) (string, error) {
@@ -222,9 +299,7 @@
installationID := generateID()
installationDir := filepath.Join(i.config.Root, applicationDirName(envelope.Title), installationDirName(installationID))
versionDir := filepath.Join(installationDir, generateVersionDirName())
- perm := os.FileMode(0700)
- if err := os.MkdirAll(versionDir, perm); err != nil {
- vlog.Errorf("MkdirAll(%v, %v) failed: %v", versionDir, perm, err)
+ if err := mkdir(versionDir); err != nil {
return "", errOperationFailed
}
deferrer := func() {
@@ -266,11 +341,6 @@
return nil
}
-func (*appInvoker) Resume(ipc.ServerContext) error {
- // TODO(jsimsa): Implement.
- return nil
-}
-
func (*appInvoker) Revert(ipc.ServerContext) error {
// TODO(jsimsa): Implement.
return nil
@@ -285,20 +355,17 @@
// the app (to point to the device mounttable).
cmd.Env = envelope.Env
rootDir := filepath.Join(instanceDir, "root")
- perm := os.FileMode(0700)
- if err := os.MkdirAll(rootDir, perm); err != nil {
- vlog.Errorf("MkdirAll(%v, %v) failed: %v", rootDir, perm, err)
+ if err := mkdir(rootDir); err != nil {
return nil, err
}
cmd.Dir = rootDir
logDir := filepath.Join(instanceDir, "logs")
- if err := os.MkdirAll(logDir, perm); err != nil {
- vlog.Errorf("MkdirAll(%v, %v) failed: %v", logDir, perm, err)
+ if err := mkdir(logDir); err != nil {
return nil, err
}
timestamp := time.Now().UnixNano()
var err error
- perm = os.FileMode(0600)
+ perm := os.FileMode(0600)
cmd.Stdout, err = os.OpenFile(filepath.Join(logDir, fmt.Sprintf("STDOUT-%d", timestamp)), os.O_WRONLY|os.O_CREATE, perm)
if err != nil {
return nil, err
@@ -335,29 +402,60 @@
return installationDir, nil
}
-func (i *appInvoker) Start(ipc.ServerContext) ([]string, error) {
+// newInstance sets up the directory for a new application instance.
+func (i *appInvoker) newInstance() (string, string, error) {
installationDir, err := i.installationDir()
if err != nil {
- return nil, err
+ return "", "", err
+ }
+ instanceID := generateID()
+ instanceDir := filepath.Join(installationDir, "instances", instanceDirName(instanceID))
+ if mkdir(instanceDir) != nil {
+ return "", instanceID, errOperationFailed
}
currLink := filepath.Join(installationDir, "current")
- envelope, err := loadEnvelope(currLink)
+ versionDir, err := filepath.EvalSymlinks(currLink)
+ if err != nil {
+ vlog.Errorf("EvalSymlinks(%v) failed: %v", currLink, err)
+ return instanceDir, instanceID, err
+ }
+ versionLink := filepath.Join(instanceDir, "version")
+ if err := os.Symlink(versionDir, versionLink); err != nil {
+ vlog.Errorf("Symlink(%v, %v) failed: %v", versionDir, versionLink, err)
+ return instanceDir, instanceID, errOperationFailed
+ }
+ if err := initializeState(instanceDir, suspended); err != nil {
+ return instanceDir, instanceID, err
+ }
+ return instanceDir, instanceID, nil
+}
+
+func genCmd(instanceDir string) (*exec.Cmd, error) {
+ versionLink := filepath.Join(instanceDir, "version")
+ versionDir, err := filepath.EvalSymlinks(versionLink)
+ if err != nil {
+ vlog.Errorf("EvalSymlinks(%v) failed: %v", versionLink, err)
+ return nil, errOperationFailed
+ }
+ envelope, err := loadEnvelope(versionDir)
if err != nil {
return nil, err
}
- binPath := filepath.Join(currLink, "bin")
+ binPath := filepath.Join(versionDir, "bin")
if _, err := os.Stat(binPath); err != nil {
vlog.Errorf("Stat(%v) failed: %v", binPath, err)
return nil, errOperationFailed
}
- instanceID := generateID()
- // TODO(caprita): Clean up instanceDir upon failure.
- instanceDir := filepath.Join(installationDir, "instances", instanceDirName(instanceID))
+ // TODO(caprita): Fold generateCommand inline here.
cmd, err := generateCommand(envelope, binPath, instanceDir)
if err != nil {
vlog.Errorf("generateCommand(%v, %v, %v) failed: %v", envelope, binPath, instanceDir, err)
return nil, errOperationFailed
}
+ return cmd, nil
+}
+
+func (i *appInvoker) startCmd(instanceDir string, cmd *exec.Cmd) error {
// Setup up the child process callback.
callbackState := i.callback
listener := callbackState.listenFor(mgmt.AppCycleManagerConfigKey)
@@ -365,55 +463,92 @@
cfg := config.New()
cfg.Set(mgmt.ParentNodeManagerConfigKey, listener.name())
handle := vexec.NewParentHandle(cmd, vexec.ConfigOpt{cfg})
+ defer func() {
+ if handle != nil {
+ if err := handle.Clean(); err != nil {
+ vlog.Errorf("Clean() failed: %v", err)
+ }
+ }
+ }()
// Start the child process.
if err := handle.Start(); err != nil {
vlog.Errorf("Start() failed: %v", err)
- return nil, errOperationFailed
+ return errOperationFailed
}
// Wait for the child process to start.
timeout := 10 * time.Second
if err := handle.WaitForReady(timeout); err != nil {
vlog.Errorf("WaitForReady(%v) failed: %v", timeout, err)
- if err := handle.Clean(); err != nil {
- vlog.Errorf("Clean() failed: %v", err)
- }
- return nil, errOperationFailed
+ return errOperationFailed
}
childName, err := listener.waitForValue(timeout)
if err != nil {
- if err := handle.Clean(); err != nil {
- vlog.Errorf("Clean() failed: %v", err)
- }
- return nil, errOperationFailed
+ return errOperationFailed
}
instanceInfo := &instanceInfo{
AppCycleMgrName: childName,
Pid: handle.Pid(),
}
if err := saveInstanceInfo(instanceDir, instanceInfo); err != nil {
- if err := handle.Clean(); err != nil {
- vlog.Errorf("Clean() failed: %v", err)
- }
- return nil, err
+ return err
}
// TODO(caprita): Spin up a goroutine to reap child status upon exit and
// transition it to suspended state if it exits on its own.
+ handle = nil
+ return nil
+}
+
+func (i *appInvoker) run(instanceDir string) error {
+ if err := transition(instanceDir, suspended, starting); err != nil {
+ return err
+ }
+ cmd, err := genCmd(instanceDir)
+ if err == nil {
+ err = i.startCmd(instanceDir, cmd)
+ }
+ if err != nil {
+ transition(instanceDir, starting, suspended)
+ return err
+ }
+ return transition(instanceDir, starting, started)
+}
+
+func (i *appInvoker) Start(ipc.ServerContext) ([]string, error) {
+ instanceDir, instanceID, err := i.newInstance()
+ if err == nil {
+ err = i.run(instanceDir)
+ }
+ if err != nil {
+ if instanceDir != "" {
+ if err := os.RemoveAll(instanceDir); err != nil {
+ vlog.Errorf("RemoveAll(%v) failed: %v", instanceDir, err)
+ }
+ }
+ return nil, err
+ }
return []string{instanceID}, nil
}
// instanceDir returns the path to the directory containing the app instance
// referred to by the invoker's suffix, as well as the corresponding stopped
// instance dir. Returns an error if the suffix does not name an instance.
-func (i *appInvoker) instanceDir() (string, string, error) {
+func (i *appInvoker) instanceDir() (string, error) {
components := i.suffix
if nComponents := len(components); nComponents != 3 {
- return "", "", errInvalidSuffix
+ return "", errInvalidSuffix
}
app, installation, instance := components[0], components[1], components[2]
instancesDir := filepath.Join(i.config.Root, applicationDirName(app), installationDirName(installation), "instances")
instanceDir := filepath.Join(instancesDir, instanceDirName(instance))
- stoppedInstanceDir := filepath.Join(instancesDir, stoppedInstanceDirName(instance))
- return instanceDir, stoppedInstanceDir, nil
+ return instanceDir, nil
+}
+
+func (i *appInvoker) Resume(ipc.ServerContext) error {
+ instanceDir, err := i.instanceDir()
+ if err != nil {
+ return err
+ }
+ return i.run(instanceDir)
}
func stopAppRemotely(appVON string) error {
@@ -444,32 +579,47 @@
return nil
}
-func (i *appInvoker) Stop(_ ipc.ServerContext, deadline uint32) error {
- // TODO(caprita): implement deadline.
- instanceDir, stoppedInstanceDir, err := i.instanceDir()
+func stop(instanceDir string) error {
+ info, err := loadInstanceInfo(instanceDir)
if err != nil {
return err
}
- if err := os.Rename(instanceDir, stoppedInstanceDir); err != nil {
- vlog.Errorf("Rename(%v, %v) failed: %v", instanceDir, stoppedInstanceDir, err)
- if os.IsNotExist(err) {
- return errNotExist
- }
- vlog.Errorf("Rename(%v, %v) failed: %v", instanceDir, stoppedInstanceDir, err)
- return errOperationFailed
- }
- // TODO(caprita): restore the instance to unstopped upon failure?
-
- info, err := loadInstanceInfo(stoppedInstanceDir)
- if err != nil {
- return errOperationFailed
- }
return stopAppRemotely(info.AppCycleMgrName)
}
-func (*appInvoker) Suspend(ipc.ServerContext) error {
- // TODO(jsimsa): Implement.
- return nil
+// TODO(caprita): implement deadline for Stop.
+
+func (i *appInvoker) Stop(_ ipc.ServerContext, deadline uint32) error {
+ instanceDir, err := i.instanceDir()
+ if err != nil {
+ return err
+ }
+ if err := transition(instanceDir, suspended, stopped); err == errOperationFailed || err == nil {
+ return err
+ }
+ if err := transition(instanceDir, started, stopping); err != nil {
+ return err
+ }
+ if err := stop(instanceDir); err != nil {
+ transition(instanceDir, stopping, started)
+ return err
+ }
+ return transition(instanceDir, stopping, stopped)
+}
+
+func (i *appInvoker) Suspend(ipc.ServerContext) error {
+ instanceDir, err := i.instanceDir()
+ if err != nil {
+ return err
+ }
+ if err := transition(instanceDir, started, suspending); err != nil {
+ return err
+ }
+ if err := stop(instanceDir); err != nil {
+ transition(instanceDir, suspending, started)
+ return err
+ }
+ return transition(instanceDir, suspending, suspended)
}
func (*appInvoker) Uninstall(ipc.ServerContext) error {
diff --git a/services/mgmt/node/impl/dispatcher.go b/services/mgmt/node/impl/dispatcher.go
index 9d00df3..bf01b56 100644
--- a/services/mgmt/node/impl/dispatcher.go
+++ b/services/mgmt/node/impl/dispatcher.go
@@ -2,15 +2,22 @@
import (
"fmt"
+ "os"
+ "path/filepath"
"strings"
+ vsecurity "veyron/security"
+ vflag "veyron/security/flag"
+ "veyron/security/serialization"
inode "veyron/services/mgmt/node"
"veyron/services/mgmt/node/config"
"veyron2/ipc"
+ "veyron2/rt"
"veyron2/security"
"veyron2/services/mgmt/node"
"veyron2/verror"
+ "veyron2/vlog"
)
// internalState wraps state shared between different node manager
@@ -43,21 +50,92 @@
errUpdateNoOp = verror.NotFoundf("no different version available")
errNotExist = verror.NotFoundf("object does not exist")
errInvalidOperation = verror.BadArgf("invalid operation")
+ errInvalidBlessing = verror.BadArgf("invalid claim blessing")
)
// NewDispatcher is the node manager dispatcher factory.
-func NewDispatcher(auth security.Authorizer, config *config.State) (*dispatcher, error) {
+func NewDispatcher(config *config.State) (*dispatcher, error) {
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("Invalid config %v: %v", config, err)
}
- return &dispatcher{
- auth: auth,
+ d := &dispatcher{
internal: &internalState{
callback: newCallbackState(config.Name),
updating: newUpdatingState(),
},
config: config,
- }, nil
+ }
+ // Prefer ACLs in the nodemanager data directory if they exist.
+ if data, sig, err := d.getACLFiles(os.O_RDONLY); err != nil {
+ if d.auth = vflag.NewAuthorizerOrDie(); d.auth == nil {
+ // If there are no specified ACLs we grant nodemanager access to all
+ // principal until it is claimed.
+ d.auth = vsecurity.NewACLAuthorizer(vsecurity.OpenACL())
+ }
+ } else {
+ defer data.Close()
+ defer sig.Close()
+ reader, err := serialization.NewVerifyingReader(data, sig, rt.R().Identity().PublicKey())
+ if err != nil {
+ return nil, fmt.Errorf("Failed to read nodemanager ACL file:%v", err)
+ }
+ acl, err := vsecurity.LoadACL(reader)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to load nodemanager ACL:%v", err)
+ }
+ d.auth = vsecurity.NewACLAuthorizer(acl)
+ }
+ return d, nil
+}
+
+func (d *dispatcher) getACLFiles(flag int) (aclData *os.File, aclSig *os.File, err error) {
+ nodedata := filepath.Join(d.config.Root, "node-manager", "node-data")
+ perm := os.FileMode(0700)
+ if err = os.MkdirAll(nodedata, perm); err != nil {
+ return
+ }
+ if aclData, err = os.OpenFile(filepath.Join(nodedata, "acl.nodemanager"), flag, perm); err != nil {
+ return
+ }
+ if aclSig, err = os.OpenFile(filepath.Join(nodedata, "acl.signature"), flag, perm); err != nil {
+ return
+ }
+ return
+}
+
+func (d *dispatcher) claimNodeManager(id security.PublicID) error {
+ // TODO(gauthamt): Should we start trusting these identity providers?
+ if id.Names() == nil {
+ vlog.Errorf("Identity provider for device claimer is not trusted")
+ return errOperationFailed
+ }
+ rt.R().PublicIDStore().Add(id, security.AllPrincipals)
+ // Create ACLs to transfer nodemanager permissions to the provided identity.
+ acl := security.ACL{In: make(map[security.BlessingPattern]security.LabelSet)}
+ for _, name := range id.Names() {
+ acl.In[security.BlessingPattern(name)] = security.AllLabels
+ }
+ d.auth = vsecurity.NewACLAuthorizer(acl)
+ // Write out the ACLs so that it will persist across restarts.
+ data, sig, err := d.getACLFiles(os.O_CREATE | os.O_RDWR)
+ if err != nil {
+ vlog.Errorf("Failed to create ACL files:%v", err)
+ return errOperationFailed
+ }
+ writer, err := serialization.NewSigningWriteCloser(data, sig, rt.R().Identity(), nil)
+ if err != nil {
+ vlog.Errorf("Failed to create NewSigningWriteCloser:%v", err)
+ return errOperationFailed
+ }
+ if err = vsecurity.SaveACL(writer, acl); err != nil {
+ vlog.Errorf("Failed to SaveACL:%v", err)
+ return errOperationFailed
+ }
+ if err = writer.Close(); err != nil {
+ vlog.Errorf("Failed to Close() SigningWriteCloser:%v", err)
+ return errOperationFailed
+ }
+ return nil
}
// DISPATCHER INTERFACE IMPLEMENTATION
@@ -83,6 +161,7 @@
callback: d.internal.callback,
updating: d.internal.updating,
config: d.config,
+ disp: d,
})
case appsSuffix:
receiver = node.NewServerApplication(&appInvoker{
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index a7aa441..2dd443f 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -12,16 +12,20 @@
"strings"
"syscall"
"testing"
+ "time"
"veyron/lib/signals"
"veyron/lib/testutil/blackbox"
+ tsecurity "veyron/lib/testutil/security"
"veyron/services/mgmt/lib/exec"
"veyron/services/mgmt/node/config"
"veyron/services/mgmt/node/impl"
+ "veyron2"
"veyron2/ipc"
"veyron2/naming"
"veyron2/rt"
+ "veyron2/security"
"veyron2/services/mgmt/application"
"veyron2/services/mgmt/node"
"veyron2/verror"
@@ -105,7 +109,7 @@
configState.Root, configState.Origin, configState.CurrentLink = args[0], args[1], args[2]
}
- dispatcher, err := impl.NewDispatcher(nil, configState)
+ dispatcher, err := impl.NewDispatcher(configState)
if err != nil {
vlog.Fatalf("Failed to create node manager dispatcher: %v", err)
}
@@ -480,6 +484,32 @@
}
}
+func suspendApp(t *testing.T, appID, instanceID string) {
+ appsName := "nm//apps"
+ appName := naming.Join(appsName, appID)
+ instanceName := naming.Join(appName, instanceID)
+ stub, err := node.BindApplication(instanceName)
+ if err != nil {
+ t.Fatalf("BindApplication(%v) failed: %v", instanceName, err)
+ }
+ if err := stub.Suspend(rt.R().NewContext()); err != nil {
+ t.Fatalf("Suspend failed: %v", err)
+ }
+}
+
+func resumeApp(t *testing.T, appID, instanceID string) {
+ appsName := "nm//apps"
+ appName := naming.Join(appsName, appID)
+ instanceName := naming.Join(appName, instanceID)
+ stub, err := node.BindApplication(instanceName)
+ if err != nil {
+ t.Fatalf("BindApplication(%v) failed: %v", instanceName, err)
+ }
+ if err := stub.Resume(rt.R().NewContext()); err != nil {
+ t.Fatalf("Resume failed: %v", err)
+ }
+}
+
func verifyAppWorkspace(t *testing.T, root, appID, instanceID string) {
// HACK ALERT: for now, we peek inside the node manager's directory
// structure (which ought to be opaque) to check for what the app has
@@ -495,7 +525,7 @@
}
components := strings.Split(appID, "/")
appTitle, installationID := components[0], components[1]
- instanceDir := filepath.Join(root, applicationDirName(appTitle), "installation-"+installationID, "instances", "stopped-instance-"+instanceID)
+ instanceDir := filepath.Join(root, applicationDirName(appTitle), "installation-"+installationID, "instances", "instance-"+instanceID)
rootDir := filepath.Join(instanceDir, "root")
testFile := filepath.Join(rootDir, "testfile")
if read, err := ioutil.ReadFile(testFile); err != nil {
@@ -549,6 +579,13 @@
instanceID := startApp(t, appID)
<-pingCh // Wait until the app pings us that it's ready.
+ // Suspend the app.
+ suspendApp(t, appID, instanceID)
+ <-pingCh // App should have pinged us before it terminated.
+
+ resumeApp(t, appID, instanceID)
+ <-pingCh
+
// TODO(caprita): test Suspend and Resume, and verify various
// non-standard combinations (suspend when stopped; resume while still
// running; stop while suspended).
@@ -564,3 +601,105 @@
nm.Expect("nm terminating")
nm.ExpectEOFAndWait()
}
+
+type granter struct {
+ ipc.CallOpt
+ self security.PrivateID
+}
+
+func (g granter) Grant(id security.PublicID) (security.PublicID, error) {
+ return g.self.Bless(id, "claimernode", 10*time.Minute, nil)
+}
+
+func newRuntimeClient(t *testing.T, id security.PrivateID) (veyron2.Runtime, ipc.Client) {
+ runtime, err := rt.New(veyron2.RuntimeID(id))
+ if err != nil {
+ t.Fatalf("rt.New() failed: %v", err)
+ }
+ runtime.Namespace().SetRoots(rt.R().Namespace().Roots()[0])
+ nodeClient, err := runtime.NewClient()
+ if err != nil {
+ t.Fatalf("rt.NewClient() failed %v", err)
+ }
+ return runtime, nodeClient
+}
+
+func tryInstall(rt veyron2.Runtime, c ipc.Client) error {
+ appsName := "nm//apps"
+ stub, err := node.BindApplication(appsName, c)
+ if err != nil {
+ return fmt.Errorf("BindApplication(%v) failed: %v", appsName, err)
+ }
+ if _, err = stub.Install(rt.NewContext(), "ar"); err != nil {
+ return fmt.Errorf("Install failed: %v", err)
+ }
+ return nil
+}
+
+// TestNodeManagerClaim claims a nodemanager and tests ACL permissions on its methods.
+func TestNodeManagerClaim(t *testing.T) {
+ // Set up mount table, application, and binary repositories.
+ defer setupLocalNamespace(t)()
+ envelope, cleanup := startApplicationRepository()
+ defer cleanup()
+ defer startBinaryRepository()()
+
+ root, cleanup := setupRootDir()
+ defer cleanup()
+
+ // Set up the node manager. Since we won't do node manager updates,
+ // don't worry about its application envelope and current link.
+ nm := blackbox.HelperCommand(t, "nodeManager", "nm", root, "unused app repo name", "unused curr link")
+ defer setupChildCommand(nm)()
+ if err := nm.Cmd.Start(); err != nil {
+ t.Fatalf("Start() failed: %v", err)
+ }
+ defer nm.Cleanup()
+ readPID(t, nm)
+
+ // Create an envelope for an app.
+ app := blackbox.HelperCommand(t, "app", "app1")
+ defer setupChildCommand(app)()
+ appTitle := "google naps"
+ *envelope = *envelopeFromCmd(appTitle, app.Cmd)
+
+ nodeStub, err := node.BindNode("nm//nm")
+ if err != nil {
+ t.Fatalf("BindNode failed: %v", err)
+ }
+
+ // Create a new identity and runtime.
+ newIdentity := tsecurity.NewBlessedIdentity(rt.R().Identity(), "claimer")
+ newRT, nodeClient := newRuntimeClient(t, newIdentity)
+ defer newRT.Cleanup()
+
+ // Nodemanager should have open ACLs before we claim it and so an Install
+ // should succeed.
+ if err = tryInstall(newRT, nodeClient); err != nil {
+ t.Fatalf("%v", err)
+ }
+ // Claim the nodemanager with this identity.
+ if err = nodeStub.Claim(rt.R().NewContext(), granter{self: newIdentity}); err != nil {
+ t.Fatalf("Claim failed: %v", err)
+ }
+ if err = tryInstall(newRT, nodeClient); err != nil {
+ t.Fatalf("%v", err)
+ }
+ // Try to install with a new identity. This should fail.
+ newIdentity = tsecurity.NewBlessedIdentity(rt.R().Identity(), "random")
+ newRT, nodeClient = newRuntimeClient(t, newIdentity)
+ defer newRT.Cleanup()
+ if err = tryInstall(newRT, nodeClient); err == nil {
+ t.Fatalf("Install should have failed with random identity")
+ }
+ // Try to install with the original identity. This should still work as the original identity
+ // name is a prefix of the identity used by newRT.
+ nodeClient, err = rt.R().NewClient()
+ if err != nil {
+ t.Fatalf("rt.NewClient() failed %v", err)
+ }
+ if err = tryInstall(rt.R(), nodeClient); err != nil {
+ t.Fatalf("%v", err)
+ }
+ // TODO(gauthamt): Test that ACLs persist across nodemanager restarts
+}
diff --git a/services/mgmt/node/impl/node_invoker.go b/services/mgmt/node/impl/node_invoker.go
index 02d87a6..0557d7e 100644
--- a/services/mgmt/node/impl/node_invoker.go
+++ b/services/mgmt/node/impl/node_invoker.go
@@ -11,6 +11,9 @@
// noded.sh - a shell script to start the binary
// <version 2 timestamp>
// ...
+// node-data/
+// acl.nodemanager
+// acl.signature
//
// The node manager is always expected to be started through the symbolic link
// passed in as config.CurrentLink, which is monitored by an init daemon. This
@@ -81,6 +84,16 @@
updating *updatingState
callback *callbackState
config *iconfig.State
+ disp *dispatcher
+}
+
+func (i *nodeInvoker) Claim(call ipc.ServerContext) error {
+ // Get the blessing to be used by the claimant
+ blessing := call.Blessing()
+ if blessing == nil {
+ return errInvalidBlessing
+ }
+ return i.disp.claimNodeManager(blessing)
}
func (*nodeInvoker) Describe(ipc.ServerContext) (node.Description, error) {
diff --git a/services/mgmt/node/impl/util_test.go b/services/mgmt/node/impl/util_test.go
index caaad5b..d2fed3d 100644
--- a/services/mgmt/node/impl/util_test.go
+++ b/services/mgmt/node/impl/util_test.go
@@ -114,13 +114,13 @@
func updateExpectError(t *testing.T, name string, errID verror.ID) {
if err := invokeUpdate(t, name); !verror.Is(err, errID) {
- t.Fatalf("Unexpected update error %v, expected error ID %v", err, errID)
+ t.Fatalf("Unexpected update on %s error %v, expected error ID %v", name, err, errID)
}
}
func update(t *testing.T, name string) {
if err := invokeUpdate(t, name); err != nil {
- t.Fatalf("Update() failed: %v", err)
+ t.Fatalf("Update() on %s failed: %v", name, err)
}
}
diff --git a/services/mgmt/node/noded/main.go b/services/mgmt/node/noded/main.go
index 12e6267..4e2175e 100644
--- a/services/mgmt/node/noded/main.go
+++ b/services/mgmt/node/noded/main.go
@@ -4,7 +4,6 @@
"flag"
"veyron/lib/signals"
- vflag "veyron/security/flag"
"veyron/services/mgmt/node/config"
"veyron/services/mgmt/node/impl"
@@ -46,7 +45,7 @@
// TODO(caprita): We need a way to set config fields outside of the
// update mechanism (since that should ideally be an opaque
// implementation detail).
- dispatcher, err := impl.NewDispatcher(vflag.NewAuthorizerOrDie(), configState)
+ dispatcher, err := impl.NewDispatcher(configState)
if err != nil {
vlog.Fatalf("Failed to create dispatcher: %v", err)
}
diff --git a/services/mgmt/stats/impl/stats_invoker.go b/services/mgmt/stats/impl/stats_invoker.go
index 99ef64d..f1f9dac 100644
--- a/services/mgmt/stats/impl/stats_invoker.go
+++ b/services/mgmt/stats/impl/stats_invoker.go
@@ -77,8 +77,10 @@
}
return errOperationFailed
}
- if len(changes) > 0 {
- call.Send(watchtypes.ChangeBatch{Changes: changes})
+ for _, change := range changes {
+ if err := call.Send(change); err != nil {
+ return err
+ }
}
select {
case <-call.Done():
diff --git a/services/mgmt/stats/impl/stats_invoker_test.go b/services/mgmt/stats/impl/stats_invoker_test.go
index c2089ef..c06e0ee 100644
--- a/services/mgmt/stats/impl/stats_invoker_test.go
+++ b/services/mgmt/stats/impl/stats_invoker_test.go
@@ -118,11 +118,7 @@
t.Fatalf("expected more stream values")
}
got := iterator.Value()
- expected := types.ChangeBatch{
- Changes: []types.Change{
- types.Change{Name: "testing/foo/bar", Value: int64(10)},
- },
- }
+ expected := types.Change{Name: "testing/foo/bar", Value: int64(10)}
if !reflect.DeepEqual(got, expected) {
t.Errorf("unexpected result. Got %#v, want %#v", got, expected)
}
@@ -133,11 +129,7 @@
t.Fatalf("expected more stream values")
}
got = iterator.Value()
- expected = types.ChangeBatch{
- Changes: []types.Change{
- types.Change{Name: "testing/foo/bar", Value: int64(15)},
- },
- }
+ expected = types.Change{Name: "testing/foo/bar", Value: int64(15)}
if !reflect.DeepEqual(got, expected) {
t.Errorf("unexpected result. Got %#v, want %#v", got, expected)
}
@@ -148,11 +140,7 @@
t.Fatalf("expected more stream values")
}
got = iterator.Value()
- expected = types.ChangeBatch{
- Changes: []types.Change{
- types.Change{Name: "testing/foo/bar", Value: int64(17)},
- },
- }
+ expected = types.Change{Name: "testing/foo/bar", Value: int64(17)}
if !reflect.DeepEqual(got, expected) {
t.Errorf("unexpected result. Got %#v, want %#v", got, expected)
}
diff --git a/services/mounttable/lib/testdata/test.acl b/services/mounttable/lib/testdata/test.acl
index b65c35b..9ffe006 100644
--- a/services/mounttable/lib/testdata/test.acl
+++ b/services/mounttable/lib/testdata/test.acl
@@ -1,5 +1,5 @@
{
-"/": {"In": {"Principals": {"fake/root": "RW", "*": "R"}}},
-"/stuff": {"In": {"Principals": {"fake/root": "RW", "fake/bob": "R"}}},
-"/a": {"In": {"Principals": {"fake/root": "RW", "fake/alice": "R"}}}
+"/": {"In": {"fake/root": "RW", "...": "R"}},
+"/stuff": {"In": {"fake/root": "RW", "fake/bob": "R"}},
+"/a": {"In": {"fake/root": "RW", "fake/alice": "R"}}
}
\ No newline at end of file
diff --git a/services/store/memstore/blackbox/sync_integration_test.go b/services/store/memstore/blackbox/sync_integration_test.go
index 98a4f6d..98b23e9 100644
--- a/services/store/memstore/blackbox/sync_integration_test.go
+++ b/services/store/memstore/blackbox/sync_integration_test.go
@@ -8,8 +8,21 @@
"veyron/services/store/raw"
"veyron2/rt"
+ "veyron2/services/watch/types"
)
+func recv(t *testing.T, call raw.StoreWatchCall, n int) []types.Change {
+ rStream := call.RecvStream()
+ changes := make([]types.Change, n)
+ for i := 0; i < n; i++ {
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ changes[i] = rStream.Value()
+ }
+ return changes
+}
+
func TestSyncState(t *testing.T) {
rt.Init()
@@ -49,13 +62,9 @@
// Create a sync request
stream := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, raw.Request{})
- rStream := stream.RecvStream()
- if !rStream.Advance() {
- t.Fatalf("Advance() failed: %v", rStream.Err())
- }
- cb := rStream.Value()
// Update target
- PutMutations(t, target, Mutations(cb.Changes))
+ changes := recv(t, stream, 2)
+ PutMutations(t, target, Mutations(changes))
GC(t, target)
// Expect that the target contains id1 and id2 but not id3
@@ -86,13 +95,9 @@
id3 := Put(t, st, tr, "/a/b", "val3")
Commit(t, tr)
- rStream := stream.RecvStream()
- if !rStream.Advance() {
- t.Fatalf("Advance() failed: %v", rStream.Err())
- }
- cb := rStream.Value()
// Update target
- PutMutations(t, target, Mutations(cb.Changes))
+ changes := recv(t, stream, 3)
+ PutMutations(t, target, Mutations(changes))
GC(t, target)
// Expect that the target contains id1, id2, id3
@@ -105,13 +110,9 @@
Remove(t, st, tr, "/a/b")
Commit(t, tr)
- if !rStream.Advance() {
- t.Fatalf("Advance() failed: %v", rStream.Err())
- }
-
- cb = rStream.Value()
// Update target
- PutMutations(t, target, Mutations(cb.Changes))
+ changes = recv(t, stream, 1)
+ PutMutations(t, target, Mutations(changes))
GC(t, target)
// Expect that the target contains id1, id2, but not id3
diff --git a/services/store/memstore/testing/util.go b/services/store/memstore/testing/util.go
index 8c60190..14f0c00 100644
--- a/services/store/memstore/testing/util.go
+++ b/services/store/memstore/testing/util.go
@@ -154,10 +154,10 @@
type watcherServiceWatchStreamSender struct {
mu *sync.Mutex
ctx ipc.ServerContext
- output chan<- types.ChangeBatch
+ output chan<- types.Change
}
-func (s *watcherServiceWatchStreamSender) Send(cb types.ChangeBatch) error {
+func (s *watcherServiceWatchStreamSender) Send(cb types.Change) error {
s.mu.Lock()
defer s.mu.Unlock()
select {
@@ -174,7 +174,7 @@
}
func (s *watcherServiceWatchStream) SendStream() interface {
- Send(cb types.ChangeBatch) error
+ Send(cb types.Change) error
} {
return s
}
@@ -183,8 +183,8 @@
// watcherWatchStream implements watch.WatcherWatchStream.
type watcherWatchStream struct {
ctx *FakeServerContext
- value types.ChangeBatch
- input <-chan types.ChangeBatch
+ value types.Change
+ input <-chan types.Change
err <-chan error
}
@@ -194,7 +194,7 @@
return ok
}
-func (s *watcherWatchStream) Value() types.ChangeBatch {
+func (s *watcherWatchStream) Value() types.Change {
return s.value
}
@@ -213,7 +213,7 @@
func (s *watcherWatchStream) RecvStream() interface {
Advance() bool
- Value() types.ChangeBatch
+ Value() types.Change
Err() error
} {
return s
@@ -222,7 +222,7 @@
func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
mu := &sync.Mutex{}
ctx := NewFakeServerContext(id)
- c := make(chan types.ChangeBatch, 1)
+ c := make(chan types.Change, 1)
errc := make(chan error, 1)
go func() {
stream := &watcherServiceWatchStream{
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index 437f1cd..609c292 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -86,7 +86,7 @@
type WatchStream interface {
// Send places the item onto the output stream, blocking if there is no
// buffer space available.
- Send(item types.ChangeBatch) error
+ Send(item types.Change) error
}
// Watch handles the specified request, processing records in the store log and
@@ -291,11 +291,12 @@
}
func sendChanges(stream WatchStream, changes []types.Change) error {
- if len(changes) == 0 {
- return nil
+ for _, change := range changes {
+ if err := stream.Send(change); err != nil {
+ return err
+ }
}
- // TODO(tilaks): batch more aggressively.
- return stream.Send(types.ChangeBatch{Changes: changes})
+ return nil
}
func addContinued(changes []types.Change) {
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 6861033..2588060 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -39,14 +39,15 @@
req := raw.Request{}
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
- // Check that watch detects the changes in the first transaction.
rStream := ws.RecvStream()
+
+ // Check that watch detects the changes in the first transaction.
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -62,17 +63,20 @@
post2 := st.Snapshot().Find(id2).Version
// Check that watch detects the changes in the second transaction.
-
+ changes = []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -102,13 +106,14 @@
ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
rStream := ws.RecvStream()
+
// Check that watch detects the changes in the first transaction.
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -120,17 +125,20 @@
commit(t, tr)
// Check that watch detects the changes in the second transaction.
+ changes = []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
-
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -260,13 +268,11 @@
// Retrieve the resume marker for the initial state.
rStream := ws.RecvStream()
+
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
-
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
resumeMarker1 := change.ResumeMarker
// Cancel the watch request.
@@ -278,31 +284,34 @@
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
rStream = ws.RecvStream()
+
// Check that watch detects the changes in the state and the transaction.
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
-
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id1, raw.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
// Check that watch detects the changes in the state and the transaction.
+ changes = []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
-
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -346,10 +355,7 @@
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
-
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
resumeMarker1 := change.ResumeMarker
// Cancel the watch request.
@@ -360,31 +366,35 @@
req = raw.Request{ResumeMarker: resumeMarker1}
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
- // Check that watch detects the changes in the first and second transaction.
rStream = ws.RecvStream()
+
+ // Check that watch detects the changes in the first transaction.
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
-
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
watchtesting.ExpectMutationExists(t, changes, id1, raw.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
+ // Check that watch detects the changes in the second transaction.
+ changes = []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
-
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -400,19 +410,23 @@
req = raw.Request{ResumeMarker: resumeMarker2}
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
- // Check that watch detects the changes in the second transaction.
rStream = ws.RecvStream()
+
+ // Check that watch detects the changes in the second transaction.
+ changes = []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
-
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -464,24 +478,24 @@
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
-
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
watchtesting.ExpectInitialStateSkipped(t, change)
// Check that watch detects the changes in the third transaction.
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
-
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -551,10 +565,7 @@
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
-
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
// Save the ResumeMarker of the change.
r := change.ResumeMarker
@@ -565,10 +576,7 @@
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb = rStream.Value()
-
- changes = cb.Changes
- change = changes[0]
+ change = rStream.Value()
// Expect the same ResumeMarker.
if !bytes.Equal(r, change.ResumeMarker) {
t.Error("Inconsistent ResumeMarker.")
diff --git a/services/store/raw/service.vdl b/services/store/raw/service.vdl
index 00b6eaa..4d18639 100644
--- a/services/store/raw/service.vdl
+++ b/services/store/raw/service.vdl
@@ -70,7 +70,7 @@
// via the Watcher interface, and committed via PutMutation.
type Store interface {
// Watch returns a stream of all changes.
- Watch(Req Request) stream<_, types.ChangeBatch> error
+ Watch(Req Request) stream<_, types.Change> error
// PutMutations atomically commits a stream of Mutations when the stream is
// closed. Mutations are not committed if the request is cancelled before
diff --git a/services/store/raw/service.vdl.go b/services/store/raw/service.vdl.go
index 213f083..5e75600 100644
--- a/services/store/raw/service.vdl.go
+++ b/services/store/raw/service.vdl.go
@@ -115,7 +115,7 @@
// Value returns the element that was staged by Advance.
// Value may panic if Advance returned false or was not
// called at all. Value does not block.
- Value() types.ChangeBatch
+ Value() types.Change
// Err returns a non-nil error iff the stream encountered
// any errors. Err does not block.
@@ -143,17 +143,17 @@
type implStoreWatchStreamIterator struct {
clientCall _gen_ipc.Call
- val types.ChangeBatch
+ val types.Change
err error
}
func (c *implStoreWatchStreamIterator) Advance() bool {
- c.val = types.ChangeBatch{}
+ c.val = types.Change{}
c.err = c.clientCall.Recv(&c.val)
return c.err == nil
}
-func (c *implStoreWatchStreamIterator) Value() types.ChangeBatch {
+func (c *implStoreWatchStreamIterator) Value() types.Change {
return c.val
}
@@ -172,7 +172,7 @@
func (c *implStoreWatchCall) RecvStream() interface {
Advance() bool
- Value() types.ChangeBatch
+ Value() types.Change
Err() error
} {
return &c.readStream
@@ -193,7 +193,7 @@
serverCall _gen_ipc.ServerCall
}
-func (s *implStoreServiceWatchStreamSender) Send(item types.ChangeBatch) error {
+func (s *implStoreServiceWatchStreamSender) Send(item types.Change) error {
return s.serverCall.Send(item)
}
@@ -204,7 +204,7 @@
SendStream() interface {
// Send places the item onto the output stream, blocking if there is no buffer
// space available. If the client has canceled, an error is returned.
- Send(item types.ChangeBatch) error
+ Send(item types.Change) error
}
}
@@ -216,7 +216,7 @@
func (s *implStoreServiceWatchStream) SendStream() interface {
// Send places the item onto the output stream, blocking if there is no buffer
// space available. If the client has canceled, an error is returned.
- Send(item types.ChangeBatch) error
+ Send(item types.Change) error
} {
return &s.writer
}
@@ -497,7 +497,7 @@
OutArgs: []_gen_ipc.MethodArgument{
{Name: "", Type: 68},
},
- InStream: 77,
+ InStream: 75,
}
result.Methods["Watch"] = _gen_ipc.MethodSignature{
InArgs: []_gen_ipc.MethodArgument{
@@ -507,7 +507,7 @@
{Name: "", Type: 68},
},
- OutStream: 72,
+ OutStream: 70,
}
result.TypeDefs = []_gen_vdlutil.Any{
@@ -525,25 +525,20 @@
_gen_wiretype.FieldType{Type: 0x2, Name: "Continued"},
},
"veyron2/services/watch/types.Change", []string(nil)},
- _gen_wiretype.SliceType{Elem: 0x46, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
- []_gen_wiretype.FieldType{
- _gen_wiretype.FieldType{Type: 0x47, Name: "Changes"},
- },
- "veyron2/services/watch/types.ChangeBatch", []string(nil)},
_gen_wiretype.ArrayType{Elem: 0x41, Len: 0x10, Name: "veyron2/storage.ID", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x35, Name: "veyron/services/store/raw.Version", Tags: []string(nil)}, _gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
_gen_wiretype.FieldType{Type: 0x3, Name: "Name"},
- _gen_wiretype.FieldType{Type: 0x49, Name: "ID"},
+ _gen_wiretype.FieldType{Type: 0x47, Name: "ID"},
},
"veyron/services/store/raw.DEntry", []string(nil)},
- _gen_wiretype.SliceType{Elem: 0x4b, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
+ _gen_wiretype.SliceType{Elem: 0x49, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
- _gen_wiretype.FieldType{Type: 0x49, Name: "ID"},
- _gen_wiretype.FieldType{Type: 0x4a, Name: "PriorVersion"},
- _gen_wiretype.FieldType{Type: 0x4a, Name: "Version"},
+ _gen_wiretype.FieldType{Type: 0x47, Name: "ID"},
+ _gen_wiretype.FieldType{Type: 0x48, Name: "PriorVersion"},
+ _gen_wiretype.FieldType{Type: 0x48, Name: "Version"},
_gen_wiretype.FieldType{Type: 0x2, Name: "IsRoot"},
_gen_wiretype.FieldType{Type: 0x45, Name: "Value"},
- _gen_wiretype.FieldType{Type: 0x4c, Name: "Dir"},
+ _gen_wiretype.FieldType{Type: 0x4a, Name: "Dir"},
},
"veyron/services/store/raw.Mutation", []string(nil)},
}
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index f873c9f..b933b66 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -298,12 +298,12 @@
rStream := ws.RecvStream()
// Check that watch detects the changes in the first transaction.
{
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -329,16 +329,20 @@
// Check that watch detects the changes in the second transaction.
{
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -382,12 +386,12 @@
rStream2 := ws2.RecvStream()
// The watch on / should send a change on /.
{
+ changes := []types.Change{}
if !rStream1.Advance() {
t.Error("Advance() failed: %v", rStream1.Err())
}
- cb := rStream1.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream1.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -414,16 +418,20 @@
// The watch on / should send changes on / and /a.
{
+ changes := []types.Change{}
if !rStream1.Advance() {
t.Error("Advance() failed: %v", rStream1.Err())
}
- cb := rStream1.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream1.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream1.Advance() {
+ t.Error("Advance() failed: %v", rStream1.Err())
+ }
+ change = rStream1.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -432,12 +440,12 @@
}
// The watch on /a should send a change on /a.
{
+ changes := []types.Change{}
if !rStream2.Advance() {
t.Error("Advance() failed: %v", rStream2.Err())
}
- cb := rStream2.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream2.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -476,12 +484,12 @@
rStream := ws.RecvStream()
// Check that watch detects the changes in the first transaction.
{
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -507,16 +515,20 @@
// Check that watch detects the changes in the second transaction.
{
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
+ changes = append(changes, change)
if !change.Continued {
t.Error("Expected change to continue the transaction")
}
- change = changes[1]
+ if !rStream.Advance() {
+ t.Error("Advance() failed: %v", rStream.Err())
+ }
+ change = rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -537,12 +549,12 @@
// Check that watch detects the changes in the third transaction.
{
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
@@ -551,12 +563,12 @@
// Check that watch detects the garbage collection of /a.
{
+ changes := []types.Change{}
if !rStream.Advance() {
t.Error("Advance() failed: %v", rStream.Err())
}
- cb := rStream.Value()
- changes := cb.Changes
- change := changes[0]
+ change := rStream.Value()
+ changes = append(changes, change)
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
diff --git a/services/syncgroup/syncgroup.vdl.go b/services/syncgroup/syncgroup.vdl.go
index 962584c..78c38c5 100644
--- a/services/syncgroup/syncgroup.vdl.go
+++ b/services/syncgroup/syncgroup.vdl.go
@@ -496,14 +496,9 @@
}
result.TypeDefs = []_gen_vdlutil.Any{
- _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "anydata", Tags: []string(nil)}, _gen_wiretype.MapType{Key: 0x3, Elem: 0x41, Name: "", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x3, Name: "veyron2/security.BlessingPattern", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x34, Name: "veyron2/security.LabelSet", Tags: []string(nil)}, _gen_wiretype.MapType{Key: 0x43, Elem: 0x44, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
+ _gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "anydata", Tags: []string(nil)}, _gen_wiretype.MapType{Key: 0x3, Elem: 0x41, Name: "", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x3, Name: "veyron2/security.BlessingPattern", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x34, Name: "veyron2/security.LabelSet", Tags: []string(nil)}, _gen_wiretype.MapType{Key: 0x43, Elem: 0x44, Name: "", Tags: []string(nil)}, _gen_wiretype.MapType{Key: 0x3, Elem: 0x44, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
- _gen_wiretype.FieldType{Type: 0x45, Name: "Principals"},
- },
- "veyron2/security.Entries", []string(nil)},
- _gen_wiretype.StructType{
- []_gen_wiretype.FieldType{
- _gen_wiretype.FieldType{Type: 0x46, Name: "In"},
+ _gen_wiretype.FieldType{Type: 0x45, Name: "In"},
_gen_wiretype.FieldType{Type: 0x46, Name: "NotIn"},
},
"veyron2/security.ACL", []string(nil)},
diff --git a/services/wsprd/identity/identity_test.go b/services/wsprd/identity/identity_test.go
index 0ea8f0e..f1205a7 100644
--- a/services/wsprd/identity/identity_test.go
+++ b/services/wsprd/identity/identity_test.go
@@ -79,7 +79,7 @@
manager.AddAccount(googleAccount2, createChain(r, googleAccount2))
manager.AddAccount(facebookAccount, createChain(r, facebookAccount))
- result := manager.AccountsMatching(security.BlessingPattern(topLevelName + "/google/*"))
+ result := manager.AccountsMatching(security.BlessingPattern(topLevelName + "/google/..."))
sort.StringSlice(result).Sort()
expected := []string{googleAccount1, googleAccount2}
if !reflect.DeepEqual(result, expected) {
diff --git a/services/wsprd/ipc/server/server.go b/services/wsprd/ipc/server/server.go
index f0e78b6..b63f5e7 100644
--- a/services/wsprd/ipc/server/server.go
+++ b/services/wsprd/ipc/server/server.go
@@ -177,10 +177,9 @@
if s.dispatcher == nil {
s.dispatcher = newDispatcher(invoker,
- vsecurity.NewACLAuthorizer(vsecurity.NewWhitelistACL(
- map[security.BlessingPattern]security.LabelSet{
- security.AllPrincipals: security.AllLabels,
- })))
+ vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
+ security.AllPrincipals: security.AllLabels,
+ }}))
}
if s.endpoint == "" {
diff --git a/services/wsprd/lib/remove_this.go b/services/wsprd/lib/remove_this.go
index 7625014..32be65d 100644
--- a/services/wsprd/lib/remove_this.go
+++ b/services/wsprd/lib/remove_this.go
@@ -17,7 +17,6 @@
vom.Register(store.QueryResult{})
vom.Register(watchtypes.GlobRequest{})
vom.Register(watchtypes.QueryRequest{})
- vom.Register(watchtypes.ChangeBatch{})
vom.Register(watchtypes.Change{})
vom.Register(rps.GameOptions{})
vom.Register(rps.GameID{})
diff --git a/services/wsprd/wspr/wspr_test.go b/services/wsprd/wspr/wspr_test.go
index 9c3af33..e84b1f6 100644
--- a/services/wsprd/wspr/wspr_test.go
+++ b/services/wsprd/wspr/wspr_test.go
@@ -124,7 +124,7 @@
}
// Verify that idManager has both accounts
- gotAccounts = wspr.idManager.AccountsMatching(security.BlessingPattern(topLevelName + "/*"))
+ gotAccounts = wspr.idManager.AccountsMatching(security.BlessingPattern(fmt.Sprintf("%s%s%v", topLevelName, security.ChainSeparator, security.AllPrincipals)))
if len(gotAccounts) != 2 {
t.Fatalf("Expected to have 2 accounts, but got %v: %v", len(gotAccounts), gotAccounts)
}