rpc/dhcp: fix a bug in publishing network changes

  The network setting publisher in roaming profile sends all available
  network addresses not a newly added addresses when there is a network
  change. But server's dhcp loop assumes it received only newly added
  addresses and so append all of them to the listen states causing
  duplication of existing addresses.

  This CL fixes this issue.

Change-Id: If95543d2d9d05df0dba23eaa53ae80341cef41d6
diff --git a/runtime/internal/rpc/roaming_test.go b/runtime/internal/rpc/roaming_test.go
index b2df6e1..d192ec9 100644
--- a/runtime/internal/rpc/roaming_test.go
+++ b/runtime/internal/rpc/roaming_test.go
@@ -89,15 +89,10 @@
 		t.Fatalf("got %d, want %d", got, want)
 	}
 
-	n1 := netstate.NewNetAddr("ip", "1.1.1.1")
-	n2 := netstate.NewNetAddr("ip", "2.2.2.2")
-
 	watcher := make(chan rpc.NetworkChange, 10)
 	server.WatchNetwork(watcher)
 	defer close(watcher)
 
-	roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
-
 	waitForChange := func() *rpc.NetworkChange {
 		ctx.Infof("Waiting on %p", watcher)
 		select {
@@ -109,8 +104,17 @@
 		return nil
 	}
 
-	// We expect 4 changes, one for each IP per usable listen spec addr.
+	n1 := netstate.NewNetAddr("ip", "1.1.1.1")
+	n2 := netstate.NewNetAddr("ip", "2.2.2.2")
+	n3 := netstate.NewNetAddr("ip", "3.3.3.3")
+
+	roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
+
+	// We expect 2 added addrs and 4 changes, one for each IP per usable listen spec addr.
 	change := waitForChange()
+	if got, want := len(change.AddedAddrs), 2; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
 	if got, want := len(change.Changed), 4; got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
@@ -135,10 +139,16 @@
 		t.Fatalf("got %d, want %d", got, want)
 	}
 
-	roaming <- NewRmAddrsSetting([]net.Addr{n1})
+	// Mimic that n2 has been changed to n3. The network monitor will publish
+	// two AddrsSettings - RmAddrsSetting(n2) and NewNewAddrsSetting(n1, n3).
 
-	// We expect 2 changes, one for each usable listen spec addr.
+	roaming <- NewRmAddrsSetting([]net.Addr{n2})
+
+	// We expect 1 removed addr and 2 changes, one for each usable listen spec addr.
 	change = waitForChange()
+	if got, want := len(change.RemovedAddrs), 1; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
 	if got, want := len(change.Changed), 2; got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
@@ -146,7 +156,7 @@
 	nepsR := make([]naming.Endpoint, len(eps))
 	copy(nepsR, eps)
 	for _, p := range getUniqPorts(eps) {
-		nep2 := updateHost(eps[0], net.JoinHostPort("2.2.2.2", p))
+		nep2 := updateHost(eps[0], net.JoinHostPort("1.1.1.1", p))
 		nepsR = append(nepsR, nep2)
 	}
 
@@ -155,12 +165,43 @@
 		t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
 	}
 
+	roaming <- NewNewAddrsSetting([]net.Addr{n1, n3})
+
+	// We expect 1 added addr and 2 changes, one for the new IP per usable listen spec addr.
+	change = waitForChange()
+	if got, want := len(change.AddedAddrs), 1; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
+	if got, want := len(change.Changed), 2; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
+
+	nepsC := make([]naming.Endpoint, len(eps))
+	copy(nepsC, eps)
+	for _, p := range getUniqPorts(eps) {
+		nep1 := updateHost(eps[0], net.JoinHostPort("1.1.1.1", p))
+		nep2 := updateHost(eps[0], net.JoinHostPort("3.3.3.3", p))
+		nepsC = append(nepsC, nep1, nep2)
+	}
+
+	status = server.Status()
+	if got, want := status.Endpoints, nepsC; !cmpEndpoints(got, want) {
+		t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
+	}
+
+	if got, want := len(status.Mounts), len(nepsC)*2; got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
+	if got, want := len(status.Mounts.Servers()), len(nepsC); got != want {
+		t.Fatalf("got %d, want %d", got, want)
+	}
+
 	// Remove all addresses to mimic losing all connectivity.
-	roaming <- NewRmAddrsSetting(getIPAddrs(nepsR))
+	roaming <- NewRmAddrsSetting(getIPAddrs(nepsC))
 
 	// We expect changes for all of the current endpoints
 	change = waitForChange()
-	if got, want := len(change.Changed), len(nepsR); got != want {
+	if got, want := len(change.Changed), len(nepsC); got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
 
@@ -175,7 +216,6 @@
 	if got, want := len(change.Changed), 2; got != want {
 		t.Fatalf("got %d, want %d", got, want)
 	}
-
 }
 
 func TestWatcherDeadlock(t *testing.T) {
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index ed23e5c..b58a3cd 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -731,11 +731,9 @@
 			}
 			switch setting.Name() {
 			case NewAddrsSetting:
-				change.Changed = s.addAddresses(v)
-				change.AddedAddrs = v
+				change.AddedAddrs, change.Changed = s.updateAddresses(v)
 			case RmAddrsSetting:
-				change.Changed, change.Error = s.removeAddresses(v)
-				change.RemovedAddrs = v
+				change.RemovedAddrs, change.Changed = s.removeAddresses(v)
 			}
 			s.ctx.VI(2).Infof("rpc: dhcp: change %v", change)
 			for ch, _ := range s.dhcpState.watchers {
@@ -760,66 +758,86 @@
 
 }
 
