ref: Change the shutdown mechanism for the new runtime.
Now we properly lame duck servers, and avoid error messages on clean
shutdowns..
MultiPart: 2/2
Change-Id: Ifb5e3080ff28a4496819c3ebdf2a8b0f7c1a02db
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index 22f0bf0..f020b7b 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -12,13 +12,22 @@
"v.io/x/ref/lib/apilog"
)
-// SetClient can be used to inject a mock client implementation into the context.
-func SetClient(ctx *context.T, client rpc.Client) *context.T {
+// SetClientFactory can be used to inject a mock Client implementation
+// into the context. When v23.WithNewClient is called passed function
+// will be invoked.
+func SetClientFactory(ctx *context.T, factory func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client) *context.T {
+ client := factory(ctx)
+ ctx = context.WithValue(ctx, clientFactoryKey, factory)
return context.WithValue(ctx, clientKey, client)
}
func (r *Runtime) WithNewClient(ctx *context.T, opts ...rpc.ClientOpt) (*context.T, rpc.Client, error) {
defer apilog.LogCallf(ctx, "opts...=%v", opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- panic("unimplemented")
+ factory, ok := ctx.Value(clientFactoryKey).(func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client)
+ if !ok {
+ panic("Calling WithNewClient on the fake runtime, but no factory has been set.")
+ }
+ client := factory(ctx, opts...)
+ return context.WithValue(ctx, clientKey, client), client, nil
}
func (r *Runtime) GetClient(ctx *context.T) rpc.Client {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
@@ -26,11 +35,6 @@
return c
}
-func (r *Runtime) WithNewStreamManager(ctx *context.T) (*context.T, error) {
- defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- panic("unimplemented")
-}
-
func (r *Runtime) GetListenSpec(ctx *context.T) rpc.ListenSpec {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
ls, _ := ctx.Value(listenSpecKey).(rpc.ListenSpec)
@@ -43,21 +47,28 @@
return ctx
}
-func SetFlowManager(ctx *context.T, manager flow.Manager) *context.T {
- return context.WithValue(ctx, flowManagerKey, manager)
-}
-
-func (r *Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
- defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- fm, _ := ctx.Value(flowManagerKey).(flow.Manager)
- return fm
-}
-
-func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+func (r *Runtime) WithNewStreamManager(ctx *context.T) (*context.T, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
panic("unimplemented")
}
+// SetFlowManagerFactory can be used to inject a mock FlowManager
+// implementation into the context. When v23.NewFlowManager is called
+// passed function will be invoked.
+func SetFlowManagerFactory(ctx *context.T, factory func(ctx *context.T) flow.Manager) *context.T {
+ return context.WithValue(ctx, flowFactoryKey, factory)
+}
+
+func (r *Runtime) NewFlowManager(ctx *context.T) (flow.Manager, error) {
+ defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+
+ factory, ok := ctx.Value(flowFactoryKey).(func(ctx *context.T) flow.Manager)
+ if !ok {
+ panic("Calling NewFlowManager on the fake runtime, but no factory has been set.")
+ }
+ return factory(ctx), nil
+}
+
func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
panic("unimplemented")
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index 464ab11..ef7e102 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -24,7 +24,9 @@
loggerKey
backgroundKey
listenSpecKey
- flowManagerKey
+
+ clientFactoryKey
+ flowFactoryKey
)
type Runtime struct {
diff --git a/runtime/internal/flow/conn/auth.go b/runtime/internal/flow/conn/auth.go
index 16b31d1..40337ec 100644
--- a/runtime/internal/flow/conn/auth.go
+++ b/runtime/internal/flow/conn/auth.go
@@ -14,7 +14,6 @@
"golang.org/x/crypto/nacl/box"
"v.io/v23"
"v.io/v23/context"
- "v.io/v23/flow"
"v.io/v23/flow/message"
"v.io/v23/rpc/version"
"v.io/v23/security"
@@ -33,8 +32,9 @@
if err != nil {
return err
}
+
bflow := c.newFlowLocked(ctx, blessingsFlowID, 0, 0, true, true)
- bflow.worker.Release(ctx, defaultBufferSize)
+ bflow.worker.Release(ctx, DefaultBytesBufferedPerFlow)
c.blessingsFlow = newBlessingsFlow(ctx, &c.loopWG, bflow, true)
if err = c.readRemoteAuth(ctx, authAcceptorTag, binding); err != nil {
@@ -217,6 +217,7 @@
type blessingsFlow struct {
enc *vom.Encoder
dec *vom.Decoder
+ f *flw
mu sync.Mutex
cond *sync.Cond
@@ -226,8 +227,9 @@
byBKey map[uint64]*Blessings
}
-func newBlessingsFlow(ctx *context.T, loopWG *sync.WaitGroup, f flow.Flow, dialed bool) *blessingsFlow {
+func newBlessingsFlow(ctx *context.T, loopWG *sync.WaitGroup, f *flw, dialed bool) *blessingsFlow {
b := &blessingsFlow{
+ f: f,
enc: vom.NewEncoder(f),
dec: vom.NewDecoder(f),
nextKey: 1,
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 422eb73..c64bc5b 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -95,14 +95,18 @@
func (fc *fakeDischargeClient) StartCall(*context.T, string, string, []interface{}, ...rpc.CallOpt) (rpc.ClientCall, error) {
return nil, nil
}
-func (fc *fakeDischargeClient) Close() {}
+func (fc *fakeDischargeClient) Close() {}
+func (fc *fakeDischargeClient) Closed() <-chan struct{} { return nil }
func TestUnidirectional(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
defer shutdown()
- ctx = fake.SetClient(ctx, &fakeDischargeClient{v23.GetPrincipal(ctx)})
+ ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
+ return &fakeDischargeClient{v23.GetPrincipal(ctx)}
+ })
+ ctx, _, _ = v23.WithNewClient(ctx)
dctx := NewPrincipalWithTPCaveat(t, ctx, "dialer")
actx := NewPrincipalWithTPCaveat(t, ctx, "acceptor")
@@ -136,7 +140,10 @@
ctx, shutdown := v23.Init()
defer shutdown()
- ctx = fake.SetClient(ctx, &fakeDischargeClient{v23.GetPrincipal(ctx)})
+ ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
+ return &fakeDischargeClient{v23.GetPrincipal(ctx)}
+ })
+ ctx, _, _ = v23.WithNewClient(ctx)
dctx := NewPrincipalWithTPCaveat(t, ctx, "dialer")
actx := NewPrincipalWithTPCaveat(t, ctx, "acceptor")
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 17475a5..846fce3 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -29,7 +29,7 @@
)
const mtu = 1 << 16
-const defaultBufferSize = 1 << 20
+const DefaultBytesBufferedPerFlow = 1 << 20
const (
expressPriority = iota
@@ -95,10 +95,11 @@
conn flow.MsgReadWriteCloser,
local, remote naming.Endpoint,
versions version.RPCVersionRange,
+ handshakeTimeout time.Duration,
handler FlowHandler,
events chan<- StatusUpdate) (*Conn, error) {
c := &Conn{
- fc: flowcontrol.New(defaultBufferSize, mtu),
+ fc: flowcontrol.New(DefaultBytesBufferedPerFlow, mtu),
mp: newMessagePipe(conn),
handler: handler,
lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
@@ -112,7 +113,14 @@
borrowing: map[uint64]bool{},
events: events,
}
- if err := c.dialHandshake(ctx, versions); err != nil {
+ // TODO(mattr): This scheme for deadlines is nice, but it doesn't
+ // provide for cancellation when ctx is canceled.
+ t := time.AfterFunc(handshakeTimeout, func() { conn.Close() })
+ err := c.dialHandshake(ctx, versions)
+ if stopped := t.Stop(); !stopped {
+ err = verror.NewErrTimeout(ctx)
+ }
+ if err != nil {
c.Close(ctx, err)
return nil, err
}
@@ -127,10 +135,11 @@
conn flow.MsgReadWriteCloser,
local naming.Endpoint,
versions version.RPCVersionRange,
+ handshakeTimeout time.Duration,
handler FlowHandler,
events chan<- StatusUpdate) (*Conn, error) {
c := &Conn{
- fc: flowcontrol.New(defaultBufferSize, mtu),
+ fc: flowcontrol.New(DefaultBytesBufferedPerFlow, mtu),
mp: newMessagePipe(conn),
handler: handler,
lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
@@ -143,7 +152,14 @@
borrowing: map[uint64]bool{},
events: events,
}
- if err := c.acceptHandshake(ctx, versions); err != nil {
+ // FIXME(mattr): This scheme for deadlines is nice, but it doesn't
+ // provide for cancellation when ctx is canceled.
+ t := time.AfterFunc(handshakeTimeout, func() { conn.Close() })
+ err := c.acceptHandshake(ctx, versions)
+ if stopped := t.Stop(); !stopped {
+ err = verror.NewErrTimeout(ctx)
+ }
+ if err != nil {
c.Close(ctx, err)
return nil, err
}
@@ -250,17 +266,28 @@
if err != nil {
msg = err.Error()
}
- cerr := c.fc.Run(ctx, "close", expressPriority, func(_ int) (int, bool, error) {
+ // We use a root context here to ensure that the message get's sent even if
+ // the context is canceled.
+ // TODO(mattr): Consider other options like a special mode in the flow controller
+ // that doesn't care about the context. Or perhaps 'stop' the flow controller
+ // and then just write the message directly.
+ rootctx, cancel := context.WithRootCancel(ctx)
+ cerr := c.fc.Run(rootctx, "close", expressPriority, func(_ int) (int, bool, error) {
return 0, true, c.mp.writeMsg(ctx, &message.TearDown{Message: msg})
})
if cerr != nil {
ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr)
}
+ cancel()
}
+ flowErr := NewErrConnectionClosed(ctx)
for _, f := range flows {
- f.close(ctx, NewErrConnectionClosed(ctx))
+ f.close(ctx, flowErr)
}
- if cerr := c.mp.close(); cerr != nil {
+ if c.blessingsFlow != nil {
+ c.blessingsFlow.f.close(ctx, flowErr)
+ }
+ if cerr := c.mp.rw.Close(); cerr != nil {
ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
}
c.loopWG.Wait()
@@ -282,9 +309,9 @@
c.toRelease[fid] += count
if c.borrowing[fid] {
c.toRelease[invalidFlowID] += count
- release = c.toRelease[invalidFlowID] > defaultBufferSize/2
+ release = c.toRelease[invalidFlowID] > DefaultBytesBufferedPerFlow/2
} else {
- release = c.toRelease[fid] > defaultBufferSize/2
+ release = c.toRelease[fid] > DefaultBytesBufferedPerFlow/2
}
if release {
toRelease = c.toRelease
@@ -358,7 +385,7 @@
handler := c.handler
f := c.newFlowLocked(ctx, msg.ID, msg.BlessingsKey, msg.DischargeKey, false, true)
f.worker.Release(ctx, int(msg.InitialCounters))
- c.toRelease[msg.ID] = defaultBufferSize
+ c.toRelease[msg.ID] = DefaultBytesBufferedPerFlow
c.borrowing[msg.ID] = true
c.mu.Unlock()
@@ -403,7 +430,8 @@
f := c.flows[msg.ID]
c.mu.Unlock()
if f == nil {
- ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.ID)
+ ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d",
+ c.remote, msg.ID)
return nil
}
if err := f.q.put(ctx, msg.Payload); err != nil {
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 6837486..ce1961b 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -27,7 +27,7 @@
func init() {
test.Init()
- randData = make([]byte, 2*defaultBufferSize)
+ randData = make([]byte, 2*DefaultBytesBufferedPerFlow)
if _, err := rand.Read(randData); err != nil {
panic("Could not read random data.")
}
@@ -100,14 +100,14 @@
q1, q2 := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
fh1, fh2 := fh(q1), fh(q2)
go func() {
- d, err := NewDialed(ctx, dmrw, ep, ep, versions, nil, nil)
+ d, err := NewDialed(ctx, dmrw, ep, ep, versions, time.Minute, nil, nil)
if err != nil {
panic(err)
}
dch <- d
}()
go func() {
- a, err := NewAccepted(ctx, amrw, ep, versions, fh1, nil)
+ a, err := NewAccepted(ctx, amrw, ep, versions, time.Minute, fh1, nil)
if err != nil {
panic(err)
}
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 04fdfcf..72929d1 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,6 +6,7 @@
import (
"strconv"
+ "time"
"v.io/v23/context"
"v.io/v23/flow"
@@ -42,7 +43,7 @@
dkey: dkey,
opened: preopen,
}
- f.SetContext(ctx)
+ f.ctx, f.cancel = context.WithCancel(ctx)
if !f.opened {
c.unopenedFlows.Add(1)
}
@@ -132,7 +133,7 @@
} else {
err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
ID: f.id,
- InitialCounters: defaultBufferSize,
+ InitialCounters: DefaultBytesBufferedPerFlow,
BlessingsKey: f.bkey,
DischargeKey: f.dkey,
Flags: d.Flags,
@@ -177,12 +178,16 @@
//
// TODO(mattr): update v23/flow documentation.
// SetContext may not be called concurrently with other methods.
-func (f *flw) SetContext(ctx *context.T) error {
+func (f *flw) SetDeadlineContext(ctx *context.T, deadline time.Time) *context.T {
if f.cancel != nil {
f.cancel()
}
- f.ctx, f.cancel = context.WithCancel(ctx)
- return nil
+ if !deadline.IsZero() {
+ f.ctx, f.cancel = context.WithDeadline(ctx, deadline)
+ } else {
+ f.ctx, f.cancel = context.WithCancel(ctx)
+ }
+ return f.ctx
}
// LocalBlessings returns the blessings presented by the local end of the flow
@@ -264,36 +269,38 @@
}
func (f *flw) close(ctx *context.T, err error) {
- f.q.close(ctx)
- f.cancel()
- // We want to try to send this message even if ctx is already canceled.
- ctx, cancel := context.WithRootCancel(ctx)
- serr := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
- f.conn.mu.Lock()
- delete(f.conn.flows, f.id)
- connClosed := f.conn.flows == nil
- f.conn.mu.Unlock()
+ if f.q.close(ctx) {
+ f.cancel()
+ // We want to try to send this message even if ctx is already canceled.
+ ctx, cancel := context.WithRootCancel(ctx)
+ serr := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
+ f.conn.mu.Lock()
+ delete(f.conn.flows, f.id)
+ connClosed := f.conn.flows == nil
+ f.conn.mu.Unlock()
- if !f.opened {
- // Closing a flow that was never opened.
- f.conn.unopenedFlows.Done()
- return 0, true, nil
- } else if eid := verror.ErrorID(err); eid == ErrFlowClosedRemotely.ID || connClosed {
- // Note: If the conn is closed there is no point in trying to send
- // the flow close message as it will fail. This is racy with the connection
- // closing, but there are no ill-effects other than spamming the logs a little
- // so it's OK.
- return 0, true, nil
- }
- return 0, true, f.conn.mp.writeMsg(ctx, &message.Data{
- ID: f.id,
- Flags: message.CloseFlag,
+ eid := verror.ErrorID(err)
+ if !f.opened {
+ // Closing a flow that was never opened.
+ f.conn.unopenedFlows.Done()
+ return 0, true, nil
+ } else if eid == ErrFlowClosedRemotely.ID || connClosed {
+ // Note: If the conn is closed there is no point in trying to send
+ // the flow close message as it will fail. This is racy with the connection
+ // closing, but there are no ill-effects other than spamming the logs a little
+ // so it's OK.
+ return 0, true, nil
+ }
+ return 0, true, f.conn.mp.writeMsg(ctx, &message.Data{
+ ID: f.id,
+ Flags: message.CloseFlag,
+ })
})
- })
- if serr != nil {
- ctx.Errorf("Could not send close flow message: %v", err)
+ if serr != nil {
+ ctx.Errorf("Could not send close flow message: %v", err)
+ }
+ cancel()
}
- cancel()
}
// Close marks the flow as closed. After Close is called, new data cannot be
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 9435354..cf51f8f 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -35,10 +35,6 @@
return p.cipher.ChannelBinding()
}
-func (p *messagePipe) close() error {
- return p.rw.Close()
-}
-
func (p *messagePipe) writeMsg(ctx *context.T, m message.Message) (err error) {
// TODO(mattr): Because of the API of the underlying crypto library,
// an enormous amount of copying happens here.
diff --git a/runtime/internal/flow/conn/readq.go b/runtime/internal/flow/conn/readq.go
index 54db134..89abc97 100644
--- a/runtime/internal/flow/conn/readq.go
+++ b/runtime/internal/flow/conn/readq.go
@@ -50,7 +50,7 @@
return nil
}
newSize := l + r.size
- if newSize > defaultBufferSize {
+ if newSize > DefaultBytesBufferedPerFlow {
return NewErrCounterOverflow(ctx)
}
newBufs := r.nbufs + len(bufs)
@@ -124,13 +124,16 @@
return
}
-func (r *readq) close(ctx *context.T) {
+func (r *readq) close(ctx *context.T) bool {
r.mu.Lock()
+ closed := false
if r.e != -1 {
+ closed = true
r.e = -1
close(r.notify)
}
r.mu.Unlock()
+ return closed
}
func (r *readq) reserveLocked(n int) {
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index b68247b..eeb7207 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -51,7 +51,7 @@
if dflows != nil {
handler = fh(dflows)
}
- d, err := NewDialed(dctx, dmrw, ep, ep, versions, handler, events)
+ d, err := NewDialed(dctx, dmrw, ep, ep, versions, time.Minute, handler, events)
if err != nil {
panic(err)
}
@@ -62,7 +62,7 @@
if aflows != nil {
handler = fh(aflows)
}
- a, err := NewAccepted(actx, amrw, ep, versions, handler, events)
+ a, err := NewAccepted(actx, amrw, ep, versions, time.Minute, handler, events)
if err != nil {
panic(err)
}
diff --git a/runtime/internal/flow/manager/conncache_test.go b/runtime/internal/flow/manager/conncache_test.go
index 4c31df8..4dcc990 100644
--- a/runtime/internal/flow/manager/conncache_test.go
+++ b/runtime/internal/flow/manager/conncache_test.go
@@ -7,13 +7,13 @@
import (
"strconv"
"testing"
+ "time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc/version"
-
connpackage "v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
inaming "v.io/x/ref/runtime/internal/naming"
@@ -296,7 +296,7 @@
ach := make(chan *connpackage.Conn)
go func() {
d, err := connpackage.NewDialed(ctx, dmrw, ep, ep,
- version.RPCVersionRange{Min: 1, Max: 5}, nil, nil)
+ version.RPCVersionRange{Min: 1, Max: 5}, time.Minute, nil, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -305,7 +305,7 @@
fh := fh{t, make(chan struct{})}
go func() {
a, err := connpackage.NewAccepted(ctx, amrw, ep,
- version.RPCVersionRange{Min: 1, Max: 5}, fh, nil)
+ version.RPCVersionRange{Min: 1, Max: 5}, time.Minute, fh, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index d8c8fee..0bc1ea2 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -20,7 +20,6 @@
"v.io/v23/security"
"v.io/v23/verror"
- iflow "v.io/x/ref/runtime/internal/flow"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
@@ -30,6 +29,7 @@
const (
reconnectDelay = 50 * time.Millisecond
reapCacheInterval = 5 * time.Minute
+ handshakeTimeout = time.Minute
)
type manager struct {
@@ -37,6 +37,7 @@
closed chan struct{}
cache *ConnCache
ls *listenState
+ ctx *context.T
}
func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
@@ -44,6 +45,7 @@
rid: rid,
closed: make(chan struct{}),
cache: NewConnCache(),
+ ctx: ctx,
}
var events chan conn.StatusUpdate
if rid != naming.NullRoutingID {
@@ -122,17 +124,13 @@
case <-m.closed:
case <-done:
}
+ // Now nobody should send any more flows, so close the queue.
+ m.ls.q.Close()
}
// Listen causes the Manager to accept flows from the provided protocol and address.
// Listen may be called muliple times.
-//
-// The flow.Manager associated with ctx must be the receiver of the method,
-// otherwise an error is returned.
func (m *manager) Listen(ctx *context.T, protocol, address string) error {
- if err := m.validateContext(ctx); err != nil {
- return err
- }
if m.ls == nil {
return NewErrListeningWithNullRid(ctx)
}
@@ -165,13 +163,7 @@
//
// update gets passed the complete set of endpoints for the proxy every time it
// is called.
-//
-// The flow.Manager associated with ctx must be the receiver of the method,
-// otherwise an error is returned.
func (m *manager) ProxyListen(ctx *context.T, ep naming.Endpoint, update func([]naming.Endpoint)) error {
- if err := m.validateContext(ctx); err != nil {
- return err
- }
if m.ls == nil {
return NewErrListeningWithNullRid(ctx)
}
@@ -263,16 +255,22 @@
flowConn, err = ln.Accept(ctx)
}
if err != nil {
- ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
+ m.ls.mu.Lock()
+ closed := m.ls.listeners == nil
+ m.ls.mu.Unlock()
+ if !closed {
+ ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
+ }
return
}
fh := &flowHandler{m, make(chan struct{})}
m.ls.activeConns.Add(1)
c, err := conn.NewAccepted(
- ctx,
+ m.ctx,
flowConn,
local,
version.Supported,
+ handshakeTimeout,
fh,
m.ls.events)
if err != nil {
@@ -311,6 +309,7 @@
f,
f.Conn().LocalEndpoint(),
version.Supported,
+ handshakeTimeout,
fh,
h.m.ls.events)
if err != nil {
@@ -355,13 +354,7 @@
// }
//
// can be used to accept Flows initiated by remote processes.
-//
-// The flow.Manager associated with ctx must be the receiver of the method,
-// otherwise an error is returned.
func (m *manager) Accept(ctx *context.T) (flow.Flow, error) {
- if err := m.validateContext(ctx); err != nil {
- return nil, err
- }
if m.ls == nil {
return nil, NewErrListeningWithNullRid(ctx)
}
@@ -381,13 +374,7 @@
//
// To maximize re-use of connections, the Manager will also Listen on Dialed
// connections for the lifetime of the connection.
-//
-// The flow.Manager associated with ctx must be the receiver of the method,
-// otherwise an error is returned.
func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, fn flow.BlessingsForPeer) (flow.Flow, error) {
- if err := m.validateContext(ctx); err != nil {
- return nil, err
- }
f, _, err := m.internalDial(ctx, remote, fn)
return f, err
}
@@ -443,11 +430,12 @@
m.ls.activeConns.Add(1)
}
c, err = conn.NewDialed(
- ctx,
+ m.ctx,
flowConn,
localEndpoint(flowConn, m.rid),
remote,
version.Supported,
+ handshakeTimeout,
fh,
events,
)
@@ -474,11 +462,12 @@
m.ls.activeConns.Add(1)
}
c, err = conn.NewDialed(
- ctx,
+ m.ctx,
f,
proxyConn.LocalEndpoint(),
remote,
version.Supported,
+ handshakeTimeout,
fh,
events)
if err != nil {
@@ -512,13 +501,6 @@
return m.closed
}
-func (m *manager) validateContext(ctx *context.T) error {
- if v23.ExperimentalGetFlowManager(ctx) != m {
- return flow.NewErrBadArg(ctx, iflow.NewErrWrongObjectInContext(ctx, "manager"))
- }
- return nil
-}
-
func dial(ctx *context.T, p flow.Protocol, protocol, address string) (flow.Conn, error) {
if p != nil {
var timeout time.Duration
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 4babdd0..c151586 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -14,8 +14,7 @@
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
-
- "v.io/x/ref/runtime/factories/fake"
+ _ "v.io/x/ref/runtime/factories/fake"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/flowtest"
"v.io/x/ref/test"
@@ -33,14 +32,12 @@
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
- actx := fake.SetFlowManager(ctx, am)
- if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
- dctx := fake.SetFlowManager(ctx, dm)
- testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
shutdown()
<-am.Closed()
@@ -52,26 +49,24 @@
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
- actx := fake.SetFlowManager(ctx, am)
- if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
- dctx := fake.SetFlowManager(ctx, dm)
// At first the cache should be empty.
if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
// After dialing a connection the cache should hold one connection.
- testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
old := dm.(*manager).cache.ridCache[am.RoutingID()]
// After dialing another connection the cache should still hold one connection
// because the connections should be reused.
- testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Errorf("got cache size %v, want %v", got, want)
}
@@ -89,16 +84,14 @@
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
- actx := fake.SetFlowManager(ctx, am)
- if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
- dctx := fake.SetFlowManager(ctx, dm)
- testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
// Now am should be able to make a flow to dm even though dm is not listening.
- testFlows(t, actx, dctx, flowtest.BlessingsForPeer)
+ testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
shutdown()
<-am.Closed()
@@ -110,20 +103,17 @@
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
- actx := fake.SetFlowManager(ctx, am)
- if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
nulldm := New(ctx, naming.NullRoutingID)
- nctx := fake.SetFlowManager(ctx, nulldm)
- _, af := testFlows(t, nctx, actx, flowtest.BlessingsForPeer)
+ _, af := testFlows(t, ctx, nulldm, am, flowtest.BlessingsForPeer)
// Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
t.Errorf("got %v, want zero-value blessings", rBlessings)
}
dm := New(ctx, naming.FixedRoutingID(0x1111))
- dctx := fake.SetFlowManager(ctx, dm)
- _, af = testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+ _, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
// Ensure that the remote blessings of the underlying conn of the accepted flow are
// non-zero if we did specify a RoutingID.
if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
@@ -141,19 +131,15 @@
ctx, shutdown := v23.Init()
am := New(ctx, naming.FixedRoutingID(0x5555))
- actx := fake.SetFlowManager(ctx, am)
- if err := am.Listen(actx, "tcp", "127.0.0.1:0"); err != nil {
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
-
dm := New(ctx, naming.FixedRoutingID(0x1111))
- dctx := fake.SetFlowManager(ctx, dm)
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
- testFlows(t, dctx, actx, flowtest.BlessingsForPeer)
+ am.StopListening(ctx)
- am.StopListening(actx)
-
- if f, err := dm.Dial(dctx, am.ListeningEndpoints()[0], flowtest.BlessingsForPeer); err == nil {
+ if f, err := dm.Dial(ctx, am.ListeningEndpoints()[0], flowtest.BlessingsForPeer); err == nil {
t.Errorf("dialing a lame duck should fail, but didn't %#v.", f)
}
@@ -162,17 +148,16 @@
<-dm.Closed()
}
-func testFlows(t *testing.T, dctx, actx *context.T, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
- am := v23.ExperimentalGetFlowManager(actx)
+func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
ep := am.ListeningEndpoints()[0]
var err error
- df, err = v23.ExperimentalGetFlowManager(dctx).Dial(dctx, ep, bFn)
+ df, err = dm.Dial(ctx, ep, bFn)
if err != nil {
t.Fatal(err)
}
want := "do you read me?"
writeLine(df, want)
- af, err = am.Accept(actx)
+ af, err = am.Accept(ctx)
if err != nil {
t.Fatal(err)
}
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index a7f427f..33db6e7 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -75,35 +75,24 @@
ns namespace.T
vcOpts []stream.VCOpt // vc opts passed to dial
preferredProtocols []string
+ vcCache *vc.VCCache
+ wg sync.WaitGroup
+ dc vc.DischargeClient
- // We cache the IP networks on the device since it is not that cheap to read
- // network interfaces through os syscall.
- // TODO(toddw): this can be removed since netstate now implements caching
- // directly.
- ipNets []*net.IPNet
-
- vcCache *vc.VCCache
-
- wg sync.WaitGroup
- mu sync.Mutex
- closed bool
-
- dc vc.DischargeClient
+ mu sync.Mutex
+ closed bool
+ closech chan struct{}
}
var _ rpc.Client = (*client)(nil)
-func DeprecatedNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func DeprecatedNewClient(streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) rpc.Client {
c := &client{
streamMgr: streamMgr,
ns: ns,
vcCache: vc.NewVCCache(),
+ closech: make(chan struct{}),
}
- ipNets, err := ipNetworks()
- if err != nil {
- return nil, err
- }
- c.ipNets = ipNets
c.dc = InternalNewDischargeClient(nil, c, 0)
for _, opt := range opts {
// Collect all client opts that are also vc opts.
@@ -114,8 +103,7 @@
c.preferredProtocols = v
}
}
-
- return c, nil
+ return c
}
func (c *client) createFlow(ctx *context.T, principal security.Principal, ep naming.Endpoint, vcOpts []stream.VCOpt) (stream.Flow, *verror.SubErr) {
@@ -446,7 +434,7 @@
return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
}
// An empty set of protocols means all protocols...
- if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
+ if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols); err != nil {
return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
}
}
@@ -752,7 +740,10 @@
func (c *client) Close() {
defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
c.mu.Lock()
- c.closed = true
+ if !c.closed {
+ c.closed = true
+ close(c.closech)
+ }
c.mu.Unlock()
for _, v := range c.vcCache.Close() {
c.streamMgr.ShutdownEndpoint(v.RemoteEndpoint())
@@ -760,6 +751,10 @@
c.wg.Wait()
}
+func (c *client) Closed() <-chan struct{} {
+ return c.closech
+}
+
// flowClient implements the RPC client-side protocol for a single RPC, over a
// flow that's already connected to the server.
type flowClient struct {
diff --git a/runtime/internal/rpc/errors.vdl b/runtime/internal/rpc/errors.vdl
index 8f24707..a57dafc 100644
--- a/runtime/internal/rpc/errors.vdl
+++ b/runtime/internal/rpc/errors.vdl
@@ -27,4 +27,7 @@
badAuth(suffix, method string, err error) {
"en": "not authorized to call {suffix}.{method}: {err}",
}
+ typeFlowFailure(err error) {
+ "en": "type flow could not be constructed{:err}",
+ }
)
diff --git a/runtime/internal/rpc/errors.vdl.go b/runtime/internal/rpc/errors.vdl.go
index 2c543d7..949ba3b 100644
--- a/runtime/internal/rpc/errors.vdl.go
+++ b/runtime/internal/rpc/errors.vdl.go
@@ -23,6 +23,7 @@
errBadBlessingsCache = verror.Register("v.io/x/ref/runtime/internal/rpc.badBlessingsCache", verror.NoRetry, "{1:}{2:} failed to find blessings in cache: {3}")
errBadDischarge = verror.Register("v.io/x/ref/runtime/internal/rpc.badDischarge", verror.NoRetry, "{1:}{2:} failed to decode discharge #{3}: {4}")
errBadAuth = verror.Register("v.io/x/ref/runtime/internal/rpc.badAuth", verror.NoRetry, "{1:}{2:} not authorized to call {3}.{4}: {5}")
+ errTypeFlowFailure = verror.Register("v.io/x/ref/runtime/internal/rpc.typeFlowFailure", verror.NoRetry, "{1:}{2:} type flow could not be constructed{:3}")
)
func init() {
@@ -33,6 +34,7 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(errBadBlessingsCache.ID), "{1:}{2:} failed to find blessings in cache: {3}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(errBadDischarge.ID), "{1:}{2:} failed to decode discharge #{3}: {4}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(errBadAuth.ID), "{1:}{2:} not authorized to call {3}.{4}: {5}")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(errTypeFlowFailure.ID), "{1:}{2:} type flow could not be constructed{:3}")
}
// newErrBadRequest returns an error with the errBadRequest ID.
@@ -69,3 +71,8 @@
func newErrBadAuth(ctx *context.T, suffix string, method string, err error) error {
return verror.New(errBadAuth, ctx, suffix, method, err)
}
+
+// newErrTypeFlowFailure returns an error with the errTypeFlowFailure ID.
+func newErrTypeFlowFailure(ctx *context.T, err error) error {
+ return verror.New(errTypeFlowFailure, ctx, err)
+}
diff --git a/runtime/internal/rpc/full_test.go b/runtime/internal/rpc/full_test.go
index 495c048..6dfe486 100644
--- a/runtime/internal/rpc/full_test.go
+++ b/runtime/internal/rpc/full_test.go
@@ -70,10 +70,7 @@
}
func testInternalNewServerWithPubsub(ctx *context.T, streamMgr stream.Manager, ns namespace.T, settingsPublisher *pubsub.Publisher, settingsStreamName string, opts ...rpc.ServerOpt) (DeprecatedServer, error) {
- client, err := DeprecatedNewClient(streamMgr, ns)
- if err != nil {
- return nil, err
- }
+ client := DeprecatedNewClient(streamMgr, ns)
return DeprecatedNewServer(ctx, streamMgr, ns, settingsPublisher, settingsStreamName, client, opts...)
}
@@ -366,10 +363,7 @@
if server != nil {
b.ep, b.server = startServerWS(t, ctx, server, b.sm, b.ns, b.name, testServerDisp{ts}, shouldUseWebsocket)
}
- var err error
- if b.client, err = DeprecatedNewClient(b.sm, b.ns); err != nil {
- t.Fatalf("DeprecatedNewClient failed: %v", err)
- }
+ b.client = DeprecatedNewClient(b.sm, b.ns)
return
}
@@ -502,10 +496,7 @@
if err := server.ServeDispatcher("mp/server", disp); err != nil {
t.Fatalf("server.Serve failed: %v", err)
}
- client, err := DeprecatedNewClient(sm, ns)
- if err != nil {
- t.Fatalf("DeprecatedNewClient failed: %v", err)
- }
+ client := DeprecatedNewClient(sm, ns)
// When using SecurityNone, all authorization checks should be skipped, so
// unauthorized methods should be callable.
var got string
@@ -535,10 +526,7 @@
if err := server.ServeDispatcher("mp/server", disp); err != nil {
t.Fatalf("server.Serve failed: %v", err)
}
- client, err := DeprecatedNewClient(sm, ns)
- if err != nil {
- t.Fatalf("DeprecatedNewClient failed: %v", err)
- }
+ client := DeprecatedNewClient(sm, ns)
// A call should fail if the principal in the ctx is nil and SecurityNone is not specified.
ctx, err = v23.WithPrincipal(ctx, nil)
@@ -583,12 +571,7 @@
runClient := func(server string) ([]string, error) {
smc := imanager.InternalNew(ctx, naming.FixedRoutingID(0xc))
defer smc.Shutdown()
- client, err := DeprecatedNewClient(
- smc,
- ns)
- if err != nil {
- return nil, err
- }
+ client := DeprecatedNewClient(smc, ns)
defer client.Close()
ctx, _ = v23.WithPrincipal(cctx, pclient)
call, err := client.StartCall(cctx, server, "Closure", nil)
@@ -653,10 +636,7 @@
}
smc := imanager.InternalNew(ctx, rid)
defer smc.Shutdown()
- client, err := DeprecatedNewClient(smc, ns)
- if err != nil {
- t.Fatalf("failed to create client: %v", err)
- }
+ client := DeprecatedNewClient(smc, ns)
defer client.Close()
var opts []rpc.CallOpt
if noDischarges {
@@ -718,10 +698,7 @@
}
sm := imanager.InternalNew(ctx, rid)
- c, err := DeprecatedNewClient(sm, ns)
- if err != nil {
- t.Fatalf("failed to create client: %v", err)
- }
+ c := DeprecatedNewClient(sm, ns)
dc := c.(*client).dc
tpcav2, err := security.NewPublicKeyCaveat(pdischarger2.PublicKey(), "mountpoint/discharger2", security.ThirdPartyRequirements{}, mkCaveat(security.NewExpiryCaveat(time.Now().Add(time.Hour))))
if err != nil {
@@ -768,11 +745,7 @@
}
smc := imanager.InternalNew(sctx, rid)
defer smc.Shutdown()
- client, err := DeprecatedNewClient(smc, ns)
- if err != nil {
- t.Fatalf("failed to create client: %v", err)
- }
- return client
+ return DeprecatedNewClient(smc, ns)
}
runClient := func(client rpc.Client) {
@@ -847,14 +820,12 @@
defer runServer(t, sctx, ns, mountName, &testServer{}).Shutdown()
smc := imanager.InternalNew(sctx, naming.FixedRoutingID(0xc))
- client, err := DeprecatedNewClient(smc, ns)
- if err != nil {
- t.Fatal(err)
- }
+ client := DeprecatedNewClient(smc, ns)
defer smc.Shutdown()
defer client.Close()
// The call should succeed when the server presents the same public as the opt...
+ var err error
if _, err = client.StartCall(cctx, mountName, "Closure", nil, options.SkipServerEndpointAuthorization{}, options.ServerPublicKey{
PublicKey: pserver.PublicKey(),
}); err != nil {
@@ -918,10 +889,7 @@
}
smc := imanager.InternalNew(ctx, rid)
defer smc.Shutdown()
- client, err := DeprecatedNewClient(smc, ns)
- if err != nil {
- t.Fatalf("failed to create client: %v", err)
- }
+ client := DeprecatedNewClient(smc, ns)
defer client.Close()
dc := InternalNewDischargeClient(ctx, client, 0)
diff --git a/runtime/internal/rpc/resolve_internal_test.go b/runtime/internal/rpc/resolve_internal_test.go
index 98e9dba..56ebd6f 100644
--- a/runtime/internal/rpc/resolve_internal_test.go
+++ b/runtime/internal/rpc/resolve_internal_test.go
@@ -5,13 +5,14 @@
package rpc
import (
+ "v.io/v23/context"
"v.io/v23/rpc"
"v.io/x/ref"
)
-func InternalServerResolveToEndpoint(s rpc.Server, name string) (string, error) {
+func InternalServerResolveToEndpoint(ctx *context.T, s rpc.Server, name string) (string, error) {
if ref.RPCTransitionState() == ref.XServers {
- ep, err := s.(*xserver).resolveToEndpoint(name)
+ ep, err := s.(*xserver).resolveToEndpoint(ctx, name)
if err != nil {
return "", err
}
diff --git a/runtime/internal/rpc/resolve_test.go b/runtime/internal/rpc/resolve_test.go
index be38047..455334b 100644
--- a/runtime/internal/rpc/resolve_test.go
+++ b/runtime/internal/rpc/resolve_test.go
@@ -141,7 +141,7 @@
{"unknown", "", notfound},
}
for _, tc := range testcases {
- result, err := irpc.InternalServerResolveToEndpoint(server, tc.address)
+ result, err := irpc.InternalServerResolveToEndpoint(ctx, server, tc.address)
if (err == nil) != (tc.err == nil) {
t.Errorf("Unexpected err for %q. Got %v, expected %v", tc.address, err, tc.err)
}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 846ac32..ed23e5c 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -185,7 +185,6 @@
// We cache the IP networks on the device since it is not that cheap to read
// network interfaces through os syscall.
// TODO(jhahn): Add monitoring the network interface changes.
- ipNets []*net.IPNet
ns namespace.T
servesMountTable bool
isLeaf bool
@@ -273,11 +272,6 @@
dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
securityLevel options.SecurityLevel
)
- ipNets, err := ipNetworks()
- if err != nil {
- return nil, err
- }
- s.ipNets = ipNets
for _, opt := range opts {
switch opt := opt.(type) {
@@ -399,7 +393,7 @@
}}
}
// An empty set of protocols means all protocols...
- if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
+ if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols); err != nil {
return "", err
}
for _, n := range resolved.Names() {
@@ -1023,6 +1017,10 @@
return nil
}
+func (s *server) Closed() <-chan struct{} {
+ return s.ctx.Done()
+}
+
// flowServer implements the RPC server-side protocol for a single RPC, over a
// flow that's already connected to the client.
type flowServer struct {
diff --git a/runtime/internal/rpc/sort_endpoints.go b/runtime/internal/rpc/sort_endpoints.go
index 3b5b896..c587027 100644
--- a/runtime/internal/rpc/sort_endpoints.go
+++ b/runtime/internal/rpc/sort_endpoints.go
@@ -82,7 +82,13 @@
// will be used, but unlike the previous case, any servers that don't support
// these protocols will be returned also, but following the default
// preferences.
-func filterAndOrderServers(servers []naming.MountedServer, protocols []string, ipnets []*net.IPNet) ([]naming.MountedServer, error) {
+func filterAndOrderServers(servers []naming.MountedServer, protocols []string, ipnets ...*net.IPNet) ([]naming.MountedServer, error) {
+ if ipnets == nil {
+ var err error
+ if ipnets, err = ipNetworks(); err != nil {
+ return nil, err
+ }
+ }
var (
errs = verror.SubErrs{}
list = make(sortableServerList, 0, len(servers))
@@ -94,6 +100,7 @@
adderr := func(name string, err error) {
errs = append(errs, verror.SubErr{Name: "server=" + name, Err: err, Options: verror.Print})
}
+
for _, server := range servers {
name := server.Server
ep, err := name2endpoint(name)
diff --git a/runtime/internal/rpc/sort_internal_test.go b/runtime/internal/rpc/sort_internal_test.go
index 7600d2b..59edbcd 100644
--- a/runtime/internal/rpc/sort_internal_test.go
+++ b/runtime/internal/rpc/sort_internal_test.go
@@ -21,7 +21,7 @@
func TestIncompatible(t *testing.T) {
servers := []naming.MountedServer{}
- _, err := filterAndOrderServers(servers, []string{"tcp"}, nil)
+ _, err := filterAndOrderServers(servers, []string{"tcp"})
if err == nil || err.Error() != "failed to find any compatible servers" {
t.Errorf("expected a different error: %v", err)
}
@@ -31,7 +31,7 @@
servers = append(servers, naming.MountedServer{Server: name})
}
- _, err = filterAndOrderServers(servers, []string{"foobar"}, nil)
+ _, err = filterAndOrderServers(servers, []string{"foobar"})
if err == nil || !strings.HasSuffix(err.Error(), "undesired protocol: tcp]") {
t.Errorf("expected a different error to: %v", err)
}
@@ -58,7 +58,7 @@
name := naming.JoinAddressName(naming.FormatEndpoint("tcp6", a), "")
servers = append(servers, naming.MountedServer{Server: name})
}
- if _, err := filterAndOrderServers(servers, []string{"batman"}, ipnets); err == nil {
+ if _, err := filterAndOrderServers(servers, []string{"batman"}, ipnets...); err == nil {
t.Fatalf("expected an error")
}
@@ -77,7 +77,7 @@
"/@6@tcp4@127.0.0.2@@@@@@",
"/127.0.0.12:14141",
}
- result, err := filterAndOrderServers(servers, []string{"foobar", "tcp4"}, ipnets)
+ result, err := filterAndOrderServers(servers, []string{"foobar", "tcp4"}, ipnets...)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -101,14 +101,14 @@
"/@6@foobar@127.0.0.11@@@@@@",
"/127.0.0.12:14141",
}
- if result, err = filterAndOrderServers(servers, nil, ipnets); err != nil {
+ if result, err = filterAndOrderServers(servers, nil, ipnets...); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if got := servers2names(result); !reflect.DeepEqual(got, want) {
t.Errorf("got: %v, want %v", got, want)
}
- if result, err = filterAndOrderServers(servers, []string{}, ipnets); err != nil {
+ if result, err = filterAndOrderServers(servers, []string{}, ipnets...); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if got := servers2names(result); !reflect.DeepEqual(got, want) {
@@ -125,7 +125,7 @@
"/@6@tcp6@127.0.0.8@@@@@@",
"/127.0.0.12:14141",
}
- if result, err = filterAndOrderServers(servers, []string{"tcp"}, ipnets); err != nil {
+ if result, err = filterAndOrderServers(servers, []string{"tcp"}, ipnets...); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if got := servers2names(result); !reflect.DeepEqual(got, want) {
@@ -154,7 +154,7 @@
name := naming.JoinAddressName(naming.FormatEndpoint("foobar", a), "")
servers = append(servers, naming.MountedServer{Server: name})
}
- if result, err = filterAndOrderServers(servers, []string{}, ipnets); err != nil {
+ if result, err = filterAndOrderServers(servers, []string{}, ipnets...); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if got := servers2names(result); !reflect.DeepEqual(got, want) {
@@ -171,7 +171,7 @@
name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
servers = append(servers, naming.MountedServer{Server: name})
}
- result, err := filterAndOrderServers(servers, []string{"tcp"}, ipnets)
+ result, err := filterAndOrderServers(servers, []string{"tcp"}, ipnets...)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -190,7 +190,7 @@
servers = append(servers, naming.MountedServer{Server: name})
}
- if result, err = filterAndOrderServers(servers, []string{"ws", "tcp"}, ipnets); err != nil {
+ if result, err = filterAndOrderServers(servers, []string{"ws", "tcp"}, ipnets...); err != nil {
t.Fatalf("unexpected error: %s", err)
}
want = []string{
diff --git a/runtime/internal/rpc/test/cancel_test.go b/runtime/internal/rpc/test/cancel_test.go
index b363a33..42ef4c2 100644
--- a/runtime/internal/rpc/test/cancel_test.go
+++ b/runtime/internal/rpc/test/cancel_test.go
@@ -13,6 +13,8 @@
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
+ "v.io/x/ref"
+ "v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
"v.io/x/ref/test"
)
@@ -27,14 +29,18 @@
func (c *canceld) Run(ctx *context.T, _ rpc.ServerCall) error {
close(c.started)
client := v23.GetClient(ctx)
- ctx.Infof("Run: %s", c.child)
+ var done chan struct{}
if c.child != "" {
- if _, err := client.StartCall(ctx, c.child, "Run", []interface{}{}); err != nil {
- ctx.Error(err)
- return err
- }
+ done = make(chan struct{})
+ go func() {
+ client.Call(ctx, c.child, "Run", nil, nil)
+ close(done)
+ }()
}
<-ctx.Done()
+ if done != nil {
+ <-done
+ }
close(c.canceled)
return nil
}
@@ -50,7 +56,6 @@
if err != nil {
return nil, err
}
- ctx.Infof("Serving: %q", name)
return c, nil
}
@@ -70,20 +75,18 @@
}
ctx, cancel := context.WithCancel(ctx)
- _, err = v23.GetClient(ctx).StartCall(ctx, "c1", "Run", []interface{}{})
- if err != nil {
- t.Fatalf("can't call: ", err)
- }
+ done := make(chan struct{})
+ go func() {
+ v23.GetClient(ctx).Call(ctx, "c1", "Run", nil, nil)
+ close(done)
+ }()
<-c1.started
<-c2.started
-
- ctx.Info("cancelling initial call")
cancel()
-
- ctx.Info("waiting for children to be canceled")
<-c1.canceled
<-c2.canceled
+ <-done
}
type cancelTestServer struct {
@@ -141,11 +144,13 @@
t.Fatal(err)
}
cctx, cancel := context.WithCancel(cctx)
- _, err = v23.GetClient(cctx).StartCall(cctx, "cancel", "CancelStreamReader", []interface{}{})
- if err != nil {
- t.Fatalf("Start call failed: %v", err)
- }
+ done := make(chan struct{})
+ go func() {
+ v23.GetClient(cctx).Call(cctx, "cancel", "CancelStreamReader", nil, nil)
+ close(done)
+ }()
waitForCancel(t, ts, cancel)
+ <-done
}
// TestCancelWithFullBuffers tests that even if the writer has filled the buffers and
@@ -163,16 +168,25 @@
t.Fatal(err)
}
cctx, cancel := context.WithCancel(cctx)
- call, err := v23.GetClient(cctx).StartCall(cctx, "cancel", "CancelStreamIgnorer", []interface{}{})
+ call, err := v23.GetClient(cctx).StartCall(cctx, "cancel", "CancelStreamIgnorer", nil)
if err != nil {
t.Fatalf("Start call failed: %v", err)
}
// Fill up all the write buffers to ensure that cancelling works even when the stream
// is blocked.
- // TODO(mattr): Update for new RPC system.
- call.Send(make([]byte, vc.MaxSharedBytes))
- call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
+ if ref.RPCTransitionState() >= ref.XServers {
+ call.Send(conn.DefaultBytesBufferedPerFlow)
+ } else {
+ call.Send(make([]byte, vc.MaxSharedBytes))
+ call.Send(make([]byte, vc.DefaultBytesBufferedPerFlow))
+ }
+ done := make(chan struct{})
+ go func() {
+ call.Finish()
+ close(done)
+ }()
waitForCancel(t, ts, cancel)
+ <-done
}
diff --git a/runtime/internal/rpc/transitionclient.go b/runtime/internal/rpc/transitionclient.go
index ade2e3a..2516a43 100644
--- a/runtime/internal/rpc/transitionclient.go
+++ b/runtime/internal/rpc/transitionclient.go
@@ -6,7 +6,6 @@
import (
"v.io/v23/context"
- "v.io/v23/flow"
"v.io/v23/flow/message"
"v.io/v23/namespace"
"v.io/v23/rpc"
@@ -15,22 +14,24 @@
)
type transitionClient struct {
- c, xc rpc.Client
+ c, xc rpc.Client
+ closech chan struct{}
}
var _ = rpc.Client((*transitionClient)(nil))
-func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, flowMgr flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
- var err error
- ret := &transitionClient{}
- if ret.xc, err = NewXClient(ctx, flowMgr, ns, opts...); err != nil {
- return nil, err
+func NewTransitionClient(ctx *context.T, streamMgr stream.Manager, ns namespace.T, opts ...rpc.ClientOpt) rpc.Client {
+ tc := &transitionClient{
+ xc: NewXClient(ctx, ns, opts...),
+ c: DeprecatedNewClient(streamMgr, ns, opts...),
+ closech: make(chan struct{}),
}
- if ret.c, err = DeprecatedNewClient(streamMgr, ns, opts...); err != nil {
- ret.xc.Close()
- return nil, err
- }
- return ret, nil
+ go func() {
+ <-tc.xc.Closed()
+ <-tc.c.Closed()
+ close(tc.closech)
+ }()
+ return tc
}
func (t *transitionClient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
@@ -50,6 +51,10 @@
}
func (t *transitionClient) Close() {
- t.xc.Close()
t.c.Close()
+ <-t.xc.Closed()
+}
+
+func (t *transitionClient) Closed() <-chan struct{} {
+ return t.closech
}
diff --git a/runtime/internal/rpc/typecache.go b/runtime/internal/rpc/typecache.go
index c734913..2fcb6bc 100644
--- a/runtime/internal/rpc/typecache.go
+++ b/runtime/internal/rpc/typecache.go
@@ -7,6 +7,7 @@
import (
"sync"
+ "v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/vom"
)
@@ -54,7 +55,7 @@
return
}
-func (tc *typeCache) get(c flow.ManagedConn) (*vom.TypeEncoder, *vom.TypeDecoder) {
+func (tc *typeCache) get(ctx *context.T, c flow.ManagedConn) (*vom.TypeEncoder, *vom.TypeDecoder, error) {
tc.mu.Lock()
tce := tc.flows[c]
if tce == nil {
@@ -62,8 +63,14 @@
tc.flows[c] = tce
}
tc.mu.Unlock()
- <-tce.ready
- return tce.enc, tce.dec
+ select {
+ case <-c.Closed():
+ return nil, nil, newErrTypeFlowFailure(ctx, nil)
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ case <-tce.ready:
+ }
+ return tce.enc, tce.dec, nil
}
func (tc *typeCache) collect() {
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 0c4bf18..5568371 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -9,6 +9,7 @@
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
@@ -26,21 +27,24 @@
func TestXClientServer(t *testing.T) {
ctx, shutdown := v23.Init()
defer shutdown()
- ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x1)))
+ var i uint64 = 1
+ ctx = fake.SetFlowManagerFactory(ctx, func(ctx *context.T) flow.Manager {
+ i++
+ return manager.New(ctx, naming.FixedRoutingID(i))
+ })
+ ctx = fake.SetClientFactory(ctx, func(ctx *context.T, o ...rpc.ClientOpt) rpc.Client {
+ return NewXClient(ctx, v23.GetNamespace(ctx), o...)
+ })
+
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
})
- _, err := NewServer(ctx, "server", &testService{}, nil, nil, "")
- if err != nil {
- t.Fatal(verror.DebugString(err))
- }
- ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x2)))
- client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
+ _, _, err := WithNewServer(ctx, "server", &testService{}, nil, nil, "")
if err != nil {
t.Fatal(verror.DebugString(err))
}
var result string
- if err = client.Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
+ if err = v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
t.Fatal(verror.DebugString(err))
}
if want := "response:hello"; result != want {
@@ -57,21 +61,24 @@
func TestXClientDispatchingServer(t *testing.T) {
ctx, shutdown := v23.Init()
defer shutdown()
- ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x1)))
+ var i uint64 = 1
+ ctx = fake.SetFlowManagerFactory(ctx, func(ctx *context.T) flow.Manager {
+ i++
+ return manager.New(ctx, naming.FixedRoutingID(i))
+ })
+ ctx = fake.SetClientFactory(ctx, func(ctx *context.T, o ...rpc.ClientOpt) rpc.Client {
+ return NewXClient(ctx, v23.GetNamespace(ctx), o...)
+ })
+
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
})
- _, err := NewDispatchingServer(ctx, "server", &testDispatcher{}, nil, "")
- if err != nil {
- t.Fatal(verror.DebugString(err))
- }
- ctx = fake.SetFlowManager(ctx, manager.New(ctx, naming.FixedRoutingID(0x2)))
- client, err := NewXClient(ctx, v23.ExperimentalGetFlowManager(ctx), v23.GetNamespace(ctx))
+ _, _, err := WithNewDispatchingServer(ctx, "server", &testDispatcher{}, nil, "")
if err != nil {
t.Fatal(verror.DebugString(err))
}
var result string
- if err = client.Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
+ if err = v23.GetClient(ctx).Call(ctx, "server", "Echo", []interface{}{"hello"}, []interface{}{&result}); err != nil {
t.Fatal(verror.DebugString(err))
}
if want := "response:hello"; result != want {
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 7e6c618..c96d14a 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -7,7 +7,6 @@
import (
"fmt"
"io"
- "net"
"sync"
"time"
@@ -27,6 +26,7 @@
"v.io/x/ref/lib/apilog"
slib "v.io/x/ref/lib/security"
+ "v.io/x/ref/runtime/internal/flow/manager"
inaming "v.io/x/ref/runtime/internal/naming"
)
@@ -37,16 +37,17 @@
dischargeBuffer = time.Minute
)
+type clientFlowManagerOpt struct {
+ mgr flow.Manager
+}
+
+func (clientFlowManagerOpt) RPCClientOpt() {}
+
type xclient struct {
flowMgr flow.Manager
ns namespace.T
preferredProtocols []string
-
- // We cache the IP networks on the device since it is not that cheap to read
- // network interfaces through os syscall.
- // TODO(toddw): this can be removed since netstate now implements caching
- // directly.
- ipNets []*net.IPNet
+ ctx *context.T
// typeCache maintains a cache of type encoders and decoders.
typeCache *typeCache
@@ -58,24 +59,37 @@
var _ rpc.Client = (*xclient)(nil)
-func NewXClient(ctx *context.T, fm flow.Manager, ns namespace.T, opts ...rpc.ClientOpt) (rpc.Client, error) {
+func NewXClient(ctx *context.T, ns namespace.T, opts ...rpc.ClientOpt) rpc.Client {
c := &xclient{
- flowMgr: fm,
+ ctx: ctx,
ns: ns,
typeCache: newTypeCache(),
}
- ipNets, err := ipNetworks()
- if err != nil {
- return nil, err
- }
- c.ipNets = ipNets
+
for _, opt := range opts {
switch v := opt.(type) {
case PreferredProtocols:
c.preferredProtocols = v
+ case clientFlowManagerOpt:
+ c.flowMgr = v.mgr
}
}
- return c, nil
+ if c.flowMgr == nil {
+ c.flowMgr = manager.New(ctx, naming.NullRoutingID)
+ }
+
+ go func() {
+ <-ctx.Done()
+ c.mu.Lock()
+ // TODO(mattr): Do we really need c.closed?
+ c.closed = true
+ c.mu.Unlock()
+
+ <-c.flowMgr.Closed()
+ c.wg.Wait()
+ }()
+
+ return c
}
func (c *xclient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
@@ -173,26 +187,43 @@
return
}
bfp := blessingsForPeer{auth, method, suffix, args}.run
- if status.flow, err = c.flowMgr.Dial(ctx, ep, bfp); err != nil {
+ flow, err := c.flowMgr.Dial(ctx, ep, bfp)
+ if err != nil {
ctx.VI(2).Infof("rpc: failed to create Flow with %v: %v", server, err)
status.serverErr = suberr(err)
return
}
- if write := c.typeCache.writer(status.flow.Conn()); write != nil {
- if tflow, err := c.flowMgr.Dial(ctx, ep, bfp); err != nil {
- ctx.VI(2).Infof("rpc: failed to create type Flow with %v: %v", server, err)
- status.serverErr = suberr(err)
+ if write := c.typeCache.writer(flow.Conn()); write != nil {
+ // Create a type flow, note that we use c.ctx instead of ctx.
+ // This is because type flows have a longer lifetime than the
+ // main flow being constructed.
+ tflow, err := c.flowMgr.Dial(c.ctx, ep, bfp)
+ if err != nil {
+ status.serverErr = suberr(newErrTypeFlowFailure(ctx, err))
+ flow.Close()
return
- } else if _, err = tflow.Write([]byte{typeFlow}); err != nil {
- ctx.VI(2).Infof("rpc: Failed to write type byte. %v: %v", server, err)
- tflow.Close()
- status.serverErr = suberr(err)
- return
- } else {
- write(tflow)
}
+ if tflow.Conn() != flow.Conn() {
+ status.serverErr = suberr(newErrTypeFlowFailure(ctx, nil))
+ flow.Close()
+ tflow.Close()
+ return
+ }
+ if _, err = tflow.Write([]byte{typeFlow}); err != nil {
+ status.serverErr = suberr(newErrTypeFlowFailure(ctx, nil))
+ flow.Close()
+ tflow.Close()
+ return
+ }
+ write(tflow)
}
- status.typeEnc, status.typeDec = c.typeCache.get(status.flow.Conn())
+ status.typeEnc, status.typeDec, err = c.typeCache.get(ctx, flow.Conn())
+ if err != nil {
+ status.serverErr = suberr(newErrTypeFlowFailure(ctx, err))
+ flow.Close()
+ return
+ }
+ status.flow = flow
}
type blessingsForPeer struct {
@@ -259,7 +290,7 @@
// This should never happen.
return nil, verror.NoRetry, true, verror.New(verror.ErrInternal, ctx, name)
}
- if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols, c.ipNets); err != nil {
+ if resolved.Servers, err = filterAndOrderServers(resolved.Servers, c.preferredProtocols); err != nil {
return nil, verror.RetryRefetch, true, verror.New(verror.ErrNoServers, ctx, name, err)
}
@@ -306,7 +337,6 @@
}
}
case <-ctx.Done():
- ctx.VI(2).Infof("rpc: timeout on connection to server %v ", name)
_, _, _, err := c.failedTryCall(ctx, name, method, responses, ch)
if verror.ErrorID(err) != verror.ErrTimeout.ID {
return nil, verror.NoRetry, false, verror.New(verror.ErrTimeout, ctx, err)
@@ -446,12 +476,11 @@
}
func (c *xclient) Close() {
- defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- c.mu.Lock()
- c.closed = true
- c.mu.Unlock()
- // TODO(toddw): Implement this!
- c.wg.Wait()
+ panic("this method is deprecated.")
+}
+
+func (c *xclient) Closed() <-chan struct{} {
+ return c.flowMgr.Closed()
}
// flowXClient implements the RPC client-side protocol for a single RPC, over a
@@ -705,6 +734,7 @@
berr := verror.New(id, fc.ctx, verror.New(errResponseDecoding, fc.ctx, verr))
return fc.close(berr)
}
+
// The response header must indicate the streaming results have ended.
if fc.response.Error == nil && !fc.response.EndStreamResults {
berr := verror.New(errRemainingStreamResults, fc.ctx)
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 7266ab0..f2bdbac 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -19,7 +19,6 @@
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/i18n"
- "v.io/v23/namespace"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
@@ -63,50 +62,46 @@
disp rpc.Dispatcher // dispatcher to serve RPCs
dispReserved rpc.Dispatcher // dispatcher for reserved methods
active sync.WaitGroup // active goroutines we've spawned.
- stoppedChan chan struct{} // closed when the server has been stopped.
preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
- // We cache the IP networks on the device since it is not that cheap to read
- // network interfaces through os syscall.
- // TODO(jhahn): Add monitoring the network interface changes.
- ipNets []*net.IPNet
- ns namespace.T
- servesMountTable bool
- isLeaf bool
+ servesMountTable bool
+ isLeaf bool
// TODO(cnicolaou): add roaming stats to rpcStats
stats *rpcStats // stats for this server.
}
-func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func WithNewServer(ctx *context.T,
+ name string, object interface{}, authorizer security.Authorizer,
+ settingsPublisher *pubsub.Publisher, settingsName string,
+ opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
if object == nil {
- return nil, verror.New(verror.ErrBadArg, ctx, "nil object")
+ return ctx, nil, verror.New(verror.ErrBadArg, ctx, "nil object")
}
invoker, err := objectToInvoker(object)
if err != nil {
- return nil, verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("bad object: %v", err))
+ return ctx, nil, verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("bad object: %v", err))
}
d := &leafDispatcher{invoker, authorizer}
opts = append([]rpc.ServerOpt{options.IsLeaf(true)}, opts...)
- return NewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
+ return WithNewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
}
-func NewDispatchingServer(ctx *context.T, name string, dispatcher rpc.Dispatcher, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func WithNewDispatchingServer(ctx *context.T,
+ name string, dispatcher rpc.Dispatcher,
+ settingsPublisher *pubsub.Publisher, settingsName string,
+ opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
if dispatcher == nil {
- return nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
+ return ctx, nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
}
- ctx, cancel := context.WithRootCancel(ctx)
- flowMgr := v23.ExperimentalGetFlowManager(ctx)
- ns, principal := v23.GetNamespace(ctx), v23.GetPrincipal(ctx)
- statsPrefix := naming.Join("rpc", "server", "routing-id", flowMgr.RoutingID().String())
+
+ rootCtx, cancel := context.WithRootCancel(ctx)
+ fm, err := v23.NewFlowManager(rootCtx)
+
+ statsPrefix := naming.Join("rpc", "server", "routing-id", fm.RoutingID().String())
s := &xserver{
- ctx: ctx,
cancel: cancel,
- flowMgr: flowMgr,
- principal: principal,
- blessings: principal.BlessingStore().Default(),
- publisher: publisher.New(ctx, ns, publishPeriod),
- stoppedChan: make(chan struct{}),
- ns: ns,
+ flowMgr: fm,
+ blessings: v23.GetPrincipal(rootCtx).BlessingStore().Default(),
stats: newRPCStats(statsPrefix),
settingsPublisher: settingsPublisher,
settingsName: settingsName,
@@ -114,12 +109,6 @@
typeCache: newTypeCache(),
proxyEndpoints: make(map[string]map[string]*inaming.Endpoint),
}
- ipNets, err := ipNetworks()
- if err != nil {
- return nil, err
- }
- s.ipNets = ipNets
-
for _, opt := range opts {
switch opt := opt.(type) {
case options.ServesMountTable:
@@ -132,19 +121,25 @@
s.preferredProtocols = []string(opt)
}
}
-
- blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
- // TODO(caprita): revist printing the blessings with %s, and
- // instead expose them as a list.
- stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings))
- stats.NewStringFunc(blessingsStatsName, func() string {
- return fmt.Sprintf("%s (default)", s.principal.BlessingStore().Default())
- })
- if err = s.listen(ctx, v23.GetListenSpec(ctx)); err != nil {
- s.Stop()
- return nil, err
+ rootCtx, _, err = v23.WithNewClient(rootCtx,
+ clientFlowManagerOpt{fm},
+ PreferredProtocols(s.preferredProtocols))
+ if err != nil {
+ cancel()
+ return ctx, nil, err
}
+ s.ctx = rootCtx
+ s.publisher = publisher.New(rootCtx, v23.GetNamespace(rootCtx), publishPeriod)
+
+ // TODO(caprita): revist printing the blessings with string, and
+ // instead expose them as a list.
+ blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
+ stats.NewString(blessingsStatsName).Set(s.blessings.String())
+
+ s.listen(rootCtx, v23.GetListenSpec(rootCtx))
if len(name) > 0 {
+ // TODO(mattr): We only call AddServer here, but if someone calls AddName
+ // later there will be no servers?
s.mu.Lock()
for k, _ := range s.chosenEndpoints {
s.publisher.AddServer(k)
@@ -153,7 +148,66 @@
s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
}
- return s, nil
+
+ go func() {
+ <-ctx.Done()
+ serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status())
+ s.ctx.VI(1).Infof("Stop: %s", serverDebug)
+ defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug)
+
+ s.stats.stop()
+ s.publisher.Stop()
+
+ done := make(chan struct{})
+ go func() {
+ s.flowMgr.StopListening(ctx)
+ s.publisher.WaitForStop()
+ // At this point no new flows should arrive. Wait for existing calls
+ // to complete.
+ s.active.Wait()
+ close(done)
+ }()
+
+ s.Lock()
+ // TODO(mattr): I don't understand what this is.
+ if dhcp := s.dhcpState; dhcp != nil {
+ // TODO(cnicolaou,caprita): investigate not having to close and drain
+ // the channel here. It's a little awkward right now since we have to
+ // be careful to not close the channel in two places, i.e. here and
+ // and from the publisher's Shutdown method.
+ if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
+ drain:
+ for {
+ select {
+ case v := <-dhcp.ch:
+ if v == nil {
+ break drain
+ }
+ default:
+ close(dhcp.ch)
+ break drain
+ }
+ }
+ }
+ }
+ s.Unlock()
+
+ select {
+ case <-done:
+ case <-time.After(5 * time.Second): // TODO(mattr): This should be configurable.
+ s.ctx.Errorf("%s: Timedout waiting for active requests to complete", serverDebug)
+ }
+ // Now we cancel the root context which closes all the connections
+ // in the flow manager and cancels all the contexts used by
+ // ongoing requests. Hopefully this will bring all outstanding
+ // operations to a close.
+ s.cancel()
+ <-s.flowMgr.Closed()
+ // Now we really will wait forever. If this doesn't exit, there's something
+ // wrong with the users code.
+ <-done
+ }()
+ return rootCtx, s, nil
}
func (s *xserver) Status() rpc.ServerStatus {
@@ -185,11 +239,12 @@
}
// resolveToEndpoint resolves an object name or address to an endpoint.
-func (s *xserver) resolveToEndpoint(address string) (naming.Endpoint, error) {
+func (s *xserver) resolveToEndpoint(ctx *context.T, address string) (naming.Endpoint, error) {
var resolved *naming.MountEntry
var err error
- if s.ns != nil {
- if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
+ // TODO(mattr): Why should ns be nil?
+ if ns := v23.GetNamespace(ctx); ns != nil {
+ if resolved, err = ns.Resolve(ctx, address); err != nil {
return nil, err
}
} else {
@@ -199,7 +254,7 @@
}}
}
// An empty set of protocols means all protocols...
- if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
+ if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols); err != nil {
return nil, err
}
for _, n := range resolved.Names() {
@@ -295,13 +350,14 @@
return ret
}
-func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) error {
+func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) {
s.Lock()
defer s.Unlock()
var lastErr error
+ var ep naming.Endpoint
if len(listenSpec.Proxy) > 0 {
- var ep naming.Endpoint
- if ep, lastErr = s.resolveToEndpoint(listenSpec.Proxy); lastErr != nil {
+ ep, lastErr = s.resolveToEndpoint(ctx, listenSpec.Proxy)
+ if lastErr != nil {
s.ctx.VI(2).Infof("resolveToEndpoint(%q) failed: %v", listenSpec.Proxy, lastErr)
} else {
lastErr = s.flowMgr.ProxyListen(ctx, ep, s.update(ep))
@@ -320,7 +376,6 @@
}
leps := s.flowMgr.ListeningEndpoints()
-
s.addressChooser = listenSpec.AddressChooser
roaming := false
chosenEps := make(map[string]*inaming.Endpoint)
@@ -346,7 +401,6 @@
s.active.Add(1)
go s.acceptLoop(ctx)
- return nil
}
func (s *xserver) acceptLoop(ctx *context.T) error {
@@ -389,12 +443,9 @@
}
}
case typeFlow:
- write := s.typeCache.writer(fl.Conn())
- if write == nil {
- s.ctx.VI(1).Infof("ignoring duplicate type flow.")
- return
+ if write := s.typeCache.writer(fl.Conn()); write != nil {
+ write(fl)
}
- write(fl)
}
}(fl)
}
@@ -422,105 +473,23 @@
func (s *xserver) Stop() error {
defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+ panic("unimplemented")
+}
- serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status())
- s.ctx.VI(1).Infof("Stop: %s", serverDebug)
- defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug)
-
- s.Lock()
- if s.disp == nil {
- s.Unlock()
- return nil
- }
- s.disp = nil
- close(s.stoppedChan)
- s.Unlock()
-
- // Delete the stats object.
- s.stats.stop()
-
- // Note, It's safe to Stop/WaitForStop on the publisher outside of the
- // server lock, since publisher is safe for concurrent access.
- // Stop the publisher, which triggers unmounting of published names.
- s.publisher.Stop()
-
- // Wait for the publisher to be done unmounting before we can proceed to
- // close the listeners (to minimize the number of mounted names pointing
- // to endpoint that are no longer serving).
- //
- // TODO(caprita): See if make sense to fail fast on rejecting
- // connections once listeners are closed, and parallelize the publisher
- // and listener shutdown.
- s.publisher.WaitForStop()
-
- s.Lock()
-
- // TODO(mattr): What should we do when we stop a server now? We need to
- // interrupt Accept at some point, but it's weird to stop the flowmanager.
- // Close all listeners. No new flows will be accepted, while in-flight
- // flows will continue until they terminate naturally.
- // nListeners := len(s.listeners)
- // errCh := make(chan error, nListeners)
- // for ln, _ := range s.listeners {
- // go func(ln stream.Listener) {
- // errCh <- ln.Close()
- // }(ln)
- // }
-
- if dhcp := s.dhcpState; dhcp != nil {
- // TODO(cnicolaou,caprita): investigate not having to close and drain
- // the channel here. It's a little awkward right now since we have to
- // be careful to not close the channel in two places, i.e. here and
- // and from the publisher's Shutdown method.
- if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
- drain:
- for {
- select {
- case v := <-dhcp.ch:
- if v == nil {
- break drain
- }
- default:
- close(dhcp.ch)
- break drain
- }
- }
- }
- }
-
- s.Unlock()
-
- // At this point, we are guaranteed that no new requests are going to be
- // accepted.
-
- // Wait for the publisher and active listener + flows to finish.
- done := make(chan struct{}, 1)
- go func() { s.active.Wait(); done <- struct{}{} }()
-
- select {
- case <-done:
- case <-time.After(5 * time.Second):
- s.ctx.Errorf("%s: Timedout waiting for goroutines to stop", serverDebug)
- // TODO(mattr): This doesn't make sense, shouldn't we not wait after timing out?
- <-done
- s.ctx.Infof("%s: Done waiting.", serverDebug)
- }
-
- s.cancel()
- return nil
+func (s *xserver) Closed() <-chan struct{} {
+ return s.ctx.Done()
}
// flowServer implements the RPC server-side protocol for a single RPC, over a
// flow that's already connected to the client.
type xflowServer struct {
- ctx *context.T // context associated with the RPC
server *xserver // rpc.Server that this flow server belongs to
disp rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow
- dec *vom.Decoder // to decode requests and args from the client
- enc *vom.Encoder // to encode responses and results to the client
flow flow.Flow // underlying flow
// Fields filled in during the server invocation.
+ dec *vom.Decoder // to decode requests and args from the client
+ enc *vom.Encoder // to encode responses and results to the client
grantedBlessings security.Blessings
method, suffix string
tags []*vdl.Value
@@ -535,17 +504,10 @@
)
func newXFlowServer(flow flow.Flow, server *xserver) (*xflowServer, error) {
- server.Lock()
- disp := server.disp
- server.Unlock()
- typeEnc, typeDec := server.typeCache.get(flow.Conn())
fs := &xflowServer{
- ctx: server.ctx,
server: server,
- disp: disp,
+ disp: server.disp,
flow: flow,
- enc: vom.NewEncoderWithTypeEncoder(flow, typeEnc),
- dec: vom.NewDecoderWithTypeDecoder(flow, typeDec),
discharges: make(map[string]security.Discharge),
}
return fs, nil
@@ -566,20 +528,19 @@
if fs.server.dispReserved != nil {
_, auth, _ = fs.server.dispReserved.Lookup(ctx, params.Suffix)
}
- return authorize(fs.ctx, security.NewCall(params), auth)
+ return authorize(ctx, security.NewCall(params), auth)
}
func (fs *xflowServer) serve() error {
defer fs.flow.Close()
- results, err := fs.processRequest()
-
- vtrace.GetSpan(fs.ctx).Finish()
+ ctx, results, err := fs.processRequest()
+ vtrace.GetSpan(ctx).Finish()
var traceResponse vtrace.Response
// Check if the caller is permitted to view vtrace data.
- if fs.authorizeVtrace(fs.ctx) == nil {
- traceResponse = vtrace.GetResponse(fs.ctx)
+ if fs.authorizeVtrace(ctx) == nil {
+ traceResponse = vtrace.GetResponse(ctx)
}
// Respond to the client with the response header and positional results.
@@ -593,7 +554,7 @@
if err == io.EOF {
return err
}
- return verror.New(errResponseEncoding, fs.ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
+ return verror.New(errResponseEncoding, ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
}
if response.Error != nil {
return response.Error
@@ -603,7 +564,7 @@
if err == io.EOF {
return err
}
- return verror.New(errResultEncoding, fs.ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
+ return verror.New(errResultEncoding, ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
}
}
// TODO(ashankar): Should unread data from the flow be drained?
@@ -622,69 +583,67 @@
return nil
}
-func (fs *xflowServer) readRPCRequest() (*rpc.Request, error) {
- // TODO(toddw): How do we set the initial timeout? It might be shorter than
- // the timeout we set later, which we learn after we've decoded the request.
- /*
- // Set a default timeout before reading from the flow. Without this timeout,
- // a client that sends no request or a partial request will retain the flow
- // indefinitely (and lock up server resources).
- initTimer := newTimer(defaultCallTimeout)
- defer initTimer.Stop()
- fs.flow.SetDeadline(initTimer.C)
- */
-
+func (fs *xflowServer) readRPCRequest(ctx *context.T) (*rpc.Request, error) {
// Decode the initial request.
var req rpc.Request
if err := fs.dec.Decode(&req); err != nil {
- return nil, verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadRequest(fs.ctx, err))
+ return nil, verror.New(verror.ErrBadProtocol, ctx, newErrBadRequest(ctx, err))
}
return &req, nil
}
-func (fs *xflowServer) processRequest() ([]interface{}, error) {
+func (fs *xflowServer) processRequest() (*context.T, []interface{}, error) {
fs.starttime = time.Now()
- req, err := fs.readRPCRequest()
+
+ // Set an initial deadline on the flow to ensure that we don't wait forever
+ // for the initial read.
+ ctx := fs.flow.SetDeadlineContext(fs.server.ctx, time.Now().Add(defaultCallTimeout))
+
+ typeEnc, typeDec, err := fs.server.typeCache.get(ctx, fs.flow.Conn())
+ if err != nil {
+ return ctx, nil, err
+ }
+ fs.enc = vom.NewEncoderWithTypeEncoder(fs.flow, typeEnc)
+ fs.dec = vom.NewDecoderWithTypeDecoder(fs.flow, typeDec)
+
+ req, err := fs.readRPCRequest(ctx)
if err != nil {
// We don't know what the rpc call was supposed to be, but we'll create
// a placeholder span so we can capture annotations.
- fs.ctx, _ = vtrace.WithNewSpan(fs.ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
- return nil, err
+ // TODO(mattr): I'm not sure this makes sense anymore, but I'll revisit it
+ // when I'm doing another round of vtrace next quarter.
+ ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
+ return ctx, nil, err
}
+
+ // Start building up a new context for the request now that we know
+ // the header information.
+ ctx = fs.server.ctx
+
// We must call fs.drainDecoderArgs for any error that occurs
// after this point, and before we actually decode the arguments.
fs.method = req.Method
fs.suffix = strings.TrimLeft(req.Suffix, "/")
-
if req.Language != "" {
- fs.ctx = i18n.WithLangID(fs.ctx, i18n.LangID(req.Language))
+ ctx = i18n.WithLangID(ctx, i18n.LangID(req.Language))
}
// TODO(mattr): Currently this allows users to trigger trace collection
// on the server even if they will not be allowed to collect the
// results later. This might be considered a DOS vector.
spanName := fmt.Sprintf("\"%s\".%s", fs.suffix, fs.method)
- fs.ctx, _ = vtrace.WithContinuedTrace(fs.ctx, spanName, req.TraceRequest)
+ ctx, _ = vtrace.WithContinuedTrace(ctx, spanName, req.TraceRequest)
+ ctx = fs.flow.SetDeadlineContext(ctx, req.Deadline.Time)
- var cancel context.CancelFunc
- if !req.Deadline.IsZero() {
- fs.ctx, cancel = context.WithDeadline(fs.ctx, req.Deadline.Time)
- } else {
- fs.ctx, cancel = context.WithCancel(fs.ctx)
- }
- fs.flow.SetContext(fs.ctx)
- // TODO(toddw): Explicitly cancel the context when the flow is done.
- _ = cancel
-
- if err := fs.readGrantedBlessings(req); err != nil {
+ if err := fs.readGrantedBlessings(ctx, req); err != nil {
fs.drainDecoderArgs(int(req.NumPosArgs))
- return nil, err
+ return ctx, nil, err
}
// Lookup the invoker.
- invoker, auth, err := fs.lookup(fs.suffix, fs.method)
+ invoker, auth, err := fs.lookup(ctx, fs.suffix, fs.method)
if err != nil {
fs.drainDecoderArgs(int(req.NumPosArgs))
- return nil, err
+ return ctx, nil, err
}
// Note that we strip the reserved prefix when calling the invoker so
@@ -694,31 +653,31 @@
// Prepare invoker and decode args.
numArgs := int(req.NumPosArgs)
- argptrs, tags, err := invoker.Prepare(fs.ctx, strippedMethod, numArgs)
+ argptrs, tags, err := invoker.Prepare(ctx, strippedMethod, numArgs)
fs.tags = tags
if err != nil {
fs.drainDecoderArgs(numArgs)
- return nil, err
+ return ctx, nil, err
}
if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
fs.drainDecoderArgs(numArgs)
- return nil, newErrBadNumInputArgs(fs.ctx, fs.suffix, fs.method, called, want)
+ return ctx, nil, newErrBadNumInputArgs(ctx, fs.suffix, fs.method, called, want)
}
for ix, argptr := range argptrs {
if err := fs.dec.Decode(argptr); err != nil {
- return nil, newErrBadInputArg(fs.ctx, fs.suffix, fs.method, uint64(ix), err)
+ return ctx, nil, newErrBadInputArg(ctx, fs.suffix, fs.method, uint64(ix), err)
}
}
// Check application's authorization policy.
- if err := authorize(fs.ctx, fs, auth); err != nil {
- return nil, err
+ if err := authorize(ctx, fs, auth); err != nil {
+ return ctx, nil, err
}
// Invoke the method.
- results, err := invoker.Invoke(fs.ctx, fs, strippedMethod, argptrs)
+ results, err := invoker.Invoke(ctx, fs, strippedMethod, argptrs)
fs.server.stats.record(fs.method, time.Since(fs.starttime))
- return results, err
+ return ctx, results, err
}
// drainDecoderArgs drains the next n arguments encoded onto the flows decoder.
@@ -741,7 +700,7 @@
// with rpc.DebugKeyword, we use the internal debug dispatcher to look up the
// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
// value may be modified to match the actual suffix and method to use.
-func (fs *xflowServer) lookup(suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
+func (fs *xflowServer) lookup(ctx *context.T, suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
if naming.IsReserved(method) {
return reservedInvoker(fs.disp, fs.server.dispReserved), security.AllowEveryone(), nil
}
@@ -749,26 +708,26 @@
if naming.IsReserved(suffix) {
disp = fs.server.dispReserved
} else if fs.server.isLeaf && suffix != "" {
- innerErr := verror.New(errUnexpectedSuffix, fs.ctx, suffix)
- return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix, innerErr)
+ innerErr := verror.New(errUnexpectedSuffix, ctx, suffix)
+ return nil, nil, verror.New(verror.ErrUnknownSuffix, ctx, suffix, innerErr)
}
if disp != nil {
- obj, auth, err := disp.Lookup(fs.ctx, suffix)
+ obj, auth, err := disp.Lookup(ctx, suffix)
switch {
case err != nil:
return nil, nil, err
case obj != nil:
invoker, err := objectToInvoker(obj)
if err != nil {
- return nil, nil, verror.New(verror.ErrInternal, fs.ctx, "invalid received object", err)
+ return nil, nil, verror.New(verror.ErrInternal, ctx, "invalid received object", err)
}
return invoker, auth, nil
}
}
- return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix)
+ return nil, nil, verror.New(verror.ErrUnknownSuffix, ctx, suffix)
}
-func (fs *xflowServer) readGrantedBlessings(req *rpc.Request) error {
+func (fs *xflowServer) readGrantedBlessings(ctx *context.T, req *rpc.Request) error {
if req.GrantedBlessings.IsZero() {
return nil
}
@@ -781,7 +740,8 @@
// this - should servers be able to assume that a blessing is something that
// does not have the authorizations that the server's own identity has?
if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
- return verror.New(verror.ErrNoAccess, fs.ctx, verror.New(errBlessingsNotBound, fs.ctx, got, want))
+ return verror.New(verror.ErrNoAccess, ctx,
+ verror.New(errBlessingsNotBound, ctx, got, want))
}
fs.grantedBlessings = req.GrantedBlessings
return nil
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 77f07f9..fd3f075 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -54,7 +54,6 @@
backgroundKey
reservedNameKey
listenKey
- flowManagerKey
// initKey is used to store values that are only set at init time.
initKey
@@ -169,12 +168,6 @@
return nil, nil, nil, err
}
- // Add the flow.Manager to the context.
- // This initial Flow Manager can only be used as a client.
- ctx, _, err = r.setNewClientFlowManager(ctx)
- if err != nil {
- return nil, nil, nil, err
- }
// Add the Client to the context.
ctx, _, err = r.WithNewClient(ctx)
if err != nil {
@@ -306,27 +299,6 @@
return sm, nil
}
-func (r *Runtime) setNewClientFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
- return r.setNewFlowManager(ctx, naming.NullRoutingID)
-}
-
-func (r *Runtime) setNewBidiFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
- rid, err := naming.NewRoutingID()
- if err != nil {
- return nil, nil, err
- }
- return r.setNewFlowManager(ctx, rid)
-}
-
-func (r *Runtime) setNewFlowManager(ctx *context.T, rid naming.RoutingID) (*context.T, flow.Manager, error) {
- fm := manager.New(ctx, rid)
- if err := r.addChild(ctx, fm, func() { <-fm.Closed() }); err != nil {
- return ctx, nil, err
- }
- newctx := context.WithValue(ctx, flowManagerKey, fm)
- return newctx, fm, nil
-}
-
func (r *Runtime) setNewStreamManager(ctx *context.T) (*context.T, error) {
sm, err := newStreamManager(ctx)
if err != nil {
@@ -381,14 +353,6 @@
if newctx, err = r.setNewStreamManager(newctx); err != nil {
return ctx, err
}
- if rid := r.ExperimentalGetFlowManager(newctx).RoutingID(); rid == naming.NullRoutingID {
- newctx, _, err = r.setNewClientFlowManager(newctx)
- } else {
- newctx, _, err = r.setNewBidiFlowManager(newctx)
- }
- if err != nil {
- return ctx, err
- }
if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
return ctx, err
}
@@ -412,35 +376,28 @@
p, _ := ctx.Value(principalKey).(security.Principal)
sm, _ := ctx.Value(streamManagerKey).(stream.Manager)
ns, _ := ctx.Value(namespaceKey).(namespace.T)
- fm, _ := ctx.Value(flowManagerKey).(flow.Manager)
otherOpts = append(otherOpts, imanager.DialTimeout(5*time.Minute))
-
if id, _ := ctx.Value(initKey).(*initData); id.protocols != nil {
otherOpts = append(otherOpts, irpc.PreferredProtocols(id.protocols))
}
var client rpc.Client
- var err error
deps := []interface{}{vtraceDependency{}}
- if fm != nil && ref.RPCTransitionState() >= ref.XClients {
- client, err = irpc.NewTransitionClient(ctx, sm, fm, ns, otherOpts...)
- deps = append(deps, fm, sm)
- } else {
- client, err = irpc.DeprecatedNewClient(sm, ns, otherOpts...)
+ if ref.RPCTransitionState() >= ref.XClients {
+ client = irpc.NewTransitionClient(ctx, sm, ns, otherOpts...)
deps = append(deps, sm)
- }
-
- if err != nil {
- return ctx, nil, err
+ } else {
+ client = irpc.DeprecatedNewClient(sm, ns, otherOpts...)
+ deps = append(deps, sm)
}
newctx := context.WithValue(ctx, clientKey, client)
if p != nil {
deps = append(deps, p)
}
- if err = r.addChild(ctx, client, client.Close, deps...); err != nil {
+ if err := r.addChild(ctx, client, client.Close, deps...); err != nil {
return ctx, nil, err
}
- return newctx, client, err
+ return newctx, client, nil
}
func (*Runtime) GetClient(ctx *context.T) rpc.Client {
@@ -539,33 +496,16 @@
return nil
}
-func (*Runtime) ExperimentalGetFlowManager(ctx *context.T) flow.Manager {
- // nologcall
- if d, ok := ctx.Value(flowManagerKey).(flow.Manager); ok {
- return d
- }
- return nil
-}
-
-func (r *Runtime) ExperimentalWithNewFlowManager(ctx *context.T) (*context.T, flow.Manager, error) {
+func (r *Runtime) NewFlowManager(ctx *context.T) (flow.Manager, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- newctx, m, err := r.setNewBidiFlowManager(ctx)
+ rid, err := naming.NewRoutingID()
if err != nil {
- return ctx, nil, err
+ return nil, err
}
- // Create a new client since it depends on the flow manager.
- newctx, _, err = r.WithNewClient(newctx)
- if err != nil {
- return ctx, nil, err
- }
- return newctx, m, nil
+ return manager.New(ctx, rid), nil
}
-func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*context.T, *pubsub.Publisher, string, []rpc.ServerOpt, error) {
- newctx, _, err := r.ExperimentalWithNewFlowManager(ctx)
- if err != nil {
- return ctx, nil, "", nil, err
- }
+func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*pubsub.Publisher, string, []rpc.ServerOpt, error) {
otherOpts := append([]rpc.ServerOpt{}, opts...)
if reservedDispatcher := r.GetReservedNameDispatcher(ctx); reservedDispatcher != nil {
otherOpts = append(otherOpts, irpc.ReservedNameDispatcher{
@@ -576,20 +516,21 @@
if id.protocols != nil {
otherOpts = append(otherOpts, irpc.PreferredServerResolveProtocols(id.protocols))
}
- return newctx, id.settingsPublisher, id.settingsName, otherOpts, nil
+ return id.settingsPublisher, id.settingsName, otherOpts, nil
}
func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
if ref.RPCTransitionState() >= ref.XServers {
- // TODO(mattr): Deal with shutdown deps.
- newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ spub, sname, opts, err := r.commonServerInit(ctx, opts...)
if err != nil {
return ctx, nil, err
}
- s, err := irpc.NewServer(newctx, name, object, auth, spub, sname, opts...)
+ newctx, s, err := irpc.WithNewServer(ctx, name, object, auth, spub, sname, opts...)
if err != nil {
- // TODO(mattr): Stop the flow manager.
+ return ctx, nil, err
+ }
+ if err = r.addChild(ctx, s, func() { <-s.Closed() }); err != nil {
return ctx, nil, err
}
return newctx, s, nil
@@ -612,14 +553,15 @@
func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
if ref.RPCTransitionState() >= ref.XServers {
- // TODO(mattr): Deal with shutdown deps.
- newctx, spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ spub, sname, opts, err := r.commonServerInit(ctx, opts...)
if err != nil {
return ctx, nil, err
}
- s, err := irpc.NewDispatchingServer(newctx, name, disp, spub, sname, opts...)
+ newctx, s, err := irpc.WithNewDispatchingServer(ctx, name, disp, spub, sname, opts...)
if err != nil {
- // TODO(mattr): Stop the flow manager.
+ return ctx, nil, err
+ }
+ if err = r.addChild(ctx, s, func() { <-s.Closed() }); err != nil {
return ctx, nil, err
}
return newctx, s, nil
diff --git a/runtime/internal/rt/runtime_test.go b/runtime/internal/rt/runtime_test.go
index da50a92..0a424e7 100644
--- a/runtime/internal/rt/runtime_test.go
+++ b/runtime/internal/rt/runtime_test.go
@@ -141,25 +141,12 @@
ctx, shutdown := test.V23Init()
defer shutdown()
- oldman := v23.ExperimentalGetFlowManager(ctx)
- if oldman == nil {
- t.Error("ExperimentalGetFlowManager should have returned a non-nil value")
+ oldman, err := v23.NewFlowManager(ctx)
+ if err != nil || oldman == nil {
+ t.Error("NewFlowManager failed: %v, %v", oldman, err)
}
- if rid := oldman.RoutingID(); rid != naming.NullRoutingID {
- t.Errorf("Initial flow.Manager should have NullRoutingID, got %v", rid)
- }
- newctx, newman, err := v23.ExperimentalWithNewFlowManager(ctx)
+ newman, err := v23.NewFlowManager(ctx)
if err != nil || newman == nil || newman == oldman {
- t.Fatalf("Could not create flow manager: %v", err)
- }
- if !newctx.Initialized() {
- t.Fatal("Got uninitialized context.")
- }
- man := v23.ExperimentalGetFlowManager(newctx)
- if man != newman || man == oldman {
- t.Error("ExperimentalWithNewFlowManager didn't update the context properly")
- }
- if man.RoutingID() == naming.NullRoutingID {
- t.Error("Newly created flow.Manager should not have NullRoutingID")
+ t.Fatalf("NewFlowManager failed: %v, %v", newman, err)
}
}
diff --git a/services/wspr/internal/lib/signature_manager_test.go b/services/wspr/internal/lib/signature_manager_test.go
index 689df80..97240ab 100644
--- a/services/wspr/internal/lib/signature_manager_test.go
+++ b/services/wspr/internal/lib/signature_manager_test.go
@@ -11,6 +11,7 @@
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/rpc"
"v.io/v23/vdl"
"v.io/v23/vdlroot/signature"
"v.io/x/ref/runtime/factories/fake"
@@ -38,7 +39,9 @@
"__Signature": []interface{}{initialSig},
},
)
- ctx = fake.SetClient(ctx, client)
+ ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
+ return client
+ })
return ctx, client, shutdown
}
diff --git a/services/wspr/internal/lib/simple_client.go b/services/wspr/internal/lib/simple_client.go
index a46fd82..17d18ab 100644
--- a/services/wspr/internal/lib/simple_client.go
+++ b/services/wspr/internal/lib/simple_client.go
@@ -90,6 +90,10 @@
func (*simpleMockClient) Close() {
}
+func (*simpleMockClient) Closed() <-chan struct{} {
+ return nil
+}
+
// mockCall implements rpc.ClientCall
type mockCall struct {
mockStream
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index 4a08676..5343d26 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -89,11 +89,6 @@
if err != nil {
t.Fatal(err)
}
- // Create a new flow manager for the client.
- cctx, _, err := v23.ExperimentalWithNewFlowManager(ctx)
- if err != nil {
- t.Fatal(err)
- }
// Wait for the server to finish listening through the proxy.
eps := s.Status().Endpoints
for ; len(eps) == 0 || eps[0].Addr().Network() == ""; eps = s.Status().Endpoints {
@@ -101,7 +96,7 @@
}
var got string
- if err := v23.GetClient(cctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+ if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
@@ -114,30 +109,29 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
kp := newKillProtocol()
flow.RegisterProtocol("kill", kp)
- pctx, shutdown := v23.Init()
+ ctx, shutdown := v23.Init()
defer shutdown()
- actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+ am, err := v23.NewFlowManager(ctx)
if err != nil {
t.Fatal(err)
}
- dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+ dm, err := v23.NewFlowManager(ctx)
if err != nil {
t.Fatal(err)
}
- pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
+ pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
done := make(chan struct{})
update := func(eps []naming.Endpoint) {
if len(eps) > 0 {
- if err := testEndToEndConnection(t, dctx, actx, dm, am, eps[0]); err != nil {
+ if err := testEndToEndConnection(t, ctx, dm, am, eps[0]); err != nil {
t.Error(err)
}
close(done)
}
}
-
- if err := am.ProxyListen(actx, pep, update); err != nil {
+ if err := am.ProxyListen(ctx, pep, update); err != nil {
t.Fatal(err)
}
<-done
@@ -147,22 +141,22 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
kp := newKillProtocol()
flow.RegisterProtocol("kill", kp)
- pctx, shutdown := v23.Init()
+ ctx, shutdown := v23.Init()
defer shutdown()
- actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
+ am, err := v23.NewFlowManager(ctx)
if err != nil {
t.Fatal(err)
}
- dctx, dm, err := v23.ExperimentalWithNewFlowManager(pctx)
+ dm, err := v23.NewFlowManager(ctx)
if err != nil {
t.Fatal(err)
}
- pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
+ pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
- p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
+ p2ep := startProxy(t, ctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
- p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
+ p3ep := startProxy(t, ctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
done := make(chan struct{})
update := func(eps []naming.Endpoint) {
@@ -173,24 +167,24 @@
// to each other and to the server. For now we at least check a random endpoint so the
// test will at least fail over many runs if something is wrong.
if len(eps) > 0 {
- if err := testEndToEndConnection(t, dctx, actx, dm, am, eps[rand.Int()%3]); err != nil {
+ if err := testEndToEndConnection(t, ctx, dm, am, eps[rand.Int()%3]); err != nil {
t.Error(err)
}
close(done)
}
}
- if err := am.ProxyListen(actx, p3ep, update); err != nil {
+ if err := am.ProxyListen(ctx, p3ep, update); err != nil {
t.Fatal(err)
}
<-done
}
-func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
+func testEndToEndConnection(t *testing.T, ctx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
// The dialing flow.Manager dials a flow to the accepting flow.Manager.
want := "Do you read me?"
- df, err := dm.Dial(dctx, aep, bfp)
+ df, err := dm.Dial(ctx, aep, bfp)
if err != nil {
return err
}
@@ -198,7 +192,7 @@
if err := writeLine(df, want); err != nil {
return err
}
- af, err := am.Accept(actx)
+ af, err := am.Accept(ctx)
if err != nil {
return err
}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index 85d66f7..1ead875 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -25,7 +25,7 @@
}
func New(ctx *context.T) (*proxy, *context.T, error) {
- ctx, mgr, err := v23.ExperimentalWithNewFlowManager(ctx)
+ mgr, err := v23.NewFlowManager(ctx)
if err != nil {
return nil, nil, err
}