veyron/runtimes/google/ipc/stream/proxy: Proxy protocol changes.
(1) "Servers" will be proxied only if they explicitly ask to.
(The previous behavior was that any client connection would
result in its routing id being added to the proxy's routing
table).
(2) An explicit request<->response protocol is defined between
the server wishing to be proxied and the proxy.
These changes pave the way for upcoming changes to the proxy:
(a) Servers will be able to ask the proxy to mount the proxied endpoint
in a mounttable. This is required as the endpoint of the proxy as viewed by
the server can be different from the endpoint of the proxy as accessible
in a different network domain (the "externally accessible" endpoint).
For example, consider a proxy running on the home router.
Servers inside the home network would "export" themselves by
"Listening" on say ("veyron", "@2@tcp@192.168.1.1...)
however, their intent is to mount themselves with the external
IP address of the proxy.
The explicit request/response messages will be used to
enable this.
(b) Multiple connections for the same routing id to the proxy:
It is currently not possible for two network connections
from the same routing id to be proxied. This may be useful
if there are multiple network paths to the proxy (say
multiple network protocols). One of the reasons this has
been disallowed so far is to prevent denial of service (DoS)
attacks, wherein a malicious process registers the routing id
of a good server. However, with the changes in this commit,
it should be easy to allow a server to proxy itself over
multiple network connections as long as the identity of the
server in all the network connections is the same.
Change-Id: Ied6ab99bf26e5a7d0b6bce16d13a006a64a75b5a
diff --git a/runtimes/google/ipc/stream/doc.go b/runtimes/google/ipc/stream/doc.go
index b3401cb..0d6b8c0 100644
--- a/runtimes/google/ipc/stream/doc.go
+++ b/runtimes/google/ipc/stream/doc.go
@@ -8,7 +8,7 @@
// Package contents and dependencies are as follows:
//
// * manager provides a factory for veyron2/ipc/stream.Manager objects.
-// It depends on the vif package.
+// It depends on the vif and proxy packages.
// * vif implements a VIF type that wraps over a net.Conn
// and enables the creation of veyron2/ipc/stream.VC objects
// over the underlying network connection.
diff --git a/runtimes/google/ipc/stream/manager/listener.go b/runtimes/google/ipc/stream/manager/listener.go
index bd5f78c..33c81d7 100644
--- a/runtimes/google/ipc/stream/manager/listener.go
+++ b/runtimes/google/ipc/stream/manager/listener.go
@@ -8,6 +8,7 @@
"sync"
"time"
+ "veyron/runtimes/google/ipc/stream/proxy"
"veyron/runtimes/google/ipc/stream/vif"
"veyron/runtimes/google/lib/upcqueue"
@@ -15,6 +16,7 @@
"veyron2/naming"
"veyron2/verror"
"veyron2/vlog"
+ "veyron2/vom"
)
var errListenerIsClosed = errors.New("Listener has been Closed")
@@ -160,10 +162,9 @@
if err := vf.StartAccepting(ln.opts...); err != nil {
return nil, fmt.Errorf("already connected to proxy and accepting connections? VIF: %v, StartAccepting error: %v", vf, err)
}
- // Proxy protocol:
- // (1) Dial a VC to it (to include this processes' routing id in the proxy's routing table)
- // (2) Open a Flow and wait for it to die (which should happen only when the proxy is down)
- // For (1), need stream.VCOpt (identity etc.)
+ // Proxy protocol: See veyron/runtimes/google/ipc/stream/proxy/protocol.vdl
+ // Requires dialing a VC to the proxy, need to extract options (like the identity)
+ // from ln.opts to do so.
var dialOpts []stream.VCOpt
for _, opt := range ln.opts {
if dopt, ok := opt.(stream.VCOpt); ok {
@@ -183,6 +184,24 @@
vf.StopAccepting()
return nil, fmt.Errorf("unable to create liveness check flow to proxy: %v", err)
}
+ var request proxy.Request
+ var response proxy.Response
+ if err := vom.NewEncoder(flow).Encode(request); err != nil {
+ flow.Close()
+ vf.StopAccepting()
+ return nil, fmt.Errorf("failed to encode request to proxy: %v", err)
+ }
+ if err := vom.NewDecoder(flow).Decode(&response); err != nil {
+ flow.Close()
+ vf.StopAccepting()
+ return nil, fmt.Errorf("failed to decode response from proxy: %v", err)
+ }
+ if response.Error != nil {
+ flow.Close()
+ vf.StopAccepting()
+ return nil, fmt.Errorf("proxy error: %v", response.Error)
+ }
+
go func(vf *vif.VIF, flow stream.Flow) {
<-flow.Closed()
vf.StopAccepting()
diff --git a/runtimes/google/ipc/stream/proxy/http.go b/runtimes/google/ipc/stream/proxy/http.go
index cd5e33d..47f9fe2 100644
--- a/runtimes/google/ipc/stream/proxy/http.go
+++ b/runtimes/google/ipc/stream/proxy/http.go
@@ -17,18 +17,28 @@
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
write := func(s string) { w.Write([]byte(s)) }
+ servers := p.servers.List()
p.mu.RLock()
defer p.mu.RUnlock()
- write(fmt.Sprintf("Proxy with endpoint: %q and %d processes\n", p.Endpoint(), len(p.processes)))
+ write(fmt.Sprintf("Proxy with endpoint: %q. #Processes:%d #Servers:%d\n", p.Endpoint(), len(p.processes), len(servers)))
+ write("=========\n")
+ write("PROCESSES\n")
+ write("=========\n")
index := 1
- for rid, process := range p.processes {
- write(fmt.Sprintf("Process #%3d - RoutingID:%v [%v]", index, rid, process))
+ for process, _ := range p.processes {
+ write(fmt.Sprintf("(%d) - %v", index, process))
index++
process.mu.RLock()
- write(fmt.Sprintf(" NextVCI:%d\n", process.nextVCI))
+ write(fmt.Sprintf(" NextVCI:%d #Severs:%d\n", process.nextVCI, len(process.servers)))
for vci, d := range process.routingTable {
write(fmt.Sprintf(" VCI %4d --> VCI %4d @ %s\n", vci, d.VCI, d.Process))
}
process.mu.RUnlock()
}
+ write("=======\n")
+ write("SERVERS\n")
+ write("=======\n")
+ for ix, is := range servers {
+ write(fmt.Sprintf("(%d) %v\n", ix+1, is))
+ }
}
diff --git a/runtimes/google/ipc/stream/proxy/protocol.vdl b/runtimes/google/ipc/stream/proxy/protocol.vdl
new file mode 100644
index 0000000..48a73ab
--- /dev/null
+++ b/runtimes/google/ipc/stream/proxy/protocol.vdl
@@ -0,0 +1,26 @@
+package proxy
+
+// The proxy protocol is:
+// (1) Server establishes a VC to the proxy to register its routing id and authenticate.
+// (2) The server opens a flow and sends a "Request" message and waits for a "Response"
+// message.
+// (3) This flow is then kept alive with no more data read/written.
+// Closure of this flow indicates that proxying has (or should be) stopped.
+// (4) The proxy immediately closes any other flows on the VC.
+
+// Request is the message sent by a server to request that the proxy route
+// traffic intended for the server's RoutingID to the network connection
+// between the server and the proxy.
+type Request struct {
+ // TODO(ashankar): Things that will go in here include the mounttable and the name
+ // the server wants the proxy to use when mounting the server on the mounttable,
+ // possibly a blessing to give the proxy the credentials required to act as the
+ // server as far as the mounttable is concerned.
+}
+
+// Response is sent by the proxy to the server after processing Request.
+type Response struct {
+ // Error is a description of why the proxy refused to proxy the server.
+ // A nil error indicates that the proxy will route traffic to the server.
+ Error error
+}
\ No newline at end of file
diff --git a/runtimes/google/ipc/stream/proxy/protocol.vdl.go b/runtimes/google/ipc/stream/proxy/protocol.vdl.go
new file mode 100644
index 0000000..7bd2229
--- /dev/null
+++ b/runtimes/google/ipc/stream/proxy/protocol.vdl.go
@@ -0,0 +1,17 @@
+// This file was auto-generated by the veyron vdl tool.
+// Source: protocol.vdl
+
+package proxy
+
+// Request is the message sent by a server to request that the proxy route
+// traffic intended for the server's RoutingID to the network connection
+// between the server and the proxy.
+type Request struct {
+}
+
+// Response is sent by the proxy to the server after processing Request.
+type Response struct {
+ // Error is a description of why the proxy refused to proxy the server.
+ // A nil error indicates that the proxy will route traffic to the server.
+ Error error
+}
diff --git a/runtimes/google/ipc/stream/proxy/proxy.go b/runtimes/google/ipc/stream/proxy/proxy.go
index 67d16ee..e9ec40d 100644
--- a/runtimes/google/ipc/stream/proxy/proxy.go
+++ b/runtimes/google/ipc/stream/proxy/proxy.go
@@ -18,7 +18,9 @@
"veyron2"
"veyron2/naming"
"veyron2/security"
+ "veyron2/verror"
"veyron2/vlog"
+ "veyron2/vom"
)
var (
@@ -30,16 +32,16 @@
// Proxy routes virtual circuit (VC) traffic between multiple underlying
// network connections.
type Proxy struct {
- ln net.Listener
- rid naming.RoutingID
- id security.PrivateID
-
+ ln net.Listener
+ rid naming.RoutingID
+ id security.PrivateID
mu sync.RWMutex
- processes map[naming.RoutingID]*process
+ servers *servermap
+ processes map[*process]struct{}
}
// process encapsulates the physical network connection and the routing table
-// associated with a single peer process (identified by a naming.RoutingID).
+// associated with the process at the other end of the network connection.
type process struct {
Conn net.Conn
Queue *upcqueue.T
@@ -47,18 +49,86 @@
mu sync.RWMutex
routingTable map[id.VC]*destination
nextVCI id.VC
+ servers map[id.VC]*vc.VC // servers wishing to be proxied create a VC that terminates at the proxy
- // Some VCs terminate at the proxy process (for example, VCs initiated
- // by servers wishing to be proxied).
- proxyVCs map[id.VC]*vc.VC // VCs that terminate at this proxy
- BQ bqueue.T // Flow control for messages sent on behalf of proxyVCs.
+ BQ bqueue.T // Flow control for messages sent on behalf of servers.
}
+// destination is an entry in the routingtable of a process.
type destination struct {
VCI id.VC
Process *process
}
+// server encapsulates information stored about a server exporting itself via the proxy.
+type server struct {
+ Process *process
+ VC *vc.VC
+}
+
+func (s *server) RoutingID() naming.RoutingID { return s.VC.RemoteAddr().RoutingID() }
+func (s *server) Close(err error) {
+ if vc := s.Process.RemoveServerVC(s.VC.VCI()); vc != nil {
+ if err != nil {
+ vc.Close(err.Error())
+ } else {
+ vc.Close("server closed by proxy")
+ }
+ }
+ s.Process.SendCloseVC(s.VC.VCI(), err)
+}
+func (s *server) String() string {
+ return fmt.Sprintf("RoutingID %v on process %v (VCI:%v ID:%v)", s.RoutingID(), s.Process, s.VC.VCI(), s.VC.RemoteID())
+}
+
+// servermap is a concurrent-access safe map from the RoutingID of a server exporting itself
+// through the proxy to the underlying network connection that the server is found on.
+type servermap struct {
+ mu sync.Mutex
+ m map[naming.RoutingID]*server
+}
+
+func (m *servermap) Add(server *server) error {
+ key := server.RoutingID()
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if m.m[key] != nil {
+ return fmt.Errorf("server with routing id %v is already being proxied", key)
+ }
+ m.m[key] = server
+ proxyLog().Infof("Started proxying server: %v", server)
+ return nil
+}
+
+func (m *servermap) Remove(server *server) {
+ key := server.RoutingID()
+ m.mu.Lock()
+ if m.m[key] != nil {
+ delete(m.m, key)
+ proxyLog().Infof("Stopped proxying server: %v", server)
+ }
+ m.mu.Unlock()
+}
+
+func (m *servermap) Process(rid naming.RoutingID) *process {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if s := m.m[rid]; s != nil {
+ return s.Process
+ }
+ return nil
+}
+
+func (m *servermap) List() []string {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ ret := make([]string, 0, len(m.m))
+ for _, s := range m.m {
+ ret = append(ret, s.String())
+ }
+ return ret
+}
+
// New creates a new Proxy that listens for network connections on the provided
// (network, address) pair and routes VC traffic between accepted connections.
func New(rid naming.RoutingID, identity security.PrivateID, network, address string) (*Proxy, error) {
@@ -70,7 +140,8 @@
ln: ln,
rid: rid,
id: identity,
- processes: make(map[naming.RoutingID]*process),
+ servers: &servermap{m: make(map[naming.RoutingID]*server)},
+ processes: make(map[*process]struct{}),
}
go proxy.listenLoop()
return proxy, nil
@@ -88,13 +159,12 @@
Conn: conn,
Queue: upcqueue.New(),
routingTable: make(map[id.VC]*destination),
- proxyVCs: make(map[id.VC]*vc.VC),
+ servers: make(map[id.VC]*vc.VC),
BQ: drrqueue.New(vc.MaxPayloadSizeBytes),
}
go writeLoop(process)
+ go serverVCsLoop(process)
go p.readLoop(process)
- go proxyVCLoop(process)
- processLog().Infof("New network connection: %v", process)
}
}
@@ -116,14 +186,14 @@
}
}
-func proxyVCLoop(process *process) {
+func serverVCsLoop(process *process) {
for {
w, bufs, err := process.BQ.Get(nil)
if err != nil {
return
}
vci, fid := unpackIDs(w.ID())
- if vc := process.ProxyVC(vci); vc != nil {
+ if vc := process.ServerVC(vci); vc != nil {
queueDataMessages(bufs, vc, fid, process.Queue)
if len(bufs) == 0 {
m := &message.Data{VCI: vci, Flow: fid}
@@ -161,9 +231,24 @@
}
}
+func (p *Proxy) startProcess(process *process) {
+ p.mu.Lock()
+ p.processes[process] = struct{}{}
+ p.mu.Unlock()
+ processLog().Infof("Started process %v", process)
+}
+
+func (p *Proxy) stopProcess(process *process) {
+ process.Close()
+ p.mu.Lock()
+ delete(p.processes, process)
+ p.mu.Unlock()
+ processLog().Infof("Stopped process %v", process)
+}
+
func (p *Proxy) readLoop(process *process) {
- defer processLog().Infof("Exited readLoop for %v", process)
- defer process.Close()
+ p.startProcess(process)
+ defer p.stopProcess(process)
reader := iobuf.NewReader(iobuf.NewPool(0), process.Conn)
defer reader.Close()
@@ -177,7 +262,7 @@
msgLog().Infof("Received msg: %T = %v", msg, msg)
switch m := msg.(type) {
case *message.Data:
- if vc := process.ProxyVC(m.VCI); vc != nil {
+ if vc := process.ServerVC(m.VCI); vc != nil {
if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil {
processLog().Infof("Ignoring data message %v from process %v: %v", m, process, err)
}
@@ -197,7 +282,7 @@
}
process.SendCloseVC(srcVCI, errNoRoutingTableEntry)
case *message.OpenFlow:
- if vc := process.ProxyVC(m.VCI); vc != nil {
+ if vc := process.ServerVC(m.VCI); vc != nil {
if err := vc.AcceptFlow(m.Flow); err != nil {
processLog().Infof("OpenFlow %+v on process %v failed: %v", m, process, err)
cm := &message.Data{VCI: m.VCI, Flow: m.Flow}
@@ -218,7 +303,8 @@
}
process.SendCloseVC(srcVCI, errNoRoutingTableEntry)
case *message.CloseVC:
- if process.RemoveProxyVC(m.VCI, m.Error) {
+ if vc := process.RemoveServerVC(m.VCI); vc != nil {
+ vc.Close(m.Error)
break
}
srcVCI := m.VCI
@@ -231,55 +317,42 @@
case *message.AddReceiveBuffers:
p.routeCounters(process, m.Counters)
case *message.OpenVC:
- // Update processes map and routing tables if necessary.
- srcrid := m.SrcEndpoint.RoutingID()
dstrid := m.DstEndpoint.RoutingID()
- seenBefore, err := p.addProcess(srcrid, process)
- if !seenBefore {
- processLog().Infof("RoutingID %v associated with process %v", srcrid, process)
- process.mu.Lock()
- process.nextVCI = m.VCI + 1
- process.mu.Unlock()
- defer p.removeProcess(srcrid)
- }
- switch {
- case err != nil:
- process.SendCloseVC(m.VCI, err)
- case naming.Compare(dstrid, p.rid) || naming.Compare(dstrid, naming.NullRoutingID):
- if !process.AddProxyVC(m, p.id) {
- process.SendCloseVC(m.VCI, errDuplicateOpenVC)
- }
+ if naming.Compare(dstrid, p.rid) || naming.Compare(dstrid, naming.NullRoutingID) {
+ // VC that terminates at the proxy.
+ // See protocol.vdl for details on the protocol between the server and the proxy.
+ vc := process.NewServerVC(m)
+ // route counters after creating the VC so counters to vc are not lost.
p.routeCounters(process, m.Counters)
- default:
- // Find the process corresponding to dstrid
- p.mu.RLock()
- dstProcess := p.processes[dstrid]
- p.mu.RUnlock()
- if dstProcess == nil {
- process.SendCloseVC(m.VCI, fmt.Errorf("routing id %v is not connected to the proxy", dstrid))
- break
+ if vc != nil {
+ server := &server{Process: process, VC: vc}
+ go p.runServer(server, vc.HandshakeAcceptedVC(veyron2.LocalID(p.id)))
}
- // Update the routing tables
- srcVCI := m.VCI
- dstVCI := dstProcess.AllocVCI()
- startRoutingVC(srcVCI, dstVCI, process, dstProcess)
- // Forward the OpenVC message.
- // Typically, an OpenVC message is accompanied with Counters for
- // the new VC. Keep that in the forwarded message and route the
- // remaining counters separately.
- counters := m.Counters
-
- m.Counters = message.NewCounters()
- for cid, bytes := range counters {
- if cid.VCI() == m.VCI {
- m.Counters.Add(dstVCI, cid.Flow(), bytes)
- delete(counters, cid)
- }
- }
- m.VCI = dstVCI
- dstProcess.Queue.Put(m)
- p.routeCounters(process, counters)
+ break
}
+ dstprocess := p.servers.Process(dstrid)
+ if dstprocess == nil {
+ process.SendCloseVC(m.VCI, fmt.Errorf("no server with routing id %v is being proxied", dstrid))
+ p.routeCounters(process, m.Counters)
+ break
+ }
+ srcVCI := m.VCI
+ dstVCI := dstprocess.AllocVCI()
+ startRoutingVC(srcVCI, dstVCI, process, dstprocess)
+ // Forward the OpenVC message.
+ // Typically, an OpenVC message is accompanied with Counters for the new VC.
+ // Keep that in the forwarded message and route the remaining counters separately.
+ counters := m.Counters
+ m.Counters = message.NewCounters()
+ for cid, bytes := range counters {
+ if cid.VCI() == srcVCI {
+ m.Counters.Add(dstVCI, cid.Flow(), bytes)
+ delete(counters, cid)
+ }
+ }
+ m.VCI = dstVCI
+ dstprocess.Queue.Put(m)
+ p.routeCounters(process, counters)
default:
processLog().Infof("Closing %v because of unrecognized message %T", process, m)
return
@@ -287,42 +360,58 @@
}
}
-func (p *Proxy) addProcess(rid naming.RoutingID, process *process) (bool, error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- switch p.processes[rid] {
- case nil:
- // First time this rid was seen.
- p.processes[rid] = process
- return false, nil
- case process:
- // Same (rid, process) pair as before. Nothing to do.
- return true, nil
- default:
- // Conflicting (rid, process) pairs.
- // TODO(ashankar): This means that the first process to
- // advertise a particular srcrid gets into the routing
- // table. What's our security strategy vis-a-vis faking
- // routing ids?
- return true, fmt.Errorf("routing id %v is registered by another process at the proxy", rid)
+func (p *Proxy) runServer(server *server, c <-chan vc.HandshakeResult) {
+ hr := <-c
+ if hr.Error != nil {
+ server.Close(hr.Error)
+ return
}
-}
+ // See comments in protocol.vdl for the protocol between servers and the proxy.
+ conn, err := hr.Listener.Accept()
+ if err != nil {
+ server.Close(errors.New("failed to accept health check flow"))
+ return
+ }
+ defer server.Process.RemoveServerVC(server.VC.VCI())
+ defer server.VC.Close("stopped proxying server")
+ server.Process.InitVCI(server.VC.VCI())
-func (p *Proxy) removeProcess(rid naming.RoutingID) {
- p.mu.Lock()
- delete(p.processes, rid)
- p.mu.Unlock()
- processLog().Infof("Removed routing for %v", rid)
+ var request Request
+ var response Response
+ if err := vom.NewDecoder(conn).Decode(&request); err != nil {
+ response.Error = verror.BadProtocolf("proxy: unable to read Request: %v", err)
+ } else if err := p.servers.Add(server); err != nil {
+ response.Error = verror.Convert(err)
+ } else {
+ defer p.servers.Remove(server)
+ }
+ if err := vom.NewEncoder(conn).Encode(response); err != nil {
+ proxyLog().Infof("Failed to encode response %#v for server %v", response, server)
+ server.Close(err)
+ return
+ }
+ // Reject all other flows
+ go func() {
+ for {
+ flow, err := hr.Listener.Accept()
+ if err != nil {
+ return
+ }
+ flow.Close()
+ }
+ }()
+ // Wait for this flow to be closed.
+ <-conn.Closed()
}
func (p *Proxy) routeCounters(process *process, counters message.Counters) {
// Since each VC can be routed to a different process, split up the
// Counters into one message per VC.
// Ideally, would split into one message per process (rather than per
- // VC). This optimization is left an as excercise to the interested.
+ // flow). This optimization is left an as excercise to the interested.
for cid, bytes := range counters {
srcVCI := cid.VCI()
- if vc := process.proxyVCs[srcVCI]; vc != nil {
+ if vc := process.servers[srcVCI]; vc != nil {
vc.ReleaseCounters(cid.Flow(), bytes)
continue
}
@@ -354,7 +443,7 @@
p.ln.Close()
p.mu.Lock()
defer p.mu.Unlock()
- for _, process := range p.processes {
+ for process, _ := range p.processes {
process.Close()
}
}
@@ -376,6 +465,14 @@
p.mu.Unlock()
}
+func (p *process) InitVCI(vci id.VC) {
+ p.mu.Lock()
+ if p.nextVCI <= vci {
+ p.nextVCI = vci + 1
+ }
+ p.mu.Unlock()
+}
+
func (p *process) AllocVCI() id.VC {
p.mu.Lock()
ret := p.nextVCI
@@ -402,6 +499,9 @@
p.mu.Lock()
rt := p.routingTable
p.routingTable = nil
+ for _, vc := range p.servers {
+ vc.Close("net.Conn is closing")
+ }
p.mu.Unlock()
for _, d := range rt {
@@ -412,18 +512,18 @@
p.Conn.Close()
}
-func (p *process) ProxyVC(vci id.VC) *vc.VC {
+func (p *process) ServerVC(vci id.VC) *vc.VC {
p.mu.Lock()
defer p.mu.Unlock()
- return p.proxyVCs[vci]
+ return p.servers[vci]
}
-func (p *process) AddProxyVC(m *message.OpenVC, id security.PrivateID) bool {
+func (p *process) NewServerVC(m *message.OpenVC) *vc.VC {
p.mu.Lock()
defer p.mu.Unlock()
- if vc := p.proxyVCs[m.VCI]; vc != nil {
+ if vc := p.servers[m.VCI]; vc != nil {
vc.Close("duplicate OpenVC request")
- return false
+ return nil
}
vc := vc.InternalNew(vc.Params{
VCI: m.VCI,
@@ -433,55 +533,20 @@
ReserveBytes: message.HeaderSizeBytes,
Helper: p,
})
- p.proxyVCs[m.VCI] = vc
- proxyLog().Infof("Opening VC %v from process %v", vc, p)
- go p.handleProxyVC(vc, vc.HandshakeAcceptedVC(veyron2.LocalID(id)))
- return true
+ p.servers[m.VCI] = vc
+ proxyLog().Infof("Registered VC %v from server on process %v", vc, p)
+ return vc
}
-func (p *process) handleProxyVC(vc *vc.VC, c <-chan vc.HandshakeResult) {
- hr := <-c
- if hr.Error != nil {
- p.RemoveProxyVC(vc.VCI(), fmt.Sprintf("handshake failed: %v", hr.Error))
- p.SendCloseVC(vc.VCI(), hr.Error)
- return
- }
- // The proxy protocol is:
- // (1) Server establishes a VC to the proxy to setup routing table and
- // authenticate.
- // (2) The server opens a flow and waits for it to be closed. No data
- // is every read/written on the flow, the flow closure is an indication
- // of the connection to the proxy dying.
- //
- // The proxy expects this "health" flow to be the first one and rejects
- // any other flows on the VC.
- //
- // TODO(ashankar): Document this "protocol" once it is stable.
- _, err := hr.Listener.Accept()
- if err != nil {
- vc.Close("failed to accept health check flow")
- return
- }
- // Reject all other flows
- for {
- flow, err := hr.Listener.Accept()
- if err != nil {
- return
- }
- flow.Close()
- }
-}
-
-func (p *process) RemoveProxyVC(vci id.VC, reason string) bool {
+func (p *process) RemoveServerVC(vci id.VC) *vc.VC {
p.mu.Lock()
defer p.mu.Unlock()
- if vc := p.proxyVCs[vci]; vc != nil {
- proxyLog().Infof("Closing VC %v from process %v", vc, p)
- vc.Close(reason)
- delete(p.proxyVCs, vci)
- return true
+ if vc := p.servers[vci]; vc != nil {
+ delete(p.servers, vci)
+ proxyLog().Infof("Unregistered server VC %v from process %v", vc, p)
+ return vc
}
- return false
+ return nil
}
// Make process implement vc.Helper
diff --git a/runtimes/google/ipc/stream/proxy/proxy_test.go b/runtimes/google/ipc/stream/proxy/proxy_test.go
index d86676c..dacf405 100644
--- a/runtimes/google/ipc/stream/proxy/proxy_test.go
+++ b/runtimes/google/ipc/stream/proxy/proxy_test.go
@@ -40,7 +40,7 @@
}
defer ln1.Close()
- // Create the stream.Manager for a second server1.
+ // Create the stream.Manager for a second server.
server2 := manager.InternalNew(naming.FixedRoutingID(0x2222222222222222))
defer server2.Shutdown()
// Setup a stream.Listener that will accept VCs and Flows routed
@@ -104,7 +104,7 @@
defer ln1.Close()
ln2, ep2, err := server2.Listen(proxy.Endpoint().Network(), proxy.Endpoint().String())
- if pattern := "routing id 00000000000000005555555555555555 is registered by another process at the proxy"; err == nil || !strings.Contains(err.Error(), pattern) {
+ if pattern := "routing id 00000000000000005555555555555555 is already being proxied"; err == nil || !strings.Contains(err.Error(), pattern) {
t.Errorf("Got (%v, %v, %v) want error \"...%v\" (ep1:%v)", ln2, ep2, err, pattern, ep1)
}
}
diff --git a/runtimes/google/ipc/stream/vc/vc.go b/runtimes/google/ipc/stream/vc/vc.go
index 19f1ddb..f41ce67 100644
--- a/runtimes/google/ipc/stream/vc/vc.go
+++ b/runtimes/google/ipc/stream/vc/vc.go
@@ -291,7 +291,7 @@
// Close closes the VC and all flows on it, allowing any pending writes in the
// flow to drain.
func (vc *VC) Close(reason string) error {
- vlog.VI(1).Infof("Closing VCI %d. Reason:%q", vc.vci, reason)
+ vlog.VI(1).Infof("Closing VC %v. Reason:%q", vc, reason)
vc.mu.Lock()
flows := vc.flowMap
vc.flowMap = nil
@@ -304,7 +304,7 @@
vc.sharedCounters.Close()
for fid, flow := range flows {
- vlog.VI(2).Infof("Closing flow %d on VC %d as VC is being closed(%q)", fid, vc.vci, reason)
+ vlog.VI(2).Infof("Closing flow %d on VC %v as VC is being closed(%q)", fid, vc, reason)
flow.Close()
}
return nil