-// Remove all endpoints that have the same host address as the supplied
-// address parameter.
-func (s *server) removeAddresses(addrs []net.Addr) ([]naming.Endpoint, error) {
-	var removed []naming.Endpoint
-	for _, address := range addrs {
-		host := getHost(address)
-		for ls, _ := range s.listenState {
-			if ls != nil && ls.roaming && len(ls.ieps) > 0 {
-				remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
-				for _, iep := range ls.ieps {
-					lnHost, _, err := net.SplitHostPort(iep.Address)
-					if err != nil {
-						lnHost = iep.Address
-					}
-					if lnHost == host {
-						s.ctx.VI(2).Infof("rpc: dhcp removing: %s", iep)
-						removed = append(removed, iep)
-						s.publisher.RemoveServer(iep.String())
-						continue
-					}
-					remaining = append(remaining, iep)
-				}
-				ls.ieps = remaining
-			}
+// findEndpoint returns the index of the first endpoint in ieps with the given network address.
+func findEndpoint(ieps []*inaming.Endpoint, host string) int {
+	for i, iep := range ieps {
+		if getHost(iep.Addr()) == host {
+			return i
 		}
 	}
-	return removed, nil
+	return -1
 }
 
-// Add new endpoints for the new address. There is no way to know with
-// 100% confidence which new endpoints to publish without shutting down
-// all network connections and reinitializing everything from scratch.
-// Instead, we find all roaming listeners with at least one endpoint
-// and create a new endpoint with the same port as the existing ones
-// but with the new address supplied to us to by the dhcp code. As
-// an additional safeguard we reject the new address if it is not
-// externally accessible.
-// This places the onus on the dhcp/roaming code that sends us addresses
-// to ensure that those addresses are externally reachable.
-func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
-	var added []naming.Endpoint
-	for _, address := range netstate.ConvertToAddresses(addrs) {
-		if !netstate.IsAccessibleIP(address) {
-			return added
-		}
-		host := getHost(address)
+// removeAddresses removes all endpoints that have the same host address as
+// the supplied address parameter.
+func (s *server) removeAddresses(addrs []net.Addr) ([]net.Addr, []naming.Endpoint) {
+	var removedAddrs []net.Addr
+	var removedEps []naming.Endpoint
+	for _, addr := range addrs {
+		host := getHost(addr)
+		removed := false
 		for ls, _ := range s.listenState {
-			if ls != nil && ls.roaming {
+			if ls == nil || !ls.roaming {
+				continue
+			}
+			if i := findEndpoint(ls.ieps, host); i >= 0 {
+				oiep := ls.ieps[i]
+				s.ctx.VI(2).Infof("rpc: dhcp removing: %s", oiep)
+				n := len(ls.ieps) - 1
+				ls.ieps[i], ls.ieps[n] = ls.ieps[n], nil
+				ls.ieps = ls.ieps[:n]
+				s.publisher.RemoveServer(oiep.String())
+				removedEps = append(removedEps, oiep)
+				removed = true
+			}
+		}
+		if removed {
+			removedAddrs = append(removedAddrs, addr)
+		}
+	}
+	return removedAddrs, removedEps
+}
+
+// updateAddresses updates endpoints with the given addresses. There is no way to
+// know with 100% confidence which new endpoints to publish without shutting down
+// all network connections and reinitializing everything from scratch. Instead, we
+// find all roaming listeners with at least one endpoint and create a new endpoint
+// with the same port as the existing ones but with the new address supplied to us
+// to by the dhcp code. As an additional safeguard we reject the new address if it
+// is not externally accessible.
+//
+// This places the onus on the dhcp/roaming code that sends us addresses to ensure
+// that those addresses are externally reachable.
+func (s *server) updateAddresses(addrs []net.Addr) ([]net.Addr, []naming.Endpoint) {
+	var addedAddrs []net.Addr
+	var addedEPs []naming.Endpoint
+	for _, addr := range netstate.ConvertToAddresses(addrs) {
+		if !netstate.IsAccessibleIP(addr) {
+			continue
+		}
+		host := getHost(addr)
+		added := false
+		for ls, _ := range s.listenState {
+			if ls == nil || !ls.roaming {
+				continue
+			}
+			if i := findEndpoint(ls.ieps, host); i < 0 {
 				niep := ls.protoIEP
 				niep.Address = net.JoinHostPort(host, ls.port)
 				niep.IsMountTable = s.servesMountTable
 				niep.IsLeaf = s.isLeaf
-				ls.ieps = append(ls.ieps, &niep)
 				s.ctx.VI(2).Infof("rpc: dhcp adding: %s", niep)
+				ls.ieps = append(ls.ieps, &niep)
 				s.publisher.AddServer(niep.String())
-				added = append(added, &niep)
+				addedEPs = append(addedEPs, &niep)
+				added = true
 			}
 		}
+		if added {
+			addedAddrs = append(addedAddrs, addr)
+		}
 	}
-	return added
+	return addedAddrs, addedEPs
 }
 
 type leafDispatcher struct {