blob: d7863f9ef918925ace12f666cef1b5966a39399f [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Benjamin Prosnitz809246c2014-10-29 14:24:11 -07005package channel
6
7import (
8 "fmt"
9 "sync"
Todd Wang5ab03662015-02-19 21:03:01 -080010
Jiri Simsa1f1302c2015-02-23 16:18:34 -080011 "v.io/v23/vdl"
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070012)
13
Todd Wang5ab03662015-02-19 21:03:01 -080014type RequestHandler func(*vdl.Value) (*vdl.Value, error)
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070015
16type MessageSender func(Message)
17
18type Channel struct {
19 messageHandler MessageSender
20
Benjamin Prosnitz86d52282014-12-19 15:48:38 -080021 lastSeq uint32
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070022 handlers map[string]RequestHandler
Benjamin Prosnitz86d52282014-12-19 15:48:38 -080023 pendingResponses map[uint32]chan Response
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070024 lock sync.Mutex
25}
26
27func NewChannel(messageHandler MessageSender) *Channel {
28 return &Channel{
29 messageHandler: messageHandler,
30 handlers: map[string]RequestHandler{},
Benjamin Prosnitz86d52282014-12-19 15:48:38 -080031 pendingResponses: map[uint32]chan Response{},
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070032 }
33}
34
Todd Wang5ab03662015-02-19 21:03:01 -080035func (c *Channel) PerformRpc(typ string, body *vdl.Value) (*vdl.Value, error) {
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070036 c.lock.Lock()
37 c.lastSeq++
38 lastSeq := c.lastSeq
Todd Wangbb6afc02014-11-21 11:24:20 -080039 m := MessageRequest{Request{
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070040 Type: typ,
41 Seq: lastSeq,
42 Body: body,
Todd Wangbb6afc02014-11-21 11:24:20 -080043 }}
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070044 pending := make(chan Response, 1)
45 c.pendingResponses[lastSeq] = pending
46 c.lock.Unlock()
47
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070048 go c.messageHandler(m)
49 response := <-pending
50
51 c.lock.Lock()
52 delete(c.pendingResponses, lastSeq)
53 c.lock.Unlock()
54
55 if response.Err == "" {
56 return response.Body, nil
57 }
58 return response.Body, fmt.Errorf(response.Err)
59}
60
61func (c *Channel) RegisterRequestHandler(typ string, handler RequestHandler) {
62 c.lock.Lock()
63 c.handlers[typ] = handler
64 c.lock.Unlock()
65}
66
67func (c *Channel) handleRequest(req Request) {
68 // Call handler.
69 c.lock.Lock()
70 handler, ok := c.handlers[req.Type]
71 c.lock.Unlock()
72 if !ok {
73 panic(fmt.Errorf("Unknown handler: %s", req.Type))
74 }
75
76 result, err := handler(req.Body)
77 errMsg := ""
78 if err != nil {
79 errMsg = err.Error()
80 }
Todd Wangbb6afc02014-11-21 11:24:20 -080081 m := MessageResponse{Response{
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070082 ReqSeq: req.Seq,
83 Err: errMsg,
84 Body: result,
Todd Wangbb6afc02014-11-21 11:24:20 -080085 }}
Benjamin Prosnitz809246c2014-10-29 14:24:11 -070086 c.messageHandler(m)
87}
88
89func (c *Channel) handleResponse(resp Response) {
90 seq := resp.ReqSeq
91 c.lock.Lock()
92 pendingResponse, ok := c.pendingResponses[seq]
93 c.lock.Unlock()
94 if !ok {
95 panic("Received invalid response code")
96 }
97
98 pendingResponse <- resp
99}
100
Todd Wangbb6afc02014-11-21 11:24:20 -0800101func (c *Channel) HandleMessage(m Message) {
102 switch r := m.(type) {
Nicolas LaCassef74ec6a2014-12-22 10:22:52 -0800103 // Run the handlers in goroutines so we don't block the main thread.
104 // This is particularly important for the request handler, since it can
105 // potentially do a lot of work.
Todd Wangbb6afc02014-11-21 11:24:20 -0800106 case MessageRequest:
Nicolas LaCassef74ec6a2014-12-22 10:22:52 -0800107 go c.handleRequest(r.Value)
Todd Wangbb6afc02014-11-21 11:24:20 -0800108 case MessageResponse:
Nicolas LaCassef74ec6a2014-12-22 10:22:52 -0800109 go c.handleResponse(r.Value)
Benjamin Prosnitz809246c2014-10-29 14:24:11 -0700110 default:
Todd Wangbb6afc02014-11-21 11:24:20 -0800111 panic(fmt.Sprintf("Unknown message type: %T", m))
Benjamin Prosnitz809246c2014-10-29 14:24:11 -0700112 }
113}