Merge "Counterpart of https://vanadium-review.googlesource.com/16644"
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 4f75755..7f95652 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -387,6 +387,9 @@
 }
 
 func (b *blessingsFlow) send(ctx *context.T, blessings security.Blessings, discharges map[string]security.Discharge) (bkey, dkey uint64, err error) {
+	if blessings.IsZero() {
+		return 0, 0, nil
+	}
 	defer b.mu.Unlock()
 	b.mu.Lock()
 	buid := string(blessings.UniqueID())
diff --git a/runtime/internal/rpc/roaming_test.go b/runtime/internal/rpc/roaming_test.go
index b2df6e1..d192ec9 100644
--- a/runtime/internal/rpc/roaming_test.go
+++ b/runtime/internal/rpc/roaming_test.go
@@ -89,15 +89,10 @@
 		t.Fatalf("got %d, want %d", got, want)
 	}
 
-	n1 := netstate.NewNetAddr("ip", "1.1.1.1")
-	n2 := netstate.NewNetAddr("ip", "2.2.2.2")
-
 	watcher := make(chan rpc.NetworkChange, 10)
 	server.WatchNetwork(watcher)
 	defer close(watcher)
 
-	roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
-
 	waitForChange := func() *rpc.NetworkChange {
 		ctx.Infof("Waiting on %p", watcher)
 		select {
@@ -109,8 +104,17 @@
 		return nil
 	}
 
-	// We expect 4 changes, one for each IP per usable listen spec addr.
+	n1 := netstate.NewNetAddr("ip", "1.1.1.1")
+	n2 := netstate.NewNetAddr("ip", "2.2.2.2")
+	n3 := netstate.NewNetAddr("ip", "3.3.3.3")
+
+	roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
+
+	// We expect 2 added addrs and 4 changes, one for each IP per usable listen spec addr.
 	change := waitForChange()
+	if got, want := len(change.AddedAddrs), 2; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
 	if got, want := len(change.Changed), 4; got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
@@ -135,10 +139,16 @@
 		t.Fatalf("got %d, want %d", got, want)
 	}
 
-	roaming <- NewRmAddrsSetting([]net.Addr{n1})
+	// Mimic that n2 has been changed to n3. The network monitor will publish
+	// two AddrsSettings - RmAddrsSetting(n2) and NewNewAddrsSetting(n1, n3).
 
-	// We expect 2 changes, one for each usable listen spec addr.
+	roaming <- NewRmAddrsSetting([]net.Addr{n2})
+
+	// We expect 1 removed addr and 2 changes, one for each usable listen spec addr.
 	change = waitForChange()
+	if got, want := len(change.RemovedAddrs), 1; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
 	if got, want := len(change.Changed), 2; got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
@@ -146,7 +156,7 @@
 	nepsR := make([]naming.Endpoint, len(eps))
 	copy(nepsR, eps)
 	for _, p := range getUniqPorts(eps) {
-		nep2 := updateHost(eps[0], net.JoinHostPort("2.2.2.2", p))
+		nep2 := updateHost(eps[0], net.JoinHostPort("1.1.1.1", p))
 		nepsR = append(nepsR, nep2)
 	}
 
@@ -155,12 +165,43 @@
 		t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
 	}
 
+	roaming <- NewNewAddrsSetting([]net.Addr{n1, n3})
+
+	// We expect 1 added addr and 2 changes, one for the new IP per usable listen spec addr.
+	change = waitForChange()
+	if got, want := len(change.AddedAddrs), 1; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
+	if got, want := len(change.Changed), 2; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
+
+	nepsC := make([]naming.Endpoint, len(eps))
+	copy(nepsC, eps)
+	for _, p := range getUniqPorts(eps) {
+		nep1 := updateHost(eps[0], net.JoinHostPort("1.1.1.1", p))
+		nep2 := updateHost(eps[0], net.JoinHostPort("3.3.3.3", p))
+		nepsC = append(nepsC, nep1, nep2)
+	}
+
+	status = server.Status()
+	if got, want := status.Endpoints, nepsC; !cmpEndpoints(got, want) {
+		t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
+	}
+
+	if got, want := len(status.Mounts), len(nepsC)*2; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
+	if got, want := len(status.Mounts.Servers()), len(nepsC); got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
+
 	// Remove all addresses to mimic losing all connectivity.
