| package internal |
| |
| import ( |
| "sync" |
| |
| "v.io/v23/context" |
| "v.io/v23/discovery" |
| "v.io/v23/verror" |
| idiscovery "v.io/x/ref/lib/discovery" |
| |
| "mojo/public/go/bindings" |
| mojom "mojom/vanadium/discovery" |
| ) |
| |
| // DiscoveryService implements the mojom interface mojom/vanadium/discovery.DiscoveryService. It |
| // is basically a thin wrapper around the Vanadium Discovery API. |
| type DiscoveryService struct { |
| ctx *context.T |
| s discovery.T |
| trigger *idiscovery.Trigger |
| |
| // mu protects pending* and next* |
| mu sync.Mutex |
| |
| // The id to assign the next advertisement. |
| nextAdv int32 |
| // A map of advertisement ids to the cancellation channel. When StopAdvertisement is |
| // called, the channel will be closed. |
| pendingAdvs map[int32]chan struct{} |
| // The id to assign to the next scan. |
| nextScan int32 |
| // A map of scan id to the cancellataion channel. When StopScan is called, the channel |
| // for that id will be closed. |
| pendingScans map[int32]chan struct{} |
| } |
| |
| func convertToErrorStruct(err error) *mojom.Error { |
| outErr := &mojom.Error{ |
| Id: "v.io/verror/Unknown", |
| Msg: err.Error(), |
| } |
| if e, ok := err.(verror.E); ok { |
| outErr.Id = string(e.ID) |
| outErr.Action = int32(e.Action) |
| } |
| return outErr |
| } |
| |
| // NewDiscoveryService returns a new DiscoveryService bound to the context and the Vanadium |
| // Discovery implementation passed in. |
| func NewDiscoveryService(ctx *context.T, vDiscovery discovery.T) *DiscoveryService { |
| return &DiscoveryService{ |
| ctx: ctx, |
| s: vDiscovery, |
| trigger: idiscovery.NewTrigger(), |
| pendingAdvs: map[int32]chan struct{}{}, |
| pendingScans: map[int32]chan struct{}{}, |
| } |
| } |
| |
| // Advertise advertises the mojom service passed only to the giveen blessing patterns. Returns the |
| // handle to this Advertise call. |
| func (d *DiscoveryService) Advertise(s mojom.Service, patterns []string) (int32, *mojom.Error, error) { |
| vService := discovery.Service{ |
| InstanceUuid: s.InstanceUuid, |
| InterfaceName: s.InterfaceName, |
| Attrs: discovery.Attributes(s.Attrs), |
| Addrs: s.Addrs, |
| } |
| |
| ctx, c := context.WithCancel(d.ctx) |
| |
| err := d.s.Advertise(ctx, vService, nil) |
| if err != nil { |
| return 0, convertToErrorStruct(err), nil |
| } |
| ch := make(chan struct{}) |
| d.mu.Lock() |
| id := d.nextAdv |
| d.pendingAdvs[id] = ch |
| d.nextAdv++ |
| d.mu.Unlock() |
| d.trigger.Add(c, ch) |
| return id, nil, nil |
| } |
| |
| // StopAdvertising stops advertising for the given advertising id. |
| func (d *DiscoveryService) StopAdvertising(handle int32) error { |
| d.mu.Lock() |
| ch := d.pendingAdvs[handle] |
| delete(d.pendingAdvs, handle) |
| d.mu.Unlock() |
| if ch != nil { |
| close(ch) |
| } |
| return nil |
| } |
| |
| func vServiceTomService(s discovery.Service) mojom.Service { |
| return mojom.Service{ |
| InstanceUuid: s.InstanceUuid, |
| InterfaceName: s.InterfaceName, |
| Attrs: s.Attrs, |
| Addrs: s.Addrs, |
| } |
| } |
| |
| // Scan scans for all services that match the query string passed in and calls scanHandler with updates. |
| // Returns the handle to this Scan. |
| func (d *DiscoveryService) Scan(query string, scanHandler mojom.ScanHandler_Pointer) (int32, *mojom.Error, error) { |
| ctx, c := context.WithCancel(d.ctx) |
| proxy := mojom.NewScanHandlerProxy(scanHandler, bindings.GetAsyncWaiter()) |
| scanCh, err := d.s.Scan(ctx, query) |
| if err != nil { |
| return 0, convertToErrorStruct(err), nil |
| } |
| ch := make(chan struct{}) |
| d.mu.Lock() |
| id := d.nextScan |
| d.pendingScans[id] = ch |
| d.nextScan++ |
| d.mu.Unlock() |
| |
| d.trigger.Add(c, ch) |
| go func() { |
| for v := range scanCh { |
| switch value := v.(type) { |
| case discovery.UpdateFound: |
| proxy.Found(vServiceTomService(value.Value.Service)) |
| case discovery.UpdateLost: |
| proxy.Lost(vServiceTomService(value.Value.Service)) |
| default: |
| } |
| } |
| }() |
| return id, nil, nil |
| } |
| |
| // SopScan Stops the scan. |
| func (d *DiscoveryService) StopScan(handle int32) error { |
| d.mu.Lock() |
| ch := d.pendingScans[handle] |
| delete(d.pendingScans, handle) |
| d.mu.Unlock() |
| if ch != nil { |
| close(ch) |
| } |
| return nil |
| } |
| |
| // Stop Stops all scans and advertisements. |
| func (d *DiscoveryService) Stop() { |
| d.mu.Lock() |
| for _, ch := range d.pendingScans { |
| close(ch) |
| } |
| |
| for _, ch := range d.pendingAdvs { |
| close(ch) |
| } |
| d.mu.Unlock() |
| } |