Merge branch 'master' into advertiser
diff --git a/Makefile b/Makefile
index a53b715..ed28a23 100644
--- a/Makefile
+++ b/Makefile
@@ -20,6 +20,9 @@
BUILD_DIR := $(PWD)/gen/mojo/linux_amd64
endif
+# Add the discovery mojom output to the go path.
+export GOPATH := $(PWD)/../discovery/gen/go
+
# If this is not the first mojo shell, then you must reuse the devservers
# to avoid a "port in use" error.
ifneq ($(shell fuser 31841/tcp),)
@@ -120,6 +123,8 @@
# See https://github.com/domokit/mojo/issues/386
rm -f lib/gen/mojom/$(notdir $@)
+mojom/mojom/v23proxy.mojom: mojom/mojo/public/interfaces/bindings/mojom_types.mojom | mojo-env-check
+
gen/go/src/mojom/v23proxy/v23proxy.mojom.go: mojom/mojom/v23proxy.mojom | mojo-env-check
$(call MOJOM_GEN,$<,mojom,gen,go)
gofmt -w $@
diff --git a/go/src/v.io/x/mojo/proxy/proxy.go b/go/src/v.io/x/mojo/proxy/proxy.go
index f9df689..bcf27f1 100644
--- a/go/src/v.io/x/mojo/proxy/proxy.go
+++ b/go/src/v.io/x/mojo/proxy/proxy.go
@@ -15,6 +15,7 @@
"v.io/v23/vdl"
"mojom/v23proxy"
+ "mojom/vanadium/discovery"
"mojo/public/go/application"
"mojo/public/go/bindings"
@@ -219,11 +220,92 @@
}, security.AllowEveryone(), nil
}
+type advertiser struct {
+ mu sync.Mutex
+ advProxy *discovery.Advertiser_Proxy
+ endpoints []string
+ advertisementMap map[string]uint32
+}
+
+func (d *advertiser) Advertise(s Service, visibility []string) (*string, *string, error) {
+ mService := discovery.Service{
+ InstanceId: s.InstanceId,
+ InstanceName: s.InstanceName,
+ InterfaceName: s.InterfaceName,
+ Attrs: s.Attrs,
+ }
+ mService.Addrs = make([]string, len(d.endpoints))
+ for i, ep := range d.endpoints {
+ mService.Addrs[i] = ep + "//" + s.url
+ }
+ handle, instance, mErr, err := d.advProxy.Advertise(mService, visibility)
+ if err != nil {
+ return nil, nil, err
+ }
+ var errMessg *string
+ if mErr == nil {
+ d.mu.Lock()
+ d.advertisementMap[instance] = handle
+ d.mu.Unlock()
+ } else {
+
+ errMessg = &mErr.Msg
+ }
+ return &instance, errMessg, nil
+}
+
+func (d *advertiser) StopAdvertising(instance string) error {
+ d.mu.Lock()
+ id, found := d.advertisementMap[instance]
+ delete(d.advertisementMap, instance)
+ d.mu.Unlock()
+ if found {
+ d.advProxy.Stop(id)
+ }
+}
+
+type discoveryFactory struct {
+ appContext application.Context
+ eps []string
+ stubs []*bindings.Stub
+}
+
+func (df *discoveryFactory) Create(request v23proxy.Advertiser_Request) {
+ req, ptr := discovery.CreateMessagePipeForAdvertiser()
+ df.appContext.ConnectToApplication("https://mojo.v.io/discovery.mojo").ConnectToService(&req)
+ d := &advertiser{
+ endpoints: eps,
+ advProxy: discovery.NewAdvertiserProxy(ptr, bindings.GetAsyncWaiter()),
+ advertisementMap: map[string]uint32{},
+ }
+ advertiserStub := v23proxy.NewAdvertiserStub(request, d, bindings.GetAsyncWaiter())
+
+ df.stubs = append(df.stubs, advertiserStub)
+
+ go func() {
+ // Read header message
+ if err := advertiserStub.ServeRequest(); err != nil {
+ connectionError, ok := err.(*bindings.ConnectionError)
+ if !ok || !connectionError.Closed() {
+ delegate.ctx.Errorf("%v", err)
+ }
+ return
+ }
+ }()
+}
+
+func (df *discoveryFactory) quit() {
+ for _, stub := range df.stubs {
+ stub.Close()
+ }
+}
+
type delegate struct {
ctx *context.T
stubs []*bindings.Stub
shutdown v23.Shutdown
v23Server rpc.Server
+ df *discoveryFactory
}
func (delegate *delegate) Initialize(context application.Context) {
@@ -245,7 +327,15 @@
ctx.Fatal("Error serving service: ", err)
}
delegate.v23Server = s
- fmt.Println("Listening at:", s.Status().Endpoints[0].Name())
+ status := s.Status()
+ fmt.Println("Listening at:", status.Endpoints[0].Name())
+ delegate.df = &discoveryFactory{
+ appContext: context,
+ }
+ delegate.df.eps = make([]string, len(status.Endpoints))
+ for i, e := range status.Endpoints {
+ delegate.df.eps[i] = e.Name()
+ }
}
func (delegate *delegate) Create(request v23proxy.V23_Request) {
headerReceiver := &v23HeaderReceiver{delegate: delegate}
@@ -266,7 +356,7 @@
func (delegate *delegate) AcceptConnection(connection *application.Connection) {
delegate.ctx.Infof("delegate.AcceptConnection...")
- connection.ProvideServices(&v23proxy.V23_ServiceFactory{delegate})
+ connection.ProvideServices(&v23proxy.V23_ServiceFactory{delegate}, &v23proxy.Advertiser_ServiceFactory{delegate.df})
}
func (delegate *delegate) Quit() {
diff --git a/mojom/mojom/v23proxy.mojom b/mojom/mojom/v23proxy.mojom
index 53272b3..91ff8e0 100644
--- a/mojom/mojom/v23proxy.mojom
+++ b/mojom/mojom/v23proxy.mojom
@@ -30,3 +30,28 @@
// Endpoints gets the endpoints that the v23proxy serves at.
Endpoints() => (array<string> endpoints);
};
+
+interface Advertiser {
+ // Advertises the existence of a service.
+ Advertise(Service service, array<string> visibility) => (string? InstanceId, string? Error);
+
+ // Stops advertising a specific instanceId
+ StopAdvertising(string InstanceId);
+};
+
+// Mostly copied from v.io/v23/discovery/types.vdl
+struct Service {
+ // The universal unique identifier of a service instance.
+ // If this is not specified, a random 128 bit (16 byte) UUID will be used.
+ string? InstanceId;
+ // Optional name of the service instance.
+ string? InstanceName;
+ // The interface that the service implements.
+ // E.g., 'v.io/v23/services/vtrace.Store'.
+ string InterfaceName;
+ // The service attributes.
+ // E.g., {'resolution': '1024x768'}.
+ map<string, string>? Attrs;
+ // The url of the advertising mojo service
+ string url;
+};