Merge "store: implementing the trie and replacing the memstore impl."
diff --git a/cmd/mounttable/impl.go b/cmd/mounttable/impl.go
index 3653e16..4aa0dbc 100644
--- a/cmd/mounttable/impl.go
+++ b/cmd/mounttable/impl.go
@@ -61,7 +61,7 @@
name, pattern := args[0], args[1]
client := v23.GetClient(ctx)
- call, err := client.StartCall(ctx, name, rpc.GlobMethod, []interface{}{pattern}, options.NoResolve{})
+ call, err := client.StartCall(ctx, name, rpc.GlobMethod, []interface{}{pattern}, options.Preresolved{})
if err != nil {
return err
}
@@ -134,7 +134,7 @@
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
client := v23.GetClient(ctx)
- if err := client.Call(ctx, name, "Mount", []interface{}{server, seconds, flags}, nil, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, name, "Mount", []interface{}{server, seconds, flags}, nil, options.Preresolved{}); err != nil {
return err
}
fmt.Fprintln(env.Stdout, "Name mounted successfully.")
@@ -160,7 +160,7 @@
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
client := v23.GetClient(ctx)
- if err := client.Call(ctx, args[0], "Unmount", []interface{}{args[1]}, nil, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, args[0], "Unmount", []interface{}{args[1]}, nil, options.Preresolved{}); err != nil {
return err
}
fmt.Fprintln(env.Stdout, "Unmount successful or name not mounted.")
@@ -186,7 +186,7 @@
defer cancel()
client := v23.GetClient(ctx)
var entry naming.MountEntry
- if err := client.Call(ctx, args[0], "ResolveStep", nil, []interface{}{&entry}, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, args[0], "ResolveStep", nil, []interface{}{&entry}, options.Preresolved{}); err != nil {
return err
}
fmt.Fprintf(env.Stdout, "Servers: %v Suffix: %q MT: %v\n", entry.Servers, entry.Name, entry.ServesMountTable)
diff --git a/cmd/mounttable/impl_test.go b/cmd/mounttable/impl_test.go
index fad262e..8786897 100644
--- a/cmd/mounttable/impl_test.go
+++ b/cmd/mounttable/impl_test.go
@@ -11,8 +11,6 @@
"testing"
"time"
- "v.io/x/lib/cmdline"
-
"v.io/v23"
"v.io/v23/context"
"v.io/v23/glob"
@@ -22,9 +20,8 @@
"v.io/v23/security/access"
"v.io/v23/services/mounttable"
vdltime "v.io/v23/vdlroot/time"
-
+ "v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
)
@@ -109,7 +106,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewDispatchingServer(ctx, "", new(dispatcher))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", new(dispatcher))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/cmd/principal/main.go b/cmd/principal/main.go
index e1eaadf..0c259be 100644
--- a/cmd/principal/main.go
+++ b/cmd/principal/main.go
@@ -31,7 +31,6 @@
"v.io/x/ref"
vsecurity "v.io/x/ref/lib/security"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/static"
)
@@ -505,11 +504,7 @@
if err != nil {
return fmt.Errorf("invalid base64 encoding of public key: %v", err)
}
- key, err := security.UnmarshalPublicKey(der)
- if err != nil {
- return fmt.Errorf("invalid DER encoding of public key: %v", err)
- }
- return p.Roots().Add(key, security.BlessingPattern(args[0]))
+ return p.Roots().Add(der, security.BlessingPattern(args[0]))
}),
}
@@ -769,7 +764,7 @@
token: base64.URLEncoding.EncodeToString(token[:]),
notify: make(chan error),
}
- server, err := xrpc.NewServer(ctx, "", service, security.AllowEveryone())
+ ctx, server, err := v23.WithNewServer(ctx, "", service, security.AllowEveryone())
if err != nil {
return fmt.Errorf("failed to create server to listen for blessings: %v", err)
}
diff --git a/cmd/sb51/internal/demodb/db.go b/cmd/sb51/internal/demodb/db.go
index 9c073c1..ba1ce45 100644
--- a/cmd/sb51/internal/demodb/db.go
+++ b/cmd/sb51/internal/demodb/db.go
@@ -111,15 +111,15 @@
},
}
-// Creates demo tables in the provided database. Tables are deleted and
+// Creates demo tables in the provided database. Tables are destroyed and
// recreated if they already exist.
func PopulateDemoDB(ctx *context.T, db nosql.Database) error {
for i, t := range demoTables {
tn := demoPrefix + t.name
- if err := db.DeleteTable(ctx, tn); err != nil {
- return fmt.Errorf("failed deleting table %s (%d/%d): %v", tn, i+1, len(demoTables), err)
+ if err := db.Table(tn).Destroy(ctx); err != nil {
+ return fmt.Errorf("failed destroying table %s (%d/%d): %v", tn, i+1, len(demoTables), err)
}
- if err := db.CreateTable(ctx, tn, nil); err != nil {
+ if err := db.Table(tn).Create(ctx, nil); err != nil {
return fmt.Errorf("failed creating table %s (%d/%d): %v", tn, i+1, len(demoTables), err)
}
if err := nosql.RunInBatch(ctx, db, wire.BatchOptions{}, func(db nosql.BatchDatabase) error {
diff --git a/cmd/servicerunner/main.go b/cmd/servicerunner/main.go
index 76e12e8..a761f1a 100644
--- a/cmd/servicerunner/main.go
+++ b/cmd/servicerunner/main.go
@@ -29,7 +29,6 @@
"v.io/x/lib/set"
"v.io/x/ref"
"v.io/x/ref/lib/signals"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/identity/identitylib"
"v.io/x/ref/services/mounttable/mounttablelib"
@@ -73,7 +72,7 @@
if err != nil {
return fmt.Errorf("mounttablelib.NewMountTableDispatcher failed: %s", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
if err != nil {
return fmt.Errorf("root failed: %v", err)
}
diff --git a/cmd/vdl/arith_test.go b/cmd/vdl/arith_test.go
index 8cb78b7..ae84453 100644
--- a/cmd/vdl/arith_test.go
+++ b/cmd/vdl/arith_test.go
@@ -15,12 +15,12 @@
"reflect"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/vdl"
"v.io/x/ref/lib/vdl/testdata/arith"
"v.io/x/ref/lib/vdl/testdata/base"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
_ "v.io/x/ref/runtime/factories/generic"
@@ -103,7 +103,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewServer(ctx, "", arith.CalculatorServer(&serverCalculator{}), nil)
+ ctx, server, err := v23.WithNewServer(ctx, "", arith.CalculatorServer(&serverCalculator{}), nil)
if err != nil {
t.Fatal(err)
}
@@ -287,7 +287,7 @@
}
for i, obj := range objects {
- server, err := xrpc.NewServer(ctx, "", obj, nil)
+ ctx, server, err := v23.WithNewServer(ctx, "", obj, nil)
if err != nil {
t.Fatalf("%d: %v", i, err)
}
diff --git a/cmd/vrpc/vrpc_test.go b/cmd/vrpc/vrpc_test.go
index d4ffcac..60fa5aa 100644
--- a/cmd/vrpc/vrpc_test.go
+++ b/cmd/vrpc/vrpc_test.go
@@ -17,7 +17,6 @@
"v.io/x/lib/vlog"
"v.io/x/ref/cmd/vrpc/internal"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
)
@@ -120,7 +119,7 @@
func initTest(t *testing.T) (ctx *context.T, name string, shutdown v23.Shutdown) {
ctx, shutdown = test.V23Init()
obj := internal.TypeTesterServer(&server{})
- server, err := xrpc.NewServer(ctx, "", obj, nil)
+ ctx, server, err := v23.WithNewServer(ctx, "", obj, nil)
if err != nil {
t.Fatalf("NewServer failed: %v", err)
return
diff --git a/examples/fortune/fortuned/impl_test.go b/examples/fortune/fortuned/impl_test.go
index 248bb03..0c6048d 100644
--- a/examples/fortune/fortuned/impl_test.go
+++ b/examples/fortune/fortuned/impl_test.go
@@ -11,7 +11,6 @@
"v.io/v23/context"
"v.io/v23/security"
"v.io/x/ref/examples/fortune"
- "v.io/x/ref/lib/xrpc"
)
func TestGet(t *testing.T) {
@@ -56,7 +55,7 @@
service := fortune.FortuneServer(impl)
name := ""
- server, err := xrpc.NewServer(ctx, name, service, authorizer)
+ ctx, server, err := v23.WithNewServer(ctx, name, service, authorizer)
if err != nil {
t.Errorf("Failure creating server: %v", err)
}
diff --git a/examples/fortune/fortuned/main.go b/examples/fortune/fortuned/main.go
index deb214e..fc21e49 100644
--- a/examples/fortune/fortuned/main.go
+++ b/examples/fortune/fortuned/main.go
@@ -13,7 +13,7 @@
"v.io/v23/security"
"v.io/x/ref/examples/fortune"
"v.io/x/ref/lib/signals"
- "v.io/x/ref/lib/xrpc"
+
// The v23.Init call below will use the generic runtime factory.
_ "v.io/x/ref/runtime/factories/generic"
)
@@ -30,7 +30,7 @@
impl := newImpl()
service := fortune.FortuneServer(impl)
- server, err := xrpc.NewServer(ctx, *name, service, authorizer)
+ ctx, server, err := v23.WithNewServer(ctx, *name, service, authorizer)
if err != nil {
log.Panic("Failure creating server: ", err)
}
diff --git a/examples/rps/rpsbot/impl_test.go b/examples/rps/rpsbot/impl_test.go
index 2e93780..a43e95b 100644
--- a/examples/rps/rpsbot/impl_test.go
+++ b/examples/rps/rpsbot/impl_test.go
@@ -21,7 +21,6 @@
"v.io/v23/context"
"v.io/v23/options"
"v.io/x/ref/examples/rps"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/test"
"v.io/x/ref/test/modules"
@@ -37,7 +36,7 @@
if err != nil {
return fmt.Errorf("mounttablelib.NewMountTableDispatcher failed: %s", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
if err != nil {
return fmt.Errorf("root failed: %v", err)
}
@@ -54,7 +53,7 @@
ns.SetRoots(mtAddress)
rpsService := NewRPS(ctx)
names := []string{"rps/judge/test", "rps/player/test", "rps/scorekeeper/test"}
- server, err := xrpc.NewServer(ctx, names[0], rps.RockPaperScissorsServer(rpsService), nil)
+ ctx, server, err := v23.WithNewServer(ctx, names[0], rps.RockPaperScissorsServer(rpsService), nil)
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/examples/rps/rpsbot/main.go b/examples/rps/rpsbot/main.go
index 86bcef5..547f0bc 100644
--- a/examples/rps/rpsbot/main.go
+++ b/examples/rps/rpsbot/main.go
@@ -16,11 +16,11 @@
"v.io/v23/context"
+ "v.io/v23"
"v.io/x/ref/examples/rps"
"v.io/x/ref/examples/rps/internal"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -61,7 +61,7 @@
fmt.Sprintf("rps/player/%s", name),
fmt.Sprintf("rps/scorekeeper/%s", name),
}
- server, err := xrpc.NewServer(ctx, names[0], rps.RockPaperScissorsServer(rpsService), auth)
+ ctx, server, err := v23.WithNewServer(ctx, names[0], rps.RockPaperScissorsServer(rpsService), auth)
if err != nil {
return fmt.Errorf("NewServer failed: %v", err)
}
diff --git a/examples/rps/rpsplayer/main.go b/examples/rps/rpsplayer/main.go
index ad57b4f..eaf033d 100644
--- a/examples/rps/rpsplayer/main.go
+++ b/examples/rps/rpsplayer/main.go
@@ -15,21 +15,17 @@
"sync"
"time"
- "v.io/x/lib/cmdline"
-
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/vtrace"
-
+ "v.io/x/lib/cmdline"
"v.io/x/ref/examples/rps"
"v.io/x/ref/examples/rps/internal"
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
-
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -132,7 +128,7 @@
fullname := fmt.Sprintf("rps/player/%s", name)
service := rps.PlayerServer(&impl{ch: ch})
auth := internal.NewAuthorizer(aclFile)
- server, err := xrpc.NewServer(ctx, fullname, service, auth)
+ ctx, server, err := v23.WithNewServer(ctx, fullname, service, auth)
if err != nil {
ctx.Fatalf("NewServer failed: %v", err)
}
diff --git a/examples/rps/rpsscorekeeper/main.go b/examples/rps/rpsscorekeeper/main.go
index 6c8fd39..1a03bad 100644
--- a/examples/rps/rpsscorekeeper/main.go
+++ b/examples/rps/rpsscorekeeper/main.go
@@ -17,10 +17,10 @@
"v.io/v23/rpc"
"v.io/v23/security"
+ "v.io/v23"
"v.io/x/ref/examples/rps"
"v.io/x/ref/examples/rps/internal"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -65,7 +65,7 @@
name := fmt.Sprintf("rps/scorekeeper/%s", hostname)
service := rps.ScoreKeeperServer(rpsService)
authorizer := internal.NewAuthorizer(aclFile)
- server, err := xrpc.NewServer(ctx, name, service, authorizer)
+ ctx, server, err := v23.WithNewServer(ctx, name, service, authorizer)
if err != nil {
return fmt.Errorf("NewServer failed: %v", err)
}
diff --git a/examples/tunnel/tunneld/main.go b/examples/tunnel/tunneld/main.go
index 5291e67..c33de73 100644
--- a/examples/tunnel/tunneld/main.go
+++ b/examples/tunnel/tunneld/main.go
@@ -14,11 +14,11 @@
"v.io/v23/context"
+ "v.io/v23"
"v.io/x/ref/examples/tunnel"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -42,7 +42,7 @@
func runTunnelD(ctx *context.T, env *cmdline.Env, args []string) error {
auth := securityflag.NewAuthorizerOrDie()
- server, err := xrpc.NewServer(ctx, name, tunnel.TunnelServer(&T{}), auth)
+ ctx, server, err := v23.WithNewServer(ctx, name, tunnel.TunnelServer(&T{}), auth)
if err != nil {
return fmt.Errorf("NewServer failed: %v", err)
}
diff --git a/lib/security/blessingroots.go b/lib/security/blessingroots.go
index 5c0a7b2..a0b60ee 100644
--- a/lib/security/blessingroots.go
+++ b/lib/security/blessingroots.go
@@ -26,23 +26,15 @@
state blessingRootsState // GUARDED_BY(mu)
}
-func stateMapKey(root security.PublicKey) (string, error) {
- rootBytes, err := root.MarshalBinary()
- if err != nil {
- return "", err
- }
- return string(rootBytes), nil
-}
-
-func (br *blessingRoots) Add(root security.PublicKey, pattern security.BlessingPattern) error {
+func (br *blessingRoots) Add(root []byte, pattern security.BlessingPattern) error {
if pattern == security.AllPrincipals {
return verror.New(errRootsAddPattern, nil)
}
- key, err := stateMapKey(root)
- if err != nil {
+ // Sanity check to avoid invalid keys being added.
+ if _, err := security.UnmarshalPublicKey(root); err != nil {
return err
}
-
+ key := string(root)
br.mu.Lock()
defer br.mu.Unlock()
patterns := br.state[key]
@@ -61,20 +53,23 @@
return nil
}
-func (br *blessingRoots) Recognized(root security.PublicKey, blessing string) error {
- key, err := stateMapKey(root)
- if err != nil {
- return err
- }
-
+func (br *blessingRoots) Recognized(root []byte, blessing string) error {
+ key := string(root)
br.mu.RLock()
- defer br.mu.RUnlock()
for _, p := range br.state[key] {
if p.MatchedBy(blessing) {
+ br.mu.RUnlock()
return nil
}
}
- return security.NewErrUnrecognizedRoot(nil, root.String(), nil)
+ br.mu.RUnlock()
+ // Silly to have to unmarshal the public key on an error.
+ // Change the error message to not require that?
+ obj, err := security.UnmarshalPublicKey(root)
+ if err != nil {
+ return err
+ }
+ return security.NewErrUnrecognizedRoot(nil, obj.String(), nil)
}
func (br *blessingRoots) Dump() map[security.BlessingPattern][]security.PublicKey {
diff --git a/lib/security/blessingroots_test.go b/lib/security/blessingroots_test.go
index ad6afd5..47492e5 100644
--- a/lib/security/blessingroots_test.go
+++ b/lib/security/blessingroots_test.go
@@ -16,15 +16,20 @@
"v.io/v23/verror"
)
-type rootsTester [4]security.PublicKey
+type rootsTester [4][]byte
func newRootsTester() *rootsTester {
var tester rootsTester
- var err error
for idx := range tester {
- if tester[idx], _, err = NewPrincipalKey(); err != nil {
+ key, _, err := NewPrincipalKey()
+ if err != nil {
panic(err)
}
+ keybytes, err := key.MarshalBinary()
+ if err != nil {
+ panic(err)
+ }
+ tester[idx] = keybytes
}
return &tester
}
@@ -34,7 +39,7 @@
return fmt.Errorf("Add( , %v) succeeded, expected it to fail", security.AllPrincipals)
}
testdata := []struct {
- root security.PublicKey
+ root []byte
pattern security.BlessingPattern
}{
{t[0], "vanadium"},
@@ -52,7 +57,7 @@
func (t *rootsTester) testRecognized(br security.BlessingRoots) error {
testdata := []struct {
- root security.PublicKey
+ root []byte
recognized []string
notRecognized []string
}{
@@ -99,10 +104,17 @@
func (s pubKeySorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (t *rootsTester) testDump(br security.BlessingRoots) error {
+ object := func(data []byte) security.PublicKey {
+ ret, err := security.UnmarshalPublicKey(data)
+ if err != nil {
+ panic(err)
+ }
+ return ret
+ }
want := map[security.BlessingPattern][]security.PublicKey{
- "google/foo": []security.PublicKey{t[1], t[2]},
- "google/$": []security.PublicKey{t[0]},
- "vanadium": []security.PublicKey{t[0]},
+ "google/foo": []security.PublicKey{object(t[1]), object(t[2])},
+ "google/$": []security.PublicKey{object(t[0])},
+ "vanadium": []security.PublicKey{object(t[0])},
}
got := br.Dump()
sort.Sort(pubKeySorter(want["google/foo"]))
diff --git a/lib/security/prepare_discharges_test.go b/lib/security/prepare_discharges_test.go
index 292c58e..b7059e2 100644
--- a/lib/security/prepare_discharges_test.go
+++ b/lib/security/prepare_discharges_test.go
@@ -14,7 +14,6 @@
"v.io/v23/rpc"
"v.io/v23/security"
securitylib "v.io/x/ref/lib/security"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
"v.io/x/ref/test/testutil"
@@ -91,7 +90,7 @@
tpid := tpcav.ThirdPartyDetails().ID()
v23.GetPrincipal(dctx)
- _, err = xrpc.NewServer(dctx,
+ dctx, _, err = v23.WithNewServer(dctx,
"discharger",
&expiryDischarger{},
security.AllowEveryone())
diff --git a/lib/signals/signals_test.go b/lib/signals/signals_test.go
index 46bbefb..ae779e6 100644
--- a/lib/signals/signals_test.go
+++ b/lib/signals/signals_test.go
@@ -21,7 +21,6 @@
"v.io/v23/vtrace"
"v.io/x/ref/lib/mgmt"
"v.io/x/ref/lib/security/securityflag"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/device"
"v.io/x/ref/test"
"v.io/x/ref/test/modules"
@@ -349,7 +348,7 @@
defer sh.Cleanup(os.Stderr, os.Stderr)
ch := make(chan string)
- server, err := xrpc.NewServer(ctx, "", device.ConfigServer(&configServer{ch}), securityflag.NewAuthorizerOrDie())
+ ctx, server, err := v23.WithNewServer(ctx, "", device.ConfigServer(&configServer{ch}), securityflag.NewAuthorizerOrDie())
if err != nil {
t.Fatalf("Got error: %v", err)
}
diff --git a/lib/xrpc/xserver.go b/lib/xrpc/xserver.go
index 971d1aa..b06f7e5 100644
--- a/lib/xrpc/xserver.go
+++ b/lib/xrpc/xserver.go
@@ -14,7 +14,7 @@
)
type server struct {
- s rpc.Server
+ s rpc.DeprecatedServer
}
// NewServer creates a new Server instance to serve a service object.
@@ -35,7 +35,7 @@
// Invoker interface, the Invoker is used to invoke methods directly,
// without reflection. If name is an empty string, no attempt will
// made to publish.
-func NewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+func NewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (rpc.Server, error) {
s, err := v23.NewServer(ctx, opts...)
if err != nil {
return nil, err
@@ -68,7 +68,7 @@
// method which will in turn return the object and security.Authorizer
// used to serve the actual RPC call. If name is an empty string, no
// attempt will made to publish that name to a mount table.
-func NewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+func NewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (rpc.Server, error) {
s, err := v23.NewServer(ctx, opts...)
if err != nil {
return nil, err
diff --git a/lib/xrpc/xserver_test.go b/lib/xrpc/xserver_test.go
index 95b5e4b..b799fb4 100644
--- a/lib/xrpc/xserver_test.go
+++ b/lib/xrpc/xserver_test.go
@@ -11,7 +11,6 @@
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
)
@@ -30,7 +29,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewServer(ctx, "", &service{}, nil)
+ ctx, server, err := v23.WithNewServer(ctx, "", &service{}, nil)
if err != nil {
t.Fatalf("Error creating server: %v", err)
}
@@ -55,7 +54,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewDispatchingServer(ctx, "", &dispatcher{})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &dispatcher{})
if err != nil {
t.Fatalf("Error creating server: %v", err)
}
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index 903348f..786b021 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -26,7 +26,7 @@
return c
}
-func (r *Runtime) NewServer(ctx *context.T, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func (r *Runtime) NewServer(ctx *context.T, opts ...rpc.ServerOpt) (rpc.DeprecatedServer, error) {
defer apilog.LogCallf(ctx, "opts...=%v", opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
panic("unimplemented")
}
@@ -62,12 +62,12 @@
panic("unimplemented")
}
-func (r *Runtime) XWithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.XServer, error) {
+func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
panic("unimplemented")
}
-func (r *Runtime) XWithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.XServer, error) {
+func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
panic("unimplemented")
}
diff --git a/runtime/factories/roaming/roaming_server.go b/runtime/factories/roaming/roaming_server.go
index c87d37a..b8e5170 100644
--- a/runtime/factories/roaming/roaming_server.go
+++ b/runtime/factories/roaming/roaming_server.go
@@ -11,7 +11,6 @@
"v.io/v23"
"v.io/v23/rpc"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -19,7 +18,7 @@
ctx, shutdown := v23.Init()
defer shutdown()
- server, err := xrpc.NewServer(ctx, "roamer", &dummy{}, nil)
+ ctx, server, err := v23.WithNewServer(ctx, "roamer", &dummy{}, nil)
if err != nil {
ctx.Fatalf("unexpected error: %q", err)
}
diff --git a/runtime/internal/discovery/advertise.go b/runtime/internal/discovery/advertise.go
index 1422760..f8f156b 100644
--- a/runtime/internal/discovery/advertise.go
+++ b/runtime/internal/discovery/advertise.go
@@ -11,6 +11,23 @@
)
// Advertise implements discovery.Advertiser.
+//
+// TODO(jhahn): Handle ACL.
func (ds *ds) Advertise(ctx *context.T, service discovery.Service, perms access.Permissions) error {
+ if len(service.InstanceUuid) == 0 {
+ service.InstanceUuid = NewInstanceUUID()
+ }
+ ad := &Advertisement{
+ ServiceUuid: NewServiceUUID(service.InterfaceName),
+ Service: service,
+ }
+ ctx, cancel := context.WithCancel(ctx)
+ for _, plugin := range ds.plugins {
+ err := plugin.Advertise(ctx, ad)
+ if err != nil {
+ cancel()
+ return err
+ }
+ }
return nil
}
diff --git a/runtime/internal/discovery/discovery.go b/runtime/internal/discovery/discovery.go
index 53501d7..79329ec 100644
--- a/runtime/internal/discovery/discovery.go
+++ b/runtime/internal/discovery/discovery.go
@@ -4,6 +4,30 @@
package discovery
+import (
+ "github.com/pborman/uuid"
+
+ "v.io/v23/discovery"
+)
+
// ds is an implementation of discovery.T.
type ds struct {
+ plugins []Plugin
+}
+
+// Advertisement holds a set of service properties to advertise.
+type Advertisement struct {
+ discovery.Service
+
+ // The service UUID to advertise.
+ ServiceUuid uuid.UUID
+
+ // TODO(jhahn): Add proximity.
+}
+
+// TODO(jhahn): Need a better API.
+func New(plugins []Plugin) discovery.T {
+ ds := &ds{plugins: make([]Plugin, len(plugins))}
+ copy(ds.plugins, plugins)
+ return ds
}
diff --git a/runtime/internal/discovery/discovery_test.go b/runtime/internal/discovery/discovery_test.go
new file mode 100644
index 0000000..6a4b248
--- /dev/null
+++ b/runtime/internal/discovery/discovery_test.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery_test
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/discovery"
+
+ idiscovery "v.io/x/ref/runtime/internal/discovery"
+ "v.io/x/ref/runtime/internal/discovery/plugins/mock"
+)
+
+func TestBasic(t *testing.T) {
+ ds := idiscovery.New([]idiscovery.Plugin{mock.New()})
+ services := []discovery.Service{
+ {
+ InstanceUuid: idiscovery.NewInstanceUUID(),
+ InterfaceName: "v.io/v23/a",
+ Addrs: []string{"/h1:123/x", "/h2:123/y"},
+ },
+ {
+ InstanceUuid: idiscovery.NewInstanceUUID(),
+ InterfaceName: "v.io/v23/b",
+ Addrs: []string{"/h1:123/x", "/h2:123/z"},
+ },
+ }
+ var stops []func()
+ for _, service := range services {
+ stop, err := advertise(ds, service)
+ if err != nil {
+ t.Fatalf("Advertise failed: %v\n", err)
+ }
+ stops = append(stops, stop)
+ }
+
+ updates, err := scan(ds, "v.io/v23/a")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates, services[0]) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[0])
+ }
+ updates, err = scan(ds, "v.io/v23/b")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates, services[1]) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[1])
+ }
+ updates, err = scan(ds, "")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates, services...) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services)
+ }
+ updates, err = scan(ds, "v.io/v23/c")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+ }
+
+ // Stop advertising the first service. Shouldn't affect the other.
+ stops[0]()
+ updates, err = scan(ds, "v.io/v23/a")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+ }
+ updates, err = scan(ds, "v.io/v23/b")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates, services[1]) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, services[1])
+ }
+ // Stop advertising the other. Now shouldn't discover any service.
+ stops[1]()
+ updates, err = scan(ds, "")
+ if err != nil {
+ t.Fatalf("Scan failed: %v\n", err)
+ }
+ if !match(updates) {
+ t.Errorf("Scan failed; got %v, but wanted %v\n", updates, nil)
+ }
+}
+
+func advertise(ds discovery.Advertiser, services ...discovery.Service) (func(), error) {
+ ctx, cancel := context.RootContext()
+ for _, service := range services {
+ if err := ds.Advertise(ctx, service, nil); err != nil {
+ return nil, err
+ }
+ }
+ return cancel, nil
+}
+
+func scan(ds discovery.Scanner, query string) ([]discovery.Update, error) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ updateCh, err := ds.Scan(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+ var updates []discovery.Update
+ for {
+ select {
+ case update := <-updateCh:
+ updates = append(updates, update)
+ case <-time.After(10 * time.Millisecond):
+ return updates, nil
+ }
+ }
+}
+
+func match(updates []discovery.Update, wants ...discovery.Service) bool {
+ for _, want := range wants {
+ matched := false
+ for i, update := range updates {
+ found, ok := update.(discovery.UpdateFound)
+ if !ok {
+ continue
+ }
+ matched = reflect.DeepEqual(found.Value.Service, want)
+ if matched {
+ updates = append(updates[:i], updates[i+1:]...)
+ break
+ }
+ }
+ if !matched {
+ return false
+ }
+ }
+ return len(updates) == 0
+}
diff --git a/runtime/internal/discovery/plugin.go b/runtime/internal/discovery/plugin.go
index c275d89..57a74c1 100644
--- a/runtime/internal/discovery/plugin.go
+++ b/runtime/internal/discovery/plugin.go
@@ -4,6 +4,24 @@
package discovery
+import (
+ "github.com/pborman/uuid"
+
+ "v.io/v23/context"
+)
+
// Plugin is the basic interface for a plugin to discovery service.
+// All implementation should be goroutine-safe.
type Plugin interface {
+ // Advertise advertises the advertisement. Advertising will continue until
+ // the context is canceled or exceeds its deadline.
+ Advertise(ctx *context.T, ad *Advertisement) error
+
+ // Scan scans services that match the service uuid and returns scanned
+ // advertisements to the channel. A zero-value service uuid means any service.
+ // Scanning will continue until the context is canceled or exceeds its
+ // deadline.
+ //
+ // TODO(jhahn): Pass a filter on service attributes.
+ Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *Advertisement) error
}
diff --git a/runtime/internal/discovery/plugins/mock/mock.go b/runtime/internal/discovery/plugins/mock/mock.go
new file mode 100644
index 0000000..30423da
--- /dev/null
+++ b/runtime/internal/discovery/plugins/mock/mock.go
@@ -0,0 +1,70 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package mock
+
+import (
+ "sync"
+
+ "github.com/pborman/uuid"
+
+ "v.io/v23/context"
+
+ "v.io/x/ref/runtime/internal/discovery"
+)
+
+type plugin struct {
+ mu sync.Mutex
+ services map[string][]*discovery.Advertisement // GUARDED_BY(mu)
+}
+
+func (p *plugin) Advertise(ctx *context.T, ad *discovery.Advertisement) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ key := string(ad.ServiceUuid)
+ ads := p.services[key]
+ p.services[key] = append(ads, ad)
+ go func() {
+ <-ctx.Done()
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ ads := p.services[key]
+ for i, a := range ads {
+ if uuid.Equal(a.InstanceUuid, ad.InstanceUuid) {
+ ads = append(ads[:i], ads[i+1:]...)
+ break
+ }
+ }
+ if len(ads) > 0 {
+ p.services[key] = ads
+ } else {
+ delete(p.services, key)
+ }
+ }()
+ return nil
+}
+
+func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *discovery.Advertisement) error {
+ go func() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ for key, service := range p.services {
+ if len(serviceUuid) > 0 && key != string(serviceUuid) {
+ continue
+ }
+ for _, ad := range service {
+ select {
+ case scanCh <- ad:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }()
+ return nil
+}
+
+func New() discovery.Plugin {
+ return &plugin{services: make(map[string][]*discovery.Advertisement)}
+}
diff --git a/runtime/internal/discovery/scan.go b/runtime/internal/discovery/scan.go
index f955326..9425201 100644
--- a/runtime/internal/discovery/scan.go
+++ b/runtime/internal/discovery/scan.go
@@ -5,11 +5,48 @@
package discovery
import (
+ "github.com/pborman/uuid"
+
"v.io/v23/context"
"v.io/v23/discovery"
)
// Scan implements discovery.Scanner.
func (ds *ds) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
- return nil, nil
+ // TODO(jhann): Implement a simple query processor.
+ var serviceUuid uuid.UUID
+ if len(query) > 0 {
+ serviceUuid = NewServiceUUID(query)
+ }
+ // TODO(jhahn): Revisit the buffer size.
+ scanCh := make(chan *Advertisement, 10)
+ ctx, cancel := context.WithCancel(ctx)
+ for _, plugin := range ds.plugins {
+ err := plugin.Scan(ctx, serviceUuid, scanCh)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+ }
+ // TODO(jhahn): Revisit the buffer size.
+ updateCh := make(chan discovery.Update, 10)
+ go doScan(ctx, scanCh, updateCh)
+ return updateCh, nil
+}
+
+func doScan(ctx *context.T, scanCh <-chan *Advertisement, updateCh chan<- discovery.Update) {
+ defer close(updateCh)
+ for {
+ select {
+ case ad := <-scanCh:
+ // TODO(jhahn): Merge scanData based on InstanceUuid.
+ // TODO(jhahn): Handle "Lost" case.
+ update := discovery.UpdateFound{
+ Value: discovery.Found{Service: ad.Service},
+ }
+ updateCh <- update
+ case <-ctx.Done():
+ return
+ }
+ }
}
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 3b2f121..cf2cb9b 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -305,7 +305,9 @@
b.mu.Lock()
if err != nil {
if err != io.EOF {
- ctx.Errorf("Blessings flow closed: %v", err)
+ // TODO(mattr): In practice this is very spammy,
+ // figure out how to log it more effectively.
+ ctx.VI(3).Infof("Blessings flow closed: %v", err)
}
b.closed = true
b.mu.Unlock()
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index fa8fabf..6d44609 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -80,5 +80,5 @@
}
m, err := message.Read(ctx, msg[:len(msg)-p.cipher.MACSize()])
ctx.VI(2).Infof("Read low-level message: %#v", m)
- return m, nil
+ return m, err
}
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index dc97290..cdb81a7 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -106,6 +106,21 @@
return nil
}
+// InsertWithRoutingID add conn to the cache keyed only by conn's RoutingID.
+func (c *ConnCache) InsertWithRoutingID(conn *conn.Conn) error {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ if c.addrCache == nil {
+ return NewErrCacheClosed(nil)
+ }
+ entry := &connEntry{
+ conn: conn,
+ rid: conn.RemoteEndpoint().RoutingID(),
+ }
+ c.ridCache[entry.rid] = entry
+ return nil
+}
+
// Close marks the ConnCache as closed and closes all Conns in the cache.
func (c *ConnCache) Close(ctx *context.T) {
defer c.mu.Unlock()
@@ -185,13 +200,6 @@
return entry.conn, nil
}
-// Size returns the number of Conns stored in the ConnCache.
-func (c *ConnCache) Size() int {
- defer c.mu.Unlock()
- c.mu.Lock()
- return len(c.addrCache)
-}
-
func key(protocol, address string, blessingNames []string) string {
// TODO(suharshs): We may be able to do something more inclusive with our
// blessingNames.
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 20c1537..ace0f89 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -64,6 +64,25 @@
t.Errorf("got %v, want <nil>, err: %v", got, err)
}
+ // Caching with InsertWithRoutingID should only cache by RoutingID, not with network/address.
+ ridEP := &inaming.Endpoint{
+ Protocol: "ridonly",
+ Address: "ridonly",
+ RID: naming.FixedRoutingID(0x1111),
+ Blessings: []string{"ridonly"},
+ }
+ ridConn := makeConnAndFlow(t, ctx, ridEP).c
+ if err := c.InsertWithRoutingID(ridConn); err != nil {
+ t.Fatal(err)
+ }
+ if got, err := c.ReservedFind(ridEP.Protocol, ridEP.Address, ridEP.Blessings); err != nil || got != nil {
+ t.Errorf("got %v, want <nil>, err: %v", got, err)
+ }
+ c.Unreserve(ridEP.Protocol, ridEP.Address, ridEP.Blessings)
+ if got, err := c.FindWithRoutingID(ridEP.RID); err != nil || got != ridConn {
+ t.Errorf("got %v, want %v, err: %v", got, ridConn, err)
+ }
+
otherEP := &inaming.Endpoint{
Protocol: "other",
Address: "other",
@@ -217,12 +236,12 @@
rep := conn.RemoteEndpoint()
rfconn, err := c.ReservedFind(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
if err != nil {
- t.Errorf("got %v, want %v, err: %v", rfconn, conn, err)
+ t.Error(err)
}
c.Unreserve(rep.Addr().Network(), rep.Addr().String(), rep.BlessingNames())
ridconn, err := c.FindWithRoutingID(rep.RoutingID())
if err != nil {
- t.Errorf("got %v, want %v, err: %v", ridconn, conn, err)
+ t.Error(err)
}
return rfconn != nil || ridconn != nil
}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index af90572..d067aa1 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,6 +26,11 @@
"v.io/x/ref/runtime/internal/rpc/version"
)
+const (
+ clientByte = 'c'
+ serverByte = 's'
+)
+
type manager struct {
rid naming.RoutingID
closed <-chan struct{}
@@ -95,7 +100,7 @@
return nil, flow.NewErrNetwork(ctx, err)
}
// Write to ensure we send an openFlow message.
- if _, err := f.Write([]byte{0}); err != nil {
+ if _, err := f.Write([]byte{serverByte}); err != nil {
return nil, flow.NewErrNetwork(ctx, err)
}
var lep string
@@ -145,7 +150,7 @@
ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
continue
}
- if err := m.cache.Insert(c); err != nil {
+ if err := m.cache.InsertWithRoutingID(c); err != nil {
ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
}
}
@@ -189,7 +194,7 @@
h.ctx.Errorf("failed to create accepted conn: %v", err)
return
}
- if err := h.m.cache.Insert(c); err != nil {
+ if err := h.m.cache.InsertWithRoutingID(c); err != nil {
h.ctx.Errorf("failed to create accepted conn: %v", err)
return
}
@@ -312,6 +317,10 @@
// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
// return a flow on that corresponding conn.
if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
+ // Write to tell the proxy that this should be routed.
+ if _, err := f.Write([]byte{clientByte}); err != nil {
+ return nil, flow.NewErrNetwork(ctx, err)
+ }
c, err = conn.NewDialed(
ctx,
f,
@@ -326,6 +335,9 @@
}
return nil, flow.NewErrDialFailed(ctx, err)
}
+ if err := m.cache.InsertWithRoutingID(c); err != nil {
+ return nil, flow.NewErrBadState(ctx, err)
+ }
f, err = c.Dial(ctx, fn)
if err != nil {
return nil, flow.NewErrDialFailed(ctx, err)
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 0c45db3..7af0053 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -73,18 +73,18 @@
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
// At first the cache should be empty.
- if got, want := dm.(*manager).cache.Size(), 0; got != want {
+ if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing a connection the cache should hold one connection.
dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
- if got, want := dm.(*manager).cache.Size(), 1; got != want {
+ if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing another connection the cache should still hold one connection
// because the connections should be reused.
dialAndAccept(t, ctx, dm, am, eps[0], flowtest.BlessingsForPeer)
- if got, want := dm.(*manager).cache.Size(), 1; got != want {
+ if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
}
diff --git a/runtime/internal/naming/namespace/all_test.go b/runtime/internal/naming/namespace/all_test.go
index 04febd1..3934b14 100644
--- a/runtime/internal/naming/namespace/all_test.go
+++ b/runtime/internal/naming/namespace/all_test.go
@@ -21,8 +21,6 @@
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
-
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
inamespace "v.io/x/ref/runtime/internal/naming/namespace"
"v.io/x/ref/services/mounttable/mounttablelib"
@@ -222,7 +220,7 @@
}
func run(t *testing.T, ctx *context.T, disp rpc.Dispatcher, mountPoint string, mt bool) *serverEntry {
- s, err := xrpc.NewDispatchingServer(ctx, mountPoint, disp, options.ServesMountTable(mt))
+ ctx, s, err := v23.WithNewDispatchingServer(ctx, mountPoint, disp, options.ServesMountTable(mt))
if err != nil {
boom(t, "r.NewServer: %s", err)
}
@@ -775,9 +773,9 @@
ns := v23.GetNamespace(ctx)
ns.SetRoots(root.name)
- server, err := xrpc.NewServer(ctx, "leaf", &leafObject{}, nil)
+ ctx, server, err := v23.WithNewServer(ctx, "leaf", &leafObject{}, nil)
if err != nil {
- boom(t, "xrpc.NewServer: %s", err)
+ boom(t, "v23.WithNewServer: %s", err)
}
defer server.Stop()
diff --git a/runtime/internal/naming/namespace/glob.go b/runtime/internal/naming/namespace/glob.go
index ae24caf..5106b02 100644
--- a/runtime/internal/naming/namespace/glob.go
+++ b/runtime/internal/naming/namespace/glob.go
@@ -13,6 +13,7 @@
"v.io/v23/context"
"v.io/v23/glob"
"v.io/v23/naming"
+ "v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/verror"
"v.io/x/ref/lib/apilog"
@@ -23,12 +24,12 @@
places map[string]struct{}
}
-func (tr *tracks) beenThereDoneThat(servers []string, pstr string) bool {
+func (tr *tracks) beenThereDoneThat(servers []naming.MountedServer, pstr string) bool {
tr.m.Lock()
defer tr.m.Unlock()
found := false
for _, s := range servers {
- x := s + "!" + pstr
+ x := naming.JoinAddressName(s.Server, "") + "!" + pstr
if _, ok := tr.places[x]; ok {
found = true
}
@@ -61,25 +62,25 @@
pstr := t.pattern.String()
ctx.VI(2).Infof("globAtServer(%v, %v)", *t.me, pstr)
- servers := []string{}
- for _, s := range t.me.Servers {
- servers = append(servers, naming.JoinAddressName(s.Server, ""))
- }
-
// If there are no servers to call, this isn't a mount point. No sense
// trying to call servers that aren't there.
- if len(servers) == 0 {
+ if len(t.me.Servers) == 0 {
t.error = nil
return
}
// If we've been there before with the same request, give up.
- if tr.beenThereDoneThat(servers, pstr) {
+ if tr.beenThereDoneThat(t.me.Servers, pstr) {
t.error = nil
return
}
- call, err := ns.parallelStartCall(ctx, client, servers, rpc.GlobMethod, []interface{}{pstr}, opts)
+ // t.me.Name has already been matched at this point to so don't pass it to the Call. Kind of sleazy to do this
+ // but it avoids making yet another copy of the MountEntry.
+ on := t.me.Name
+ t.me.Name = ""
+ call, err := client.StartCall(withTimeout(ctx), "", rpc.GlobMethod, []interface{}{pstr}, append(opts, options.Preresolved{t.me})...)
+ t.me.Name = on
if err != nil {
t.error = err
return
diff --git a/runtime/internal/naming/namespace/mount.go b/runtime/internal/naming/namespace/mount.go
index 884cec1..c431560 100644
--- a/runtime/internal/naming/namespace/mount.go
+++ b/runtime/internal/naming/namespace/mount.go
@@ -11,17 +11,16 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/options"
- "v.io/v23/rpc"
"v.io/v23/security"
"v.io/x/ref/lib/apilog"
)
-// mountIntoMountTable mounts a single server into a single mount table.
-func mountIntoMountTable(ctx *context.T, client rpc.Client, name, server string, ttl time.Duration, flags naming.MountFlag, id string, opts ...rpc.CallOpt) (s status) {
- s.id = id
- ctx = withTimeout(ctx)
- s.err = client.Call(ctx, name, "Mount", []interface{}{server, uint32(ttl.Seconds()), flags}, nil, append(opts, options.NoResolve{})...)
- return
+func (ns *namespace) forget(ctx *context.T, me *naming.MountEntry) {
+ var names []string
+ for _, s := range me.Servers {
+ names = append(names, naming.JoinAddressName(s.Server, me.Name))
+ }
+ ns.resolutionCache.forget(ctx, names)
}
// Mount implements Namespace.Mount.
@@ -45,56 +44,39 @@
}
}
}
-
- client := v23.GetClient(ctx)
- // Mount the server in all the returned mount tables.
- f := func(ctx *context.T, mt, id string) status {
- return mountIntoMountTable(ctx, client, mt, server, ttl, flags, id, getCallOpts(opts)...)
+
+ me, err := ns.ResolveToMountTable(ctx, name, opts...)
+ if err == nil {
+ copts := append(getCallOpts(opts), options.Preresolved{me})
+ err = v23.GetClient(ctx).Call(withTimeout(ctx), name, "Mount", []interface{}{server, uint32(ttl.Seconds()), flags}, nil, copts...)
+ ns.forget(ctx, me)
}
- err := ns.dispatch(ctx, name, f, opts)
ctx.VI(1).Infof("Mount(%s, %q) -> %v", name, server, err)
return err
}
-// unmountFromMountTable removes a single mounted server from a single mount table.
-func unmountFromMountTable(ctx *context.T, client rpc.Client, name, server string, id string, opts ...rpc.CallOpt) (s status) {
- s.id = id
- ctx = withTimeout(ctx)
- s.err = client.Call(ctx, name, "Unmount", []interface{}{server}, nil, append(opts, options.NoResolve{})...)
- return
-}
-
// Unmount implements Namespace.Unmount.
func (ns *namespace) Unmount(ctx *context.T, name, server string, opts ...naming.NamespaceOpt) error {
defer apilog.LogCallf(ctx, "name=%.10s...,server=%.10s...,opts...=%v", name, server, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- // Unmount the server from all the mount tables.
- client := v23.GetClient(ctx)
- f := func(ctx *context.T, mt, id string) status {
- return unmountFromMountTable(ctx, client, mt, server, id, getCallOpts(opts)...)
+ me, err := ns.ResolveToMountTable(ctx, name, opts...)
+ if err == nil {
+ copts := append(getCallOpts(opts), options.Preresolved{me})
+ err = v23.GetClient(ctx).Call(withTimeout(ctx), name, "Unmount", []interface{}{server}, nil, copts...)
+ ns.forget(ctx, me)
}
- err := ns.dispatch(ctx, name, f, opts)
ctx.VI(1).Infof("Unmount(%s, %s) -> %v", name, server, err)
return err
}
-// deleteFromMountTable deletes a name from a single mount table. If there are any children
-// and deleteSubtree isn't true, nothing is deleted.
-func deleteFromMountTable(ctx *context.T, client rpc.Client, name string, deleteSubtree bool, id string, opts ...rpc.CallOpt) (s status) {
- s.id = id
- ctx = withTimeout(ctx)
- s.err = client.Call(ctx, name, "Delete", []interface{}{deleteSubtree}, nil, append(opts, options.NoResolve{})...)
- return
-}
-
-// RDeleteemove implements Namespace.Delete.
+// Delete implements Namespace.Delete.
func (ns *namespace) Delete(ctx *context.T, name string, deleteSubtree bool, opts ...naming.NamespaceOpt) error {
defer apilog.LogCallf(ctx, "name=%.10s...,deleteSubtree=%v,opts...=%v", name, deleteSubtree, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- // Remove from all the mount tables.
- client := v23.GetClient(ctx)
- f := func(ctx *context.T, mt, id string) status {
- return deleteFromMountTable(ctx, client, mt, deleteSubtree, id, getCallOpts(opts)...)
+ me, err := ns.ResolveToMountTable(ctx, name, opts...)
+ if err == nil {
+ copts := append(getCallOpts(opts), options.Preresolved{me})
+ err = v23.GetClient(ctx).Call(withTimeout(ctx), name, "Delete", []interface{}{deleteSubtree}, nil, copts...)
+ ns.forget(ctx, me)
}
- err := ns.dispatch(ctx, name, f, opts)
ctx.VI(1).Infof("Remove(%s, %v) -> %v", name, deleteSubtree, err)
return err
}
diff --git a/runtime/internal/naming/namespace/parallelstartcall.go b/runtime/internal/naming/namespace/parallelstartcall.go
deleted file mode 100644
index 77525a2..0000000
--- a/runtime/internal/naming/namespace/parallelstartcall.go
+++ /dev/null
@@ -1,122 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package namespace
-
-import (
- "v.io/v23/context"
- "v.io/v23/naming"
- "v.io/v23/options"
- "v.io/v23/rpc"
- "v.io/v23/verror"
- inaming "v.io/x/ref/runtime/internal/naming"
-)
-
-type startStatus struct {
- index int
- err error
- call rpc.ClientCall
-}
-
-func tryStartCall(ctx *context.T, client rpc.Client, target, method string, args []interface{}, c chan startStatus, index int, opts ...rpc.CallOpt) {
- call, err := client.StartCall(ctx, target, method, args, append(opts, options.NoResolve{})...)
- c <- startStatus{index: index, err: err, call: call}
-}
-
-// parallelStartCall returns the first succeeding StartCall.
-func (ns *namespace) parallelStartCall(ctx *context.T, client rpc.Client, servers []string, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
- if len(servers) == 0 {
- return nil, verror.New(verror.ErrNoExist, ctx, "no servers to resolve query")
- }
-
- // StartCall to each of the servers.
- c := make(chan startStatus, len(servers))
- cancelFuncs := make([]context.CancelFunc, len(servers))
- for index, server := range servers {
- callCtx, cancel := withTimeoutAndCancel(ctx)
- cancelFuncs[index] = cancel
- go tryStartCall(callCtx, client, server, method, args, c, index, opts...)
- }
-
- // First positive response wins. Cancel the rest. The cancellation
- // will prevent any RPCs from starting or progressing. We do not close
- // the channel since some go routines may still be in flight and want to
- // write status to it. The channel will be garbage collected when all
- // references to it disappear.
- var final startStatus
- for range servers {
- final = <-c
- if final.err == nil {
- cancelFuncs[final.index] = nil
- break
- }
- }
- // Cancel the rest.
- for _, cancel := range cancelFuncs {
- if cancel != nil {
- cancel()
- }
- }
- return final.call, final.err
-}
-
-type status struct {
- id string
- err error
-}
-
-// nameToRID converts a name to a routing ID string. If a routing ID can't be obtained,
-// it just returns the name.
-func nameToRID(name string) string {
- address, _ := naming.SplitAddressName(name)
- if ep, err := inaming.NewEndpoint(address); err == nil {
- return ep.RID.String()
- }
- return name
-}
-
-// collectStati collects n status messages from channel c and returns an error if, for
-// any id, there is no successful reply.
-func collectStati(c chan status, n int) error {
- // Make a map indexed by the routing id (or address if routing id not found) of
- // each mount table. A mount table may be reachable via multiple addresses but
- // each address should have the same routing id. We should only return an error
- // if any of the ids had no successful mounts.
- statusByID := make(map[string]error)
- // Get the status of each request.
- for i := 0; i < n; i++ {
- s := <-c
- if _, ok := statusByID[s.id]; !ok || s.err == nil {
- statusByID[s.id] = s.err
- }
- }
- // Return any error.
- for _, s := range statusByID {
- if s != nil {
- return s
- }
- }
- return nil
-}
-
-// dispatch executes f in parallel for each mount table implementing mTName.
-func (ns *namespace) dispatch(ctx *context.T, mTName string, f func(*context.T, string, string) status, opts []naming.NamespaceOpt) error {
- // Resolve to all the mount tables implementing name.
- me, err := ns.ResolveToMountTable(ctx, mTName, opts...)
- if err != nil {
- return err
- }
- mts := me.Names()
- // Apply f to each of the returned mount tables.
- c := make(chan status, len(mts))
- for _, mt := range mts {
- go func(mt string) {
- c <- f(ctx, mt, nameToRID(mt))
- }(mt)
- }
- finalerr := collectStati(c, len(mts))
- // Forget any previous cached information about these names.
- ns.resolutionCache.forget(ctx, mts)
- return finalerr
-}
diff --git a/runtime/internal/naming/namespace/perms.go b/runtime/internal/naming/namespace/perms.go
index df7302c..aa3ce6c 100644
--- a/runtime/internal/naming/namespace/perms.go
+++ b/runtime/internal/naming/namespace/perms.go
@@ -9,50 +9,34 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/options"
- "v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/x/ref/lib/apilog"
)
-// setPermsInMountTable sets the Permissions in a single server.
-func setPermsInMountTable(ctx *context.T, client rpc.Client, name string, perms access.Permissions, version, id string, opts []rpc.CallOpt) (s status) {
- s.id = id
- ctx = withTimeout(ctx)
- s.err = client.Call(ctx, name, "SetPermissions", []interface{}{perms, version}, nil, append(opts, options.NoResolve{})...)
- return
-}
-
+// SetPermissions implements Namespace.SetPermissions.
func (ns *namespace) SetPermissions(ctx *context.T, name string, perms access.Permissions, version string, opts ...naming.NamespaceOpt) error {
defer apilog.LogCallf(ctx, "name=%.10s...,perms=,version=%.10s...,opts...=%v", name, version, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- client := v23.GetClient(ctx)
- // Apply to all mount tables implementing the name.
- f := func(ctx *context.T, mt, id string) status {
- return setPermsInMountTable(ctx, client, mt, perms, version, id, getCallOpts(opts))
+ me, err := ns.ResolveToMountTable(ctx, name, opts...)
+ if err == nil {
+ copts := append(getCallOpts(opts), options.Preresolved{me})
+ err = v23.GetClient(ctx).Call(withTimeout(ctx), name, "SetPermissions", []interface{}{perms, version}, nil, copts...)
+ ns.forget(ctx, me)
}
- err := ns.dispatch(ctx, name, f, opts)
ctx.VI(1).Infof("SetPermissions(%s, %v, %s) -> %v", name, perms, version, err)
return err
}
-// GetPermissions gets Permissions from a mount table.
+// GetPermissions implements Namespace.GetPermissions.
func (ns *namespace) GetPermissions(ctx *context.T, name string, opts ...naming.NamespaceOpt) (perms access.Permissions, version string, err error) {
defer apilog.LogCallf(ctx, "name=%.10s...,opts...=%v", name, opts)(ctx, "perms=,version=%.10s...,err=%v", &version, &err) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- client := v23.GetClient(ctx)
- // Resolve to all the mount tables implementing name.
- me, rerr := ns.ResolveToMountTable(ctx, name, opts...)
- if rerr != nil {
- err = rerr
- return
+ me, err := ns.ResolveToMountTable(ctx, name, opts...)
+ if err == nil {
+ copts := append(getCallOpts(opts), options.Preresolved{me})
+ err = v23.GetClient(ctx).Call(withTimeout(ctx), name, "GetPermissions", []interface{}{}, []interface{}{&perms, &version}, copts...)
+ ns.forget(ctx, me)
}
- mts := me.Names()
-
- call, serr := ns.parallelStartCall(ctx, client, mts, "GetPermissions", []interface{}{}, getCallOpts(opts))
- if serr != nil {
- err = serr
- return
- }
- err = call.Finish(&perms, &version)
+ ctx.VI(1).Infof("GetPermissions(%s) -> (%v, %v, %v)", name, perms, version, err)
return
}
diff --git a/runtime/internal/naming/namespace/perms_test.go b/runtime/internal/naming/namespace/perms_test.go
index bf2dba4..b66d0ef 100644
--- a/runtime/internal/naming/namespace/perms_test.go
+++ b/runtime/internal/naming/namespace/perms_test.go
@@ -15,8 +15,6 @@
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
-
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/test"
@@ -138,24 +136,18 @@
ns := v23.GetNamespace(rootCtx)
ns.SetRoots("/" + rmtAddr)
- // Create two parallel mount tables.
+ // Create lower mount table.
stop1, mt1Addr := newMT(t, rootCtx)
fmt.Printf("mt1 at %s\n", mt1Addr)
defer stop1()
- stop2, mt2Addr := newMT(t, rootCtx)
- fmt.Printf("mt2 at %s\n", mt2Addr)
- defer stop2()
// Mount them into the root.
if err := ns.Mount(rootCtx, "a/b/c", mt1Addr, 0, naming.ServesMountTable(true)); err != nil {
t.Fatalf("Failed to Mount %s onto a/b/c: %s", "/"+mt1Addr, err)
}
- if err := ns.Mount(rootCtx, "a/b/c", mt2Addr, 0, naming.ServesMountTable(true)); err != nil {
- t.Fatalf("Failed to Mount %s onto a/b/c: %s", "/"+mt2Addr, err)
- }
// Set/Get the mount point's Permissions.
- perms, version, err := ns.GetPermissions(rootCtx, "a/b/c")
+ _, version, err := ns.GetPermissions(rootCtx, "a/b/c")
if err != nil {
t.Fatalf("GetPermissions a/b/c: %s", err)
}
@@ -170,7 +162,7 @@
t.Fatalf("want %v, got %v", openPerms, nacl)
}
- // Now Set/Get the parallel mount point's Permissions.
+ // Now Set/Get the in lower mount table.
name := "a/b/c/d/e"
version = "" // Parallel setperms with any other value is dangerous
if err := ns.SetPermissions(rootCtx, name, openPerms, version); err != nil {
@@ -184,24 +176,6 @@
t.Fatalf("want %v, got %v", openPerms, nacl)
}
- // Get from each server individually to make sure both are set.
- name = naming.Join(mt1Addr, "d/e")
- nacl, _, err = ns.GetPermissions(rootCtx, name)
- if err != nil {
- t.Fatalf("GetPermissions %s: %s", name, err)
- }
- if !reflect.DeepEqual(openPerms, nacl) {
- t.Fatalf("want %v, got %v", openPerms, nacl)
- }
- name = naming.Join(mt2Addr, "d/e")
- nacl, _, err = ns.GetPermissions(rootCtx, name)
- if err != nil {
- t.Fatalf("GetPermissions %s: %s", name, err)
- }
- if !reflect.DeepEqual(openPerms, nacl) {
- t.Fatalf("want %v, got %v", perms, nacl)
- }
-
// Create mount points accessible only by root's key and owner.
name = "a/b/c/d/f"
deadbody := "/the:8888/rain"
@@ -236,8 +210,8 @@
if err := ns.SetPermissions(rootCtx, name, closedPerms, version); err != nil {
t.Fatalf("SetPermissions %s: %s", name, err)
}
- if _, err := xrpc.NewServer(rootCtx, name, &nopServer{1}, nil); err != nil {
- t.Fatalf("v23.NewServer failed: %v", err)
+ if rootCtx, _, err = v23.WithNewServer(rootCtx, name, &nopServer{1}, nil); err != nil {
+ t.Fatalf("v23.WithNewServer failed: %v", err)
}
// Alice shouldn't be able to resolve it.
_, err = v23.GetNamespace(aliceCtx).Resolve(aliceCtx, name)
diff --git a/runtime/internal/naming/namespace/resolve.go b/runtime/internal/naming/namespace/resolve.go
index 7f96c8d..b872000 100644
--- a/runtime/internal/naming/namespace/resolve.go
+++ b/runtime/internal/naming/namespace/resolve.go
@@ -5,7 +5,6 @@
package namespace
import (
- "errors"
"runtime"
"strings"
@@ -18,58 +17,76 @@
"v.io/x/ref/lib/apilog"
)
+var (
+ errNoServers = verror.Register(pkgPath+".errNoServers", verror.NoRetry, "{1} {2} No servers found to resolve query {_}")
+)
+
+// resolveAgainstMountTable asks each server in e.Servers that might be a mounttable to resolve e.Name. The requests
+// are parallelized by the client rpc code.
func (ns *namespace) resolveAgainstMountTable(ctx *context.T, client rpc.Client, e *naming.MountEntry, opts ...rpc.CallOpt) (*naming.MountEntry, error) {
- // Try each server till one answers.
- finalErr := errors.New("no servers to resolve query")
- opts = append(opts, options.NoResolve{})
+ // Run through the server list looking for answers in the cache or servers that aren't mounttables.
+ change := false
for _, s := range e.Servers {
// If the server was not specified as an endpoint (perhaps as host:port)
// then we really don't know if this is a mounttable or not. Check the
// cache to see if we've tried in the recent past and it came back as not
// a mounttable.
if ns.resolutionCache.isNotMT(s.Server) {
- finalErr = verror.New(verror.ErrUnknownMethod, nil, "ResolveStep")
+ change = true
continue
}
- // Assume a mount table and make the call.
- name := naming.JoinAddressName(s.Server, e.Name)
- // First check the cache.
- if ne, err := ns.resolutionCache.lookup(ctx, name); err == nil {
- ctx.VI(2).Infof("resolveAMT %s from cache -> %v", name, convertServersToStrings(ne.Servers, ne.Name))
+
+ // Check the cache. If its there, we're done.
+ n := naming.JoinAddressName(s.Server, e.Name)
+ if ne, err := ns.resolutionCache.lookup(ctx, n); err == nil {
+ ctx.VI(2).Infof("resolveAMT %s from cache -> %v", n, convertServersToStrings(ne.Servers, ne.Name))
return &ne, nil
}
- // Not in cache, call the real server.
- callCtx := ctx
- if _, hasDeadline := ctx.Deadline(); !hasDeadline {
- // Only set a per-call timeout if a deadline has not already
- // been set.
- callCtx = withTimeout(ctx)
- }
- entry := new(naming.MountEntry)
- if err := client.Call(callCtx, name, "ResolveStep", nil, []interface{}{entry}, opts...); err != nil {
- // If any replica says the name doesn't exist, return that fact.
- if verror.ErrorID(err) == naming.ErrNoSuchName.ID || verror.ErrorID(err) == naming.ErrNoSuchNameRoot.ID {
- return nil, err
+ }
+ // We had at least one server that wasn't a mount table. Create a new mount entry without those servers.
+ if change {
+ ne := *e
+ ne.Servers = nil
+ for _, s := range e.Servers {
+ if !ns.resolutionCache.isNotMT(s.Server) {
+ ne.Servers = append(ne.Servers, s)
}
- // If it wasn't a mounttable remember that fact. The check for the __ is for
- // the debugging hack in the local namespace of every server. That part never
- // answers mounttable RPCs and shouldn't make us think this isn't a mounttable
- // server.
- if notAnMT(err) && !strings.HasPrefix(e.Name, "__") {
+ }
+ e = &ne
+ }
+ // If we have no servers to query, give up.
+ if len(e.Servers) == 0 {
+ ctx.VI(2).Infof("resolveAMT %s -> No servers", e.Name)
+ return nil, verror.New(errNoServers, ctx)
+ }
+ // We have preresolved the servers. Pass the mount entry to the call.
+ opts = append(opts, options.Preresolved{e})
+ callCtx := ctx
+ if _, hasDeadline := ctx.Deadline(); !hasDeadline {
+ // Only set a per-call timeout if a deadline has not already
+ // been set.
+ callCtx = withTimeout(ctx)
+ }
+ entry := new(naming.MountEntry)
+ if err := client.Call(callCtx, e.Name, "ResolveStep", nil, []interface{}{entry}, opts...); err != nil {
+ // If it wasn't a mounttable remember that fact. The check for the __ is for
+ // the debugging hack in the local namespace of every server. That part never
+ // answers mounttable RPCs and shouldn't make us think this isn't a mounttable
+ // server.
+ if notAnMT(err) && !strings.HasPrefix(e.Name, "__") {
+ for _, s := range e.Servers {
ns.resolutionCache.setNotMT(s.Server)
}
- // Keep track of the final error and continue with next server.
- finalErr = err
- ctx.VI(2).Infof("resolveAMT: Finish %s failed: %s", name, err)
- continue
}
- // Add result to cache.
- ns.resolutionCache.remember(ctx, name, entry)
- ctx.VI(2).Infof("resolveAMT %s -> %v", name, entry)
- return entry, nil
+ return nil, err
}
- ctx.VI(2).Infof("resolveAMT %v -> %v", e.Servers, finalErr)
- return nil, finalErr
+ // Add result to cache for each server that may have returned it.
+ for _, s := range e.Servers {
+ n := naming.JoinAddressName(s.Server, e.Name)
+ ns.resolutionCache.remember(ctx, n, entry)
+ }
+ ctx.VI(2).Infof("resolveAMT %s -> %v", e.Name, entry)
+ return entry, nil
}
func terminal(e *naming.MountEntry) bool {
@@ -79,13 +96,20 @@
// Resolve implements v.io/v23/naming.Namespace.
func (ns *namespace) Resolve(ctx *context.T, name string, opts ...naming.NamespaceOpt) (*naming.MountEntry, error) {
defer apilog.LogCallf(ctx, "name=%.10s...,opts...=%v", name, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- e, _ := ns.rootMountEntry(name, opts...)
+ // If caller supplied a mount entry, use it.
+ e, skipResolution := preresolved(opts)
+ if e != nil {
+ return e, nil
+ }
+ // Expand any relative name.
+ e, _ = ns.rootMountEntry(name, opts...)
if ctx.V(2) {
_, file, line, _ := runtime.Caller(1)
ctx.Infof("Resolve(%s) called from %s:%d", name, file, line)
- ctx.Infof("Resolve(%s) -> rootMountEntry %v", name, *e)
+ ctx.Infof("Resolve(%s) -> rootEntry %v", name, *e)
}
- if skipResolve(opts) {
+ // If caller didn't want resolution, use expanded name.
+ if skipResolution {
return e, nil
}
if len(e.Servers) == 0 {
@@ -194,11 +218,11 @@
return flushed
}
-func skipResolve(opts []naming.NamespaceOpt) bool {
+func preresolved(opts []naming.NamespaceOpt) (*naming.MountEntry, bool) {
for _, o := range opts {
- if _, ok := o.(options.NoResolve); ok {
- return true
+ if v, ok := o.(options.Preresolved); ok {
+ return v.Resolution, true
}
}
- return false
+ return nil, false
}
diff --git a/runtime/internal/rpc/benchmark/benchmark_test.go b/runtime/internal/rpc/benchmark/benchmark_test.go
index 22f63b9..3107ce6 100644
--- a/runtime/internal/rpc/benchmark/benchmark_test.go
+++ b/runtime/internal/rpc/benchmark/benchmark_test.go
@@ -11,7 +11,6 @@
"v.io/v23"
"v.io/v23/context"
"v.io/x/ref/lib/security/securityflag"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/static"
"v.io/x/ref/runtime/internal/rpc/benchmark/internal"
"v.io/x/ref/test"
@@ -113,7 +112,7 @@
var shutdown v23.Shutdown
ctx, shutdown = test.V23Init()
- server, err := xrpc.NewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
+ ctx, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
if err != nil {
ctx.Fatalf("NewServer failed: %v", err)
}
diff --git a/runtime/internal/rpc/benchmark/benchmarkd/main.go b/runtime/internal/rpc/benchmark/benchmarkd/main.go
index f20bb3c..69679af 100644
--- a/runtime/internal/rpc/benchmark/benchmarkd/main.go
+++ b/runtime/internal/rpc/benchmark/benchmarkd/main.go
@@ -12,10 +12,10 @@
"v.io/v23/context"
+ "v.io/v23"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/runtime/internal/rpc/benchmark/internal"
)
@@ -33,7 +33,7 @@
}
func runBenchmarkD(ctx *context.T, env *cmdline.Env, args []string) error {
- server, err := xrpc.NewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
+ ctx, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
if err != nil {
ctx.Fatalf("NewServer failed: %v", err)
}
diff --git a/runtime/internal/rpc/benchmark/simple/main.go b/runtime/internal/rpc/benchmark/simple/main.go
index 8173e82..c904a7d 100644
--- a/runtime/internal/rpc/benchmark/simple/main.go
+++ b/runtime/internal/rpc/benchmark/simple/main.go
@@ -14,13 +14,13 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
-
"v.io/x/ref/lib/security/securityflag"
- "v.io/x/ref/lib/xrpc"
-
_ "v.io/x/ref/runtime/factories/static"
+ "v.io/x/ref/runtime/internal/flow/flowtest"
+ fmanager "v.io/x/ref/runtime/internal/flow/manager"
"v.io/x/ref/runtime/internal/rpc/benchmark/internal"
"v.io/x/ref/runtime/internal/rpc/stream/manager"
+ "v.io/x/ref/runtime/internal/rt"
"v.io/x/ref/test"
"v.io/x/ref/test/benchmark"
"v.io/x/ref/test/testutil"
@@ -52,19 +52,28 @@
principal := testutil.NewPrincipal("test")
nctx, _ := v23.WithPrincipal(ctx, principal)
+ b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
- client := manager.InternalNew(ctx, naming.FixedRoutingID(0xc))
-
- b.StartTimer()
- _, err := client.Dial(nctx, serverEP)
- if err != nil {
- ctx.Fatalf("Dial failed: %v", err)
+ if rt.TransitionState >= rt.XServers {
+ m := fmanager.New(nctx, naming.FixedRoutingID(0xc))
+ b.StartTimer()
+ _, err := m.Dial(nctx, serverEP, flowtest.BlessingsForPeer)
+ if err != nil {
+ ctx.Fatalf("Dial failed: %v", err)
+ }
+ b.StopTimer()
+ // TODO(mattr): close m.
+ } else {
+ client := manager.InternalNew(ctx, naming.FixedRoutingID(0xc))
+ b.StartTimer()
+ _, err := client.Dial(nctx, serverEP)
+ if err != nil {
+ ctx.Fatalf("Dial failed: %v", err)
+ }
+ b.StopTimer()
+ client.Shutdown()
}
-
- b.StopTimer()
-
- client.Shutdown()
}
}
@@ -126,7 +135,7 @@
ctx, shutdown = test.V23Init()
defer shutdown()
- server, err := xrpc.NewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
+ ctx, server, err := v23.WithNewServer(ctx, "", internal.NewService(), securityflag.NewAuthorizerOrDie())
if err != nil {
ctx.Fatalf("NewServer failed: %v", err)
}
diff --git a/runtime/internal/rpc/debug_test.go b/runtime/internal/rpc/debug_test.go
index 26aabf6..24873ad 100644
--- a/runtime/internal/rpc/debug_test.go
+++ b/runtime/internal/rpc/debug_test.go
@@ -79,7 +79,7 @@
foo.Set("The quick brown fox jumps over the lazy dog")
addr := naming.JoinAddressName(ep.String(), "__debug/stats/testing/foo")
var value string
- if err := client.Call(cctx, addr, "Value", nil, []interface{}{&value}, options.NoResolve{}); err != nil {
+ if err := client.Call(cctx, addr, "Value", nil, []interface{}{&value}, options.Preresolved{}); err != nil {
t.Fatalf("client.Call failed: %v", err)
}
if want := foo.Value(); value != want {
@@ -99,7 +99,7 @@
}
for _, tc := range testcases {
addr := naming.JoinAddressName(ep.String(), tc.name)
- call, err := client.StartCall(cctx, addr, rpc.GlobMethod, []interface{}{tc.pattern}, options.NoResolve{})
+ call, err := client.StartCall(cctx, addr, rpc.GlobMethod, []interface{}{tc.pattern}, options.Preresolved{})
if err != nil {
t.Fatalf("client.StartCall failed for %q: %v", tc.name, err)
}
diff --git a/runtime/internal/rpc/full_test.go b/runtime/internal/rpc/full_test.go
index 2da28c4..d42c249 100644
--- a/runtime/internal/rpc/full_test.go
+++ b/runtime/internal/rpc/full_test.go
@@ -78,7 +78,7 @@
c.Unlock()
}
-func testInternalNewServerWithPubsub(ctx *context.T, streamMgr stream.Manager, ns namespace.T, settingsPublisher *pubsub.Publisher, settingsStreamName string, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func testInternalNewServerWithPubsub(ctx *context.T, streamMgr stream.Manager, ns namespace.T, settingsPublisher *pubsub.Publisher, settingsStreamName string, opts ...rpc.ServerOpt) (rpc.DeprecatedServer, error) {
client, err := InternalNewClient(streamMgr, ns)
if err != nil {
return nil, err
@@ -86,7 +86,7 @@
return InternalNewServer(ctx, streamMgr, ns, settingsPublisher, settingsStreamName, client, opts...)
}
-func testInternalNewServer(ctx *context.T, streamMgr stream.Manager, ns namespace.T, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func testInternalNewServer(ctx *context.T, streamMgr stream.Manager, ns namespace.T, opts ...rpc.ServerOpt) (rpc.DeprecatedServer, error) {
return testInternalNewServerWithPubsub(ctx, streamMgr, ns, nil, "", opts...)
}
diff --git a/runtime/internal/rpc/options.go b/runtime/internal/rpc/options.go
index 4971e1c..09da261 100644
--- a/runtime/internal/rpc/options.go
+++ b/runtime/internal/rpc/options.go
@@ -53,7 +53,8 @@
func getNoNamespaceOpt(opts []rpc.CallOpt) bool {
for _, o := range opts {
- if _, ok := o.(options.NoResolve); ok {
+ switch o.(type) {
+ case options.Preresolved:
return true
}
}
diff --git a/runtime/internal/rpc/resolve_test.go b/runtime/internal/rpc/resolve_test.go
index da3649f..8f325c7 100644
--- a/runtime/internal/rpc/resolve_test.go
+++ b/runtime/internal/rpc/resolve_test.go
@@ -16,9 +16,7 @@
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
-
"v.io/x/ref/lib/flags"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/runtime/factories/fake"
"v.io/x/ref/runtime/internal"
"v.io/x/ref/runtime/internal/lib/appcycle"
@@ -76,7 +74,7 @@
if err != nil {
return fmt.Errorf("mounttablelib.NewMountTableDispatcher failed: %s", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, mp, mt, options.ServesMountTable(true))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, mp, mt, options.ServesMountTable(true))
if err != nil {
return fmt.Errorf("root failed: %v", err)
}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 52be186..f93c7f4 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -172,7 +172,7 @@
return s.state == stopping || s.state == stopped
}
-var _ rpc.Server = (*server)(nil)
+var _ rpc.DeprecatedServer = (*server)(nil)
func InternalNewServer(
ctx *context.T,
@@ -181,7 +181,7 @@
settingsPublisher *pubsub.Publisher,
settingsName string,
client rpc.Client,
- opts ...rpc.ServerOpt) (rpc.Server, error) {
+ opts ...rpc.ServerOpt) (rpc.DeprecatedServer, error) {
ctx, cancel := context.WithRootCancel(ctx)
ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
@@ -1338,7 +1338,7 @@
//nologcall
return fs.discharges
}
-func (fs *flowServer) Server() rpc.XServer {
+func (fs *flowServer) Server() rpc.Server {
//nologcall
return fs.server
}
diff --git a/runtime/internal/rpc/stress/mtstress/main.go b/runtime/internal/rpc/stress/mtstress/main.go
index b00d77e..e2d3b3a 100644
--- a/runtime/internal/rpc/stress/mtstress/main.go
+++ b/runtime/internal/rpc/stress/mtstress/main.go
@@ -62,7 +62,7 @@
// point should generate random test data -
// mountpoints at different depths and the like
start := time.Now()
- if err := v23.GetClient(ctx).Call(ctx, mountpoint, "Mount", []interface{}{ep, uint32(ttl / time.Second), 0}, nil, options.NoResolve{}); err != nil {
+ if err := v23.GetClient(ctx).Call(ctx, mountpoint, "Mount", []interface{}{ep, uint32(ttl / time.Second), 0}, nil, options.Preresolved{}); err != nil {
return 0, err
}
return time.Since(start), nil
@@ -93,7 +93,7 @@
resolve := func(ctx *context.T) (time.Duration, error) {
var entry naming.MountEntry
start := time.Now()
- if err := v23.GetClient(ctx).Call(ctx, name, "ResolveStep", nil, []interface{}{&entry}, options.NoResolve{}); err != nil && verror.ErrorID(err) != naming.ErrNoSuchName.ID {
+ if err := v23.GetClient(ctx).Call(ctx, name, "ResolveStep", nil, []interface{}{&entry}, options.Preresolved{}); err != nil && verror.ErrorID(err) != naming.ErrNoSuchName.ID {
// ErrNoSuchName is fine, it just means
// that the mounttable server did not
// find an entry in its tables.
diff --git a/runtime/internal/rpc/stress/stressd/main.go b/runtime/internal/rpc/stress/stressd/main.go
index 1adbceb..e6c21fa 100644
--- a/runtime/internal/rpc/stress/stressd/main.go
+++ b/runtime/internal/rpc/stress/stressd/main.go
@@ -11,12 +11,12 @@
"runtime"
"time"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/security"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/static"
"v.io/x/ref/runtime/internal/rpc/stress/internal"
)
@@ -40,7 +40,7 @@
runtime.GOMAXPROCS(runtime.NumCPU())
service, stop := internal.NewService()
- server, err := xrpc.NewServer(ctx, "", service, security.AllowEveryone())
+ ctx, server, err := v23.WithNewServer(ctx, "", service, security.AllowEveryone())
if err != nil {
ctx.Fatalf("NewServer failed: %v", err)
}
diff --git a/runtime/internal/rpc/test/client_test.go b/runtime/internal/rpc/test/client_test.go
index ecb9138..b963d93 100644
--- a/runtime/internal/rpc/test/client_test.go
+++ b/runtime/internal/rpc/test/client_test.go
@@ -25,7 +25,6 @@
"v.io/x/ref"
"v.io/x/ref/internal/logger"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
inaming "v.io/x/ref/runtime/internal/naming"
irpc "v.io/x/ref/runtime/internal/rpc"
@@ -55,7 +54,7 @@
if err != nil {
return fmt.Errorf("mounttablelib.NewMountTableDispatcher failed: %s", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", mt, options.ServesMountTable(true), seclevel)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", mt, options.ServesMountTable(true), seclevel)
if err != nil {
return fmt.Errorf("root failed: %v", err)
}
@@ -99,7 +98,7 @@
id, mp := args[0], args[1]
disp := &treeDispatcher{id: id}
- server, err := xrpc.NewDispatchingServer(ctx, mp, disp)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, mp, disp)
if err != nil {
return err
}
@@ -514,7 +513,7 @@
func initServer(t *testing.T, ctx *context.T, opts ...rpc.ServerOpt) (string, func()) {
done := make(chan struct{})
- server, err := xrpc.NewServer(ctx, "", &simple{done}, nil, opts...)
+ ctx, server, err := v23.WithNewServer(ctx, "", &simple{done}, nil, opts...)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
diff --git a/runtime/internal/rpc/test/glob_test.go b/runtime/internal/rpc/test/glob_test.go
index d881b67..da4d6ba 100644
--- a/runtime/internal/rpc/test/glob_test.go
+++ b/runtime/internal/rpc/test/glob_test.go
@@ -11,6 +11,7 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/glob"
"v.io/v23/i18n"
@@ -19,7 +20,6 @@
"v.io/v23/rpc/reserved"
"v.io/v23/security"
"v.io/v23/verror"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
"v.io/x/ref/test/testutil"
@@ -42,7 +42,7 @@
tree.find(strings.Split(p, "/"), true)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", &disp{tree})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &disp{tree})
if err != nil {
t.Fatalf("failed to start debug server: %v", err)
}
@@ -198,7 +198,7 @@
tree.find([]string{"a", "b"}, true)
tree.find([]string{"a", "deny", "x"}, true)
- server, err := xrpc.NewDispatchingServer(ctx, "", &disp{tree})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &disp{tree})
if err != nil {
t.Fatalf("failed to start debug server: %v", err)
}
diff --git a/runtime/internal/rpc/test/retry_test.go b/runtime/internal/rpc/test/retry_test.go
index 736f45e..bb4d533 100644
--- a/runtime/internal/rpc/test/retry_test.go
+++ b/runtime/internal/rpc/test/retry_test.go
@@ -13,7 +13,6 @@
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
- "v.io/x/ref/lib/xrpc"
)
var errRetryThis = verror.Register("retry_test.retryThis", verror.RetryBackoff, "retryable error")
@@ -39,7 +38,7 @@
// Start the server.
rs := retryServer{}
- server, err := xrpc.NewServer(ctx, "", &rs, security.AllowEveryone())
+ ctx, server, err := v23.WithNewServer(ctx, "", &rs, security.AllowEveryone())
if err != nil {
t.Fatal(err)
}
diff --git a/runtime/internal/rpc/test/signature_test.go b/runtime/internal/rpc/test/signature_test.go
index 8c3c492..7ffe035 100644
--- a/runtime/internal/rpc/test/signature_test.go
+++ b/runtime/internal/rpc/test/signature_test.go
@@ -8,13 +8,12 @@
"reflect"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/rpc/reserved"
"v.io/v23/vdl"
"v.io/v23/vdlroot/signature"
-
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
)
@@ -45,7 +44,7 @@
func TestMethodSignature(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewServer(ctx, "", sigImpl{}, nil)
+ ctx, server, err := v23.WithNewServer(ctx, "", sigImpl{}, nil)
if err != nil {
t.Fatalf("failed to start sig server: %v", err)
}
@@ -90,7 +89,7 @@
func TestSignature(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewServer(ctx, "", sigImpl{}, nil)
+ ctx, server, err := v23.WithNewServer(ctx, "", sigImpl{}, nil)
if err != nil {
t.Fatalf("failed to start sig server: %v", err)
}
diff --git a/runtime/internal/rpc/transitionclient.go b/runtime/internal/rpc/transitionclient.go
index 41f675e..0c41746 100644
--- a/runtime/internal/rpc/transitionclient.go
+++ b/runtime/internal/rpc/transitionclient.go
@@ -5,9 +5,8 @@
package rpc
import (
- "strings"
-
"v.io/v23/context"
+ "v.io/v23/flow"
"v.io/v23/flow/message"
"v.io/v23/namespace"
"v.io/v23/rpc"
@@ -21,14 +20,12 @@
var _ = rpc.Client((*transitionClient)(nil))
-func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, flowMgr flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
var err error
ret := &transitionClient{}
- // TODO(mattr): Un-comment this once servers are sending setups before closing
- // connections in error cases.
- // if ret.xc, err = InternalNewXClient(ctx, opts...); err != nil {
- // return nil, err
- // }
+ if ret.xc, err = NewXClient(ctx, flowMgr, ns, opts...); err != nil {
+ return nil, err
+ }
if ret.c, err = InternalNewClient(streamMgr, ns, opts...); err != nil {
ret.xc.Close()
return nil, err
@@ -37,12 +34,6 @@
}
func (t *transitionClient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
- // The agent cannot reconnect, and it's never going to transition to the new
- // rpc system. Instead it's moving off of rpc entirely. For now we detect
- // and send it to the old rpc system.
- if t.xc == nil || strings.HasPrefix(name, "/@5@unixfd@") || strings.HasPrefix(name, "/@6@unixfd@") {
- return t.c.StartCall(ctx, name, method, args, opts...)
- }
call, err := t.xc.StartCall(ctx, name, method, args, opts...)
if verror.ErrorID(err) == message.ErrWrongProtocol.ID {
call, err = t.c.StartCall(ctx, name, method, args, opts...)
@@ -51,12 +42,6 @@
}
func (t *transitionClient) Call(ctx *context.T, name, method string, in, out []interface{}, opts ...rpc.CallOpt) error {
- // The agent cannot reconnect, and it's never going to transition to the new
- // rpc system. Instead it's moving off of rpc entirely. For now we detect
- // and send it to the old rpc system.
- if t.xc == nil || strings.HasPrefix(name, "/@5@unixfd@") || strings.HasPrefix(name, "/@6@unixfd@") {
- return t.c.Call(ctx, name, method, in, out, opts...)
- }
err := t.xc.Call(ctx, name, method, in, out, opts...)
if verror.ErrorID(err) == message.ErrWrongProtocol.ID {
err = t.c.Call(ctx, name, method, in, out, opts...)
@@ -65,8 +50,6 @@
}
func (t *transitionClient) Close() {
- if t.xc != nil {
- t.xc.Close()
- }
+ t.xc.Close()
t.c.Close()
}
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 7284c05..e36c3f9 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -34,7 +34,7 @@
if err != nil {
t.Fatal(verror.DebugString(err))
}
- client, err := NewXClient(ctx)
+ client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
if err != nil {
t.Fatal(verror.DebugString(err))
}
@@ -64,7 +64,7 @@
if err != nil {
t.Fatal(verror.DebugString(err))
}
- client, err := NewXClient(ctx)
+ client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
if err != nil {
t.Fatal(verror.DebugString(err))
}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 1cd90dd..afde0e4 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -46,10 +46,10 @@
var _ rpc.Client = (*xclient)(nil)
-func NewXClient(ctx *context.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewXClient(ctx *context.T, fm flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
c := &xclient{
- flowMgr: v23.ExperimentalGetFlowManager(ctx),
- ns: v23.GetNamespace(ctx),
+ flowMgr: fm,
+ ns: ns,
}
ipNets, err := ipNetworks()
if err != nil {
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index d466475..1bdf4a4 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -74,7 +74,7 @@
stats *rpcStats // stats for this server.
}
-func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.Server, error) {
if object == nil {
return nil, verror.New(verror.ErrBadArg, ctx, "nil object")
}
@@ -87,7 +87,7 @@
return NewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
}
-func NewDispatchingServer(ctx *context.T, name string, dispatcher rpc.Dispatcher, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+func NewDispatchingServer(ctx *context.T, name string, dispatcher rpc.Dispatcher, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.Server, error) {
if dispatcher == nil {
return nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
}
@@ -151,7 +151,11 @@
}
func (s *xserver) Status() rpc.ServerStatus {
- return rpc.ServerStatus{}
+ ret := rpc.ServerStatus{}
+ for _, e := range s.chosenEndpoints {
+ ret.Endpoints = append(ret.Endpoints, e)
+ }
+ return ret
}
func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) {
@@ -764,9 +768,9 @@
//nologcall
return fs.flow.RemoteDischarges()
}
-func (fs *xflowServer) Server() rpc.XServer {
+func (fs *xflowServer) Server() rpc.Server {
//nologcall
- return nil // TODO(toddw): Change return to rpc.XServer
+ return nil // TODO(toddw): Change return to rpc.Server
}
func (fs *xflowServer) Timestamp() time.Time {
//nologcall
diff --git a/runtime/internal/rt/ipc_test.go b/runtime/internal/rt/ipc_test.go
index 23d320b..1c14c4e 100644
--- a/runtime/internal/rt/ipc_test.go
+++ b/runtime/internal/rt/ipc_test.go
@@ -18,7 +18,6 @@
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
"v.io/x/ref/test"
@@ -137,7 +136,7 @@
t.Errorf("pserver.SetDefault(%v) failed: %v", test.server, err)
continue
}
- server, err := xrpc.NewServer(serverCtx, "", testService{}, security.AllowEveryone())
+ _, server, err := v23.WithNewServer(serverCtx, "", testService{}, security.AllowEveryone())
if err != nil {
t.Fatal(err)
}
@@ -186,7 +185,7 @@
t.Fatal(err)
}
for idx, test := range tests {
- server, err := xrpc.NewServer(ctx, "", testService{}, nil, test.opts...)
+ _, server, err := v23.WithNewServer(ctx, "", testService{}, nil, test.opts...)
if err != nil {
t.Errorf("test #%d: %v", idx, err)
continue
@@ -258,7 +257,7 @@
t.Fatal(err)
}
ds := &dischargeService{}
- server, err := xrpc.NewServer(dischargerCtx, "", ds, security.AllowEveryone())
+ dischargerCtx, server, err := v23.WithNewServer(dischargerCtx, "", ds, security.AllowEveryone())
if err != nil {
t.Fatal(err)
}
@@ -267,7 +266,7 @@
if err := root.Bless(pserver, "server", mkThirdPartyCaveat(pdischarger.PublicKey(), dischargeServerName)); err != nil {
t.Fatal(err)
}
- server, err = xrpc.NewServer(serverCtx, "", testService{}, security.AllowEveryone(), vc.DischargeExpiryBuffer(10*time.Millisecond))
+ serverCtx, server, err = v23.WithNewServer(serverCtx, "", testService{}, security.AllowEveryone(), vc.DischargeExpiryBuffer(10*time.Millisecond))
if err != nil {
t.Fatal(err)
}
diff --git a/runtime/internal/rt/mgmt_test.go b/runtime/internal/rt/mgmt_test.go
index ad1f650..0c48e4a 100644
--- a/runtime/internal/rt/mgmt_test.go
+++ b/runtime/internal/rt/mgmt_test.go
@@ -18,7 +18,6 @@
"v.io/v23/services/appcycle"
"v.io/x/ref/lib/mgmt"
"v.io/x/ref/lib/security/securityflag"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/device"
"v.io/x/ref/test"
"v.io/x/ref/test/expect"
@@ -278,7 +277,7 @@
ch := make(chan string)
service := device.ConfigServer(&configServer{ch})
authorizer := securityflag.NewAuthorizerOrDie()
- configServer, err := xrpc.NewServer(ctx, "", service, authorizer)
+ ctx, configServer, err := v23.WithNewServer(ctx, "", service, authorizer)
if err != nil {
t.Fatalf("Got error: %v", err)
}
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 07a83af..f3452b0 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -43,6 +43,27 @@
ivtrace "v.io/x/ref/runtime/internal/vtrace"
)
+const (
+ None = iota
+ XClients
+ XServers
+)
+
+var TransitionState = None
+
+func init() {
+ switch ts := os.Getenv("V23_RPC_TRANSITION_STATE"); ts {
+ case "xclients":
+ TransitionState = XClients
+ case "xservers":
+ TransitionState = XServers
+ case "":
+ TransitionState = None
+ default:
+ panic("Unknown transition state: " + ts)
+ }
+}
+
type contextKey int
const (
@@ -233,7 +254,7 @@
return inaming.NewEndpoint(ep)
}
-func (r *Runtime) NewServer(ctx *context.T, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func (r *Runtime) NewServer(ctx *context.T, opts ...rpc.ServerOpt) (rpc.DeprecatedServer, error) {
defer apilog.LogCallf(ctx, "opts...=%v", opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
// Create a new RoutingID (and StreamManager) for each server.
sm, err := newStreamManager(ctx)
@@ -410,17 +431,15 @@
var client rpc.Client
var err error
deps := []interface{}{vtraceDependency{}}
- switch {
- case fm != nil && sm != nil:
- client, err = irpc.NewTransitionClient(ctx, sm, ns, otherOpts...)
+
+ if fm != nil && TransitionState >= XClients {
+ client, err = irpc.NewTransitionClient(ctx, sm, fm, ns, otherOpts...)
deps = append(deps, fm, sm)
- case fm != nil:
- client, err = irpc.NewXClient(ctx, otherOpts...)
- deps = append(deps, fm)
- case sm != nil:
+ } else {
client, err = irpc.InternalNewClient(sm, ns, otherOpts...)
deps = append(deps, sm)
}
+
if err != nil {
return ctx, nil, err
}
@@ -570,32 +589,62 @@
return newctx, id.settingsPublisher, id.settingsName, otherOpts, nil
}
-func (r *Runtime) XWithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.XServer, error) {
+func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- // TODO(mattr): Deal with shutdown deps.
- newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ if TransitionState >= XServers {
+ // TODO(mattr): Deal with shutdown deps.
+ newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ if err != nil {
+ return ctx, nil, err
+ }
+ s, err := irpc.NewServer(newctx, name, object, auth, spub, sname, opts...)
+ if err != nil {
+ // TODO(mattr): Stop the flow manager.
+ return ctx, nil, err
+ }
+ return newctx, s, nil
+ }
+ s, err := r.NewServer(ctx, opts...)
if err != nil {
return ctx, nil, err
}
- s, err := irpc.NewServer(newctx, name, object, auth, spub, sname, opts...)
- if err != nil {
- // TODO(mattr): Stop the flow manager.
+ if _, err = s.Listen(r.GetListenSpec(ctx)); err != nil {
+ s.Stop()
return ctx, nil, err
}
- return newctx, s, err
+ if err = s.Serve(name, object, auth); err != nil {
+ s.Stop()
+ return ctx, nil, err
+ }
+ return ctx, s, nil
}
-func (r *Runtime) XWithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.XServer, error) {
+func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- // TODO(mattr): Deal with shutdown deps.
- newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ if TransitionState >= XServers {
+ // TODO(mattr): Deal with shutdown deps.
+ newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ if err != nil {
+ return ctx, nil, err
+ }
+ s, err := irpc.NewDispatchingServer(newctx, name, disp, spub, sname, opts...)
+ if err != nil {
+ // TODO(mattr): Stop the flow manager.
+ return ctx, nil, err
+ }
+ return newctx, s, nil
+ }
+
+ s, err := r.NewServer(ctx, opts...)
if err != nil {
return ctx, nil, err
}
- s, err := irpc.NewDispatchingServer(newctx, name, disp, spub, sname, opts...)
- if err != nil {
- // TODO(mattr): Stop the flow manager.
+ if _, err = s.Listen(r.GetListenSpec(ctx)); err != nil {
return ctx, nil, err
}
- return newctx, s, err
+ if err = s.ServeDispatcher(name, disp); err != nil {
+ s.Stop()
+ return ctx, nil, err
+ }
+ return ctx, s, nil
}
diff --git a/runtime/internal/rt/shutdown_servers_test.go b/runtime/internal/rt/shutdown_servers_test.go
index b6612e4..8133615 100644
--- a/runtime/internal/rt/shutdown_servers_test.go
+++ b/runtime/internal/rt/shutdown_servers_test.go
@@ -16,9 +16,7 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
-
"v.io/x/ref/lib/signals"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
"v.io/x/ref/test/modules"
@@ -68,11 +66,11 @@
defer remoteCmdLoop(ctx, env.Stdin)()
// Create a couple servers, and start serving.
- server1, err := xrpc.NewServer(ctx, "", &dummy{}, nil)
+ ctx, server1, err := v23.WithNewServer(ctx, "", &dummy{}, nil)
if err != nil {
ctx.Fatalf("r.NewServer error: %s", err)
}
- server2, err := xrpc.NewServer(ctx, "", &dummy{}, nil)
+ ctx, server2, err := v23.WithNewServer(ctx, "", &dummy{}, nil)
if err != nil {
ctx.Fatalf("r.NewServer error: %s", err)
}
@@ -220,7 +218,7 @@
defer remoteCmdLoop(ctx, env.Stdin)()
// Create a server, and start serving.
- server, err := xrpc.NewServer(ctx, "", &dummy{}, nil)
+ ctx, server, err := v23.WithNewServer(ctx, "", &dummy{}, nil)
if err != nil {
ctx.Fatalf("r.NewServer error: %s", err)
}
diff --git a/runtime/internal/testing/mocks/mocknet/mocknet_test.go b/runtime/internal/testing/mocks/mocknet/mocknet_test.go
index f545b56..86afbcd 100644
--- a/runtime/internal/testing/mocks/mocknet/mocknet_test.go
+++ b/runtime/internal/testing/mocks/mocknet/mocknet_test.go
@@ -21,8 +21,6 @@
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/verror"
-
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/runtime/internal/rpc/stream/crypto"
"v.io/x/ref/runtime/internal/rpc/stream/message"
@@ -333,7 +331,7 @@
}
func initServer(t *testing.T, ctx *context.T) (string, func()) {
- server, err := xrpc.NewServer(ctx, "", &simple{}, nil, options.SecurityNone)
+ ctx, server, err := v23.WithNewServer(ctx, "", &simple{}, nil, options.SecurityNone)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
diff --git a/runtime/internal/vtrace/vtrace_test.go b/runtime/internal/vtrace/vtrace_test.go
index 4bb4525..13921e4 100644
--- a/runtime/internal/vtrace/vtrace_test.go
+++ b/runtime/internal/vtrace/vtrace_test.go
@@ -19,10 +19,8 @@
"v.io/v23/security/access"
"v.io/v23/uniqueid"
"v.io/v23/vtrace"
-
"v.io/x/ref/lib/flags"
_ "v.io/x/ref/lib/security/securityflag"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
ivtrace "v.io/x/ref/runtime/internal/vtrace"
"v.io/x/ref/services/mounttable/mounttablelib"
@@ -47,7 +45,7 @@
if err != nil {
t.Fatalf("Could not create mt dispatcher %v", err)
}
- s, err := xrpc.NewDispatchingServer(ctx, "", disp, options.ServesMountTable(true))
+ ctx, s, err := v23.WithNewDispatchingServer(ctx, "", disp, options.ServesMountTable(true))
if err != nil {
t.Fatalf("Could not create mt server %v", err)
}
@@ -176,7 +174,7 @@
c := &testServer{
name: name,
}
- s, err := xrpc.NewServer(ctx, name, c, security.AllowEveryone())
+ ctx, s, err := v23.WithNewServer(ctx, name, c, security.AllowEveryone())
if err != nil {
return nil, err
}
diff --git a/services/agent/agentlib/agent_test.go b/services/agent/agentlib/agent_test.go
index 03d07fa..0aeb5e2 100644
--- a/services/agent/agentlib/agent_test.go
+++ b/services/agent/agentlib/agent_test.go
@@ -219,7 +219,10 @@
}
func runRecognizedNegativeBenchmark(b *testing.B, p security.Principal) {
- key := p.PublicKey()
+ key, err := p.PublicKey().MarshalBinary()
+ if err != nil {
+ b.Fatal(err)
+ }
b.ResetTimer()
for i := 0; i < b.N; i++ {
if d := p.Roots().Recognized(key, "foobar"); d == nil {
@@ -229,7 +232,10 @@
}
func runRecognizedBenchmark(b *testing.B, p security.Principal) {
- key := p.PublicKey()
+ key, err := p.PublicKey().MarshalBinary()
+ if err != nil {
+ b.Fatal(err)
+ }
blessing, err := p.BlessSelf("foobar")
if err != nil {
b.Fatal(err)
diff --git a/services/agent/agentlib/client.go b/services/agent/agentlib/client.go
index c5a68c8..d920f4b 100644
--- a/services/agent/agentlib/client.go
+++ b/services/agent/agentlib/client.go
@@ -99,7 +99,7 @@
func (c *vrpcCaller) startCall(name string, args ...interface{}) (rpc.ClientCall, error) {
ctx, _ := vtrace.WithNewTrace(c.ctx)
// SecurityNone is safe here since we're using anonymous unix sockets.
- return c.client.StartCall(ctx, c.name, name, args, options.SecurityNone, options.NoResolve{})
+ return c.client.StartCall(ctx, c.name, name, args, options.SecurityNone, options.Preresolved{})
}
func results(inputs ...interface{}) []interface{} {
@@ -362,20 +362,12 @@
caller caller
}
-func (b *blessingRoots) Add(root security.PublicKey, pattern security.BlessingPattern) error {
- marshalledKey, err := root.MarshalBinary()
- if err != nil {
- return err
- }
- return b.caller.call("BlessingRootsAdd", results(), marshalledKey, pattern)
+func (b *blessingRoots) Add(root []byte, pattern security.BlessingPattern) error {
+ return b.caller.call("BlessingRootsAdd", results(), root, pattern)
}
-func (b *blessingRoots) Recognized(root security.PublicKey, blessing string) error {
- marshalledKey, err := root.MarshalBinary()
- if err != nil {
- return err
- }
- return b.caller.call("BlessingRootsRecognized", results(), marshalledKey, blessing)
+func (b *blessingRoots) Recognized(root []byte, blessing string) error {
+ return b.caller.call("BlessingRootsRecognized", results(), root, blessing)
}
func (b *blessingRoots) Dump() map[security.BlessingPattern][]security.PublicKey {
diff --git a/services/agent/internal/cache/cache.go b/services/agent/internal/cache/cache.go
index f4500a8..a3d3418 100644
--- a/services/agent/internal/cache/cache.go
+++ b/services/agent/internal/cache/cache.go
@@ -49,11 +49,8 @@
return roots, nil
}
-func (r *cachedRoots) Add(root security.PublicKey, pattern security.BlessingPattern) error {
- cacheKey, err := keyToString(root)
- if err != nil {
- return err
- }
+func (r *cachedRoots) Add(root []byte, pattern security.BlessingPattern) error {
+ cachekey := string(root)
defer r.mu.Unlock()
r.mu.Lock()
@@ -63,22 +60,19 @@
}
if r.cache != nil {
-
- r.cache[cacheKey] = append(r.cache[cacheKey], pattern)
+ r.cache[cachekey] = append(r.cache[cachekey], pattern)
}
return nil
}
-func (r *cachedRoots) Recognized(root security.PublicKey, blessing string) (result error) {
- key, err := keyToString(root)
- if err != nil {
- return err
- }
+func (r *cachedRoots) Recognized(root []byte, blessing string) (result error) {
+ cachekey := string(root)
- r.mu.RLock()
var cacheExists bool
+ var err error
+ r.mu.RLock()
if r.cache != nil {
- err = r.recognizeFromCache(key, root, blessing)
+ err = r.recognizeFromCache(cachekey, root, blessing)
cacheExists = true
}
r.mu.RUnlock()
@@ -89,7 +83,7 @@
r.mu.Unlock()
return err
}
- err = r.recognizeFromCache(key, root, blessing)
+ err = r.recognizeFromCache(cachekey, root, blessing)
r.mu.Unlock()
}
@@ -97,11 +91,11 @@
// to support the 'Dump' method.
r.mu.RLock()
if !r.dumpExists && err != nil {
- negKey := key + blessing
+ negKey := cachekey + blessing
negErr, ok := r.negative.Get(negKey)
if !ok {
r.mu.RUnlock()
- return r.recognizeFromImpl(key, root, blessing)
+ return r.recognizeFromImpl(root, blessing)
}
r.negative.Put(negKey, err)
err = negErr.(error)
@@ -150,13 +144,14 @@
return nil
}
- for p, keys := range dump {
- for _, key := range keys {
- cacheKey, err := keyToString(key)
+ for p, pubkeys := range dump {
+ for _, pubkey := range pubkeys {
+ keybytes, err := pubkey.MarshalBinary()
if err != nil {
return err
}
- r.cache[cacheKey] = append(r.cache[cacheKey], p)
+ cachekey := string(keybytes)
+ r.cache[cachekey] = append(r.cache[cachekey], p)
}
}
r.dumpExists = true
@@ -176,38 +171,44 @@
}
dump := make(map[security.BlessingPattern][]security.PublicKey)
for keyStr, patterns := range r.cache {
- key, err := security.UnmarshalPublicKey([]byte(keyStr))
+ pubkey, err := security.UnmarshalPublicKey([]byte(keyStr))
if err != nil {
logger.Global().Errorf("security.UnmarshalPublicKey(%v) returned error: %v", []byte(keyStr), err)
return nil
}
for _, p := range patterns {
- dump[p] = append(dump[p], key)
+ dump[p] = append(dump[p], pubkey)
}
}
return dump
}
// Must be called while holding mu.
-func (r *cachedRoots) recognizeFromCache(key string, root security.PublicKey, blessing string) error {
- for _, p := range r.cache[key] {
+func (r *cachedRoots) recognizeFromCache(cachekey string, root []byte, blessing string) error {
+ for _, p := range r.cache[cachekey] {
if p.MatchedBy(blessing) {
return nil
}
}
- return security.NewErrUnrecognizedRoot(nil, root.String(), nil)
+ // Silly to do this unmarshaling work on an error. Change the error string?
+ object, err := security.UnmarshalPublicKey(root)
+ if err != nil {
+ return err
+ }
+ return security.NewErrUnrecognizedRoot(nil, object.String(), nil)
}
// TODO(ataly): Get rid of this method once all agents have been updated
// to support the 'Dump' method.
-func (r *cachedRoots) recognizeFromImpl(key string, root security.PublicKey, blessing string) error {
- negKey := key + blessing
+func (r *cachedRoots) recognizeFromImpl(root []byte, blessing string) error {
+ cachekey := string(root)
err := r.impl.Recognized(root, blessing)
r.mu.Lock()
if err == nil {
- r.cache[key] = append(r.cache[key], security.BlessingPattern(blessing))
+ r.cache[cachekey] = append(r.cache[cachekey], security.BlessingPattern(blessing))
} else {
+ negKey := cachekey + blessing
r.negative.Put(negKey, err)
}
r.mu.Unlock()
@@ -218,7 +219,7 @@
// wraps over another implementation and adds caching.
type cachedStore struct {
mu *sync.RWMutex
- key security.PublicKey
+ pubkey security.PublicKey
def security.Blessings
hasDef bool
peers map[security.BlessingPattern]security.Blessings
@@ -302,7 +303,7 @@
}
func (s *cachedStore) PublicKey() security.PublicKey {
- return s.key
+ return s.pubkey
}
func (s *cachedStore) DebugString() string {
@@ -369,7 +370,7 @@
}
type dummySigner struct {
- key security.PublicKey
+ pubkey security.PublicKey
}
func (s dummySigner) Sign(purpose, message []byte) (security.Signature, error) {
@@ -378,7 +379,7 @@
}
func (s dummySigner) PublicKey() security.PublicKey {
- return s.key
+ return s.pubkey
}
func NewCachedPrincipal(ctx *context.T, impl agent.Principal, call rpc.ClientCall) (p agent.Principal, err error) {
@@ -411,9 +412,9 @@
return
}
cachedStore := &cachedStore{
- mu: &mu,
- key: impl.PublicKey(),
- impl: impl.BlessingStore(),
+ mu: &mu,
+ pubkey: impl.PublicKey(),
+ impl: impl.BlessingStore(),
}
flush = func() {
defer mu.Unlock()
@@ -429,11 +430,3 @@
p = &cachedPrincipal{sp, impl}
return
}
-
-func keyToString(key security.PublicKey) (string, error) {
- bytes, err := key.MarshalBinary()
- if err != nil {
- return "", err
- }
- return string(bytes), nil
-}
diff --git a/services/agent/internal/cache/cache_test.go b/services/agent/internal/cache/cache_test.go
index e0c2193..3f669ad 100644
--- a/services/agent/internal/cache/cache_test.go
+++ b/services/agent/internal/cache/cache_test.go
@@ -16,7 +16,7 @@
"v.io/x/ref/test/testutil"
)
-func createRoots() (security.PublicKey, security.BlessingRoots, *cachedRoots) {
+func createRoots() ([]byte, security.BlessingRoots, *cachedRoots) {
var mu sync.RWMutex
ctx, _ := context.RootContext()
ctx = context.WithLogger(ctx, logger.Global())
@@ -26,7 +26,11 @@
if err != nil {
panic(err)
}
- return p.PublicKey(), impl, roots
+ keybytes, err := p.PublicKey().MarshalBinary()
+ if err != nil {
+ panic(err)
+ }
+ return keybytes, impl, roots
}
func TestCreateRoots(t *testing.T) {
@@ -39,17 +43,15 @@
}
}
-func expectRecognized(roots security.BlessingRoots, key security.PublicKey, blessing string) string {
- err := roots.Recognized(key, blessing)
- if err != nil {
+func expectRecognized(roots security.BlessingRoots, key []byte, blessing string) string {
+ if err := roots.Recognized(key, blessing); err != nil {
return fmt.Sprintf("Key (%s, %v) not matched by roots:\n%s, Recognized returns error: %v", key, blessing, roots.DebugString(), err)
}
return ""
}
-func expectNotRecognized(roots security.BlessingRoots, key security.PublicKey, blessing string) string {
- err := roots.Recognized(key, blessing)
- if err == nil {
+func expectNotRecognized(roots security.BlessingRoots, key []byte, blessing string) string {
+ if err := roots.Recognized(key, blessing); err == nil {
return fmt.Sprintf("Key (%s, %s) should not match roots:\n%s", key, blessing, roots.DebugString())
}
return ""
@@ -164,7 +166,7 @@
func createStore(p security.Principal) (security.BlessingStore, *cachedStore) {
var mu sync.RWMutex
impl := p.BlessingStore()
- return impl, &cachedStore{mu: &mu, key: p.PublicKey(), impl: impl}
+ return impl, &cachedStore{mu: &mu, pubkey: p.PublicKey(), impl: impl}
}
func TestDefaultBlessing(t *testing.T) {
diff --git a/services/agent/internal/pingpong/main.go b/services/agent/internal/pingpong/main.go
index 2c27383..d233602 100644
--- a/services/agent/internal/pingpong/main.go
+++ b/services/agent/internal/pingpong/main.go
@@ -10,6 +10,7 @@
import (
"fmt"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security"
@@ -17,7 +18,6 @@
"v.io/x/ref/lib/flags"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
)
@@ -67,7 +67,7 @@
// Use the default authorization policy (nil authorizer), which will
// only authorize clients if the blessings of the client is a prefix of
// that of the server or vice-versa.
- s, err := xrpc.NewServer(ctx, "", PingPongServer(&pongd{}), nil)
+ ctx, s, err := v23.WithNewServer(ctx, "", PingPongServer(&pongd{}), nil)
if err != nil {
return fmt.Errorf("failure creating server: %v", err)
}
diff --git a/services/agent/internal/server/server.go b/services/agent/internal/server/server.go
index ba9c579..a9ccc5c 100644
--- a/services/agent/internal/server/server.go
+++ b/services/agent/internal/server/server.go
@@ -305,23 +305,15 @@
}
func (a *agentd) BlessingRootsAdd(root []byte, pattern security.BlessingPattern) error {
- pkey, err := security.UnmarshalPublicKey(root)
- if err != nil {
- return err
- }
defer a.unlock()
a.mu.Lock()
- return a.principal.Roots().Add(pkey, pattern)
+ return a.principal.Roots().Add(root, pattern)
}
func (a *agentd) BlessingRootsRecognized(root []byte, blessing string) error {
- pkey, err := security.UnmarshalPublicKey(root)
- if err != nil {
- return err
- }
defer a.mu.RUnlock()
a.mu.RLock()
- return a.principal.Roots().Recognized(pkey, blessing)
+ return a.principal.Roots().Recognized(root, blessing)
}
func (a *agentd) BlessingRootsDump() (map[security.BlessingPattern][][]byte, error) {
diff --git a/services/agent/internal/test_principal/main.go b/services/agent/internal/test_principal/main.go
index 40c4f5c..09e03d8 100644
--- a/services/agent/internal/test_principal/main.go
+++ b/services/agent/internal/test_principal/main.go
@@ -99,13 +99,17 @@
errorf("MintDischarge: %v", err)
}
// BlessingRoots
- if err := p.Roots().Recognized(p.PublicKey(), "batman"); err == nil {
+ keybytes, err := p.PublicKey().MarshalBinary()
+ if err != nil {
+ errorf("Failed to marshal public key: %v", err)
+ }
+ if err := p.Roots().Recognized(keybytes, "batman"); err == nil {
errorf("Roots().Recognized returned nil")
}
if err := p.AddToRoots(b); err != nil {
errorf("AddToRoots: %v", err)
}
- if err := p.Roots().Recognized(p.PublicKey(), "batman"); err != nil {
+ if err := p.Roots().Recognized(keybytes, "batman"); err != nil {
errorf("Roots().Recognized: %v", err)
}
// BlessingStore: Defaults
diff --git a/services/application/application/impl.go b/services/application/application/impl.go
index d244b5e..b420b3a 100644
--- a/services/application/application/impl.go
+++ b/services/application/application/impl.go
@@ -61,7 +61,7 @@
results <- struct {
profile string
err error
- }{profile, app.PutX(ctx, profile, envelope, overwrite)}
+ }{profile, app.Put(ctx, profile, envelope, overwrite)}
}(profile)
}
resultsMap := make(map[string]error, len(profiles))
diff --git a/services/application/application/impl_test.go b/services/application/application/impl_test.go
index 2f02939..007d507 100644
--- a/services/application/application/impl_test.go
+++ b/services/application/application/impl_test.go
@@ -12,6 +12,7 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
@@ -20,7 +21,6 @@
"v.io/v23/services/application"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/repository"
"v.io/x/ref/test"
@@ -91,8 +91,9 @@
return envelope, nil
}
-func (s *server) Put(ctx *context.T, _ rpc.ServerCall, profiles []string, env application.Envelope) error {
- ctx.VI(2).Infof("%v.Put(%v, %v) was called", s.suffix, profiles, env)
+func (s *server) Put(ctx *context.T, _ rpc.ServerCall, profile string, env application.Envelope, overwrite bool) error {
+ ctx.VI(2).Infof("%v.Put(%v, %v, %t) was called", s.suffix, profile, env, overwrite)
+ fmt.Fprintf(&serverOut, "Put(%s, ..., %t)\n", profile, overwrite)
return nil
}
@@ -137,7 +138,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewDispatchingServer(ctx, "", &dispatcher{})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &dispatcher{})
if err != nil {
t.Errorf("NewServer failed: %v", err)
return
@@ -184,7 +185,7 @@
if expected, got := "Application envelope added for profile myprofile1.\nApplication envelope added for profile myprofile2.", strings.TrimSpace(stdout.String()); got != expected {
t.Errorf("Unexpected output from put. Got %q, expected %q", got, expected)
}
- if expected1, expected2, got := "PutX(myprofile1, ..., false)\nPutX(myprofile2, ..., false)", "PutX(myprofile2, ..., false)\nPutX(myprofile1, ..., false)", strings.TrimSpace(serverOut.String()); got != expected1 && got != expected2 {
+ if expected1, expected2, got := "Put(myprofile1, ..., false)\nPut(myprofile2, ..., false)", "Put(myprofile2, ..., false)\nPut(myprofile1, ..., false)", strings.TrimSpace(serverOut.String()); got != expected1 && got != expected2 {
t.Errorf("Unexpected output from mock server. Got %q, expected %q or %q", got, expected1, expected2)
}
resetOut()
@@ -196,7 +197,7 @@
if expected, got := "Application envelope added for profile myprofile.", strings.TrimSpace(stdout.String()); got != expected {
t.Errorf("Unexpected output from put. Got %q, expected %q", got, expected)
}
- if expected, got := "PutX(myprofile, ..., true)", strings.TrimSpace(serverOut.String()); got != expected {
+ if expected, got := "Put(myprofile, ..., true)", strings.TrimSpace(serverOut.String()); got != expected {
t.Errorf("Unexpected output from mock server. Got %q, expected %q", got, expected)
}
resetOut()
diff --git a/services/application/applicationd/impl_test.go b/services/application/applicationd/impl_test.go
index 45b925e..4f49d7f 100644
--- a/services/application/applicationd/impl_test.go
+++ b/services/application/applicationd/impl_test.go
@@ -17,8 +17,6 @@
"v.io/v23/security"
"v.io/v23/services/application"
"v.io/v23/verror"
-
- "v.io/x/ref/lib/xrpc"
appd "v.io/x/ref/services/application/applicationd"
"v.io/x/ref/services/repository"
"v.io/x/ref/test"
@@ -84,7 +82,7 @@
t.Fatalf("NewDispatcher() failed: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
t.Fatalf("NewServer(%v) failed: %v", dispatcher, err)
}
@@ -129,17 +127,17 @@
}
checkNoProfile(t, ctx, stub)
- // Test PutX(), adding a number of application envelopes.
- if err := stubV1.PutX(ctx, "base", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ // Test Put(), adding a number of application envelopes.
+ if err := stubV1.Put(ctx, "base", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
- if err := stubV1.PutX(ctx, "media", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV1.Put(ctx, "media", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
- if err := stubV2.PutX(ctx, "base", envelopeV2, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV2.Put(ctx, "base", envelopeV2, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
- if err := stub.PutX(ctx, "base", envelopeV1, false); err == nil || verror.ErrorID(err) != appd.ErrInvalidSuffix.ID {
+ if err := stub.Put(ctx, "base", envelopeV1, false); err == nil || verror.ErrorID(err) != appd.ErrInvalidSuffix.ID {
t.Fatalf("Unexpected error: expected %v, got %v", appd.ErrInvalidSuffix, err)
}
@@ -163,8 +161,8 @@
// Test that if we add another envelope for a version that's the highest
// in sort order, the new envelope becomes the latest.
- if err := stubV3.PutX(ctx, "base", envelopeV3, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV3.Put(ctx, "base", envelopeV3, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
checkEnvelope(t, ctx, envelopeV3, stub, "base", "media")
checkProfiles(t, ctx, stubV3, "base")
@@ -179,8 +177,8 @@
},
Publisher: blessings,
}
- if err := stubV0.PutX(ctx, "base", envelopeV0, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV0.Put(ctx, "base", envelopeV0, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
checkEnvelope(t, ctx, envelopeV3, stub, "base", "media")
@@ -201,14 +199,14 @@
t.Errorf("unexpected Glob results. Got %q, want %q", matches, expected)
}
- // PutX cannot replace the envelope for v0-base when overwrite is false.
- if err := stubV0.PutX(ctx, "base", envelopeV2, false); err == nil || verror.ErrorID(err) != verror.ErrExist.ID {
+ // Put cannot replace the envelope for v0-base when overwrite is false.
+ if err := stubV0.Put(ctx, "base", envelopeV2, false); err == nil || verror.ErrorID(err) != verror.ErrExist.ID {
t.Fatalf("Unexpected error: expected %v, got %v", appd.ErrInvalidSuffix, err)
}
checkEnvelope(t, ctx, envelopeV0, stubV0, "base")
- // PutX can replace the envelope for v0-base when overwrite is true.
- if err := stubV0.PutX(ctx, "base", envelopeV2, true); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ // Put can replace the envelope for v0-base when overwrite is true.
+ if err := stubV0.Put(ctx, "base", envelopeV2, true); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
checkEnvelope(t, ctx, envelopeV2, stubV0, "base")
@@ -248,20 +246,20 @@
checkNoEnvelope(t, ctx, stubV1, "media")
checkNoEnvelope(t, ctx, stubV2, "base")
- if err := stubV0.PutX(ctx, "base", envelopeV0, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV0.Put(ctx, "base", envelopeV0, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
- if err := stubV1.PutX(ctx, "base", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV1.Put(ctx, "base", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
- if err := stubV1.PutX(ctx, "media", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV1.Put(ctx, "media", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
- if err := stubV2.PutX(ctx, "base", envelopeV2, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV2.Put(ctx, "base", envelopeV2, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
- if err := stubV3.PutX(ctx, "base", envelopeV3, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV3.Put(ctx, "base", envelopeV3, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
if err := stub.Remove(ctx, "*"); err != nil {
t.Fatalf("Remove() failed: %v", err)
@@ -293,7 +291,7 @@
t.Fatalf("NewDispatcher() failed: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
t.Fatalf("Serve(%v) failed: %v", dispatcher, err)
}
@@ -315,8 +313,8 @@
Publisher: blessings,
}
- if err := stubV1.PutX(ctx, "media", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubV1.Put(ctx, "media", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
// There is content here now.
@@ -330,7 +328,7 @@
t.Fatalf("NewDispatcher() failed: %v", err)
}
- server, err = xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err = v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
t.Fatalf("NewServer(%v) failed: %v", dispatcher, err)
}
@@ -357,7 +355,7 @@
t.Fatalf("NewDispatcher() failed: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
t.Fatalf("NewServer(%v) failed: %v", dispatcher, err)
}
@@ -470,8 +468,8 @@
// Test that we can add an envelope for v3 with profile media and after calling
// TidyNow(), there will be all versions still in glob but v0 will only match profile
// base and not have an envelope for profile media.
- if err := stubs[3].PutX(ctx, "media", envelopeV3, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stubs[3].Put(ctx, "media", envelopeV3, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
if err := stubs[0].TidyNow(ctx); err != nil {
@@ -540,8 +538,8 @@
func stuffEnvelopes(t *testing.T, ctx *context.T, stubs []repository.ApplicationClientStub, pets []profEnvTuple) {
for i, pet := range pets {
for _, profile := range pet.p {
- if err := stubs[i].PutX(ctx, profile, *pet.e, true); err != nil {
- t.Fatalf("%d: PutX(%v) failed: %v", i, pet, err)
+ if err := stubs[i].Put(ctx, profile, *pet.e, true); err != nil {
+ t.Fatalf("%d: Put(%v) failed: %v", i, pet, err)
}
}
}
diff --git a/services/application/applicationd/main.go b/services/application/applicationd/main.go
index e1b4e6d..0471c12 100644
--- a/services/application/applicationd/main.go
+++ b/services/application/applicationd/main.go
@@ -10,11 +10,11 @@
import (
"fmt"
+ "v.io/v23"
"v.io/v23/context"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -48,7 +48,7 @@
return fmt.Errorf("NewDispatcher() failed: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, name, dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, name, dispatcher)
if err != nil {
return fmt.Errorf("NewServer() failed: %v", err)
}
diff --git a/services/application/applicationd/perms_test.go b/services/application/applicationd/perms_test.go
index f796367..64c1f5f 100644
--- a/services/application/applicationd/perms_test.go
+++ b/services/application/applicationd/perms_test.go
@@ -17,7 +17,6 @@
"v.io/v23/services/application"
"v.io/v23/verror"
"v.io/x/ref/lib/signals"
- "v.io/x/ref/lib/xrpc"
appd "v.io/x/ref/services/application/applicationd"
"v.io/x/ref/services/internal/servicetest"
"v.io/x/ref/services/repository"
@@ -45,7 +44,7 @@
if err != nil {
ctx.Fatalf("Failed to create repository dispatcher: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, publishName, dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, publishName, dispatcher)
if err != nil {
ctx.Fatalf("NewDispatchingServer(%v) failed: %v", publishName, err)
}
@@ -101,13 +100,13 @@
}
// Envelope putting as other should fail.
- if err := v1stub.PutX(otherCtx, "base", envelopeV1, false); verror.ErrorID(err) != verror.ErrNoAccess.ID {
- t.Fatalf("PutX() returned errorid=%v wanted errorid=%v [%v]", verror.ErrorID(err), verror.ErrNoAccess.ID, err)
+ if err := v1stub.Put(otherCtx, "base", envelopeV1, false); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("Put() returned errorid=%v wanted errorid=%v [%v]", verror.ErrorID(err), verror.ErrNoAccess.ID, err)
}
// Envelope putting as global should succeed.
- if err := v1stub.PutX(ctx, "base", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := v1stub.Put(ctx, "base", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
ctx.VI(2).Infof("Accessing the Permission Lists of the root returns a (simulated) list providing default authorization.")
@@ -158,8 +157,8 @@
}
// Envelope putting as other should now succeed.
- if err := v1stub.PutX(otherCtx, "base", envelopeV1, true); err != nil {
- t.Fatalf("PutX() wrongly failed: %v", err)
+ if err := v1stub.Put(otherCtx, "base", envelopeV1, true); err != nil {
+ t.Fatalf("Put() wrongly failed: %v", err)
}
// Other takes control.
@@ -243,16 +242,16 @@
ctx.VI(2).Info("Upload an envelope")
v1stub := repository.ApplicationClient("repo/search/v1")
- if err := v1stub.PutX(ctx, "base", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := v1stub.Put(ctx, "base", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
v2stub := repository.ApplicationClient("repo/search/v2")
- if err := v2stub.PutX(ctx, "base", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := v2stub.Put(ctx, "base", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
v3stub := repository.ApplicationClient("repo/naps/v1")
- if err := v3stub.PutX(ctx, "base", envelopeV1, false); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := v3stub.Put(ctx, "base", envelopeV1, false); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
ctx.VI(2).Info("Self can access Permissions but other can't.")
@@ -377,8 +376,8 @@
// Other can now upload an envelope at both locations.
for _, stub := range []repository.ApplicationClientStub{v1stub, v2stub} {
- if err := stub.PutX(otherCtx, "base", envelopeV1, true); err != nil {
- t.Fatalf("PutX() failed: %v", err)
+ if err := stub.Put(otherCtx, "base", envelopeV1, true); err != nil {
+ t.Fatalf("Put() failed: %v", err)
}
}
diff --git a/services/application/applicationd/service.go b/services/application/applicationd/service.go
index 0869f13..7d12040 100644
--- a/services/application/applicationd/service.go
+++ b/services/application/applicationd/service.go
@@ -135,55 +135,12 @@
return empty, verror.New(verror.ErrNoExist, ctx)
}
-func (i *appRepoService) Put(ctx *context.T, call rpc.ServerCall, profiles []string, envelope application.Envelope) error {
- ctx.VI(0).Infof("%v.Put(%v, %v)", i.suffix, profiles, envelope)
- name, version, err := parse(ctx, i.suffix)
- if err != nil {
- return err
- }
- if version == "" {
- return verror.New(ErrInvalidSuffix, ctx)
- }
- i.store.Lock()
- defer i.store.Unlock()
- // Transaction is rooted at "", so tname == tid.
- tname, err := i.store.BindTransactionRoot("").CreateTransaction(call)
- if err != nil {
- return err
- }
-
- // Only add a Permissions value if there is not already one present.
- apath := naming.Join("/acls", name, "data")
- aobj := i.store.BindObject(apath)
- if _, err := aobj.Get(call); verror.ErrorID(err) == fs.ErrNotInMemStore.ID {
- rb, _ := security.RemoteBlessingNames(ctx, call.Security())
- if len(rb) == 0 {
- // None of the client's blessings are valid.
- return verror.New(ErrNotAuthorized, ctx)
- }
- newperms := pathperms.PermissionsForBlessings(rb)
- if _, err := aobj.Put(nil, newperms); err != nil {
- return err
- }
- }
-
- for _, profile := range profiles {
- path := naming.Join(tname, "/applications", name, profile, version)
-
- object := i.store.BindObject(path)
- _, err := object.Put(call, envelope)
- if err != nil {
- return verror.New(ErrOperationFailed, ctx)
- }
- }
- if err := i.store.BindTransaction(tname).Commit(call); err != nil {
- return verror.New(ErrOperationFailed, ctx)
- }
- return nil
+func (i *appRepoService) PutX(ctx *context.T, call rpc.ServerCall, profile string, envelope application.Envelope, overwrite bool) error {
+ return i.Put(ctx, call, profile, envelope, overwrite)
}
-func (i *appRepoService) PutX(ctx *context.T, call rpc.ServerCall, profile string, envelope application.Envelope, overwrite bool) error {
- ctx.VI(0).Infof("%v.PutX(%v, %v, %t)", i.suffix, profile, envelope, overwrite)
+func (i *appRepoService) Put(ctx *context.T, call rpc.ServerCall, profile string, envelope application.Envelope, overwrite bool) error {
+ ctx.VI(0).Infof("%v.Put(%v, %v, %t)", i.suffix, profile, envelope, overwrite)
name, version, err := parse(ctx, i.suffix)
if err != nil {
return err
diff --git a/services/binary/binary/impl_test.go b/services/binary/binary/impl_test.go
index 1036c8d..cce1553 100644
--- a/services/binary/binary/impl_test.go
+++ b/services/binary/binary/impl_test.go
@@ -15,6 +15,7 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
@@ -24,7 +25,6 @@
"v.io/v23/services/repository"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
)
@@ -104,7 +104,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewDispatchingServer(ctx, "", NewDispatcher())
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", NewDispatcher())
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/binary/binaryd/main.go b/services/binary/binaryd/main.go
index d7fc5f1..8c5cae0 100644
--- a/services/binary/binaryd/main.go
+++ b/services/binary/binaryd/main.go
@@ -19,7 +19,6 @@
"v.io/x/lib/netstate"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/services/internal/binarylib"
)
@@ -98,7 +97,7 @@
if err != nil {
return fmt.Errorf("NewDispatcher() failed: %v\n", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, name, dis)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, name, dis)
if err != nil {
return fmt.Errorf("NewServer() failed: %v", err)
}
diff --git a/services/binary/tidy/impl_test.go b/services/binary/tidy/impl_test.go
index 8b05eab..bc88203 100644
--- a/services/binary/tidy/impl_test.go
+++ b/services/binary/tidy/impl_test.go
@@ -11,10 +11,10 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/services/application"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/binary/tidy/appd"
"v.io/x/ref/services/binary/tidy/binaryd"
@@ -29,7 +29,7 @@
defer shutdown()
apptape := servicetest.NewTape()
- appserver, err := xrpc.NewDispatchingServer(ctx, "", appd.NewDispatcher(t, apptape))
+ ctx, appserver, err := v23.WithNewDispatchingServer(ctx, "", appd.NewDispatcher(t, apptape))
if err != nil {
t.Fatalf("applicationd NewDispatchingServer failed: %v", err)
}
@@ -69,13 +69,13 @@
defer shutdown()
binarytape := servicetest.NewTape()
- binserver, err := xrpc.NewDispatchingServer(ctx, "", binaryd.NewDispatcher(t, binarytape))
+ ctx, binserver, err := v23.WithNewDispatchingServer(ctx, "", binaryd.NewDispatcher(t, binarytape))
if err != nil {
t.Fatalf("binaryd NewDispatchingServer failed: %v", err)
}
apptape := servicetest.NewTape()
- appserver, err := xrpc.NewDispatchingServer(ctx, "", appd.NewDispatcher(t, apptape))
+ ctx, appserver, err := v23.WithNewDispatchingServer(ctx, "", appd.NewDispatcher(t, apptape))
if err != nil {
t.Fatalf("applicationd NewDispatchingServer failed: %v", err)
}
diff --git a/services/build/build/impl_test.go b/services/build/build/impl_test.go
index 2c7ab70..54b96d1 100644
--- a/services/build/build/impl_test.go
+++ b/services/build/build/impl_test.go
@@ -9,6 +9,7 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
@@ -17,7 +18,6 @@
"v.io/v23/verror"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
)
@@ -47,7 +47,7 @@
func startServer(ctx *context.T, t *testing.T) naming.Endpoint {
unpublished := ""
- server, err := xrpc.NewServer(ctx, unpublished, build.BuilderServer(&mock{}), nil)
+ ctx, server, err := v23.WithNewServer(ctx, unpublished, build.BuilderServer(&mock{}), nil)
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/build/buildd/impl_test.go b/services/build/buildd/impl_test.go
index 147b165..5cf973d 100644
--- a/services/build/buildd/impl_test.go
+++ b/services/build/buildd/impl_test.go
@@ -15,7 +15,7 @@
"v.io/v23/context"
"v.io/v23/services/build"
- "v.io/x/ref/lib/xrpc"
+ "v.io/v23"
"v.io/x/ref/test"
)
@@ -68,7 +68,7 @@
gobin, goroot := findGoBinary(t, "go")
service := build.BuilderServer(NewBuilderService(gobin, goroot))
unpublished := ""
- server, err := xrpc.NewServer(ctx, unpublished, service, nil)
+ ctx, server, err := v23.WithNewServer(ctx, unpublished, service, nil)
if err != nil {
t.Fatalf("NewServer() failed: %v", err)
}
diff --git a/services/build/buildd/main.go b/services/build/buildd/main.go
index cf5cc2d..d6bfd92 100644
--- a/services/build/buildd/main.go
+++ b/services/build/buildd/main.go
@@ -11,13 +11,13 @@
"fmt"
"os"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/services/build"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -44,7 +44,7 @@
}
func runBuildD(ctx *context.T, env *cmdline.Env, args []string) error {
- server, err := xrpc.NewServer(ctx, name, build.BuilderServer(NewBuilderService(gobin, goroot)), securityflag.NewAuthorizerOrDie())
+ ctx, server, err := v23.WithNewServer(ctx, name, build.BuilderServer(NewBuilderService(gobin, goroot)), securityflag.NewAuthorizerOrDie())
if err != nil {
return fmt.Errorf("NewServer() failed: %v", err)
}
diff --git a/services/debug/debuglib/dispatcher_test.go b/services/debug/debuglib/dispatcher_test.go
index 1dcd3b6..d24878f 100644
--- a/services/debug/debuglib/dispatcher_test.go
+++ b/services/debug/debuglib/dispatcher_test.go
@@ -26,7 +26,6 @@
"v.io/v23/vtrace"
"v.io/x/lib/vlog"
libstats "v.io/x/ref/lib/stats"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
"v.io/x/ref/test/testutil"
@@ -60,7 +59,7 @@
ctx = context.WithLogger(ctx, testLogger)
disp := NewDispatcher(nil)
- server, err := xrpc.NewDispatchingServer(ctx, "", disp)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", disp)
if err != nil {
t.Fatalf("failed to start debug server: %v", err)
}
diff --git a/services/device/claimable/main.go b/services/device/claimable/main.go
index 1c92bbf..e0d4b4e 100644
--- a/services/device/claimable/main.go
+++ b/services/device/claimable/main.go
@@ -20,7 +20,6 @@
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/services/device/internal/claim"
"v.io/x/ref/services/identity"
@@ -42,7 +41,7 @@
return errors.New("device is already claimed")
}
- server, err := xrpc.NewDispatchingServer(ctx, "", claimable)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", claimable)
if err != nil {
return err
}
@@ -65,14 +64,10 @@
if err := json.Unmarshal([]byte(jRoot), &bRoot); err != nil {
ctx.Fatalf("unable to unmarshal the json blessing root: %v", err)
}
- decodedKey, err := base64.URLEncoding.DecodeString(bRoot.PublicKey)
+ key, err := base64.URLEncoding.DecodeString(bRoot.PublicKey)
if err != nil {
ctx.Fatalf("unable to decode public key: %v", err)
}
- key, err := security.UnmarshalPublicKey(decodedKey)
- if err != nil {
- ctx.Fatalf("unable to unmarshal the public key: %v", err)
- }
roots := v23.GetPrincipal(ctx).Roots()
for _, name := range bRoot.Names {
if err := roots.Add(key, security.BlessingPattern(name)); err != nil {
diff --git a/services/device/device/acl_test.go b/services/device/device/acl_test.go
index 0deb899..6b19ba2 100644
--- a/services/device/device/acl_test.go
+++ b/services/device/device/acl_test.go
@@ -11,12 +11,12 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/security"
"v.io/v23/security/access"
"v.io/v23/verror"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -34,7 +34,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
@@ -81,7 +81,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/associate_test.go b/services/device/device/associate_test.go
index 099d424..25b0003 100644
--- a/services/device/device/associate_test.go
+++ b/services/device/device/associate_test.go
@@ -10,10 +10,10 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/services/device"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -25,7 +25,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
@@ -78,7 +78,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
@@ -129,7 +129,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/claim_test.go b/services/device/device/claim_test.go
index 0ac45aa..d8ce330 100644
--- a/services/device/device/claim_test.go
+++ b/services/device/device/claim_test.go
@@ -16,7 +16,6 @@
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/security"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -28,7 +27,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/debug_test.go b/services/device/device/debug_test.go
index cc8f528..cd1d5d6 100644
--- a/services/device/device/debug_test.go
+++ b/services/device/device/debug_test.go
@@ -11,10 +11,10 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -25,7 +25,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/glob_test.go b/services/device/device/glob_test.go
index 25ac821..4c3bb25 100644
--- a/services/device/device/glob_test.go
+++ b/services/device/device/glob_test.go
@@ -15,16 +15,14 @@
"testing"
"time"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/services/device"
-
"v.io/x/lib/cmdline"
- "v.io/x/ref/lib/xrpc"
- "v.io/x/ref/test"
-
cmd_device "v.io/x/ref/services/device/device"
"v.io/x/ref/services/internal/servicetest"
+ "v.io/x/ref/test"
)
func simplePrintHandler(entry cmd_device.GlobResult, _ *context.T, stdout, _ io.Writer) error {
@@ -158,7 +156,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
rootTape := tapes.ForSuffix("")
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/install_test.go b/services/device/device/install_test.go
index 61e766d..941b935 100644
--- a/services/device/device/install_test.go
+++ b/services/device/device/install_test.go
@@ -12,12 +12,12 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/v23/services/application"
"v.io/v23/services/device"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -29,7 +29,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/instantiate_test.go b/services/device/device/instantiate_test.go
index 53ff9ac..3acfcae 100644
--- a/services/device/device/instantiate_test.go
+++ b/services/device/device/instantiate_test.go
@@ -11,10 +11,10 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/verror"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -26,7 +26,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/kill_test.go b/services/device/device/kill_test.go
index 3b37881..0e5d38d 100644
--- a/services/device/device/kill_test.go
+++ b/services/device/device/kill_test.go
@@ -11,11 +11,11 @@
"testing"
"time"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/v23/verror"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -27,7 +27,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/local_install.go b/services/device/device/local_install.go
index aba34b2..75fc476 100644
--- a/services/device/device/local_install.go
+++ b/services/device/device/local_install.go
@@ -25,10 +25,8 @@
"v.io/v23/services/device"
"v.io/v23/services/repository"
"v.io/v23/uniqueid"
-
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/internal/packages"
)
@@ -93,7 +91,7 @@
spec.Addrs = nil
ctx = v23.WithListenSpec(ctx, spec)
}
- server, err := xrpc.NewDispatchingServer(ctx, name, dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, name, dispatcher)
if err != nil {
return nil, nil, err
}
diff --git a/services/device/device/local_install_test.go b/services/device/device/local_install_test.go
index aa0d615..4fb3444 100644
--- a/services/device/device/local_install_test.go
+++ b/services/device/device/local_install_test.go
@@ -15,13 +15,13 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/services/application"
"v.io/v23/services/device"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -39,7 +39,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/ls_test.go b/services/device/device/ls_test.go
index 79974b1..5a74820 100644
--- a/services/device/device/ls_test.go
+++ b/services/device/device/ls_test.go
@@ -9,10 +9,10 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -26,7 +26,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/publish.go b/services/device/device/publish.go
index 4dc2cdb..5862d86 100644
--- a/services/device/device/publish.go
+++ b/services/device/device/publish.go
@@ -174,7 +174,7 @@
envelope.Publisher = security.Blessings{}
}
- if err := appClient.PutX(ctx, profile, envelope, true); err != nil {
+ if err := appClient.Put(ctx, profile, envelope, true); err != nil {
return err
}
fmt.Fprintf(env.Stdout, "Published %q\n", appVON)
diff --git a/services/device/device/status_test.go b/services/device/device/status_test.go
index 81a1410..29a56c3 100644
--- a/services/device/device/status_test.go
+++ b/services/device/device/status_test.go
@@ -11,11 +11,11 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/v23/services/device"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -26,7 +26,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/update_test.go b/services/device/device/update_test.go
index 338c358..ca32285 100644
--- a/services/device/device/update_test.go
+++ b/services/device/device/update_test.go
@@ -16,10 +16,10 @@
"unicode"
"unicode/utf8"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -39,7 +39,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/device/util_test.go b/services/device/device/util_test.go
index 29c79eb..9ba5b3e 100644
--- a/services/device/device/util_test.go
+++ b/services/device/device/util_test.go
@@ -10,12 +10,12 @@
"strings"
"testing"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/v23/services/device"
"v.io/v23/verror"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
cmd_device "v.io/x/ref/services/device/device"
@@ -64,7 +64,7 @@
defer shutdown()
tapes := servicetest.NewTapeMap()
- server, err := xrpc.NewDispatchingServer(ctx, "", newDispatcher(t, tapes))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", newDispatcher(t, tapes))
if err != nil {
t.Fatalf("NewServer failed: %v", err)
}
diff --git a/services/device/deviced/doc.go b/services/device/deviced/doc.go
index b89a60b..efea027 100644
--- a/services/device/deviced/doc.go
+++ b/services/device/deviced/doc.go
@@ -41,6 +41,8 @@
generate a pairing token for the device manager that will need to be provided
when a device is claimed
+ -agentsock=
+ Path to the application's security agent socket.
-alsologtostderr=true
log to standard error as well as files
-chown=false
diff --git a/services/device/deviced/internal/impl/app_service.go b/services/device/deviced/internal/impl/app_service.go
index dca7113..56dd7bc 100644
--- a/services/device/deviced/internal/impl/app_service.go
+++ b/services/device/deviced/internal/impl/app_service.go
@@ -834,7 +834,7 @@
return instanceDir, instanceID, nil
}
-func genCmd(ctx *context.T, instanceDir string, nsRoot string) (*exec.Cmd, error) {
+func genCmd(ctx *context.T, instanceDir, nsRoot string, usingSocketAgent bool) (*exec.Cmd, error) {
systemName, err := readSystemNameForInstance(instanceDir)
if err != nil {
return nil, err
@@ -855,6 +855,11 @@
}
saArgs := suidAppCmdArgs{targetUser: systemName, binpath: binPath}
+ if usingSocketAgent {
+ if saArgs.sockPath, err = sockPath(instanceDir); err != nil {
+ return nil, verror.New(errors.ErrOperationFailed, ctx, fmt.Sprintf("failed to obtain agent socket path: %v", err))
+ }
+ }
// Pass the displayed name of the program (argv0 as seen in ps output)
// Envelope data comes from the user so we sanitize it for safety
@@ -1053,13 +1058,17 @@
return pid, nil
}
+func (i *appRunner) usingSocketAgent() bool {
+ return i.securityAgent != nil && i.securityAgent.keyMgr != nil
+}
+
func (i *appRunner) run(ctx *context.T, instanceDir string) error {
if err := transitionInstance(instanceDir, device.InstanceStateNotRunning, device.InstanceStateLaunching); err != nil {
return err
}
var pid int
- cmd, err := genCmd(ctx, instanceDir, i.mtAddress)
+ cmd, err := genCmd(ctx, instanceDir, i.mtAddress, i.usingSocketAgent())
if err == nil {
pid, err = i.startCmd(ctx, instanceDir, cmd)
}
@@ -1736,7 +1745,7 @@
} else {
debugInfo.Info = info
}
- if cmd, err := genCmd(ctx, instanceDir, i.runner.mtAddress); err != nil {
+ if cmd, err := genCmd(ctx, instanceDir, i.runner.mtAddress, i.runner.usingSocketAgent()); err != nil {
return "", err
} else {
debugInfo.Cmd = cmd
diff --git a/services/device/deviced/internal/impl/helper_manager.go b/services/device/deviced/internal/impl/helper_manager.go
index 45829d0..2d65694 100644
--- a/services/device/deviced/internal/impl/helper_manager.go
+++ b/services/device/deviced/internal/impl/helper_manager.go
@@ -76,7 +76,7 @@
type suidAppCmdArgs struct {
// args to helper
- targetUser, progname, workspace, logdir, binpath string
+ targetUser, progname, workspace, logdir, binpath, sockPath string
// fields in exec.Cmd
env []string
stdout, stderr io.Writer
@@ -105,6 +105,9 @@
cmd.Args = append(cmd.Args, "--progname", a.progname)
cmd.Args = append(cmd.Args, "--workspace", a.workspace)
cmd.Args = append(cmd.Args, "--logdir", a.logdir)
+ if a.sockPath != "" {
+ cmd.Args = append(cmd.Args, "--agentsock", a.sockPath)
+ }
cmd.Args = append(cmd.Args, "--run", a.binpath)
cmd.Args = append(cmd.Args, "--")
diff --git a/services/device/deviced/internal/impl/proxy_invoker_test.go b/services/device/deviced/internal/impl/proxy_invoker_test.go
index 9c46e21..225027e 100644
--- a/services/device/deviced/internal/impl/proxy_invoker_test.go
+++ b/services/device/deviced/internal/impl/proxy_invoker_test.go
@@ -8,13 +8,13 @@
"reflect"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
libstats "v.io/v23/services/stats"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
"v.io/x/ref/test/testutil"
)
@@ -26,7 +26,7 @@
defer shutdown()
// server1 is a normal server
- server1, err := xrpc.NewServer(ctx, "", &dummy{}, nil)
+ ctx, server1, err := v23.WithNewServer(ctx, "", &dummy{}, nil)
if err != nil {
t.Fatalf("NewServer: %v", err)
}
@@ -36,7 +36,7 @@
remote: naming.JoinAddressName(server1.Status().Endpoints[0].String(), "__debug/stats"),
desc: libstats.StatsServer(nil).Describe__(),
}
- server2, err := xrpc.NewDispatchingServer(ctx, "", disp)
+ ctx, server2, err := v23.WithNewDispatchingServer(ctx, "", disp)
if err != nil {
t.Fatalf("NewServer: %v", err)
}
diff --git a/services/device/deviced/internal/impl/utiltest/app.go b/services/device/deviced/internal/impl/utiltest/app.go
index 3d36f15..ab31297 100644
--- a/services/device/deviced/internal/impl/utiltest/app.go
+++ b/services/device/deviced/internal/impl/utiltest/app.go
@@ -23,7 +23,6 @@
"v.io/x/ref/lib/exec"
"v.io/x/ref/lib/mgmt"
"v.io/x/ref/lib/signals"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/device/internal/suid"
"v.io/x/ref/test"
"v.io/x/ref/test/modules"
@@ -133,7 +132,7 @@
}
publishName := args[0]
- _, err := xrpc.NewServer(ctx, publishName, new(appService), nil)
+ ctx, _, err := v23.WithNewServer(ctx, publishName, new(appService), nil)
if err != nil {
ctx.Fatalf("NewServer(%v) failed: %v", publishName, err)
}
@@ -166,7 +165,7 @@
// function.
func SetupPingServer(t *testing.T, ctx *context.T) (PingServer, func()) {
pingCh := make(chan PingArgs, 1)
- server, err := xrpc.NewServer(ctx, "pingserver", PingServer{pingCh}, security.AllowEveryone())
+ ctx, server, err := v23.WithNewServer(ctx, "pingserver", PingServer{pingCh}, security.AllowEveryone())
if err != nil {
t.Fatalf("NewServer(%q, <dispatcher>) failed: %v", "pingserver", err)
}
diff --git a/services/device/deviced/internal/impl/utiltest/helpers.go b/services/device/deviced/internal/impl/utiltest/helpers.go
index 3c1a163..b8e855e 100644
--- a/services/device/deviced/internal/impl/utiltest/helpers.go
+++ b/services/device/deviced/internal/impl/utiltest/helpers.go
@@ -32,7 +32,6 @@
"v.io/v23/services/stats"
"v.io/v23/verror"
"v.io/x/ref/internal/logger"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/services/device/deviced/internal/impl"
"v.io/x/ref/services/device/deviced/internal/versioning"
@@ -763,7 +762,7 @@
if err != nil {
t.Fatalf("server.NewDispatcher failed: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, von, d)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, von, d)
if err != nil {
t.Fatalf("server.ServeDispatcher failed: %v", err)
}
diff --git a/services/device/deviced/internal/impl/utiltest/mock_repo.go b/services/device/deviced/internal/impl/utiltest/mock_repo.go
index c096d6d..a25ecad 100644
--- a/services/device/deviced/internal/impl/utiltest/mock_repo.go
+++ b/services/device/deviced/internal/impl/utiltest/mock_repo.go
@@ -14,6 +14,7 @@
"reflect"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security"
@@ -22,7 +23,6 @@
"v.io/v23/services/binary"
"v.io/v23/services/repository"
"v.io/v23/verror"
- "v.io/x/ref/lib/xrpc"
)
const MockBinaryRepoName = "br"
@@ -44,7 +44,7 @@
func StartApplicationRepository(ctx *context.T) (*application.Envelope, func()) {
invoker := new(arInvoker)
name := MockApplicationRepoName
- server, err := xrpc.NewServer(ctx, name, repository.ApplicationServer(invoker), security.AllowEveryone())
+ ctx, server, err := v23.WithNewServer(ctx, name, repository.ApplicationServer(invoker), security.AllowEveryone())
if err != nil {
ctx.Fatalf("NewServer(%v) failed: %v", name, err)
}
@@ -91,7 +91,7 @@
// returns a cleanup function.
func StartBinaryRepository(ctx *context.T) func() {
name := MockBinaryRepoName
- server, err := xrpc.NewServer(ctx, name, repository.BinaryServer(new(brInvoker)), security.AllowEveryone())
+ ctx, server, err := v23.WithNewServer(ctx, name, repository.BinaryServer(new(brInvoker)), security.AllowEveryone())
if err != nil {
ctx.Fatalf("Serve(%q) failed: %v", name, err)
}
diff --git a/services/device/deviced/internal/starter/starter.go b/services/device/deviced/internal/starter/starter.go
index f6cdcbf..3ad0a74 100644
--- a/services/device/deviced/internal/starter/starter.go
+++ b/services/device/deviced/internal/starter/starter.go
@@ -21,7 +21,6 @@
"v.io/v23/security"
"v.io/v23/verror"
displib "v.io/x/ref/lib/dispatcher"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/services/debug/debuglib"
"v.io/x/ref/services/device/deviced/internal/impl"
@@ -155,7 +154,7 @@
return "", nil, err
}
ctx = v23.WithListenSpec(ctx, args.Device.ListenSpec)
- server, err := xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
cancel()
return "", nil, err
@@ -334,7 +333,7 @@
func startDeviceServer(ctx *context.T, args DeviceArgs, mt string, permStore *pathperms.PathStore) (shutdown func(), err error) {
ctx = v23.WithListenSpec(ctx, args.ListenSpec)
wrapper := displib.NewDispatcherWrapper()
- server, err := xrpc.NewDispatchingServer(ctx, args.name(mt), wrapper)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, args.name(mt), wrapper)
if err != nil {
return nil, err
}
diff --git a/services/device/internal/suid/args.go b/services/device/internal/suid/args.go
index 1473654..d9ab2fb 100644
--- a/services/device/internal/suid/args.go
+++ b/services/device/internal/suid/args.go
@@ -33,6 +33,7 @@
uid int
gid int
workspace string
+ agentsock string
logDir string
argv0 string
argv []string
@@ -54,9 +55,9 @@
const SavedArgs = "V23_SAVED_ARGS"
var (
- flagUsername, flagWorkspace, flagLogDir, flagRun, flagProgName *string
- flagMinimumUid *int64
- flagRemove, flagKill, flagChown, flagDryrun *bool
+ flagUsername, flagWorkspace, flagLogDir, flagRun, flagProgName, flagAgentSock *string
+ flagMinimumUid *int64
+ flagRemove, flagKill, flagChown, flagDryrun *bool
)
func init() {
@@ -67,6 +68,7 @@
const uidThreshold = 501
flagUsername = fs.String("username", "", "The UNIX user name used for the other functions of this tool.")
flagWorkspace = fs.String("workspace", "", "Path to the application's workspace directory.")
+ flagAgentSock = fs.String("agentsock", "", "Path to the application's security agent socket.")
flagLogDir = fs.String("logdir", "", "Path to the log directory.")
flagRun = fs.String("run", "", "Path to the application to exec.")
flagProgName = fs.String("progname", "unnamed_app", "Visible name of the application, used in argv[0]")
@@ -218,6 +220,7 @@
}
wp.workspace = *flagWorkspace
+ wp.agentsock = *flagAgentSock
wp.argv0 = *flagRun
wp.logDir = *flagLogDir
wp.argv = append([]string{*flagProgName}, fs.Args()...)
diff --git a/services/device/internal/suid/args_test.go b/services/device/internal/suid/args_test.go
index 491a171..d910b51 100644
--- a/services/device/internal/suid/args_test.go
+++ b/services/device/internal/suid/args_test.go
@@ -35,6 +35,7 @@
uid: testUid,
gid: testGid,
workspace: "",
+ agentsock: "",
logDir: "",
argv0: "",
argv: []string{"unnamed_app"},
@@ -49,13 +50,14 @@
{
[]string{"setuidhelper", "--minuid", "1", "--username", testUserName, "--workspace", "/hello",
- "--logdir", "/logging", "--run", "/bin/v23", "--", "one", "two"},
+ "--logdir", "/logging", "--agentsock", "/tmp/sXXXX", "--run", "/bin/v23", "--", "one", "two"},
[]string{"A=B"},
"",
WorkParameters{
uid: testUid,
gid: testGid,
workspace: "/hello",
+ agentsock: "/tmp/sXXXX",
logDir: "/logging",
argv0: "/bin/v23",
argv: []string{"unnamed_app", "one", "two"},
@@ -83,6 +85,7 @@
uid: 0,
gid: 0,
workspace: "",
+ agentsock: "",
logDir: "",
argv0: "",
argv: []string{"hello", "vanadium"},
@@ -103,6 +106,7 @@
uid: testUid,
gid: testGid,
workspace: "",
+ agentsock: "",
logDir: "",
argv0: "",
argv: []string{"/tmp/foo", "/tmp/bar"},
@@ -123,6 +127,7 @@
uid: 0,
gid: 0,
workspace: "",
+ agentsock: "",
logDir: "",
argv0: "",
argv: nil,
@@ -143,6 +148,7 @@
uid: 0,
gid: 0,
workspace: "",
+ agentsock: "",
logDir: "",
argv0: "",
argv: nil,
@@ -157,13 +163,14 @@
{
[]string{"setuidhelper", "--minuid", "1", "--username", testUserName, "--workspace", "/hello", "--progname", "binaryd/vanadium/app/testapp",
- "--logdir", "/logging", "--run", "/bin/v23", "--dryrun", "--", "one", "two"},
+ "--logdir", "/logging", "--agentsock", "/tmp/2981298123/s", "--run", "/bin/v23", "--dryrun", "--", "one", "two"},
[]string{"A=B"},
"",
WorkParameters{
uid: testUid,
gid: testGid,
workspace: "/hello",
+ agentsock: "/tmp/2981298123/s",
logDir: "/logging",
argv0: "/bin/v23",
argv: []string{"binaryd/vanadium/app/testapp", "one", "two"},
diff --git a/services/device/internal/suid/system.go b/services/device/internal/suid/system.go
index c647558..0b80e4d 100644
--- a/services/device/internal/suid/system.go
+++ b/services/device/internal/suid/system.go
@@ -42,9 +42,13 @@
chownPaths := hw.argv
if !hw.chown {
// Chown was invoked as part of regular suid execution, rather than directly
- // via --chown. In that case, we chown the workspace and log directory
+ // via --chown. In that case, we chown the workspace, log directory, and,
+ // if specified, the agent socket path
// TODO(rjkroege): Ensure that the device manager can read log entries.
chownPaths = []string{hw.workspace, hw.logDir}
+ if hw.agentsock != "" {
+ chownPaths = append(chownPaths, hw.agentsock)
+ }
}
for _, p := range chownPaths {
diff --git a/services/groups/groups/main_test.go b/services/groups/groups/main_test.go
index a4c6fed..77ee096 100644
--- a/services/groups/groups/main_test.go
+++ b/services/groups/groups/main_test.go
@@ -14,6 +14,7 @@
"unicode"
"unicode/utf8"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
@@ -21,7 +22,6 @@
"v.io/v23/services/groups"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
)
@@ -82,16 +82,16 @@
return string(unicode.ToUpper(rune)) + s[size:]
}
-func startServer(ctx *context.T, t *testing.T) (rpc.XServer, naming.Endpoint) {
+func startServer(ctx *context.T, t *testing.T) (rpc.Server, naming.Endpoint) {
unpublished := ""
- s, err := xrpc.NewServer(ctx, unpublished, groups.GroupServer(&mock{}), nil)
+ ctx, s, err := v23.WithNewServer(ctx, unpublished, groups.GroupServer(&mock{}), nil)
if err != nil {
t.Fatalf("NewServer(%v) failed: %v", unpublished, err)
}
return s, s.Status().Endpoints[0]
}
-func stopServer(t *testing.T, server rpc.XServer) {
+func stopServer(t *testing.T, server rpc.Server) {
if err := server.Stop(); err != nil {
t.Errorf("Stop() failed: %v", err)
}
diff --git a/services/groups/groupsd/groupsd_v23_test.go b/services/groups/groupsd/groupsd_v23_test.go
index fb27e4c..9cf1ecd 100644
--- a/services/groups/groupsd/groupsd_v23_test.go
+++ b/services/groups/groupsd/groupsd_v23_test.go
@@ -22,7 +22,6 @@
"v.io/v23/services/groups"
"v.io/v23/verror"
"v.io/x/lib/set"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/groups/groupsd/testdata/kvstore"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
@@ -174,7 +173,7 @@
if err != nil {
return err
}
- if _, err := xrpc.NewServer(ctx, kvServerName, kvstore.StoreServer(&store{}), authorizer); err != nil {
+ if _, _, err := v23.WithNewServer(ctx, kvServerName, kvstore.StoreServer(&store{}), authorizer); err != nil {
return err
}
modules.WaitForEOF(env.Stdin)
diff --git a/services/groups/groupsd/main.go b/services/groups/groupsd/main.go
index c785272..38d04bd 100644
--- a/services/groups/groupsd/main.go
+++ b/services/groups/groupsd/main.go
@@ -11,7 +11,6 @@
import (
"fmt"
- "path/filepath"
"v.io/v23"
"v.io/v23/context"
@@ -22,10 +21,8 @@
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/services/groups/internal/server"
- "v.io/x/ref/services/groups/internal/store/gkv"
"v.io/x/ref/services/groups/internal/store/leveldb"
"v.io/x/ref/services/groups/internal/store/mem"
)
@@ -39,7 +36,7 @@
func main() {
cmdGroupsD.Flags.StringVar(&flagName, "name", "", "Name to mount the groups server as.")
- cmdGroupsD.Flags.StringVar(&flagEngine, "engine", "memstore", "Storage engine to use. Currently supported: gkv, leveldb, and memstore.")
+ cmdGroupsD.Flags.StringVar(&flagEngine, "engine", "memstore", "Storage engine to use. Currently supported: leveldb, and memstore.")
cmdGroupsD.Flags.StringVar(&flagRootDir, "root-dir", "/var/lib/groupsd", "Root dir for storage engines and other data.")
cmdline.HideGlobalFlagsExcept()
@@ -81,13 +78,6 @@
}
var dispatcher rpc.Dispatcher
switch flagEngine {
- case "gkv":
- file := filepath.Join(flagRootDir, ".gkvstore")
- store, err := gkv.New(file)
- if err != nil {
- ctx.Fatalf("gkv.New(%v) failed: %v", file, err)
- }
- dispatcher = server.NewManager(store, perms)
case "leveldb":
store, err := leveldb.Open(flagRootDir)
if err != nil {
@@ -99,7 +89,7 @@
default:
return fmt.Errorf("unknown storage engine %v", flagEngine)
}
- server, err := xrpc.NewDispatchingServer(ctx, flagName, dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, flagName, dispatcher)
if err != nil {
return fmt.Errorf("NewDispatchingServer(%v) failed: %v", flagName, err)
}
diff --git a/services/groups/internal/server/server_test.go b/services/groups/internal/server/server_test.go
index d5b0e6e..b4393b0 100644
--- a/services/groups/internal/server/server_test.go
+++ b/services/groups/internal/server/server_test.go
@@ -18,11 +18,9 @@
"v.io/v23/security/access"
"v.io/v23/services/groups"
"v.io/v23/verror"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/groups/internal/server"
"v.io/x/ref/services/groups/internal/store"
- "v.io/x/ref/services/groups/internal/store/gkv"
"v.io/x/ref/services/groups/internal/store/leveldb"
"v.io/x/ref/services/groups/internal/store/mem"
"v.io/x/ref/test/testutil"
@@ -31,8 +29,7 @@
type backend int
const (
- gkvstore backend = iota
- leveldbstore
+ leveldbstore backend = iota
memstore
)
@@ -108,16 +105,6 @@
switch be {
case memstore:
st = mem.New()
- case gkvstore:
- file, err := ioutil.TempFile("", "")
- if err != nil {
- ctx.Fatal("ioutil.TempFile() failed: ", err)
- }
- st, err = gkv.New(file.Name())
- if err != nil {
- ctx.Fatal("gkv.New() failed: ", err)
- }
- path = file.Name()
case leveldbstore:
path, err = ioutil.TempDir("", "")
if err != nil {
@@ -133,7 +120,7 @@
m := server.NewManager(st, perms)
- server, err := xrpc.NewDispatchingServer(ctx, "", m)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", m)
if err != nil {
ctx.Fatal("NewDispatchingServer() failed: ", err)
}
@@ -186,10 +173,6 @@
////////////////////////////////////////
// Test cases
-func TestCreateGkvStore(t *testing.T) {
- testCreateHelper(t, gkvstore)
-}
-
func TestCreateMemStore(t *testing.T) {
testCreateHelper(t, memstore)
}
@@ -246,10 +229,6 @@
}
}
-func TestDeleteGkvStore(t *testing.T) {
- testDeleteHelper(t, gkvstore)
-}
-
func TestDeleteMemStore(t *testing.T) {
testDeleteHelper(t, memstore)
}
@@ -314,10 +293,6 @@
}
}
-func TestPermsGkvStore(t *testing.T) {
- testPermsHelper(t, gkvstore)
-}
-
func TestPermsMemStore(t *testing.T) {
testPermsHelper(t, memstore)
}
@@ -427,10 +402,6 @@
}
}
-func TestAddGkvStore(t *testing.T) {
- testAddHelper(t, gkvstore)
-}
-
func TestAddMemStore(t *testing.T) {
testAddHelper(t, memstore)
}
@@ -520,10 +491,6 @@
}
}
-func TestRemoveGkvStore(t *testing.T) {
- testRemoveHelper(t, gkvstore)
-}
-
func TestRemoveMemStore(t *testing.T) {
testRemoveHelper(t, memstore)
}
diff --git a/services/groups/internal/store/gkv/store.go b/services/groups/internal/store/gkv/store.go
deleted file mode 100644
index 107aa33..0000000
--- a/services/groups/internal/store/gkv/store.go
+++ /dev/null
@@ -1,196 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package gkv provides a simple implementation of server.Store that uses
-// gkvlite for persistence. It's meant as a stopgap solution until the Syncbase
-// storage engine is ready for consumption by other Vanadium modules. Since it's
-// a stopgap, it doesn't bother with entry-level locking.
-package gkv
-
-import (
- "os"
- "strconv"
- "sync"
-
- "github.com/steveyen/gkvlite"
-
- "v.io/v23/vdl"
- "v.io/v23/verror"
- "v.io/v23/vom"
- "v.io/x/ref/services/groups/internal/store"
-)
-
-const collectionName string = "c"
-
-type entry struct {
- Value interface{}
- Version uint64
-}
-
-// TODO(sadovsky): Compaction.
-type gkv struct {
- mu sync.Mutex
- err error
- file *os.File
- kvst *gkvlite.Store
- coll *gkvlite.Collection
-}
-
-var _ store.Store = (*gkv)(nil)
-
-func New(filename string) (store.Store, error) {
- file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600)
- if err != nil {
- return nil, convertError(err)
- }
- kvst, err := gkvlite.NewStore(file)
- if err != nil {
- file.Close()
- return nil, convertError(err)
- }
- res := &gkv{file: file, kvst: kvst}
- coll := kvst.GetCollection(collectionName)
- // Create collection if needed.
- if coll == nil {
- coll = kvst.SetCollection(collectionName, nil)
- if err := res.flush(); err != nil {
- res.Close()
- return nil, convertError(err)
- }
- }
- res.coll = coll
- return res, nil
-}
-
-func (st *gkv) Get(k string, v interface{}) (version string, err error) {
- st.mu.Lock()
- defer st.mu.Unlock()
- if st.err != nil {
- return "", convertError(st.err)
- }
- e, err := st.get(k)
- if err != nil {
- return "", err
- }
- if err := vdl.Convert(v, e.Value); err != nil {
- return "", convertError(err)
- }
- return strconv.FormatUint(e.Version, 10), nil
-}
-
-func (st *gkv) Insert(k string, v interface{}) error {
- st.mu.Lock()
- defer st.mu.Unlock()
- if st.err != nil {
- return convertError(st.err)
- }
- if _, err := st.get(k); verror.ErrorID(err) != store.ErrUnknownKey.ID {
- if err != nil {
- return err
- }
- return verror.New(store.ErrKeyExists, nil, k)
- }
- return st.put(k, &entry{Value: v})
-}
-
-func (st *gkv) Update(k string, v interface{}, version string) error {
- st.mu.Lock()
- defer st.mu.Unlock()
- if st.err != nil {
- return convertError(st.err)
- }
- e, err := st.get(k)
- if err != nil {
- return err
- }
- if err := e.checkVersion(version); err != nil {
- return err
- }
- return st.put(k, &entry{Value: v, Version: e.Version + 1})
-}
-
-func (st *gkv) Delete(k string, version string) error {
- st.mu.Lock()
- defer st.mu.Unlock()
- if st.err != nil {
- return convertError(st.err)
- }
- e, err := st.get(k)
- if err != nil {
- return err
- }
- if err := e.checkVersion(version); err != nil {
- return err
- }
- return st.delete(k)
-}
-
-func (st *gkv) Close() error {
- st.mu.Lock()
- defer st.mu.Unlock()
- if st.err != nil {
- return convertError(st.err)
- }
- st.err = verror.New(verror.ErrCanceled, nil, "closed store")
- st.kvst.Close()
- return convertError(st.file.Close())
-}
-
-////////////////////////////////////////
-// Internal helpers
-
-// get, put, delete, and flush all assume st.mu is held.
-func (st *gkv) get(k string) (*entry, error) {
- bytes, err := st.coll.Get([]byte(k))
- if err != nil {
- return nil, convertError(err)
- }
- if bytes == nil {
- return nil, verror.New(store.ErrUnknownKey, nil, k)
- }
- e := &entry{}
- if err := vom.Decode(bytes, e); err != nil {
- return nil, convertError(err)
- }
- return e, nil
-}
-
-func (st *gkv) put(k string, e *entry) error {
- bytes, err := vom.Encode(e)
- if err != nil {
- return convertError(err)
- }
- if err := st.coll.Set([]byte(k), bytes); err != nil {
- return convertError(err)
- }
- return convertError(st.flush())
-}
-
-func (st *gkv) delete(k string) error {
- if _, err := st.coll.Delete([]byte(k)); err != nil {
- return convertError(err)
- }
- return convertError(st.flush())
-}
-
-func (st *gkv) flush() error {
- if err := st.kvst.Flush(); err != nil {
- return convertError(err)
- }
- // TODO(sadovsky): Better handling for the case where kvst.Flush() succeeds
- // but file.Sync() fails. See discussion in v.io/c/11829.
- return convertError(st.file.Sync())
-}
-
-func (e *entry) checkVersion(version string) error {
- newVersion := strconv.FormatUint(e.Version, 10)
- if version != newVersion {
- return verror.NewErrBadVersion(nil)
- }
- return nil
-}
-
-func convertError(err error) error {
- return verror.Convert(verror.IDAction{}, nil, err)
-}
diff --git a/services/identity/internal/revocation/revocation_test.go b/services/identity/internal/revocation/revocation_test.go
index 34ad488..de6f2dd 100644
--- a/services/identity/internal/revocation/revocation_test.go
+++ b/services/identity/internal/revocation/revocation_test.go
@@ -7,22 +7,20 @@
import (
"testing"
- "v.io/x/ref/lib/xrpc"
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/security"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/discharger"
"v.io/x/ref/services/identity/internal/dischargerlib"
"v.io/x/ref/test"
-
- "v.io/v23"
- "v.io/v23/context"
- "v.io/v23/security"
)
//go:generate v23 test generate
func revokerSetup(t *testing.T, ctx *context.T) (dischargerKey security.PublicKey, dischargerEndpoint string, revoker RevocationManager) {
dischargerServiceStub := discharger.DischargerServer(dischargerlib.NewDischarger())
- dischargerServer, err := xrpc.NewServer(ctx, "", dischargerServiceStub, nil)
+ ctx, dischargerServer, err := v23.WithNewServer(ctx, "", dischargerServiceStub, nil)
if err != nil {
t.Fatalf("r.NewServer: %s", err)
}
diff --git a/services/identity/internal/server/identityd.go b/services/identity/internal/server/identityd.go
index 4677015..a388aa4 100644
--- a/services/identity/internal/server/identityd.go
+++ b/services/identity/internal/server/identityd.go
@@ -26,7 +26,6 @@
"v.io/v23/verror"
"v.io/x/ref/lib/security/audit"
"v.io/x/ref/lib/signals"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/discharger"
"v.io/x/ref/services/identity/internal/auditor"
"v.io/x/ref/services/identity/internal/blesser"
@@ -129,7 +128,7 @@
}
}
-func (s *IdentityServer) Listen(ctx *context.T, externalHttpAddr, httpAddr, tlsConfig string) (rpc.XServer, []string, string) {
+func (s *IdentityServer) Listen(ctx *context.T, externalHttpAddr, httpAddr, tlsConfig string) (rpc.Server, []string, string) {
// json-encoded public key and blessing names of this server
principal := v23.GetPrincipal(ctx)
http.Handle("/auth/blessing-root", handlers.BlessingRoot{principal})
@@ -209,11 +208,11 @@
// Starts the Vanadium and HTTP services for blessing, and the Vanadium service for discharging.
// All Vanadium services are started on the same port.
-func (s *IdentityServer) setupBlessingServices(ctx *context.T, macaroonKey []byte) (rpc.XServer, []string, error) {
+func (s *IdentityServer) setupBlessingServices(ctx *context.T, macaroonKey []byte) (rpc.Server, []string, error) {
disp := newDispatcher(macaroonKey, s.oauthBlesserParams)
principal := v23.GetPrincipal(ctx)
objectAddr := naming.Join(s.mountNamePrefix, fmt.Sprintf("%v", principal.BlessingStore().Default()))
- server, err := xrpc.NewDispatchingServer(ctx, objectAddr, disp)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, objectAddr, disp)
if err != nil {
return nil, nil, err
}
diff --git a/services/internal/binarylib/client_test.go b/services/internal/binarylib/client_test.go
index 7c83ca2..01c868d 100644
--- a/services/internal/binarylib/client_test.go
+++ b/services/internal/binarylib/client_test.go
@@ -17,8 +17,6 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/services/repository"
-
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test"
"v.io/x/ref/test/testutil"
)
@@ -49,7 +47,7 @@
if err != nil {
t.Fatalf("NewDispatcher() failed: %v\n", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
t.Fatalf("NewServer() failed: %v", err)
}
diff --git a/services/internal/binarylib/impl_test.go b/services/internal/binarylib/impl_test.go
index 1ffd9a6..a8ea515 100644
--- a/services/internal/binarylib/impl_test.go
+++ b/services/internal/binarylib/impl_test.go
@@ -19,7 +19,7 @@
"v.io/v23/services/repository"
"v.io/v23/verror"
- "v.io/x/ref/lib/xrpc"
+ "v.io/v23"
_ "v.io/x/ref/runtime/factories/static"
"v.io/x/ref/services/internal/binarylib"
"v.io/x/ref/services/internal/servicetest"
@@ -57,7 +57,7 @@
t.Fatalf("NewDispatcher failed: %v", err)
}
dontPublishName := ""
- server, err := xrpc.NewDispatchingServer(ctx, dontPublishName, dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, dontPublishName, dispatcher)
if err != nil {
t.Fatalf("NewServer(%q) failed: %v", dontPublishName, err)
}
diff --git a/services/internal/binarylib/perms_test.go b/services/internal/binarylib/perms_test.go
index 0729b69..89b330e 100644
--- a/services/internal/binarylib/perms_test.go
+++ b/services/internal/binarylib/perms_test.go
@@ -18,7 +18,6 @@
"v.io/v23/services/repository"
"v.io/v23/verror"
"v.io/x/ref/lib/signals"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/internal/binarylib"
"v.io/x/ref/services/internal/servicetest"
"v.io/x/ref/test"
@@ -49,7 +48,7 @@
if err != nil {
ctx.Fatalf("Failed to create binaryd dispatcher: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, publishName, dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, publishName, dispatcher)
if err != nil {
ctx.Fatalf("NewDispatchingServer(%v) failed: %v", publishName, err)
}
diff --git a/services/internal/logreaderlib/logfile_test.go b/services/internal/logreaderlib/logfile_test.go
index aeda437..c046fdd 100644
--- a/services/internal/logreaderlib/logfile_test.go
+++ b/services/internal/logreaderlib/logfile_test.go
@@ -10,12 +10,12 @@
"path"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/services/logreader"
"v.io/v23/verror"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/internal/logreaderlib"
"v.io/x/ref/test"
@@ -50,7 +50,7 @@
t.Fatalf("ioutil.TempDir: %v", err)
}
defer os.RemoveAll(workdir)
- server, err := xrpc.NewDispatchingServer(ctx, "", &logFileDispatcher{workdir})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &logFileDispatcher{workdir})
if err != nil {
t.Fatalf("NewDispatchingServer failed: %v", err)
}
@@ -136,7 +136,7 @@
t.Fatalf("ioutil.TempDir: %v", err)
}
defer os.RemoveAll(workdir)
- server, err := xrpc.NewDispatchingServer(ctx, "", &logFileDispatcher{workdir})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &logFileDispatcher{workdir})
if err != nil {
t.Fatalf("NewDispatchingServer failed: %v", err)
}
diff --git a/services/internal/pproflib/proxy_test.go b/services/internal/pproflib/proxy_test.go
index 079993a..b9b21b2 100644
--- a/services/internal/pproflib/proxy_test.go
+++ b/services/internal/pproflib/proxy_test.go
@@ -10,9 +10,9 @@
"net/http"
"testing"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/security"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/internal/pproflib"
"v.io/x/ref/test"
@@ -32,7 +32,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- s, err := xrpc.NewDispatchingServer(ctx, "", &dispatcher{pproflib.NewPProfService()})
+ ctx, s, err := v23.WithNewDispatchingServer(ctx, "", &dispatcher{pproflib.NewPProfService()})
if err != nil {
t.Fatalf("failed to start server: %v", err)
}
diff --git a/services/internal/servicetest/modules.go b/services/internal/servicetest/modules.go
index 56b4264..ae44db1 100644
--- a/services/internal/servicetest/modules.go
+++ b/services/internal/servicetest/modules.go
@@ -18,7 +18,6 @@
"v.io/v23/security"
"v.io/x/ref"
"v.io/x/ref/internal/logger"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/testutil"
@@ -40,7 +39,7 @@
if err != nil {
return fmt.Errorf("mounttablelib.NewMountTableDispatcher failed: %s", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
if err != nil {
return fmt.Errorf("root failed: %v", err)
}
diff --git a/services/internal/statslib/stats_test.go b/services/internal/statslib/stats_test.go
index eccd497..0a35a64 100644
--- a/services/internal/statslib/stats_test.go
+++ b/services/internal/statslib/stats_test.go
@@ -17,9 +17,9 @@
"v.io/v23/services/watch"
"v.io/v23/vdl"
+ "v.io/v23"
libstats "v.io/x/ref/lib/stats"
"v.io/x/ref/lib/stats/histogram"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/internal/statslib"
s_stats "v.io/x/ref/services/stats"
"v.io/x/ref/test"
@@ -41,7 +41,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewDispatchingServer(ctx, "", &statsDispatcher{})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &statsDispatcher{})
if err != nil {
t.Fatalf("NewServer failed: %v", err)
return
diff --git a/services/internal/vtracelib/vtrace_test.go b/services/internal/vtracelib/vtrace_test.go
index 3848ad2..536abe6 100644
--- a/services/internal/vtracelib/vtrace_test.go
+++ b/services/internal/vtracelib/vtrace_test.go
@@ -8,9 +8,9 @@
"io"
"testing"
+ "v.io/v23"
s_vtrace "v.io/v23/services/vtrace"
"v.io/v23/vtrace"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/internal/vtracelib"
"v.io/x/ref/test"
@@ -23,7 +23,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewServer(ctx, "", vtracelib.NewVtraceService(), nil)
+ ctx, server, err := v23.WithNewServer(ctx, "", vtracelib.NewVtraceService(), nil)
if err != nil {
t.Fatalf("Could not create server: %s", err)
}
diff --git a/services/mounttable/mounttablelib/mounttable_test.go b/services/mounttable/mounttablelib/mounttable_test.go
index 3cca456..f71d5cc 100644
--- a/services/mounttable/mounttablelib/mounttable_test.go
+++ b/services/mounttable/mounttablelib/mounttable_test.go
@@ -27,9 +27,7 @@
"v.io/v23/services/mounttable"
"v.io/v23/services/stats"
"v.io/v23/vdl"
-
libstats "v.io/x/ref/lib/stats"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/debug/debuglib"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/test"
@@ -52,7 +50,7 @@
func doMount(t *testing.T, ctx *context.T, ep, suffix, service string, shouldSucceed bool) {
name := naming.JoinAddressName(ep, suffix)
client := v23.GetClient(ctx)
- if err := client.Call(ctx, name, "Mount", []interface{}{service, uint32(ttlSecs), 0}, nil, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, name, "Mount", []interface{}{service, uint32(ttlSecs), 0}, nil, options.Preresolved{}); err != nil {
if !shouldSucceed {
return
}
@@ -63,7 +61,7 @@
func doUnmount(t *testing.T, ctx *context.T, ep, suffix, service string, shouldSucceed bool) {
name := naming.JoinAddressName(ep, suffix)
client := v23.GetClient(ctx)
- if err := client.Call(ctx, name, "Unmount", []interface{}{service}, nil, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, name, "Unmount", []interface{}{service}, nil, options.Preresolved{}); err != nil {
if !shouldSucceed {
return
}
@@ -74,7 +72,7 @@
func doGetPermissions(t *testing.T, ctx *context.T, ep, suffix string, shouldSucceed bool) (perms access.Permissions, version string) {
name := naming.JoinAddressName(ep, suffix)
client := v23.GetClient(ctx)
- if err := client.Call(ctx, name, "GetPermissions", nil, []interface{}{&perms, &version}, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, name, "GetPermissions", nil, []interface{}{&perms, &version}, options.Preresolved{}); err != nil {
if !shouldSucceed {
return
}
@@ -86,7 +84,7 @@
func doSetPermissions(t *testing.T, ctx *context.T, ep, suffix string, perms access.Permissions, version string, shouldSucceed bool) {
name := naming.JoinAddressName(ep, suffix)
client := v23.GetClient(ctx)
- if err := client.Call(ctx, name, "SetPermissions", []interface{}{perms, version}, nil, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, name, "SetPermissions", []interface{}{perms, version}, nil, options.Preresolved{}); err != nil {
if !shouldSucceed {
return
}
@@ -97,7 +95,7 @@
func doDeleteNode(t *testing.T, ctx *context.T, ep, suffix string, shouldSucceed bool) {
name := naming.JoinAddressName(ep, suffix)
client := v23.GetClient(ctx)
- if err := client.Call(ctx, name, "Delete", []interface{}{false}, nil, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, name, "Delete", []interface{}{false}, nil, options.Preresolved{}); err != nil {
if !shouldSucceed {
return
}
@@ -108,7 +106,7 @@
func doDeleteSubtree(t *testing.T, ctx *context.T, ep, suffix string, shouldSucceed bool) {
name := naming.JoinAddressName(ep, suffix)
client := v23.GetClient(ctx)
- if err := client.Call(ctx, name, "Delete", []interface{}{true}, nil, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, name, "Delete", []interface{}{true}, nil, options.Preresolved{}); err != nil {
if !shouldSucceed {
return
}
@@ -132,7 +130,7 @@
// Resolve the name one level.
var entry naming.MountEntry
client := v23.GetClient(ctx)
- if err := client.Call(ctx, name, "ResolveStep", nil, []interface{}{&entry}, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, name, "ResolveStep", nil, []interface{}{&entry}, options.Preresolved{}); err != nil {
return nil, err
}
if len(entry.Servers) < 1 {
@@ -149,7 +147,7 @@
}
// Export the value.
client := v23.GetClient(ctx)
- if err := client.Call(ctx, mountentry2names(resolved)[0], "Export", []interface{}{contents, true}, nil, options.NoResolve{}); err != nil {
+ if err := client.Call(ctx, mountentry2names(resolved)[0], "Export", []interface{}{contents, true}, nil, options.Preresolved{resolved}); err != nil {
boom(t, "Failed to Export.Call %s to %s: %s", name, contents, err)
}
}
@@ -165,7 +163,7 @@
}
// Look up the value.
client := v23.GetClient(ctx)
- call, err := client.StartCall(ctx, mountentry2names(resolved)[0], "Lookup", nil, options.NoResolve{})
+ call, err := client.StartCall(ctx, mountentry2names(resolved)[0], "Lookup", nil, options.Preresolved{resolved})
if err != nil {
if shouldSucceed {
boom(t, "Failed Lookup.StartCall %s: %s", name, err)
@@ -198,7 +196,7 @@
}
// Start serving on a loopback address.
- server, err := xrpc.NewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
if err != nil {
boom(t, "r.NewServer: %s", err)
}
@@ -211,7 +209,7 @@
func newCollection(t *testing.T, rootCtx *context.T) (func() error, string) {
// Start serving a collection service on a loopback address. This
// is just a service we can mount and test against.
- server, err := xrpc.NewDispatchingServer(rootCtx, "collection", newCollectionServer())
+ _, server, err := v23.WithNewDispatchingServer(rootCtx, "collection", newCollectionServer())
if err != nil {
boom(t, "r.NewServer: %s", err)
}
@@ -331,7 +329,7 @@
func doGlobX(t *testing.T, ctx *context.T, ep, suffix, pattern string, joinServer bool) []string {
name := naming.JoinAddressName(ep, suffix)
client := v23.GetClient(ctx)
- call, err := client.StartCall(ctx, name, rpc.GlobMethod, []interface{}{pattern}, options.NoResolve{})
+ call, err := client.StartCall(ctx, name, rpc.GlobMethod, []interface{}{pattern}, options.Preresolved{})
if err != nil {
boom(t, "Glob.StartCall %s %s: %s", name, pattern, err)
}
@@ -450,7 +448,7 @@
func (fakeServerCall) LocalEndpoint() naming.Endpoint { return nil }
func (fakeServerCall) RemoteEndpoint() naming.Endpoint { return nil }
func (fakeServerCall) GrantedBlessings() security.Blessings { return security.Blessings{} }
-func (fakeServerCall) Server() rpc.XServer { return nil }
+func (fakeServerCall) Server() rpc.Server { return nil }
func (c *fakeServerCall) SendStream() interface {
Send(naming.GlobReply) error
} {
diff --git a/services/mounttable/mounttablelib/neighborhood_test.go b/services/mounttable/mounttablelib/neighborhood_test.go
index 8216c23..089e958 100644
--- a/services/mounttable/mounttablelib/neighborhood_test.go
+++ b/services/mounttable/mounttablelib/neighborhood_test.go
@@ -14,8 +14,6 @@
"v.io/v23"
"v.io/v23/naming"
"v.io/v23/options"
-
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/test"
@@ -56,7 +54,7 @@
defer nhd.(stopper).Stop()
// Start serving on a loopback address.
- server, err := xrpc.NewDispatchingServer(rootCtx, "", nhd)
+ rootCtx, server, err := v23.WithNewDispatchingServer(rootCtx, "", nhd)
if err != nil {
boom(t, "Failed to create neighborhood: %s", err)
}
@@ -86,7 +84,7 @@
client := v23.GetClient(rootCtx)
name := naming.JoinAddressName(estr, serverName+"/"+expectedSuffix)
- call, cerr := client.StartCall(rootCtx, name, "ResolveStep", nil, options.NoResolve{})
+ call, cerr := client.StartCall(rootCtx, name, "ResolveStep", nil, options.Preresolved{})
if cerr != nil {
boom(t, "ResolveStep.StartCall: %s", cerr)
}
diff --git a/services/mounttable/mounttablelib/servers.go b/services/mounttable/mounttablelib/servers.go
index 2d919d1..b0f6eb0 100644
--- a/services/mounttable/mounttablelib/servers.go
+++ b/services/mounttable/mounttablelib/servers.go
@@ -12,7 +12,6 @@
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
- "v.io/x/ref/lib/xrpc"
)
func StartServers(ctx *context.T, listenSpec rpc.ListenSpec, mountName, nhName, permsFile, persistDir, debugPrefix string) (string, func(), error) {
@@ -29,10 +28,10 @@
return "", nil, err
}
ctx = v23.WithListenSpec(ctx, listenSpec)
- mtServer, err := xrpc.NewDispatchingServer(ctx, mountName, mt, options.ServesMountTable(true))
+ ctx, mtServer, err := v23.WithNewDispatchingServer(ctx, mountName, mt, options.ServesMountTable(true))
if err != nil {
- ctx.Errorf("v23.NewServer failed: %v", err)
+ ctx.Errorf("v23.WithNewServer failed: %v", err)
return "", nil, err
}
stopFuncs = append(stopFuncs, mtServer.Stop)
@@ -60,9 +59,9 @@
nh, err = NewNeighborhoodDispatcher(nhName, names...)
}
- nhServer, err := xrpc.NewDispatchingServer(ctx, naming.Join(mtName, "nh"), nh, options.ServesMountTable(true))
+ ctx, nhServer, err := v23.WithNewDispatchingServer(ctx, naming.Join(mtName, "nh"), nh, options.ServesMountTable(true))
if err != nil {
- ctx.Errorf("v23.NewServer failed: %v", err)
+ ctx.Errorf("v23.WithNewServer failed: %v", err)
stop()
return "", nil, err
}
diff --git a/services/profile/profile/impl_test.go b/services/profile/profile/impl_test.go
index 3879f0e..7c717ba 100644
--- a/services/profile/profile/impl_test.go
+++ b/services/profile/profile/impl_test.go
@@ -16,9 +16,9 @@
"v.io/v23/security"
"v.io/v23/services/build"
+ "v.io/v23"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/profile"
"v.io/x/ref/services/repository"
@@ -91,7 +91,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- server, err := xrpc.NewDispatchingServer(ctx, "", &dispatcher{})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", &dispatcher{})
if err != nil {
return
}
diff --git a/services/profile/profiled/impl_test.go b/services/profile/profiled/impl_test.go
index 89e5667..2319d43 100644
--- a/services/profile/profiled/impl_test.go
+++ b/services/profile/profiled/impl_test.go
@@ -10,10 +10,9 @@
"reflect"
"testing"
+ "v.io/v23"
"v.io/v23/naming"
"v.io/v23/services/build"
-
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/profile"
"v.io/x/ref/services/repository"
"v.io/x/ref/test"
@@ -49,7 +48,7 @@
if err != nil {
t.Fatalf("NewDispatcher() failed: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
t.Fatalf("NewServer() failed: %v", err)
}
@@ -109,7 +108,7 @@
if err != nil {
t.Fatalf("NewDispatcher() failed: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
t.Fatalf("NewServer() failed: %v", err)
}
@@ -138,7 +137,7 @@
if err != nil {
t.Fatalf("NewDispatcher() failed: %v", err)
}
- server, err = xrpc.NewDispatchingServer(ctx, "", dispatcher)
+ ctx, server, err = v23.WithNewDispatchingServer(ctx, "", dispatcher)
if err != nil {
t.Fatalf("NewServer() failed: %v", err)
}
diff --git a/services/profile/profiled/main.go b/services/profile/profiled/main.go
index 93cf635..6b51f64 100644
--- a/services/profile/profiled/main.go
+++ b/services/profile/profiled/main.go
@@ -10,12 +10,12 @@
import (
"fmt"
+ "v.io/v23"
"v.io/v23/context"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
)
@@ -49,7 +49,7 @@
return fmt.Errorf("NewDispatcher() failed: %v", err)
}
- server, err := xrpc.NewDispatchingServer(ctx, name, dispatcher)
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, name, dispatcher)
if err != nil {
return fmt.Errorf("NewServer() failed: %v", err)
}
diff --git a/services/proxy/proxyd/main.go b/services/proxy/proxyd/main.go
index c7e6950..7b5488f 100644
--- a/services/proxy/proxyd/main.go
+++ b/services/proxy/proxyd/main.go
@@ -14,18 +14,15 @@
"net/http"
"time"
- "v.io/x/lib/cmdline"
-
"v.io/v23"
"v.io/v23/context"
"v.io/v23/logging"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
-
+ "v.io/x/lib/cmdline"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/runtime/factories/static"
)
@@ -97,7 +94,7 @@
monitoringName = name + "-mon"
}
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: proxyEndpoint.Name()})
- server, err := xrpc.NewDispatchingServer(ctx, monitoringName, &nilDispatcher{})
+ ctx, server, err := v23.WithNewDispatchingServer(ctx, monitoringName, &nilDispatcher{})
if err != nil {
return fmt.Errorf("NewServer failed: %v", err)
}
diff --git a/services/proxy/proxyd/proxyd_v23_test.go b/services/proxy/proxyd/proxyd_v23_test.go
index 86b7c64..85bf69e 100644
--- a/services/proxy/proxyd/proxyd_v23_test.go
+++ b/services/proxy/proxyd/proxyd_v23_test.go
@@ -11,7 +11,6 @@
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
)
@@ -58,7 +57,7 @@
var runServer = modules.Register(func(env *modules.Env, args ...string) error {
ctx, shutdown := v23.Init()
defer shutdown()
- if _, err := xrpc.NewServer(ctx, serverName, service{}, security.AllowEveryone()); err != nil {
+ if _, _, err := v23.WithNewServer(ctx, serverName, service{}, security.AllowEveryone()); err != nil {
return err
}
modules.WaitForEOF(env.Stdin)
diff --git a/services/repository/repository.vdl b/services/repository/repository.vdl
index 3990d3a..2d977c2 100644
--- a/services/repository/repository.vdl
+++ b/services/repository/repository.vdl
@@ -18,17 +18,14 @@
// envelopes, as well as querying for a list of supported profiles.
type Application interface {
public.Application
- // DEPRECATED. Please use PutX for new code.
- // Put adds the given tuple of application version (specified
- // through the object name suffix) and application envelope to all
- // of the given application profiles.
- Put(Profiles []string, Envelope application.Envelope) error {access.Write}
- // PutX adds the given application envelope for the given profile and
+ // Put adds the given application envelope for the given profile and
// application version (required, and specified through the object name
// suffix).
//
// An error is returned if an envelope already exists, unless the
// overwrite option is set.
+ Put(Profile string, Envelope application.Envelope, Overwrite bool) error {access.Write}
+ // DEPRECATED. Please use Put for new code.
PutX(Profile string, Envelope application.Envelope, Overwrite bool) error {access.Write}
// Remove removes the application envelope for the given profile
// name and application version (specified through the object name
diff --git a/services/repository/repository.vdl.go b/services/repository/repository.vdl.go
index 65a3890..07c29d2 100644
--- a/services/repository/repository.vdl.go
+++ b/services/repository/repository.vdl.go
@@ -43,17 +43,14 @@
// and executing the "search" application, version "v1", runnable
// on either the "base" or "media" profile.
repository.ApplicationClientMethods
- // DEPRECATED. Please use PutX for new code.
- // Put adds the given tuple of application version (specified
- // through the object name suffix) and application envelope to all
- // of the given application profiles.
- Put(ctx *context.T, Profiles []string, Envelope application.Envelope, opts ...rpc.CallOpt) error
- // PutX adds the given application envelope for the given profile and
+ // Put adds the given application envelope for the given profile and
// application version (required, and specified through the object name
// suffix).
//
// An error is returned if an envelope already exists, unless the
// overwrite option is set.
+ Put(ctx *context.T, Profile string, Envelope application.Envelope, Overwrite bool, opts ...rpc.CallOpt) error
+ // DEPRECATED. Please use Put for new code.
PutX(ctx *context.T, Profile string, Envelope application.Envelope, Overwrite bool, opts ...rpc.CallOpt) error
// Remove removes the application envelope for the given profile
// name and application version (specified through the object name
@@ -89,8 +86,8 @@
repository.ApplicationClientStub
}
-func (c implApplicationClientStub) Put(ctx *context.T, i0 []string, i1 application.Envelope, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "Put", []interface{}{i0, i1}, nil, opts...)
+func (c implApplicationClientStub) Put(ctx *context.T, i0 string, i1 application.Envelope, i2 bool, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "Put", []interface{}{i0, i1, i2}, nil, opts...)
return
}
@@ -127,17 +124,14 @@
// and executing the "search" application, version "v1", runnable
// on either the "base" or "media" profile.
repository.ApplicationServerMethods
- // DEPRECATED. Please use PutX for new code.
- // Put adds the given tuple of application version (specified
- // through the object name suffix) and application envelope to all
- // of the given application profiles.
- Put(ctx *context.T, call rpc.ServerCall, Profiles []string, Envelope application.Envelope) error
- // PutX adds the given application envelope for the given profile and
+ // Put adds the given application envelope for the given profile and
// application version (required, and specified through the object name
// suffix).
//
// An error is returned if an envelope already exists, unless the
// overwrite option is set.
+ Put(ctx *context.T, call rpc.ServerCall, Profile string, Envelope application.Envelope, Overwrite bool) error
+ // DEPRECATED. Please use Put for new code.
PutX(ctx *context.T, call rpc.ServerCall, Profile string, Envelope application.Envelope, Overwrite bool) error
// Remove removes the application envelope for the given profile
// name and application version (specified through the object name
@@ -193,8 +187,8 @@
gs *rpc.GlobState
}
-func (s implApplicationServerStub) Put(ctx *context.T, call rpc.ServerCall, i0 []string, i1 application.Envelope) error {
- return s.impl.Put(ctx, call, i0, i1)
+func (s implApplicationServerStub) Put(ctx *context.T, call rpc.ServerCall, i0 string, i1 application.Envelope, i2 bool) error {
+ return s.impl.Put(ctx, call, i0, i1, i2)
}
func (s implApplicationServerStub) PutX(ctx *context.T, call rpc.ServerCall, i0 string, i1 application.Envelope, i2 bool) error {
@@ -231,16 +225,17 @@
Methods: []rpc.MethodDesc{
{
Name: "Put",
- Doc: "// DEPRECATED. Please use PutX for new code.\n// Put adds the given tuple of application version (specified\n// through the object name suffix) and application envelope to all\n// of the given application profiles.",
+ Doc: "// Put adds the given application envelope for the given profile and\n// application version (required, and specified through the object name\n// suffix).\n//\n// An error is returned if an envelope already exists, unless the\n// overwrite option is set.",
InArgs: []rpc.ArgDesc{
- {"Profiles", ``}, // []string
- {"Envelope", ``}, // application.Envelope
+ {"Profile", ``}, // string
+ {"Envelope", ``}, // application.Envelope
+ {"Overwrite", ``}, // bool
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
},
{
Name: "PutX",
- Doc: "// PutX adds the given application envelope for the given profile and\n// application version (required, and specified through the object name\n// suffix).\n//\n// An error is returned if an envelope already exists, unless the\n// overwrite option is set.",
+ Doc: "// DEPRECATED. Please use Put for new code.",
InArgs: []rpc.ArgDesc{
{"Profile", ``}, // string
{"Envelope", ``}, // application.Envelope
diff --git a/services/role/roled/internal/role_test.go b/services/role/roled/internal/role_test.go
index e59f6d6..c327825 100644
--- a/services/role/roled/internal/role_test.go
+++ b/services/role/roled/internal/role_test.go
@@ -18,7 +18,6 @@
"v.io/v23/security"
"v.io/v23/verror"
vsecurity "v.io/x/ref/lib/security"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/role"
irole "v.io/x/ref/services/role/roled/internal"
"v.io/x/ref/test"
@@ -72,7 +71,7 @@
testServerCtx := newPrincipalContext(t, ctx, root, "testserver")
tDisp := &testDispatcher{}
- _, err = xrpc.NewDispatchingServer(testServerCtx, "test", tDisp)
+ testServerCtx, _, err = v23.WithNewDispatchingServer(testServerCtx, "test", tDisp)
if err != nil {
t.Fatalf("NewDispatchingServer failed: %v", err)
}
@@ -156,15 +155,15 @@
roleAddr := newRoleServer(t, newPrincipalContext(t, ctx, root, "roles"), workdir)
tDisp := &testDispatcher{}
- _, err = xrpc.NewDispatchingServer(peer1, "peer1", tDisp)
+ peer1, _, err = v23.WithNewDispatchingServer(peer1, "peer1", tDisp)
if err != nil {
t.Fatalf("NewDispatchingServer failed: %v", err)
}
- _, err = xrpc.NewDispatchingServer(peer2, "peer2", tDisp)
+ peer2, _, err = v23.WithNewDispatchingServer(peer2, "peer2", tDisp)
if err != nil {
t.Fatalf("NewDispatchingServer failed: %v", err)
}
- _, err = xrpc.NewDispatchingServer(peer3, "peer3", tDisp)
+ peer3, _, err = v23.WithNewDispatchingServer(peer3, "peer3", tDisp)
if err != nil {
t.Fatalf("NewDispatchingServer failed: %v", err)
}
@@ -276,7 +275,7 @@
}
func newRoleServer(t *testing.T, ctx *context.T, dir string) string {
- _, err := xrpc.NewDispatchingServer(ctx, "role", irole.NewDispatcher(dir, "role"))
+ ctx, _, err := v23.WithNewDispatchingServer(ctx, "role", irole.NewDispatcher(dir, "role"))
if err != nil {
t.Fatalf("ServeDispatcher failed: %v", err)
}
diff --git a/services/role/roled/main.go b/services/role/roled/main.go
index e1a3857..4f77cc9 100644
--- a/services/role/roled/main.go
+++ b/services/role/roled/main.go
@@ -10,11 +10,11 @@
import (
"fmt"
+ "v.io/v23"
"v.io/v23/context"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/static"
irole "v.io/x/ref/services/role/roled/internal"
)
@@ -43,7 +43,7 @@
if len(name) == 0 {
return env.UsageErrorf("-name must be specified")
}
- _, err := xrpc.NewDispatchingServer(ctx, name, irole.NewDispatcher(configDir, name))
+ ctx, _, err := v23.WithNewDispatchingServer(ctx, name, irole.NewDispatcher(configDir, name))
if err != nil {
return fmt.Errorf("NewServer failed: %v", err)
}
diff --git a/services/syncbase/server/mojo_call.go b/services/syncbase/server/mojo_call.go
index c825bc0..d0b9dac 100644
--- a/services/syncbase/server/mojo_call.go
+++ b/services/syncbase/server/mojo_call.go
@@ -16,12 +16,12 @@
type mojoServerCall struct {
sec security.Call
- srv rpc.XServer
+ srv rpc.Server
suffix string
}
// TODO(sadovsky): Synthesize endpoints and discharges as needed.
-func newMojoServerCall(ctx *context.T, srv rpc.XServer, suffix string, method rpc.MethodDesc) rpc.ServerCall {
+func newMojoServerCall(ctx *context.T, srv rpc.Server, suffix string, method rpc.MethodDesc) rpc.ServerCall {
p := v23.GetPrincipal(ctx)
// HACK: For now, we set the remote (client, i.e. Mojo app) blessing to be the
// same as the local (server, i.e. Syncbase Mojo service) blessing.
@@ -63,6 +63,6 @@
return security.Blessings{}
}
-func (call *mojoServerCall) Server() rpc.XServer {
+func (call *mojoServerCall) Server() rpc.Server {
return call.srv
}
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index 34c0f37..556350c 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -33,11 +33,11 @@
type mojoImpl struct {
ctx *context.T
- srv rpc.XServer
+ srv rpc.Server
disp rpc.Dispatcher
}
-func NewMojoImpl(ctx *context.T, srv rpc.XServer, disp rpc.Dispatcher) *mojoImpl {
+func NewMojoImpl(ctx *context.T, srv rpc.Server, disp rpc.Dispatcher) *mojoImpl {
return &mojoImpl{ctx: ctx, srv: srv, disp: disp}
}
@@ -401,13 +401,13 @@
return toMojoError(err), nil
}
-func (m *mojoImpl) TableDelete(name string) (mojom.Error, error) {
- ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.TableDesc, "Delete"))
+func (m *mojoImpl) TableDestroy(name string) (mojom.Error, error) {
+ ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.TableDesc, "Destroy"))
stub, err := m.getTable(ctx, call, name)
if err != nil {
return toMojoError(err), nil
}
- err = stub.Delete(ctx, call, NoSchema)
+ err = stub.Destroy(ctx, call, NoSchema)
return toMojoError(err), nil
}
@@ -421,8 +421,14 @@
return toMojoError(err), exists, nil
}
-func (m *mojoImpl) TableDeleteRowRange(name string, start, limit []byte) (mojom.Error, error) {
- return mojom.Error{}, nil
+func (m *mojoImpl) TableDeleteRange(name string, start, limit []byte) (mojom.Error, error) {
+ ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.TableDesc, "DeleteRange"))
+ stub, err := m.getTable(ctx, call, name)
+ if err != nil {
+ return toMojoError(err), nil
+ }
+ err = stub.DeleteRange(ctx, call, NoSchema, start, limit)
+ return toMojoError(err), nil
}
type scanStreamImpl struct {
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index d4d07d4..51b5158 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -64,7 +64,7 @@
})
}
-func (t *tableReq) Delete(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
+func (t *tableReq) Destroy(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
if t.d.batchId != nil {
return wire.NewErrBoundToBatch(ctx)
}
@@ -91,7 +91,7 @@
return util.ErrorToExists(util.GetWithAuth(ctx, call, t.d.st, t.stKey(), &tableData{}))
}
-func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start, limit []byte) error {
+func (t *tableReq) DeleteRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start, limit []byte) error {
impl := func(tx store.Transaction) error {
// Check for table-level access before doing a scan.
if err := t.checkAccess(ctx, call, tx, ""); err != nil {
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index e0ee44f..5f7042d 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -6,9 +6,6 @@
// Currently, this API and its implementations are meant to be internal.
package store
-// TODO(sadovsky): Decide whether to defensively copy passed-in []byte's vs.
-// requiring clients not to modify passed-in []byte's.
-
// StoreReader reads data from a CRUD-capable storage engine.
type StoreReader interface {
// Get returns the value for the given key. The returned slice may be a
@@ -17,6 +14,8 @@
// nil valbuf.
// If the given key is unknown, valbuf is returned unchanged and the function
// fails with ErrUnknownKey.
+ //
+ // It is safe to modify the contents of the key after Get returns.
Get(key, valbuf []byte) ([]byte, error)
// Scan returns all rows with keys in range [start, limit). If limit is "",
@@ -24,16 +23,26 @@
// Concurrency semantics: It is legal to perform writes concurrently with
// Scan. The returned stream may or may not reflect subsequent writes to keys
// not yet reached by the stream.
+ //
+ // It is safe to modify the contents of the arguments after Scan returns.
Scan(start, limit []byte) Stream
}
// StoreWriter writes data to a CRUD-capable storage engine.
type StoreWriter interface {
// Put writes the given value for the given key.
+ //
+ // WARNING: For performance reasons, a Put inside a transaction doesn't make
+ // a defensive copy of the value. The client MUST keep the value unchanged
+ // until the transaction commits or aborts.
+ //
+ // It is safe to modify the contents of the key after Put returns.
Put(key, value []byte) error
// Delete deletes the entry for the given key.
// Succeeds (no-op) if the given key is unknown.
+ //
+ // It is safe to modify the contents of the key after Delete returns.
Delete(key []byte) error
}
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index a3bb1a7..5602fa3 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -14,7 +14,6 @@
"v.io/v23/security/access"
"v.io/x/lib/vlog"
"v.io/x/ref/lib/security/securityflag"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/services/syncbase/server"
)
@@ -39,7 +38,7 @@
// TODO(sadovsky): We return rpc.Server and rpc.Dispatcher as a quick hack to
// support Mojo.
-func Serve(ctx *context.T) (rpc.XServer, rpc.Dispatcher) {
+func Serve(ctx *context.T) (rpc.Server, rpc.Dispatcher) {
perms, err := securityflag.PermissionsFromFlag()
if err != nil {
vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
@@ -62,9 +61,9 @@
d := server.NewDispatcher(service)
// Publish the service in the mount table.
- s, err := xrpc.NewDispatchingServer(ctx, *name, d)
+ ctx, s, err := v23.WithNewDispatchingServer(ctx, *name, d)
if err != nil {
- vlog.Fatal("v23.NewDispatchingServer() failed: ", err)
+ vlog.Fatal("v23.WithNewDispatchingServer() failed: ", err)
}
if *name != "" {
vlog.Info("Mounted at: ", *name)
diff --git a/services/syncbase/syncbased/mojo_main.go b/services/syncbase/syncbased/mojo_main.go
index d8ef19b..968ec35 100644
--- a/services/syncbase/syncbased/mojo_main.go
+++ b/services/syncbase/syncbased/mojo_main.go
@@ -8,7 +8,7 @@
// To build:
// cd $V23_ROOT/experimental/projects/ether
-// make build
+// make gen/mojo/syncbased.mojo
import (
"log"
@@ -30,7 +30,7 @@
type delegate struct {
ctx *context.T
- srv rpc.XServer
+ srv rpc.Server
disp rpc.Dispatcher
stubs []*bindings.Stub
}
diff --git a/services/syncbase/testutil/layer.go b/services/syncbase/testutil/layer.go
index 55e49f1..fc568b9 100644
--- a/services/syncbase/testutil/layer.go
+++ b/services/syncbase/testutil/layer.go
@@ -90,8 +90,8 @@
assertExists(t, ctx, self3, "self3", false)
}
-// TestDelete tests that object deletion works as expected.
-func TestDelete(t *testing.T, ctx *context.T, i interface{}) {
+// TestDestroy tests that object destruction works as expected.
+func TestDestroy(t *testing.T, ctx *context.T, i interface{}) {
parent := makeLayer(i)
self := parent.Child("self")
child := self.Child("child")
@@ -110,10 +110,10 @@
assertExists(t, ctx, self, "self", true)
- // By default, self perms are copied from parent, so self.Delete should
+ // By default, self perms are copied from parent, so self.Destroy should
// succeed.
- if err := self.Delete(ctx); err != nil {
- t.Fatalf("self.Delete() failed: %v", err)
+ if err := self.Destroy(ctx); err != nil {
+ t.Fatalf("self.Destroy() failed: %v", err)
}
assertExists(t, ctx, self, "self", false)
@@ -128,7 +128,7 @@
// with an error instead of returning false.
//assertExists(t, ctx, child, "child", false)
- // self.Create should succeed, since self was deleted.
+ // self.Create should succeed, since self was destroyed.
if err := self.Create(ctx, nil); err != nil {
t.Fatalf("self.Create() failed: %v", err)
}
@@ -136,7 +136,7 @@
assertExists(t, ctx, self, "self", true)
assertExists(t, ctx, child, "child", false)
- // Test that delete fails if the perms disallow access.
+ // Test that destroy fails if the perms disallow access.
self2 := parent.Child("self2")
if err := self2.Create(ctx, nil); err != nil {
t.Fatalf("self2.Create() failed: %v", err)
@@ -146,27 +146,27 @@
if err := self2.SetPermissions(ctx, perms, ""); err != nil {
t.Fatalf("self2.SetPermissions() failed: %v", err)
}
- if err := self2.Delete(ctx); verror.ErrorID(err) != verror.ErrNoAccess.ID {
- t.Fatalf("self2.Delete() should have failed: %v", err)
+ if err := self2.Destroy(ctx); verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ t.Fatalf("self2.Destroy() should have failed: %v", err)
}
assertExists(t, ctx, self2, "self2", true)
- // Test that delete succeeds even if the parent perms disallow access.
+ // Test that destroy succeeds even if the parent perms disallow access.
perms = DefaultPerms("root/client")
perms.Blacklist("root/client", string(access.Write))
if err := parent.SetPermissions(ctx, perms, ""); err != nil {
t.Fatalf("parent.SetPermissions() failed: %v", err)
}
- if err := self.Delete(ctx); err != nil {
- t.Fatalf("self.Delete() failed: %v", err)
+ if err := self.Destroy(ctx); err != nil {
+ t.Fatalf("self.Destroy() failed: %v", err)
}
assertExists(t, ctx, self, "self", false)
- // Test that delete is idempotent.
- if err := self.Delete(ctx); err != nil {
- t.Fatalf("self.Delete() failed: %v", err)
+ // Test that destroy is idempotent.
+ if err := self.Destroy(ctx); err != nil {
+ t.Fatalf("self.Destroy() failed: %v", err)
}
assertExists(t, ctx, self, "self", false)
@@ -314,9 +314,7 @@
type layer interface {
util.AccessController
Create(ctx *context.T, perms access.Permissions) error
- // TODO(aghassemi): Rename to Destroy and drop Destroy impls below once
- // Table.Delete is renamed to Destroy.
- Delete(ctx *context.T) error
+ Destroy(ctx *context.T) error
Exists(ctx *context.T) (bool, error)
ListChildren(ctx *context.T) ([]string, error)
Child(childName string) layer
@@ -329,7 +327,7 @@
func (s *service) Create(ctx *context.T, perms access.Permissions) error {
panic(notAvailable)
}
-func (s *service) Delete(ctx *context.T) error {
+func (s *service) Destroy(ctx *context.T) error {
panic(notAvailable)
}
func (s *service) Exists(ctx *context.T) (bool, error) {
@@ -352,9 +350,6 @@
func (a *app) Child(childName string) layer {
return makeLayer(a.NoSQLDatabase(childName, nil))
}
-func (a *app) Delete(ctx *context.T) error {
- return a.Destroy(ctx)
-}
type database struct {
nosql.Database
@@ -364,23 +359,13 @@
return d.ListTables(ctx)
}
func (d *database) Child(childName string) layer {
- return &table{Table: d.Table(childName), d: d}
-}
-func (d *database) Delete(ctx *context.T) error {
- return d.Destroy(ctx)
+ return makeLayer(d.Table(childName))
}
type table struct {
nosql.Table
- d nosql.Database
}
-func (t *table) Create(ctx *context.T, perms access.Permissions) error {
- return t.d.CreateTable(ctx, t.Name(), perms)
-}
-func (t *table) Delete(ctx *context.T) error {
- return t.d.DeleteTable(ctx, t.Name())
-}
func (t *table) SetPermissions(ctx *context.T, perms access.Permissions, version string) error {
return t.Table.SetPermissions(ctx, nosql.Prefix(""), perms)
}
@@ -408,7 +393,7 @@
}
return r.Put(ctx, true)
}
-func (r *row) Delete(ctx *context.T) error {
+func (r *row) Destroy(ctx *context.T) error {
return r.Delete(ctx)
}
func (r *row) SetPermissions(ctx *context.T, perms access.Permissions, version string) error {
@@ -432,6 +417,8 @@
return &app{t}
case nosql.Database:
return &database{t}
+ case nosql.Table:
+ return &table{t}
default:
vlog.Fatalf("unexpected type: %T", t)
}
diff --git a/services/syncbase/testutil/util.go b/services/syncbase/testutil/util.go
index 658794e..5e72e2e 100644
--- a/services/syncbase/testutil/util.go
+++ b/services/syncbase/testutil/util.go
@@ -24,7 +24,6 @@
"v.io/v23/verror"
"v.io/x/lib/vlog"
"v.io/x/ref/lib/flags"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/syncbase/server"
tsecurity "v.io/x/ref/test/testutil"
)
@@ -56,10 +55,11 @@
}
func CreateTable(t *testing.T, ctx *context.T, d nosql.Database, name string) nosql.Table {
- if err := d.CreateTable(ctx, name, nil); err != nil {
- Fatalf(t, "d.CreateTable() failed: %v", err)
+ tb := d.Table(name)
+ if err := tb.Create(ctx, nil); err != nil {
+ Fatalf(t, "tb.Create() failed: %v", err)
}
- return d.Table(name)
+ return tb
}
// TODO(sadovsky): Drop the 'perms' argument. The only client that passes
@@ -240,9 +240,9 @@
if err != nil {
vlog.Fatal("server.NewService() failed: ", err)
}
- s, err := xrpc.NewDispatchingServer(serverCtx, "", server.NewDispatcher(service))
+ serverCtx, s, err := v23.WithNewDispatchingServer(serverCtx, "", server.NewDispatcher(service))
if err != nil {
- vlog.Fatal("xrpc.NewDispatchingServer() failed: ", err)
+ vlog.Fatal("v23.WithNewDispatchingServer() failed: ", err)
}
name := s.Status().Endpoints[0].Name()
return name, func() {
diff --git a/services/syncbase/vsync/responder_test.go b/services/syncbase/vsync/responder_test.go
index 135830c..dbd0fea 100644
--- a/services/syncbase/vsync/responder_test.go
+++ b/services/syncbase/vsync/responder_test.go
@@ -487,7 +487,7 @@
return security.Blessings{}
}
-func (d *dummyResponder) Server() rpc.XServer {
+func (d *dummyResponder) Server() rpc.Server {
return nil
}
diff --git a/services/wspr/browsprd/main_nacl.go b/services/wspr/browsprd/main_nacl.go
index 08940f7..d8f2561 100644
--- a/services/wspr/browsprd/main_nacl.go
+++ b/services/wspr/browsprd/main_nacl.go
@@ -214,16 +214,16 @@
}
inst.logger.VI(1).Infof("Using blessing roots for identity with key %v and names %v", msg.IdentitydBlessingRoot.PublicKey, msg.IdentitydBlessingRoot.Names)
- key, err := decodeAndUnmarshalPublicKey(msg.IdentitydBlessingRoot.PublicKey)
+ keybytes, err := base64.URLEncoding.DecodeString(msg.IdentitydBlessingRoot.PublicKey)
if err != nil {
- inst.logger.Fatalf("decodeAndUnmarshalPublicKey(%v) failed: %v", msg.IdentitydBlessingRoot.PublicKey, err)
+ inst.logger.Fatalf("failed to decode public key (%v): %v", msg.IdentitydBlessingRoot.PublicKey, err)
}
for _, name := range msg.IdentitydBlessingRoot.Names {
pattern := security.BlessingPattern(name)
// Trust the identity servers blessing root.
- p.Roots().Add(key, pattern)
+ p.Roots().Add(keybytes, pattern)
// Use our blessing to only talk to the identity server.
if _, err := p.BlessingStore().Set(blessing, pattern); err != nil {
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index 0b22d86..6edb6c0 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -429,7 +429,7 @@
}
func (l *localCall) Recv(interface{}) error { return nil }
func (l *localCall) GrantedBlessings() security.Blessings { return security.Blessings{} }
-func (l *localCall) Server() rpc.XServer { return nil }
+func (l *localCall) Server() rpc.Server { return nil }
func (l *localCall) Timestamp() (t time.Time) { return }
func (l *localCall) Method() string { return l.vrpc.Method }
func (l *localCall) MethodTags() []*vdl.Value { return l.tags }
diff --git a/services/wspr/internal/app/app_test.go b/services/wspr/internal/app/app_test.go
index 11e1793..8731152 100644
--- a/services/wspr/internal/app/app_test.go
+++ b/services/wspr/internal/app/app_test.go
@@ -25,7 +25,6 @@
"v.io/v23/vom"
"v.io/v23/vtrace"
vsecurity "v.io/x/ref/lib/security"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/services/wspr/internal/lib"
@@ -109,7 +108,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- s, err := xrpc.NewServer(ctx, "", simpleAdder{}, nil)
+ ctx, s, err := v23.WithNewServer(ctx, "", simpleAdder{}, nil)
if err != nil {
t.Fatalf("unable to start server: %v", err)
}
@@ -152,7 +151,7 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- s, err := xrpc.NewServer(ctx, "", simpleAdder{}, nil)
+ ctx, s, err := v23.WithNewServer(ctx, "", simpleAdder{}, nil)
if err != nil {
t.Fatalf("unable to start server: %v", err)
}
@@ -320,22 +319,14 @@
if err != nil {
return nil, fmt.Errorf("unable to start mounttable: %v", err)
}
- s, err := xrpc.NewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
+ ctx, s, err := v23.WithNewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
if err != nil {
return nil, fmt.Errorf("unable to start mounttable: %v", err)
}
mtName := s.Status().Endpoints[0].Name()
proxySpec := rpc.ListenSpec{
- Addrs: rpc.ListenAddrs{
- // This '0' label is required by go vet.
- // TODO(suharshs): Remove the '0' label once []ListenAddr is used
- // instead of ListenAdders.
- 0: {
- Protocol: "tcp",
- Address: "127.0.0.1:0",
- },
- },
+ Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
}
proxyShutdown, proxyEndpoint, err := generic.NewProxy(ctx, proxySpec, security.AllowEveryone())
if err != nil {
diff --git a/services/wspr/internal/browspr/browspr_test.go b/services/wspr/internal/browspr/browspr_test.go
index 4ca50c8..e70be78 100644
--- a/services/wspr/internal/browspr/browspr_test.go
+++ b/services/wspr/internal/browspr/browspr_test.go
@@ -19,8 +19,6 @@
"v.io/v23/vdl"
vdltime "v.io/v23/vdlroot/time"
"v.io/v23/vom"
-
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/services/wspr/internal/app"
@@ -62,15 +60,7 @@
defer shutdown()
proxySpec := rpc.ListenSpec{
- Addrs: rpc.ListenAddrs{
- // This '0' label is required by go vet.
- // TODO(suharshs): Remove the '0' label once []ListenAddr is used
- // instead of ListenAdders.
- 0: {
- Protocol: "tcp",
- Address: "127.0.0.1:0",
- },
- },
+ Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
}
proxyShutdown, proxyEndpoint, err := generic.NewProxy(ctx, proxySpec, security.AllowEveryone())
if err != nil {
@@ -82,7 +72,7 @@
if err != nil {
t.Fatalf("Failed to create mounttable: %v", err)
}
- s, err := xrpc.NewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
+ ctx, s, err := v23.WithNewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
if err != nil {
t.Fatalf("Failed to start mounttable server: %v", err)
}
@@ -94,7 +84,7 @@
}
mockServerName := "mock/server"
- mockServer, err := xrpc.NewServer(ctx, mockServerName, mockServer{}, nil)
+ ctx, mockServer, err := v23.WithNewServer(ctx, mockServerName, mockServer{}, nil)
if err != nil {
t.Fatalf("Failed to start mock server: %v", err)
}
diff --git a/services/wspr/internal/principal/cache_test.go b/services/wspr/internal/principal/cache_test.go
index 4bbcebd..73be928 100644
--- a/services/wspr/internal/principal/cache_test.go
+++ b/services/wspr/internal/principal/cache_test.go
@@ -9,7 +9,6 @@
"crypto/elliptic"
"crypto/rand"
"reflect"
- "sync"
"testing"
"time"
@@ -18,50 +17,33 @@
// manualTrigger provides a gc trigger that can be signaled manually
type manualTrigger struct {
- gcShouldBeNext bool
- lock sync.Mutex
- cond *sync.Cond
- gcHasRun bool
+ gcHasRun bool
+ ch chan time.Time
+ nextCh chan bool
}
func newManualTrigger() *manualTrigger {
- mt := &manualTrigger{
- gcShouldBeNext: true,
+ return &manualTrigger{
+ ch: make(chan time.Time),
+ nextCh: make(chan bool),
}
- mt.cond = sync.NewCond(&mt.lock)
- return mt
}
-// policyTrigger is the trigger that should be provided in GC policy config.
-// It waits until it receives a signal and then returns a chan time.Time
-// that resolves immediately.
+// manualTrigger is the trigger that should be provided in GC policy config.
+// It returns a chan time.Time that resolves immediately after next is called.
func (mt *manualTrigger) waitForNextGc() <-chan time.Time {
- mt.lock.Lock()
if !mt.gcHasRun {
mt.gcHasRun = true
} else {
- mt.gcShouldBeNext = false // hand off control
- mt.cond.Broadcast()
- for !mt.gcShouldBeNext {
- mt.cond.Wait()
- }
+ mt.nextCh <- true
}
- mt.lock.Unlock()
-
- trigger := make(chan time.Time, 1)
- trigger <- time.Time{}
- return trigger
+ return mt.ch
}
// next should be called to trigger the next policy trigger event.
func (mt *manualTrigger) next() {
- mt.lock.Lock()
- mt.gcShouldBeNext = true // hand off control
- mt.cond.Broadcast()
- for mt.gcShouldBeNext {
- mt.cond.Wait()
- }
- mt.lock.Unlock()
+ mt.ch <- time.Time{}
+ <-mt.nextCh
}
// Test just to confirm it signals in order as expected.
@@ -70,7 +52,7 @@
countTriggers := 0
go func() {
- for i := 1; i <= 100; i++ {
+ for i := 0; i < 100; i++ {
<-mt.waitForNextGc()
countTriggers++
}
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
index 6205d91..08661c0 100644
--- a/services/wspr/internal/rpc/server/server.go
+++ b/services/wspr/internal/rpc/server/server.go
@@ -73,7 +73,7 @@
// The server that handles the rpc layer. Listen on this server is
// lazily started.
- server rpc.Server
+ server rpc.DeprecatedServer
// The saved dispatcher to reuse when serve is called multiple times.
dispatcher *dispatcher
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index b519dce..4257409 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -32,46 +32,60 @@
t.Fatal(err)
}
- // Start the proxy.
- addr := struct {
- Protocol, Address string
- }{
- Protocol: "tcp",
- Address: "127.0.0.1:0",
- }
- pctx = v23.WithListenSpec(pctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{addr}})
- p, err := xproxyd.New(pctx)
- if err != nil {
- t.Fatal(err)
- }
- peps := p.ListeningEndpoints()
- if len(peps) == 0 {
- t.Fatal("Proxy not listening on any endpoints")
- }
- pep := peps[0]
+ pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
- t.Logf("proxy endpoint: %s", pep.String())
- // Start a accepting flow.Manager and make it listen through the proxy.
if err := am.Listen(actx, "v23", pep.String()); err != nil {
t.Fatal(err)
}
+ testEndToEndConnections(t, dctx, actx, dm, am)
+}
+
+func TestMultipleProxies(t *testing.T) {
+ pctx, shutdown := v23.Init()
+ defer shutdown()
+ p2ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ p3ctx, _, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
+
+ p2ep := startProxy(t, p2ctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
+
+ p3ep := startProxy(t, p3ctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
+
+ if err := am.Listen(actx, "v23", p3ep.String()); err != nil {
+ t.Fatal(err)
+ }
+ testEndToEndConnections(t, dctx, actx, dm, am)
+}
+
+func testEndToEndConnections(t *testing.T, dctx, actx *context.T, dm, am flow.Manager) {
aeps := am.ListeningEndpoints()
if len(aeps) == 0 {
- t.Fatal("Acceptor not listening on any endpoints")
+ t.Fatal("acceptor not listening on any endpoints")
}
- aep := aeps[0]
+ for _, aep := range aeps {
+ testEndToEndConnection(t, dctx, actx, dm, am, aep)
+ }
+}
+func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) {
// The dialing flow.Manager dials a flow to the accepting flow.Manager.
want := "Do you read me?"
- bFn := func(
- ctx *context.T,
- localEndpoint, remoteEndpoint naming.Endpoint,
- remoteBlessings security.Blessings,
- remoteDischarges map[string]security.Discharge,
- ) (security.Blessings, map[string]security.Discharge, error) {
- return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
- }
- df, err := dm.Dial(dctx, aep, bFn)
+ df, err := dm.Dial(dctx, aep, bfp)
if err != nil {
t.Fatal(err)
}
@@ -83,11 +97,9 @@
}
got, err := readLine(af)
if err != nil {
- pctx.Errorf("error")
t.Fatal(err)
}
if got != want {
- pctx.Errorf("error")
t.Errorf("got %v, want %v", got, want)
}
@@ -96,17 +108,14 @@
writeLine(af, want)
got, err = readLine(df)
if err != nil {
- pctx.Errorf("error")
t.Fatal(err)
}
if got != want {
- pctx.Errorf("error")
t.Errorf("got %v, want %v", got, want)
}
}
-// TODO(suharshs): Add tests for multiple proxies and bidirectional RPC through
-// a proxy once we have bidirpc working.
+// TODO(suharshs): Add test for bidirectional RPC.
func readLine(f flow.Flow) (string, error) {
s, err := bufio.NewReader(f).ReadString('\n')
@@ -118,3 +127,36 @@
_, err := f.Write([]byte(data))
return err
}
+
+func bfp(
+ ctx *context.T,
+ localEndpoint, remoteEndpoint naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge,
+) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
+
+type address struct {
+ Protocol, Address string
+}
+
+func startProxy(t *testing.T, ctx *context.T, addrs ...address) naming.Endpoint {
+ var ls rpc.ListenSpec
+ for _, addr := range addrs {
+ ls.Addrs = append(ls.Addrs, addr)
+ }
+ ctx = v23.WithListenSpec(ctx, ls)
+ proxy, err := xproxyd.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ peps := proxy.ListeningEndpoints()
+ for _, pep := range peps {
+ if pep.Addr().Network() == "tcp" {
+ return pep
+ }
+ }
+ t.Fatal("Proxy not listening on network address.")
+ return nil
+}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index 6857ec8..239d633 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -5,22 +5,26 @@
package xproxyd
import (
- "fmt"
"io"
+ "sync"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
- "v.io/v23/flow/message"
"v.io/v23/naming"
- "v.io/v23/security"
"v.io/v23/vom"
)
// TODO(suharshs): Make sure that we don't leak any goroutines.
+const proxyByte = byte('p')
+const serverByte = byte('s')
+const clientByte = byte('c')
+
type proxy struct {
- m flow.Manager
+ m flow.Manager
+ mu sync.Mutex
+ proxyEndpoints []naming.Endpoint
}
func New(ctx *context.T) (*proxy, error) {
@@ -28,7 +32,31 @@
m: v23.ExperimentalGetFlowManager(ctx),
}
for _, addr := range v23.GetListenSpec(ctx).Addrs {
- if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
+ if addr.Protocol == "v23" {
+ ep, err := v23.NewEndpoint(addr.Address)
+ if err != nil {
+ return nil, err
+ }
+ f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
+ if err != nil {
+ return nil, err
+ }
+ // Send a byte telling the acceptor that we are a proxy.
+ if _, err := f.Write([]byte{proxyByte}); err != nil {
+ return nil, err
+ }
+ var lep string
+ if err := vom.NewDecoder(f).Decode(&lep); err != nil {
+ return nil, err
+ }
+ proxyEndpoint, err := v23.NewEndpoint(lep)
+ if err != nil {
+ return nil, err
+ }
+ p.mu.Lock()
+ p.proxyEndpoints = append(p.proxyEndpoints, proxyEndpoint)
+ p.mu.Unlock()
+ } else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
return nil, err
}
}
@@ -36,6 +64,10 @@
return p, nil
}
+func (p *proxy) ListeningEndpoints() []naming.Endpoint {
+ return p.m.ListeningEndpoints()
+}
+
func (p *proxy) listenLoop(ctx *context.T) {
for {
f, err := p.m.Accept(ctx)
@@ -43,10 +75,19 @@
ctx.Infof("p.m.Accept failed: %v", err)
break
}
- if p.shouldRoute(f) {
+ msg := make([]byte, 1)
+ if _, err := f.Read(msg); err != nil {
+ ctx.Errorf("reading type byte failed: %v", err)
+ }
+ switch msg[0] {
+ case clientByte:
err = p.startRouting(ctx, f)
- } else {
+ case proxyByte:
+ err = p.replyToProxy(ctx, f)
+ case serverByte:
err = p.replyToServer(ctx, f)
+ default:
+ continue
}
if err != nil {
ctx.Errorf("failed to handle incoming connection: %v", err)
@@ -64,27 +105,6 @@
return nil
}
-func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
- eps := p.ListeningEndpoints()
- if len(eps) == 0 {
- return NewErrNotListening(ctx)
- }
- // TODO(suharshs): handle listening on multiple endpoints.
- ep := eps[0]
- network, address := ep.Addr().Network(), ep.Addr().String()
- // TODO(suharshs): deal with routes and such here, if we are replying to a proxy.
- rid := f.Conn().RemoteEndpoint().RoutingID()
- epString := naming.FormatEndpoint(network, address, rid)
- if err := vom.NewEncoder(f).Encode(epString); err != nil {
- return err
- }
- return nil
-}
-
-func (p *proxy) ListeningEndpoints() []naming.Endpoint {
- return p.m.ListeningEndpoints()
-}
-
func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
for {
_, err := io.Copy(fin, fout)
@@ -98,54 +118,97 @@
}
func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow) (flow.Flow, error) {
- // TODO(suharshs): Read route information here when implementing multi proxy.
m, err := readSetupMessage(ctx, f)
if err != nil {
return nil, err
}
- fout, err := p.m.Dial(ctx, m.PeerRemoteEndpoint, proxyBlessingsForPeer{}.run)
+ var rid naming.RoutingID
+ var ep naming.Endpoint
+ var shouldWriteClientByte bool
+ if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
+ if err := rid.FromString(routes[0]); err != nil {
+ return nil, err
+ }
+ // Make an endpoint with the correct routingID to dial out. All other fields
+ // do not matter.
+ // TODO(suharshs): Make sure that the routingID from the route belongs to a
+ // connection that is stored in the manager's cache. (i.e. a Server has connected
+ // with the routingID before)
+ if ep, err = setEndpointRoutingID(m.PeerRemoteEndpoint, rid); err != nil {
+ return nil, err
+ }
+ // Remove the read route from the setup message endpoint.
+ if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
+ return nil, err
+ }
+ shouldWriteClientByte = true
+ } else {
+ ep = m.PeerRemoteEndpoint
+ }
+ fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
if err != nil {
return nil, err
}
+ if shouldWriteClientByte {
+ // We only write the clientByte on flows made to proxys. If we are creating
+ // the last hop flow to the end server, we don't need to send the byte.
+ if _, err := fout.Write([]byte{clientByte}); err != nil {
+ return nil, err
+ }
+ }
+
// Write the setup message back onto the flow for the next hop to read.
return fout, writeSetupMessage(ctx, m, fout)
}
-func readSetupMessage(ctx *context.T, f flow.Flow) (*message.Setup, error) {
- b, err := f.ReadMsg()
- if err != nil {
- return nil, err
- }
- m, err := message.Read(ctx, b)
- if err != nil {
- return nil, err
- }
- if m, isSetup := m.(*message.Setup); isSetup {
- return m, nil
- }
- return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
-}
-
-func writeSetupMessage(ctx *context.T, m message.Message, f flow.Flow) error {
- // TODO(suharshs): When reading the routes we should remove the read route from
- // the endpoint.
- w, err := message.Append(ctx, m, []byte{})
+func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
+ rid := f.Conn().RemoteEndpoint().RoutingID()
+ epString, err := p.returnEndpoint(ctx, rid, "")
if err != nil {
return err
}
- _, err = f.WriteMsg(w)
- return err
+ // TODO(suharshs): Make a low-level message for this information instead of
+ // VOM-Encoding the endpoint string.
+ return vom.NewEncoder(f).Encode(epString)
}
-func (p *proxy) shouldRoute(f flow.Flow) bool {
- rid := f.Conn().LocalEndpoint().RoutingID()
- return rid != p.m.RoutingID() && rid != naming.NullRoutingID
+func (p *proxy) replyToProxy(ctx *context.T, f flow.Flow) error {
+ // Add the routing id of the incoming proxy to the routes. The routing id of the
+ // returned endpoint doesn't matter because it will eventually be replaced
+ // by a server's rid by some later proxy.
+ // TODO(suharshs): Use a local route instead of this global routingID.
+ rid := f.Conn().RemoteEndpoint().RoutingID()
+ epString, err := p.returnEndpoint(ctx, naming.NullRoutingID, rid.String())
+ if err != nil {
+ return err
+ }
+ return vom.NewEncoder(f).Encode(epString)
}
-type proxyBlessingsForPeer struct{}
-
-// TODO(suharshs): Figure out what blessings to present here. And present discharges.
-func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
- rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
- return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+func (p *proxy) returnEndpoint(ctx *context.T, rid naming.RoutingID, route string) (string, error) {
+ p.mu.Lock()
+ eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
+ p.mu.Unlock()
+ if len(eps) == 0 {
+ return "", NewErrNotListening(ctx)
+ }
+ // TODO(suharshs): handle listening on multiple endpoints.
+ ep := eps[len(eps)-1]
+ var err error
+ if rid != naming.NullRoutingID {
+ ep, err = setEndpointRoutingID(ep, rid)
+ if err != nil {
+ return "", err
+ }
+ }
+ if len(route) > 0 {
+ var cp []string
+ cp = append(cp, ep.Routes()...)
+ cp = append(cp, route)
+ ep, err = setEndpointRoutes(ep, cp)
+ if err != nil {
+ return "", err
+ }
+ }
+ return ep.String(), nil
}
diff --git a/services/xproxyd/util.go b/services/xproxyd/util.go
new file mode 100644
index 0000000..b1fdd58
--- /dev/null
+++ b/services/xproxyd/util.go
@@ -0,0 +1,98 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xproxyd
+
+import (
+ "fmt"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/flow/message"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+)
+
+// setEndpointRoutingID returns a copy of ep with RoutingId changed to rid.
+func setEndpointRoutingID(ep naming.Endpoint, rid naming.RoutingID) (naming.Endpoint, error) {
+ network, address, routes, _, bnames, mountable := getEndpointParts(ep)
+ opts := routes
+ opts = append(opts, bnames...)
+ opts = append(opts, rid)
+ opts = append(opts, mountable)
+ epString := naming.FormatEndpoint(network, address, opts...)
+ return v23.NewEndpoint(epString)
+}
+
+// setEndpointRoutes returns a copy of ep with Routes changed to routes.
+func setEndpointRoutes(ep naming.Endpoint, routes []string) (naming.Endpoint, error) {
+ network, address, _, rid, bnames, mountable := getEndpointParts(ep)
+ opts := routeOpts(routes)
+ opts = append(opts, bnames...)
+ opts = append(opts, rid)
+ opts = append(opts, mountable)
+ epString := naming.FormatEndpoint(network, address, opts...)
+ return v23.NewEndpoint(epString)
+}
+
+// getEndpointParts returns all the fields of ep.
+func getEndpointParts(ep naming.Endpoint) (network string, address string,
+ routes []naming.EndpointOpt, rid naming.RoutingID,
+ blessingNames []naming.EndpointOpt, mountable naming.EndpointOpt) {
+ network, address = ep.Addr().Network(), ep.Addr().String()
+ routes = routeOpts(ep.Routes())
+ rid = ep.RoutingID()
+ blessingNames = blessingOpts(ep.BlessingNames())
+ mountable = naming.ServesMountTable(ep.ServesMountTable())
+ return
+}
+
+func routeOpts(routes []string) []naming.EndpointOpt {
+ var routeOpts []naming.EndpointOpt
+ for _, route := range routes {
+ routeOpts = append(routeOpts, naming.RouteOpt(route))
+ }
+ return routeOpts
+}
+
+func blessingOpts(blessingNames []string) []naming.EndpointOpt {
+ var blessingOpts []naming.EndpointOpt
+ for _, b := range blessingNames {
+ blessingOpts = append(blessingOpts, naming.BlessingOpt(b))
+ }
+ return blessingOpts
+}
+
+type proxyBlessingsForPeer struct{}
+
+// TODO(suharshs): Figure out what blessings to present here. And present discharges.
+func (proxyBlessingsForPeer) run(ctx *context.T, lep, rep naming.Endpoint, rb security.Blessings,
+ rd map[string]security.Discharge) (security.Blessings, map[string]security.Discharge, error) {
+ return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
+}
+
+func readSetupMessage(ctx *context.T, f flow.Flow) (*message.Setup, error) {
+ b, err := f.ReadMsg()
+ if err != nil {
+ return nil, err
+ }
+ m, err := message.Read(ctx, b)
+ if err != nil {
+ return nil, err
+ }
+ if m, isSetup := m.(*message.Setup); isSetup {
+ return m, nil
+ }
+ return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
+}
+
+func writeSetupMessage(ctx *context.T, m message.Message, f flow.Flow) error {
+ w, err := message.Append(ctx, m, []byte{})
+ if err != nil {
+ return err
+ }
+ _, err = f.WriteMsg(w)
+ return err
+}
diff --git a/test/hello/helloserver/helloserver.go b/test/hello/helloserver/helloserver.go
index 00d65fd..f9977a1 100644
--- a/test/hello/helloserver/helloserver.go
+++ b/test/hello/helloserver/helloserver.go
@@ -10,13 +10,13 @@
import (
"fmt"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/x/lib/cmdline"
"v.io/x/ref/lib/signals"
"v.io/x/ref/lib/v23cmd"
- "v.io/x/ref/lib/xrpc"
_ "v.io/x/ref/runtime/factories/generic"
)
@@ -44,7 +44,7 @@
}
func runHelloServer(ctx *context.T, env *cmdline.Env, args []string) error {
- server, err := xrpc.NewServer(ctx, name, &helloServer{}, security.AllowEveryone())
+ ctx, server, err := v23.WithNewServer(ctx, name, &helloServer{}, security.AllowEveryone())
if err != nil {
return fmt.Errorf("NewServer: %v", err)
}
diff --git a/test/init.go b/test/init.go
index c627309..ef625eb 100644
--- a/test/init.go
+++ b/test/init.go
@@ -14,10 +14,8 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/options"
-
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/flags"
- "v.io/x/ref/lib/xrpc"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/test/testutil"
)
@@ -102,7 +100,7 @@
if err != nil {
panic(err)
}
- s, err := xrpc.NewDispatchingServer(ctx, "", disp, options.ServesMountTable(true))
+ _, s, err := v23.WithNewDispatchingServer(ctx, "", disp, options.ServesMountTable(true))
if err != nil {
panic(err)
}
diff --git a/test/testutil/security_test.go b/test/testutil/security_test.go
index b13eb6b..11df3c2 100644
--- a/test/testutil/security_test.go
+++ b/test/testutil/security_test.go
@@ -17,10 +17,14 @@
if err := idp.Bless(p, "bar"); err != nil {
t.Fatal(err)
}
- if err := p.Roots().Recognized(idp.PublicKey(), "foo"); err != nil {
+ idpkey, err := idp.PublicKey().MarshalBinary()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := p.Roots().Recognized(idpkey, "foo"); err != nil {
t.Error(err)
}
- if err := p.Roots().Recognized(idp.PublicKey(), "foo/bar"); err != nil {
+ if err := p.Roots().Recognized(idpkey, "foo/bar"); err != nil {
t.Error(err)
}
def := p.BlessingStore().Default()