runtime/internal/flow/protocols/tcp: Add framed tcp protocol.
This change registers a framed tcp protocol to the new flow
protocol registry.
Once all the protocols are implemented we should import these
packages to the runtime factories and use these protocols in
the flow/stream managers.
Change-Id: Icf1cc815baa46a7adee61301a67051132ba3d897
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index 463e2cb..7598b21 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -10,7 +10,6 @@
// since all of their errors are intended to be used as arguments to higher level errors.
// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
error (
- LargerThan3ByteUInt() {"en":"integer too large to represent in 3 bytes"}
UnknownProtocol(protocol string) {"en":"unknown protocol{:protocol}"}
ManagerClosed() {"en": "manager is already closed"}
AcceptFailed(err error) {"en": "accept failed{:err}"}
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index bfa716e..be7aa07 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -15,7 +15,6 @@
)
var (
- ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/flow/manager.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
ErrUnknownProtocol = verror.Register("v.io/x/ref/runtime/internal/flow/manager.UnknownProtocol", verror.NoRetry, "{1:}{2:} unknown protocol{:3}")
ErrManagerClosed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ManagerClosed", verror.NoRetry, "{1:}{2:} manager is already closed")
ErrAcceptFailed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
@@ -24,7 +23,6 @@
)
func init() {
- i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrLargerThan3ByteUInt.ID), "{1:}{2:} integer too large to represent in 3 bytes")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrUnknownProtocol.ID), "{1:}{2:} unknown protocol{:3}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrManagerClosed.ID), "{1:}{2:} manager is already closed")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
@@ -32,11 +30,6 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
}
-// NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
-func NewErrLargerThan3ByteUInt(ctx *context.T) error {
- return verror.New(ErrLargerThan3ByteUInt, ctx)
-}
-
// NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
func NewErrUnknownProtocol(ctx *context.T, protocol string) error {
return verror.New(ErrUnknownProtocol, ctx, protocol)
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 7dc3582..748a5bc 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -20,6 +20,7 @@
"v.io/v23/vom"
"v.io/x/ref/runtime/internal/flow/conn"
+ "v.io/x/ref/runtime/internal/lib/framer"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/version"
@@ -134,7 +135,7 @@
}
c, err := conn.NewAccepted(
ctx,
- &framer{ReadWriteCloser: netConn},
+ framer.New(netConn),
local,
version.Supported,
&flowHandler{q: m.q, closed: m.closed},
@@ -288,7 +289,7 @@
// "serving flow manager" by passing a 0 RID to non-serving flow managers?
c, err = conn.NewDialed(
ctx,
- &framer{ReadWriteCloser: netConn}, // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
+ framer.New(netConn), // TODO(suharshs): Don't frame if the net.Conn already has framing in its protocol.
localEndpoint(netConn, m.rid),
remote,
version.Supported,
diff --git a/runtime/internal/flow/protocols/tcp/init.go b/runtime/internal/flow/protocols/tcp/init.go
new file mode 100644
index 0000000..04d0824
--- /dev/null
+++ b/runtime/internal/flow/protocols/tcp/init.go
@@ -0,0 +1,77 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package tcp
+
+import (
+ "net"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/flow"
+
+ "v.io/x/ref/runtime/internal/lib/framer"
+ "v.io/x/ref/runtime/internal/lib/tcputil"
+)
+
+func init() {
+ tcp := tcpProtocol{}
+ flow.RegisterProtocol("tcp", tcp, "tcp4", "tcp6")
+ flow.RegisterProtocol("tcp4", tcp)
+ flow.RegisterProtocol("tcp6", tcp)
+}
+
+type tcpProtocol struct{}
+
+// Dial dials a net.Conn to a the specific address and adds framing to the connection.
+func (tcpProtocol) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.MsgReadWriteCloser, error) {
+ conn, err := net.DialTimeout(network, address, timeout)
+ if err != nil {
+ return nil, err
+ }
+ if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
+ return nil, err
+ }
+ return framer.New(conn), nil
+}
+
+// Resolve performs a DNS resolution on the provided network and address.
+func (tcpProtocol) Resolve(ctx *context.T, network, address string) (string, string, error) {
+ tcpAddr, err := net.ResolveTCPAddr(network, address)
+ if err != nil {
+ return "", "", err
+ }
+ return tcpAddr.Network(), tcpAddr.String(), nil
+}
+
+// Listen returns a listener that sets KeepAlive on all accepted connections.
+// Connections returned from the listener will be framed.
+func (tcpProtocol) Listen(ctx *context.T, network, address string) (flow.MsgListener, error) {
+ ln, err := net.Listen(network, address)
+ if err != nil {
+ return nil, err
+ }
+ return &tcpListener{ln}, nil
+}
+
+// tcpListener is a wrapper around net.Listener that sets KeepAlive on all
+// accepted connections and returns framed flow.MsgReadWriteClosers.
+type tcpListener struct {
+ netLn net.Listener
+}
+
+func (ln *tcpListener) Accept(ctx *context.T) (flow.MsgReadWriteCloser, error) {
+ conn, err := ln.netLn.Accept()
+ if err != nil {
+ return nil, err
+ }
+ if err := tcputil.EnableTCPKeepAlive(conn); err != nil {
+ return nil, err
+ }
+ return framer.New(conn), nil
+}
+
+func (ln *tcpListener) Addr() net.Addr {
+ return ln.netLn.Addr()
+}
diff --git a/runtime/internal/lib/framer/errors.vdl b/runtime/internal/lib/framer/errors.vdl
new file mode 100644
index 0000000..0c2f74b
--- /dev/null
+++ b/runtime/internal/lib/framer/errors.vdl
@@ -0,0 +1,14 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package framer
+
+// These messages are constructed so as to avoid embedding a component/method name
+// and are thus more suitable for inclusion in other verrors.
+// This practice of omitting {1}{2} should be used throughout the flow implementations
+// since all of their errors are intended to be used as arguments to higher level errors.
+// TODO(suharshs,toddw): Allow skipping of {1}{2} in vdl generated errors.
+error (
+ LargerThan3ByteUInt() {"en":"integer too large to represent in 3 bytes"}
+)
\ No newline at end of file
diff --git a/runtime/internal/lib/framer/errors.vdl.go b/runtime/internal/lib/framer/errors.vdl.go
new file mode 100644
index 0000000..2da6252
--- /dev/null
+++ b/runtime/internal/lib/framer/errors.vdl.go
@@ -0,0 +1,28 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: errors.vdl
+
+package framer
+
+import (
+ // VDL system imports
+ "v.io/v23/context"
+ "v.io/v23/i18n"
+ "v.io/v23/verror"
+)
+
+var (
+ ErrLargerThan3ByteUInt = verror.Register("v.io/x/ref/runtime/internal/lib/framer.LargerThan3ByteUInt", verror.NoRetry, "{1:}{2:} integer too large to represent in 3 bytes")
+)
+
+func init() {
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrLargerThan3ByteUInt.ID), "{1:}{2:} integer too large to represent in 3 bytes")
+}
+
+// NewErrLargerThan3ByteUInt returns an error with the ErrLargerThan3ByteUInt ID.
+func NewErrLargerThan3ByteUInt(ctx *context.T) error {
+ return verror.New(ErrLargerThan3ByteUInt, ctx)
+}
diff --git a/runtime/internal/flow/manager/framer.go b/runtime/internal/lib/framer/framer.go
similarity index 93%
rename from runtime/internal/flow/manager/framer.go
rename to runtime/internal/lib/framer/framer.go
index a1170f2..2e0d806 100644
--- a/runtime/internal/flow/manager/framer.go
+++ b/runtime/internal/lib/framer/framer.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package manager
+package framer
import (
"io"
@@ -17,7 +17,9 @@
buf []byte
}
-var _ flow.MsgReadWriteCloser = (*framer)(nil)
+func New(c io.ReadWriteCloser) flow.MsgReadWriteCloser {
+ return &framer{ReadWriteCloser: c}
+}
func (f *framer) WriteMsg(data ...[]byte) (int, error) {
// Compute the message size.
diff --git a/runtime/internal/flow/manager/framer_test.go b/runtime/internal/lib/framer/framer_test.go
similarity index 98%
rename from runtime/internal/flow/manager/framer_test.go
rename to runtime/internal/lib/framer/framer_test.go
index b863fe0..bc1aef1 100644
--- a/runtime/internal/flow/manager/framer_test.go
+++ b/runtime/internal/lib/framer/framer_test.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package manager
+package framer
import (
"bytes"