-	roaming <- NewRmAddrsSetting(getIPAddrs(nepsR))
+	roaming <- NewRmAddrsSetting(getIPAddrs(nepsC))
 
 	// We expect changes for all of the current endpoints
 	change = waitForChange()
-	if got, want := len(change.Changed), len(nepsR); got != want {
+	if got, want := len(change.Changed), len(nepsC); got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
 
@@ -175,7 +216,6 @@
 	if got, want := len(change.Changed), 2; got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
-
 }
 
 func TestWatcherDeadlock(t *testing.T) {
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index ed23e5c..b58a3cd 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -731,11 +731,9 @@
 			}
 			switch setting.Name() {
 			case NewAddrsSetting:
-				change.Changed = s.addAddresses(v)
-				change.AddedAddrs = v
+				change.AddedAddrs, change.Changed = s.updateAddresses(v)
 			case RmAddrsSetting:
-				change.Changed, change.Error = s.removeAddresses(v)
-				change.RemovedAddrs = v
+				change.RemovedAddrs, change.Changed = s.removeAddresses(v)
 			}
 			s.ctx.VI(2).Infof("rpc: dhcp: change %v", change)
 			for ch, _ := range s.dhcpState.watchers {
@@ -760,66 +758,86 @@
 
 }
 
-// Remove all endpoints that have the same host address as the supplied
-// address parameter.
-func (s *server) removeAddresses(addrs []net.Addr) ([]naming.Endpoint, error) {
-	var removed []naming.Endpoint
-	for _, address := range addrs {
-		host := getHost(address)
-		for ls, _ := range s.listenState {
-			if ls != nil && ls.roaming && len(ls.ieps) > 0 {
-				remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
-				for _, iep := range ls.ieps {
-					lnHost, _, err := net.SplitHostPort(iep.Address)
-					if err != nil {
-						lnHost = iep.Address
-					}
-					if lnHost == host {
-						s.ctx.VI(2).Infof("rpc: dhcp removing: %s", iep)
-						removed = append(removed, iep)
-						s.publisher.RemoveServer(iep.String())
-						continue
-					}
-					remaining = append(remaining, iep)
-				}
-				ls.ieps = remaining
-			}
+// findEndpoint returns the index of the first endpoint in ieps with the given network address.
+func findEndpoint(ieps []*inaming.Endpoint, host string) int {
+	for i, iep := range ieps {
+		if getHost(iep.Addr()) == host {
+			return i
 		}
 	}
-	return removed, nil
+	return -1
 }
 
-// Add new endpoints for the new address. There is no way to know with
-// 100% confidence which new endpoints to publish without shutting down
-// all network connections and reinitializing everything from scratch.
-// Instead, we find all roaming listeners with at least one endpoint
-// and create a new endpoint with the same port as the existing ones
-// but with the new address supplied to us to by the dhcp code. As
-// an additional safeguard we reject the new address if it is not
-// externally accessible.
-// This places the onus on the dhcp/roaming code that sends us addresses
-// to ensure that those addresses are externally reachable.
-func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
-	var added []naming.Endpoint
-	for _, address := range netstate.ConvertToAddresses(addrs) {
-		if !netstate.IsAccessibleIP(address) {
-			return added
-		}
-		host := getHost(address)
+// removeAddresses removes all endpoints that have the same host address as
+// the supplied address parameter.
+func (s *server) removeAddresses(addrs []net.Addr) ([]net.Addr, []naming.Endpoint) {
+	var removedAddrs []net.Addr
+	var removedEps []naming.Endpoint
+	for _, addr := range addrs {
+		host := getHost(addr)
+		removed := false
 		for ls, _ := range s.listenState {
-			if ls != nil && ls.roaming {
+			if ls == nil || !ls.roaming {
+				continue
+			}
+			if i := findEndpoint(ls.ieps, host); i >= 0 {
+				oiep := ls.ieps[i]
+				s.ctx.VI(2).Infof("rpc: dhcp removing: %s", oiep)
+				n := len(ls.ieps) - 1
+				ls.ieps[i], ls.ieps[n] = ls.ieps[n], nil
+				ls.ieps = ls.ieps[:n]
+				s.publisher.RemoveServer(oiep.String())
+				removedEps = append(removedEps, oiep)
+				removed = true
+			}
+		}
+		if removed {
+			removedAddrs = append(removedAddrs, addr)
+		}
+	}
+	return removedAddrs, removedEps
+}
+
+// updateAddresses updates endpoints with the given addresses. There is no way to
+// know with 100% confidence which new endpoints to publish without shutting down
+// all network connections and reinitializing everything from scratch. Instead, we
+// find all roaming listeners with at least one endpoint and create a new endpoint
+// with the same port as the existing ones but with the new address supplied to us
+// to by the dhcp code. As an additional safeguard we reject the new address if it
+// is not externally accessible.
+//
+// This places the onus on the dhcp/roaming code that sends us addresses to ensure
+// that those addresses are externally reachable.
+func (s *server) updateAddresses(addrs []net.Addr) ([]net.Addr, []naming.Endpoint) {
+	var addedAddrs []net.Addr
+	var addedEPs []naming.Endpoint
+	for _, addr := range netstate.ConvertToAddresses(addrs) {
+		if !netstate.IsAccessibleIP(addr) {
+			continue
+		}
+		host := getHost(addr)
+		added := false
+		for ls, _ := range s.listenState {
+			if ls == nil || !ls.roaming {
+				continue
+			}
+			if i := findEndpoint(ls.ieps, host); i < 0 {
 				niep := ls.protoIEP
 				niep.Address = net.JoinHostPort(host, ls.port)
 				niep.IsMountTable = s.servesMountTable
 				niep.IsLeaf = s.isLeaf
-				ls.ieps = append(ls.ieps, &niep)
 				s.ctx.VI(2).Infof("rpc: dhcp adding: %s", niep)
+				ls.ieps = append(ls.ieps, &niep)
 				s.publisher.AddServer(niep.String())
-				added = append(added, &niep)
+				addedEPs = append(addedEPs, &niep)
+				added = true
 			}
 		}
+		if added {
+			addedAddrs = append(addedAddrs, addr)
+		}
 	}
-	return added
+	return addedAddrs, addedEPs
 }
 
 type leafDispatcher struct {
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 9e26aa9..b9d19e2 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -204,7 +204,7 @@
 		// This flow must outlive the flow we're currently creating.
 		// It lives as long as the connection to which it is bound.
 		tctx, tcancel := context.WithRootCancel(ctx)
-		tflow, err := c.flowMgr.Dial(tctx, ep, peerAuth)
+		tflow, err := c.flowMgr.Dial(tctx, ep, typeFlowAuthorizer{})
 		if err != nil {
 			status.serverErr = suberr(newErrTypeFlowFailure(tctx, err))
 			flow.Close()
@@ -217,7 +217,7 @@
 			return
 		}
 		if _, err = tflow.Write([]byte{typeFlow}); err != nil {
-			status.serverErr = suberr(newErrTypeFlowFailure(tctx, nil))
+			status.serverErr = suberr(newErrTypeFlowFailure(tctx, err))
 			flow.Close()
 			tflow.Close()
 			return
@@ -233,6 +233,21 @@
 	status.flow = flow
 }
 
+type typeFlowAuthorizer struct{}
+
+func (a typeFlowAuthorizer) AuthorizePeer(
+	ctx *context.T,
+	localEP, remoteEP naming.Endpoint,
+	remoteBlessings security.Blessings,
+	remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) {
+	return nil, nil, nil
+}
+
+func (a typeFlowAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) (
+	security.Blessings, map[string]security.Discharge, error) {
+	return security.Blessings{}, nil, nil
+}
+
 type peerAuthorizer struct {
 	auth   security.Authorizer
 	method string
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index 407395b..f9b8bb2 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -28,7 +28,9 @@
 	nosqlwire "v.io/v23/services/syncbase/nosql"
 	watchwire "v.io/v23/services/watch"
 	"v.io/v23/syncbase/nosql"
+	"v.io/v23/vdl"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 	"v.io/v23/vtrace"
 )
 
@@ -357,13 +359,29 @@
 }
 
 func (s *execStreamImpl) Send(item interface{}) error {
-	v, ok := item.([][]byte)
+	v, ok := item.([]*vdl.Value)
 	if !ok {
 		return verror.NewErrInternal(s.ctx)
 	}
 
+	// TODO(aghassemi): Switch everything to 'any' from '[]byte'.
+	// For now, ResultStream is the only place using 'any' instead of '[]byte', so
+	// VOM bytes are already decoded.
+	// https://github.com/vanadium/issues/issues/766
+	var values [][]byte
+	for _, vdlValue := range v {
+		var bytes []byte
+		// The value can either be a string (column headers) or bytes (value).
+		if vdlValue.Kind() == vdl.String {
+			bytes = []byte(vdlValue.String())
+		} else {
+			bytes = vdlValue.Bytes()
+		}
+		values = append(values, bytes)
+	}
+
 	r := mojom.Result{
-		Values: v,
+		Values: values,
 	}
 
 	// proxy.OnResult() blocks until the client acks the previous invocation,
@@ -462,13 +480,19 @@
 	if !ok {
 		return verror.NewErrInternal(s.ctx)
 	}
-
 	vc := nosql.ToWatchChange(c)
+
+	var value []byte
+	if vc.ValueBytes != nil {
+		if err := vom.Decode(vc.ValueBytes, &value); err != nil {
+			return err
+		}
+	}
 	mc := mojom.WatchChange{
 		TableName:    vc.Table,
 		RowKey:       vc.Row,
 		ChangeType:   uint32(vc.ChangeType),
-		ValueBytes:   vc.ValueBytes,
+		ValueBytes:   value,
 		ResumeMarker: vc.ResumeMarker,
 		FromSync:     vc.FromSync,
 		Continued:    vc.Continued,
@@ -533,6 +557,16 @@
 	return toMojoError(err), marker, nil
 }
 
+func (m *mojoImpl) DbListTables(name string) (mojom.Error, []string, error) {
+	ctx, call := m.newCtxCall(name, methodDesc(nosqlwire.DatabaseDesc, "ListTables"))
+	stub, err := m.getDb(ctx, call, name)
+	if err != nil {
+		return toMojoError(err), nil, nil
+	}
+	tables, err := stub.ListTables(ctx, call)
+	return toMojoError(err), tables, nil
+}
+
 ////////////////////////////////////////
 // nosql.Database:SyncgroupManager
 
@@ -712,11 +746,16 @@
 	if !ok {
 		return verror.NewErrInternal(s.ctx)
 	}
+
+	var value []byte
+	if err := vom.Decode(kv.Value, &value); err != nil {
+		return err
+	}
 	// proxy.OnKeyValue() blocks until the client acks the previous invocation,
 	// thus providing flow control.
 	return s.proxy.OnKeyValue(mojom.KeyValue{
 		Key:   kv.Key,
-		Value: kv.Value,
+		Value: value,
 	})
 }
 
@@ -791,7 +830,12 @@
 	if err != nil {
 		return toMojoError(err), nil, nil
 	}
-	value, err := stub.Get(ctx, call, NoSchema)
+	vomBytes, err := stub.Get(ctx, call, NoSchema)
+
+	var value []byte
+	if err := vom.Decode(vomBytes, &value); err != nil {
+		return toMojoError(err), nil, nil
+	}
 	return toMojoError(err), value, nil
 }
 
@@ -801,7 +845,14 @@
 	if err != nil {
 		return toMojoError(err), nil
 	}
-	err = stub.Put(ctx, call, NoSchema, value)
+	// TODO(aghassemi): Encode from []byte to VOM encoded []byte until we have
+	// support for types other than byte.
+	// https://github.com/vanadium/issues/issues/766
+	vomBytes, err := vom.Encode(value)
+	if err != nil {
+		return toMojoError(err), nil
+	}
+	err = stub.Put(ctx, call, NoSchema, vomBytes)
 	return toMojoError(err), nil
 }