blob: 15ce7168845a9075eb4a731748f25a4fcf373509 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vango
import (
"bytes"
"fmt"
"io"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
libdiscovery "v.io/x/ref/lib/discovery"
"v.io/x/ref/lib/stats"
)
const (
rpcTimeout = 10 * time.Second
tcpServerName = "tmp/vango/tcp"
btServerName = "tmp/vango/bt"
interfaceName = "v.io/x/jni/impl/google/services/vango.EchoServer"
vangoStat = "vango"
)
var (
bleServerName = naming.Endpoint{Protocol: "ble"}.Name()
// vangoFuncs is a map containing go functions keys by unique strings
// intended to be run by java/android applications using Vango.run(key).
// Users must add function entries to this map and rebuild lib/android-lib in
// the vanadium java repository.
vangoFuncs = map[string]func(*context.T, io.Writer) error{
"tcp-client": tcpClientFunc,
"tcp-server": tcpServerFunc,
"bt-client": btClientFunc,
"bt-server": btServerFunc,
"ble-client": bleClientFunc,
"ble-server": bleServerFunc,
"all": AllFunc,
"btdiscovery": btAndDiscoveryFunc,
}
)
func tcpServerFunc(ctx *context.T, _ io.Writer) error {
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: "proxy"})
return runServer(ctx, tcpServerName)
}
func tcpClientFunc(ctx *context.T, _ io.Writer) error {
return runClient(ctx, tcpServerName)
}
func btServerFunc(ctx *context.T, _ io.Writer) error {
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "bt", Address: "/0"}}})
return runServer(ctx, btServerName)
}
func btClientFunc(ctx *context.T, _ io.Writer) error {
return runClient(ctx, btServerName)
}
func bleServerFunc(ctx *context.T, _ io.Writer) error {
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "ble", Address: "na"}}})
return runServer(ctx, "")
}
func bleClientFunc(ctx *context.T, _ io.Writer) error {
return runClient(ctx, bleServerName)
}
func btAndDiscoveryFunc(ctx *context.T, w io.Writer) error {
bothf := func(ctx *context.T, w io.Writer, format string, args ...interface{}) {
fmt.Fprintf(w, format, args...)
ctx.Infof(format, args...)
}
defer bothf(ctx, w, "finishing!")
dis, err := v23.NewDiscovery(ctx)
if err != nil {
bothf(ctx, w, "Can't create discovery %v", err)
return err
}
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "bt", Address: "/0"}}})
_, server, err := v23.WithNewServer(ctx, "", &echoServer{}, security.AllowEveryone())
if err != nil {
bothf(ctx, w, "Can't create server %v", err)
return err
}
ctx.Infof("Server listening on %v", server.Status().Endpoints)
ctx.Infof("Server listen errors: %v", server.Status().ListenErrors)
interfaces := []string{
"v.io/x/jni/impl/google/services/vango/Echo",
"v.io/x/jni/impl/google/services/vango/Echo2",
"v.io/x/jni/impl/google/services/vango/Echo3",
"v.io/x/jni/impl/google/services/vango/Echo4",
}
type adstate struct {
ad *discovery.Advertisement
stop func()
}
ads := []adstate{}
for _, name := range interfaces {
ad := &discovery.Advertisement{
InterfaceName: name,
Attributes: discovery.Attributes{
"one": "A value of some kind",
"two": "Yet another value",
"three": "More and more",
"four": "This is insane",
},
}
nctx, ncancel := context.WithCancel(ctx)
ch, err := libdiscovery.AdvertiseServer(nctx, dis, server, "", ad, nil)
if err != nil {
bothf(nctx, w, "Can't advertise server %v", err)
return err
}
stop := func() {
ncancel()
<-ch
}
ads = append(ads, adstate{ad, stop})
}
type updateState struct {
ch <-chan discovery.Update
stop func()
}
var updates []updateState
for _, name := range interfaces {
nctx, ncancel := context.WithCancel(ctx)
u, err := dis.Scan(nctx, `v.InterfaceName="`+name+`"`)
if err != nil {
bothf(nctx, w, "Can't scan %v", err)
return err
}
stop := func() {
ncancel()
}
updates = append(updates, updateState{u, stop})
}
for _, u := range updates[1:] {
go func(up updateState) {
for _ = range up.ch {
}
}(u)
}
makeopt := func(ad discovery.Advertisement) options.Preresolved {
me := &naming.MountEntry{
IsLeaf: true,
}
for _, a := range ad.Addresses {
addr, _ := naming.SplitAddressName(a)
me.Servers = append(me.Servers, naming.MountedServer{
Server: addr,
})
}
return options.Preresolved{Resolution: me}
}
alive := map[discovery.AdId]options.Preresolved{}
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
if len(alive) == 0 {
bothf(ctx, w, "No live connections to dial.")
}
for _, opt := range alive {
dialtime := options.ConnectionTimeout(5 * time.Second)
channeltime := options.ChannelTimeout(2 * time.Second)
data := make([]byte, 1024)
summary, err := runTimedCall(ctx, "A timed call.", string(data), opt, dialtime, channeltime)
if err != nil {
bothf(ctx, w, "failed call %s, %v, %v", summary, err, opt.Resolution.Servers)
} else {
bothf(ctx, w, "succeeded call: %s, %v", summary, opt.Resolution.Servers)
}
}
case u := <-updates[0].ch:
if u.IsLost() {
bothf(ctx, w, "lost %v", u.Addresses())
delete(alive, u.Id())
} else {
bothf(ctx, w, "found %v", u.Addresses())
alive[u.Id()] = makeopt(u.Advertisement())
}
}
}
}
// AllFunc runs a server, advertises it, scans for other servers and makes an
// Echo RPC to every advertised remote server.
func AllFunc(ctx *context.T, output io.Writer) error {
ls := rpc.ListenSpec{Proxy: "proxy"}
addRegisteredProto(&ls, "tcp", ":0")
addRegisteredProto(&ls, "bt", "/0")
fmt.Fprintf(output, "Listening on: %+v (and proxy)\n", ls.Addrs)
ctx, server, err := v23.WithNewServer(
v23.WithListenSpec(ctx, ls),
mountName(ctx, "all"),
&echoServer{},
security.AllowEveryone())
if err != nil {
return err
}
ad := &discovery.Advertisement{
InterfaceName: interfaceName,
Attributes: discovery.Attributes{
"Hello": "There",
},
}
d, err := v23.NewDiscovery(ctx)
if err != nil {
return err
}
stoppedAd, err := libdiscovery.AdvertiseServer(ctx, d, server, "", ad, nil)
if err != nil {
return err
}
updates, err := d.Scan(ctx, "v.InterfaceName=\""+interfaceName+"\"")
if err != nil {
return err
}
var (
status = server.Status()
counter = 0
peerByAdId = make(map[discovery.AdId]*peer)
lastCall = make(map[discovery.AdId]time.Time)
callResults = make(chan string)
activeCalls = 0
quit = false
myaddrs = serverAddrs(status)
ticker = time.NewTicker(time.Second)
call = func(p *peer) {
counter++
activeCalls++
lastCall[p.adId] = time.Now()
go func(msg string) {
summary, err := p.call(ctx, msg)
if err != nil {
ctx.Infof("Failed to call [%v]: %v", p.description, err)
callResults <- ""
return
}
callResults <- summary
}(fmt.Sprintf("Hello #%d", counter))
}
statRequest = make(chan chan<- string)
)
defer ticker.Stop()
stats.NewStringFunc(vangoStat, func() string {
r := make(chan string)
statRequest <- r
return <-r
})
defer stats.Delete(vangoStat)
fmt.Fprintln(output, "My AdID:", ad.Id)
fmt.Fprintln(output, "My addrs:", myaddrs)
ctx.Infof("SERVER STATUS: %+v", status)
for !quit {
select {
case <-ctx.Done():
quit = true
case <-status.Dirty:
status = server.Status()
newaddrs := serverAddrs(status)
changed := len(newaddrs) != len(myaddrs)
if !changed {
for i := range newaddrs {
if newaddrs[i] != myaddrs[i] {
changed = true
break
}
}
}
if changed {
myaddrs = newaddrs
fmt.Fprintln(output, "My addrs:", myaddrs)
}
ctx.Infof("SERVER STATUS: %+v", status)
case u, scanning := <-updates:
if !scanning {
fmt.Fprintln(output, "SCANNING STOPPED")
quit = true
break
}
if u.IsLost() {
if p, ok := peerByAdId[u.Id()]; ok {
fmt.Fprintln(output, "LOST:", p.description)
}
delete(peerByAdId, u.Id())
delete(lastCall, u.Id())
break
}
p, err := newPeer(ctx, u)
if err != nil {
ctx.Info(err)
break
}
peerByAdId[p.adId] = p
fmt.Fprintln(output, "FOUND:", p.description)
call(p)
case r := <-callResults:
activeCalls--
if len(r) > 0 {
fmt.Fprintln(output, r)
}
case <-stoppedAd:
fmt.Fprintln(output, "STOPPED ADVERTISING")
stoppedAd = nil
case <-ticker.C:
// Call all peers that haven't been called in a while
now := time.Now()
for id, t := range lastCall {
if now.Sub(t) > rpcTimeout {
call(peerByAdId[id])
}
}
case s := <-statRequest:
idx := 1
ret := new(bytes.Buffer)
fmt.Fprintln(ret, "ACTIVE CALLS:", activeCalls)
fmt.Fprintln(ret, "PEERS")
for id, p := range peerByAdId {
fmt.Fprintf(ret, "%2d) %s -- %v\n", idx, p.description, lastCall[id])
}
s <- ret.String()
}
}
fmt.Println(output, "EXITING: Cleaning up")
for activeCalls > 0 {
<-callResults
activeCalls--
}
// Exhaust the scanned updates queue.
// (The channel will be closed as a by-product of the context being Done).
for range updates {
}
fmt.Fprintln(output, "EXITING: Done")
return nil
}