Merge "Counterpart of https://vanadium-review.googlesource.com/16644"
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 4f75755..7f95652 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -387,6 +387,9 @@
}
func (b *blessingsFlow) send(ctx *context.T, blessings security.Blessings, discharges map[string]security.Discharge) (bkey, dkey uint64, err error) {
+ if blessings.IsZero() {
+ return 0, 0, nil
+ }
defer b.mu.Unlock()
b.mu.Lock()
buid := string(blessings.UniqueID())
diff --git a/runtime/internal/rpc/roaming_test.go b/runtime/internal/rpc/roaming_test.go
index b2df6e1..d192ec9 100644
--- a/runtime/internal/rpc/roaming_test.go
+++ b/runtime/internal/rpc/roaming_test.go
@@ -89,15 +89,10 @@
t.Fatalf("got %d, want %d", got, want)
}
- n1 := netstate.NewNetAddr("ip", "1.1.1.1")
- n2 := netstate.NewNetAddr("ip", "2.2.2.2")
-
watcher := make(chan rpc.NetworkChange, 10)
server.WatchNetwork(watcher)
defer close(watcher)
- roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
-
waitForChange := func() *rpc.NetworkChange {
ctx.Infof("Waiting on %p", watcher)
select {
@@ -109,8 +104,17 @@
return nil
}
- // We expect 4 changes, one for each IP per usable listen spec addr.
+ n1 := netstate.NewNetAddr("ip", "1.1.1.1")
+ n2 := netstate.NewNetAddr("ip", "2.2.2.2")
+ n3 := netstate.NewNetAddr("ip", "3.3.3.3")
+
+ roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
+
+ // We expect 2 added addrs and 4 changes, one for each IP per usable listen spec addr.
change := waitForChange()
+ if got, want := len(change.AddedAddrs), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
if got, want := len(change.Changed), 4; got != want {
t.Fatalf("got %d, want %d", got, want)
}
@@ -135,10 +139,16 @@
t.Fatalf("got %d, want %d", got, want)
}
- roaming <- NewRmAddrsSetting([]net.Addr{n1})
+ // Mimic that n2 has been changed to n3. The network monitor will publish
+ // two AddrsSettings - RmAddrsSetting(n2) and NewNewAddrsSetting(n1, n3).
- // We expect 2 changes, one for each usable listen spec addr.
+ roaming <- NewRmAddrsSetting([]net.Addr{n2})
+
+ // We expect 1 removed addr and 2 changes, one for each usable listen spec addr.
change = waitForChange()
+ if got, want := len(change.RemovedAddrs), 1; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
if got, want := len(change.Changed), 2; got != want {
t.Fatalf("got %d, want %d", got, want)
}
@@ -146,7 +156,7 @@
nepsR := make([]naming.Endpoint, len(eps))
copy(nepsR, eps)
for _, p := range getUniqPorts(eps) {
- nep2 := updateHost(eps[0], net.JoinHostPort("2.2.2.2", p))
+ nep2 := updateHost(eps[0], net.JoinHostPort("1.1.1.1", p))
nepsR = append(nepsR, nep2)
}
@@ -155,12 +165,43 @@
t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
}
+ roaming <- NewNewAddrsSetting([]net.Addr{n1, n3})
+
+ // We expect 1 added addr and 2 changes, one for the new IP per usable listen spec addr.
+ change = waitForChange()
+ if got, want := len(change.AddedAddrs), 1; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ if got, want := len(change.Changed), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
+ nepsC := make([]naming.Endpoint, len(eps))
+ copy(nepsC, eps)
+ for _, p := range getUniqPorts(eps) {
+ nep1 := updateHost(eps[0], net.JoinHostPort("1.1.1.1", p))
+ nep2 := updateHost(eps[0], net.JoinHostPort("3.3.3.3", p))
+ nepsC = append(nepsC, nep1, nep2)
+ }
+
+ status = server.Status()
+ if got, want := status.Endpoints, nepsC; !cmpEndpoints(got, want) {
+ t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
+ }
+
+ if got, want := len(status.Mounts), len(nepsC)*2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ if got, want := len(status.Mounts.Servers()), len(nepsC); got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
// Remove all addresses to mimic losing all connectivity.
- roaming <- NewRmAddrsSetting(getIPAddrs(nepsR))
+ roaming <- NewRmAddrsSetting(getIPAddrs(nepsC))
// We expect changes for all of the current endpoints
change = waitForChange()
- if got, want := len(change.Changed), len(nepsR); got != want {
+ if got, want := len(change.Changed), len(nepsC); got != want {
t.Fatalf("got %d, want %d", got, want)
}
@@ -175,7 +216,6 @@
if got, want := len(change.Changed), 2; got != want {
t.Fatalf("got %d, want %d", got, want)
}
-
}
func TestWatcherDeadlock(t *testing.T) {
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index ed23e5c..b58a3cd 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -731,11 +731,9 @@
}
switch setting.Name() {
case NewAddrsSetting:
- change.Changed = s.addAddresses(v)
- change.AddedAddrs = v
+ change.AddedAddrs, change.Changed = s.updateAddresses(v)
case RmAddrsSetting:
- change.Changed, change.Error = s.removeAddresses(v)
- change.RemovedAddrs = v
+ change.RemovedAddrs, change.Changed = s.removeAddresses(v)
}
s.ctx.VI(2).Infof("rpc: dhcp: change %v", change)
for ch, _ := range s.dhcpState.watchers {
@@ -760,66 +758,86 @@
}
-// Remove all endpoints that have the same host address as the supplied
-// address parameter.
-func (s *server) removeAddresses(addrs []net.Addr) ([]naming.Endpoint, error) {
- var removed []naming.Endpoint
- for _, address := range addrs {
- host := getHost(address)
- for ls, _ := range s.listenState {
- if ls != nil && ls.roaming && len(ls.ieps) > 0 {
- remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
- for _, iep := range ls.ieps {
- lnHost, _, err := net.SplitHostPort(iep.Address)
- if err != nil {
- lnHost = iep.Address
- }
- if lnHost == host {
- s.ctx.VI(2).Infof("rpc: dhcp removing: %s", iep)
- removed = append(removed, iep)
- s.publisher.RemoveServer(iep.String())
- continue
- }
- remaining = append(remaining, iep)
- }
- ls.ieps = remaining
- }
+// findEndpoint returns the index of the first endpoint in ieps with the given network address.
+func findEndpoint(ieps []*inaming.Endpoint, host string) int {
+ for i, iep := range ieps {
+ if getHost(iep.Addr()) == host {
+ return i
}
}
- return removed, nil
+ return -1
}
-// Add new endpoints for the new address. There is no way to know with
-// 100% confidence which new endpoints to publish without shutting down
-// all network connections and reinitializing everything from scratch.
-// Instead, we find all roaming listeners with at least one endpoint
-// and create a new endpoint with the same port as the existing ones
-// but with the new address supplied to us to by the dhcp code. As
-// an additional safeguard we reject the new address if it is not
-// externally accessible.
-// This places the onus on the dhcp/roaming code that sends us addresses
-// to ensure that those addresses are externally reachable.
-func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
- var added []naming.Endpoint
- for _, address := range netstate.ConvertToAddresses(addrs) {
- if !netstate.IsAccessibleIP(address) {
- return added
- }
- host := getHost(address)
+// removeAddresses removes all endpoints that have the same host address as
+// the supplied address parameter.
+func (s *server) removeAddresses(addrs []net.Addr) ([]net.Addr, []naming.Endpoint) {
+ var removedAddrs []net.Addr
+ var removedEps []naming.Endpoint
+ for _, addr := range addrs {
+ host := getHost(addr)
+ removed := false
for ls, _ := range s.listenState {
- if ls != nil && ls.roaming {
+ if ls == nil || !ls.roaming {
+ continue
+ }
+ if i := findEndpoint(ls.ieps, host); i >= 0 {
+ oiep := ls.ieps[i]
+ s.ctx.VI(2).Infof("rpc: dhcp removing: %s", oiep)
+ n := len(ls.ieps) - 1
+ ls.ieps[i], ls.ieps[n] = ls.ieps[n], nil
+ ls.ieps = ls.ieps[:n]
+ s.publisher.RemoveServer(oiep.String())
+ removedEps = append(removedEps, oiep)
+ removed = true
+ }
+ }
+ if removed {
+ removedAddrs = append(removedAddrs, addr)
+ }
+ }
+ return removedAddrs, removedEps
+}
+
+// updateAddresses updates endpoints with the given addresses. There is no way to
+// know with 100% confidence which new endpoints to publish without shutting down
+// all network connections and reinitializing everything from scratch. Instead, we
+// find all roaming listeners with at least one endpoint and create a new endpoint
+// with the same port as the existing ones but with the new address supplied to us
+// to by the dhcp code. As an additional safeguard we reject the new address if it
+// is not externally accessible.
+//
+// This places the onus on the dhcp/roaming code that sends us addresses to ensure
+// that those addresses are externally reachable.
+func (s *server) updateAddresses(addrs []net.Addr) ([]net.Addr, []naming.Endpoint) {
+ var addedAddrs []net.Addr
+ var addedEPs []naming.Endpoint
+ for _, addr := range netstate.ConvertToAddresses(addrs) {
+ if !netstate.IsAccessibleIP(addr) {
+ continue
+ }
+ host := getHost(addr)
+ added := false
+ for ls, _ := range s.listenState {
+ if ls == nil || !ls.roaming {
+ continue
+ }
+ if i := findEndpoint(ls.ieps, host); i < 0 {
niep := ls.protoIEP
niep.Address = net.JoinHostPort(host, ls.port)
niep.IsMountTable = s.servesMountTable
niep.IsLeaf = s.isLeaf
- ls.ieps = append(ls.ieps, &niep)
s.ctx.VI(2).Infof("rpc: dhcp adding: %s", niep)
+ ls.ieps = append(ls.ieps, &niep)
s.publisher.AddServer(niep.String())
- added = append(added, &niep)
+ addedEPs = append(addedEPs, &niep)
+ added = true
}
}
+ if added {
+ addedAddrs = append(addedAddrs, addr)
+ }
}
- return added
+ return addedAddrs, addedEPs
}
type leafDispatcher struct {
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 9e26aa9..b9d19e2 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -204,7 +204,7 @@
// This flow must outlive the flow we're currently creating.
// It lives as long as the connection to which it is bound.
tctx, tcancel := context.WithRootCancel(ctx)
- tflow, err := c.flowMgr.Dial(tctx, ep, peerAuth)
+ tflow, err := c.flowMgr.Dial(tctx, ep, typeFlowAuthorizer{})
if err != nil {
status.serverErr = suberr(newErrTypeFlowFailure(tctx, err))
flow.Close()
@@ -217,7 +217,7 @@
return
}
if _, err = tflow.Write([]byte{typeFlow}); err != nil {
- status.serverErr = suberr(newErrTypeFlowFailure(tctx, nil))
+ status.serverErr = suberr(newErrTypeFlowFailure(tctx, err))
flow.Close()
tflow.Close()
return
@@ -233,6 +233,21 @@
status.flow = flow
}
+type typeFlowAuthorizer struct{}
+
+func (a typeFlowAuthorizer) AuthorizePeer(
+ ctx *context.T,
+ localEP, remoteEP naming.Endpoint,
+ remoteBlessings security.Blessings,
+ remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
+ return nil, nil, nil
+}
+
+func (a typeFlowAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
+ security.Blessings, map[string]security.Discharge, error) {
+ return security.Blessings{}, nil, nil
+}
+
type peerAuthorizer struct {
auth security.Authorizer
method string
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index 407395b..f9b8bb2 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -28,7 +28,9 @@
nosqlwire "v.io/v23/services/syncbase/nosql"
watchwire "v.io/v23/services/watch"
"v.io/v23/syncbase/nosql"
+ "v.io/v23/vdl"
"v.io/v23/verror"
+ "v.io/v23/vom"
"v.io/v23/vtrace"
)
@@ -357,13 +359,29 @@
}
func (s *execStreamImpl) Send(item interface{}) error {
- v, ok := item.([][]byte)
+ v, ok := item.([]*vdl.Value)
if !ok {
return verror.NewErrInternal(s.ctx)
}
+ // TODO(aghassemi): Switch everything to 'any' from '[]byte'.
+ // For now, ResultStream is the only place using 'any' instead of '[]byte', so
+ // VOM bytes are already decoded.
+ // https://github.com/vanadium/issues/issues/766
+ var values [][]byte
+ for _, vdlValue := range v {
+ var bytes []byte
+ // The value can either be a string (column headers) or bytes (value).
+ if vdlValue.Kind() == vdl.String {
+ bytes = []byte(vdlValue.String())
+ } else {
+ bytes = vdlValue.Bytes()
+ }
+ values = append(values, bytes)
+ }
+
r := mojom.Result{
- Values: v,
+ Values: values,
}
// proxy.OnResult() blocks until the client acks the previous invocation,
@@ -462,13 +480,19 @@
if !ok {
return verror.NewErrInternal(s.ctx)
}
-
vc := nosql.ToWatchChange(c)
+
+ var value []byte
+ if vc.ValueBytes != nil {
+ if err := vom.Decode(vc.ValueBytes, &value); err != nil {
+ return err
+ }
+ }
mc := mojom.WatchChange{
TableName: vc.Table,
RowKey: vc.Row,
ChangeType: uint32(vc.ChangeType),
- ValueBytes: vc.ValueBytes,
+ ValueBytes: value,
ResumeMarker: vc.ResumeMarker,
FromSync: vc.FromSync,
Continued: vc.Continued,
@@ -533,6 +557,16 @@
return toMojoError(err), marker, nil
}
+func (m *mojoImpl) DbListTables(name string) (mojom.Error, []string, error) {
+ ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.DatabaseDesc, "ListTables"))
+ stub, err := m.getDb(ctx, call, name)
+ if err != nil {
+ return toMojoError(err), nil, nil
+ }
+ tables, err := stub.ListTables(ctx, call)
+ return toMojoError(err), tables, nil
+}
+
////////////////////////////////////////
// nosql.Database:SyncgroupManager
@@ -712,11 +746,16 @@
if !ok {
return verror.NewErrInternal(s.ctx)
}
+
+ var value []byte
+ if err := vom.Decode(kv.Value, &value); err != nil {
+ return err
+ }
// proxy.OnKeyValue() blocks until the client acks the previous invocation,
// thus providing flow control.
return s.proxy.OnKeyValue(mojom.KeyValue{
Key: kv.Key,
- Value: kv.Value,
+ Value: value,
})
}
@@ -791,7 +830,12 @@
if err != nil {
return toMojoError(err), nil, nil
}
- value, err := stub.Get(ctx, call, NoSchema)
+ vomBytes, err := stub.Get(ctx, call, NoSchema)
+
+ var value []byte
+ if err := vom.Decode(vomBytes, &value); err != nil {
+ return toMojoError(err), nil, nil
+ }
return toMojoError(err), value, nil
}
@@ -801,7 +845,14 @@
if err != nil {
return toMojoError(err), nil
}
- err = stub.Put(ctx, call, NoSchema, value)
+ // TODO(aghassemi): Encode from []byte to VOM encoded []byte until we have
+ // support for types other than byte.
+ // https://github.com/vanadium/issues/issues/766
+ vomBytes, err := vom.Encode(value)
+ if err != nil {
+ return toMojoError(err), nil
+ }
+ err = stub.Put(ctx, call, NoSchema, vomBytes)
return toMojoError(err), nil
}