Merge "Revert "rpc/stream/vif: Temporarily make new applications compatible with very old agents.""
diff --git a/profiles/chrome/chromeinit.go b/profiles/chrome/chromeinit.go
index 6445080..ccbfc83 100644
--- a/profiles/chrome/chromeinit.go
+++ b/profiles/chrome/chromeinit.go
@@ -37,7 +37,7 @@
protocols := []string{"wsh", "ws"}
listenSpec := rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "ws", Address: ""}}}
- runtime, ctx, shutdown, err := grt.Init(ctx, nil, protocols, &listenSpec, commonFlags.RuntimeFlags(), nil)
+ runtime, ctx, shutdown, err := grt.Init(ctx, nil, protocols, &listenSpec, nil, "", commonFlags.RuntimeFlags(), nil)
if err != nil {
return nil, nil, shutdown, err
}
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 73a675f..d71409d 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -63,7 +63,7 @@
}
}
- runtime, ctx, shutdown, err := grt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), nil)
+ runtime, ctx, shutdown, err := grt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), nil)
if err != nil {
return nil, nil, shutdown, err
}
diff --git a/profiles/genericinit.go b/profiles/genericinit.go
index b04189e..484f621 100644
--- a/profiles/genericinit.go
+++ b/profiles/genericinit.go
@@ -49,6 +49,8 @@
ac,
nil,
&listenSpec,
+ nil,
+ "",
commonFlags.RuntimeFlags(),
nil)
if err != nil {
diff --git a/profiles/internal/rpc/full_test.go b/profiles/internal/rpc/full_test.go
index ed52512..ad89931 100644
--- a/profiles/internal/rpc/full_test.go
+++ b/profiles/internal/rpc/full_test.go
@@ -19,6 +19,10 @@
"testing"
"time"
+ "v.io/x/lib/netstate"
+ "v.io/x/lib/pubsub"
+ "v.io/x/lib/vlog"
+
"v.io/v23"
"v.io/v23/context"
"v.io/v23/namespace"
@@ -31,8 +35,7 @@
"v.io/v23/vdl"
"v.io/v23/verror"
"v.io/v23/vtrace"
- "v.io/x/lib/netstate"
- "v.io/x/lib/vlog"
+
"v.io/x/ref/lib/stats"
"v.io/x/ref/profiles/internal/lib/publisher"
"v.io/x/ref/profiles/internal/lib/websocket"
@@ -75,12 +78,16 @@
c.Unlock()
}
-func testInternalNewServer(ctx *context.T, streamMgr stream.Manager, ns namespace.T, principal security.Principal, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func testInternalNewServerWithPubsub(ctx *context.T, streamMgr stream.Manager, ns namespace.T, settingsPublisher *pubsub.Publisher, settingsStreamName string, principal security.Principal, opts ...rpc.ServerOpt) (rpc.Server, error) {
client, err := InternalNewClient(streamMgr, ns)
if err != nil {
return nil, err
}
- return InternalNewServer(ctx, streamMgr, ns, client, principal, opts...)
+ return InternalNewServer(ctx, streamMgr, ns, settingsPublisher, settingsStreamName, client, principal, opts...)
+}
+
+func testInternalNewServer(ctx *context.T, streamMgr stream.Manager, ns namespace.T, principal security.Principal, opts ...rpc.ServerOpt) (rpc.Server, error) {
+ return testInternalNewServerWithPubsub(ctx, streamMgr, ns, nil, "", principal, opts...)
}
type userType string
diff --git a/profiles/internal/rpc/protocols/tcp/init.go b/profiles/internal/rpc/protocols/tcp/init.go
index 08f95e6..a6067b9 100644
--- a/profiles/internal/rpc/protocols/tcp/init.go
+++ b/profiles/internal/rpc/protocols/tcp/init.go
@@ -22,7 +22,6 @@
}
func tcpDial(network, address string, timeout time.Duration) (net.Conn, error) {
- vlog.Infof("tcp.Dial %v", address)
conn, err := net.DialTimeout(network, address, timeout)
if err != nil {
return nil, err
diff --git a/profiles/internal/rpc/pubsub.go b/profiles/internal/rpc/pubsub.go
new file mode 100644
index 0000000..7bfe46c
--- /dev/null
+++ b/profiles/internal/rpc/pubsub.go
@@ -0,0 +1,30 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "net"
+
+ "v.io/x/lib/pubsub"
+)
+
+// NewAddAddrsSetting creates the Setting to be sent to Listen to inform
+// it of new addresses that have become available since the last change.
+func NewAddAddrsSetting(a []net.Addr) pubsub.Setting {
+ return pubsub.NewAny(NewAddrsSetting, NewAddrsSettingDesc, a)
+}
+
+// NewRmAddrsSetting creates the Setting to be sent to Listen to inform
+// it of addresses that are no longer available.
+func NewRmAddrsSetting(a []net.Addr) pubsub.Setting {
+ return pubsub.NewAny(RmAddrsSetting, RmAddrsSettingDesc, a)
+}
+
+const (
+ NewAddrsSetting = "NewAddrs"
+ NewAddrsSettingDesc = "New Addresses discovered since last change"
+ RmAddrsSetting = "RmAddrs"
+ RmAddrsSettingDesc = "Addresses that have been removed since last change"
+)
diff --git a/profiles/internal/rpc/resolve_test.go b/profiles/internal/rpc/resolve_test.go
index 20bf957..e0f6878 100644
--- a/profiles/internal/rpc/resolve_test.go
+++ b/profiles/internal/rpc/resolve_test.go
@@ -50,6 +50,8 @@
ac,
nil,
&listenSpec,
+ nil,
+ "",
commonFlags.RuntimeFlags(),
nil)
if err != nil {
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index 0d5a11c..102f82a 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -83,15 +83,17 @@
type server struct {
sync.Mutex
// context used by the server to make internal RPCs, error messages etc.
- ctx *context.T
- cancel context.CancelFunc // function to cancel the above context.
- state serverState // track state of the server.
- streamMgr stream.Manager // stream manager to listen for new flows.
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- listenerOpts []stream.ListenerOpt // listener opts for Listen.
- dhcpState *dhcpState // dhcpState, nil if not using dhcp
- principal security.Principal
- blessings security.Blessings
+ ctx *context.T
+ cancel context.CancelFunc // function to cancel the above context.
+ state serverState // track state of the server.
+ streamMgr stream.Manager // stream manager to listen for new flows.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ listenerOpts []stream.ListenerOpt // listener opts for Listen.
+ settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
+ settingsName string // pubwsub stream name for dhcp
+ dhcpState *dhcpState // dhcpState, nil if not using dhcp
+ principal security.Principal
+ blessings security.Blessings
// maps that contain state on listeners.
listenState map[*listenState]struct{}
@@ -173,6 +175,8 @@
ctx *context.T,
streamMgr stream.Manager,
ns namespace.T,
+ settingsPublisher *pubsub.Publisher,
+ settingsName string,
client rpc.Client,
principal security.Principal,
opts ...rpc.ServerOpt) (rpc.Server, error) {
@@ -180,18 +184,20 @@
ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
s := &server{
- ctx: ctx,
- cancel: cancel,
- streamMgr: streamMgr,
- principal: principal,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listenState: make(map[*listenState]struct{}),
- listeners: make(map[stream.Listener]struct{}),
- proxies: make(map[string]proxyState),
- stoppedChan: make(chan struct{}),
- ipNets: ipNetworks(),
- ns: ns,
- stats: newRPCStats(statsPrefix),
+ ctx: ctx,
+ cancel: cancel,
+ streamMgr: streamMgr,
+ principal: principal,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listenState: make(map[*listenState]struct{}),
+ listeners: make(map[stream.Listener]struct{}),
+ proxies: make(map[string]proxyState),
+ stoppedChan: make(chan struct{}),
+ ipNets: ipNetworks(),
+ ns: ns,
+ stats: newRPCStats(statsPrefix),
+ settingsPublisher: settingsPublisher,
+ settingsName: settingsName,
}
var (
dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
@@ -412,11 +418,11 @@
return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
}
- if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
+ if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
// Create a dhcp listener if we haven't already done so.
dhcp := &dhcpState{
- name: listenSpec.StreamName,
- publisher: listenSpec.StreamPublisher,
+ name: s.settingsName,
+ publisher: s.settingsPublisher,
watchers: make(map[chan<- rpc.NetworkChange]struct{}),
}
s.dhcpState = dhcp
@@ -624,20 +630,17 @@
s.Unlock()
return
}
- var err error
- var changed []naming.Endpoint
- switch setting.Name() {
- case rpc.NewAddrsSetting:
- changed = s.addAddresses(v)
- case rpc.RmAddrsSetting:
- changed, err = s.removeAddresses(v)
- }
change := rpc.NetworkChange{
- Time: time.Now(),
- State: externalStates[s.state],
- Setting: setting,
- Changed: changed,
- Error: err,
+ Time: time.Now(),
+ State: externalStates[s.state],
+ }
+ switch setting.Name() {
+ case NewAddrsSetting:
+ change.Changed = s.addAddresses(v)
+ change.AddedAddrs = v
+ case RmAddrsSetting:
+ change.Changed, change.Error = s.removeAddresses(v)
+ change.RemovedAddrs = v
}
vlog.VI(2).Infof("rpc: dhcp: change %v", change)
for ch, _ := range s.dhcpState.watchers {
@@ -703,14 +706,11 @@
// to ensure that those addresses are externally reachable.
func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
var added []naming.Endpoint
- vlog.Infof("HERE WITH %v -> %v", addrs, netstate.ConvertToAddresses(addrs))
for _, address := range netstate.ConvertToAddresses(addrs) {
if !netstate.IsAccessibleIP(address) {
- vlog.Infof("RETURN A %v", added)
return added
}
host := getHost(address)
- vlog.Infof("LISTEN ST: %v", s.listenState)
for ls, _ := range s.listenState {
if ls != nil && ls.roaming {
niep := ls.protoIEP
@@ -724,7 +724,6 @@
}
}
}
- vlog.Infof("RETURN B %v", added)
return added
}
@@ -957,26 +956,26 @@
discharges: make(map[string]security.Discharge),
}
var err error
- typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
- if typedec == nil {
- if fs.dec, err = vom.NewDecoder(flow); err != nil {
- flow.Close()
- return nil, err
- }
+ typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+ if typeenc == nil {
if fs.enc, err = vom.NewEncoder(flow); err != nil {
flow.Close()
return nil, err
}
- } else {
- if fs.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
+ if fs.dec, err = vom.NewDecoder(flow); err != nil {
flow.Close()
return nil, err
}
- typeenc := flow.VCDataCache().Get(vc.TypeEncoderKey{})
+ } else {
if fs.enc, err = vom.NewEncoderWithTypeEncoder(flow, typeenc.(*vom.TypeEncoder)); err != nil {
flow.Close()
return nil, err
}
+ typedec := flow.VCDataCache().Get(vc.TypeDecoderKey{})
+ if fs.dec, err = vom.NewDecoderWithTypeDecoder(flow, typedec.(*vom.TypeDecoder)); err != nil {
+ flow.Close()
+ return nil, err
+ }
}
return fs, nil
}
diff --git a/profiles/internal/rpc/server_test.go b/profiles/internal/rpc/server_test.go
index c23b3bd..900db9a 100644
--- a/profiles/internal/rpc/server_test.go
+++ b/profiles/internal/rpc/server_test.go
@@ -424,12 +424,6 @@
ns := tnaming.NewSimpleNamespace()
ctx, shutdown := initForTest()
defer shutdown()
- server, err := testInternalNewServer(ctx, sm, ns, testutil.NewPrincipal("test"))
- defer server.Stop()
-
- if err != nil {
- t.Fatal(err)
- }
publisher := pubsub.NewPublisher()
roaming := make(chan pubsub.Setting)
@@ -439,6 +433,12 @@
}
defer func() { publisher.Shutdown(); <-stop }()
+ server, err := testInternalNewServerWithPubsub(ctx, sm, ns, publisher, "TestRoaming", testutil.NewPrincipal("test"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+
ipv4And6 := func(network string, addrs []net.Addr) ([]net.Addr, error) {
accessible := netstate.ConvertToAddresses(addrs)
ipv4 := accessible.Filter(netstate.IsUnicastIPv4)
@@ -451,9 +451,7 @@
{"tcp", ":0"},
{"tcp", ":0"},
},
- StreamName: "TestRoaming",
- StreamPublisher: publisher,
- AddressChooser: ipv4And6,
+ AddressChooser: ipv4And6,
}
eps, err := server.Listen(spec)
@@ -488,7 +486,7 @@
server.WatchNetwork(watcher)
defer close(watcher)
- roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1, n2})
+ roaming <- NewAddAddrsSetting([]net.Addr{n1, n2})
waitForChange := func() *rpc.NetworkChange {
vlog.Infof("Waiting on %p", watcher)
@@ -527,7 +525,7 @@
t.Fatalf("got %d, want %d", got, want)
}
- roaming <- rpc.NewRmAddrsSetting([]net.Addr{n1})
+ roaming <- NewRmAddrsSetting([]net.Addr{n1})
// We expect 2 changes, one for each usable listen spec addr.
change = waitForChange()
@@ -548,7 +546,7 @@
}
// Remove all addresses to mimic losing all connectivity.
- roaming <- rpc.NewRmAddrsSetting(getIPAddrs(nepsR))
+ roaming <- NewRmAddrsSetting(getIPAddrs(nepsR))
// We expect changes for all of the current endpoints
change = waitForChange()
@@ -561,7 +559,7 @@
t.Fatalf("got %d, want %d: %v", got, want, status.Mounts)
}
- roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1})
+ roaming <- NewAddAddrsSetting([]net.Addr{n1})
// We expect 2 changes, one for each usable listen spec addr.
change = waitForChange()
if got, want := len(change.Changed), 2; got != want {
@@ -576,12 +574,6 @@
ns := tnaming.NewSimpleNamespace()
ctx, shutdown := initForTest()
defer shutdown()
- server, err := testInternalNewServer(ctx, sm, ns, testutil.NewPrincipal("test"))
- defer server.Stop()
-
- if err != nil {
- t.Fatal(err)
- }
publisher := pubsub.NewPublisher()
roaming := make(chan pubsub.Setting)
@@ -591,12 +583,16 @@
}
defer func() { publisher.Shutdown(); <-stop }()
+ server, err := testInternalNewServerWithPubsub(ctx, sm, ns, publisher, "TestWatcherDeadlock", testutil.NewPrincipal("test"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+
spec := rpc.ListenSpec{
Addrs: rpc.ListenAddrs{
{"tcp", ":0"},
},
- StreamName: "TestWatcherDeadlock",
- StreamPublisher: publisher,
}
eps, err := server.Listen(spec)
if err != nil {
@@ -615,12 +611,12 @@
defer close(watcher)
// Remove all addresses to mimic losing all connectivity.
- roaming <- rpc.NewRmAddrsSetting(getIPAddrs(eps))
+ roaming <- NewRmAddrsSetting(getIPAddrs(eps))
// Add in two new addresses
n1 := netstate.NewNetAddr("ip", "1.1.1.1")
n2 := netstate.NewNetAddr("ip", "2.2.2.2")
- roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1, n2})
+ roaming <- NewAddAddrsSetting([]net.Addr{n1, n2})
neps := make([]naming.Endpoint, 0, len(eps))
for _, p := range getUniqPorts(eps) {
diff --git a/profiles/internal/rpc/stream/vc/knobs.go b/profiles/internal/rpc/stream/vc/knobs.go
index c2bdf5b..7271f7a 100644
--- a/profiles/internal/rpc/stream/vc/knobs.go
+++ b/profiles/internal/rpc/stream/vc/knobs.go
@@ -25,10 +25,11 @@
// (and not any specific flow)
SharedFlowID = 0
- // Special flow used for handshaking between VCs (e.g. setting up encryption).
- HandshakeFlowID = 1
// Special flow used for authenticating between VCs.
AuthFlowID = 2
- // Special Flow ID used for interchanging of VOM types between VCs.
+ // Special flow used for interchanging of VOM types between VCs.
TypeFlowID = 3
+ // Special flow over which discharges for third-party caveats
+ // on the server's blessings are sent.
+ DischargeFlowID = 4
)
diff --git a/profiles/internal/rpc/stream/vc/vc.go b/profiles/internal/rpc/stream/vc/vc.go
index 5889cc6..7a8a567 100644
--- a/profiles/internal/rpc/stream/vc/vc.go
+++ b/profiles/internal/rpc/stream/vc/vc.go
@@ -54,17 +54,17 @@
errIgnoringMessageOnClosedVC = reg(".errIgnoringMessageOnClosedVC", "ignoring message for Flow {3} on closed VC {4}")
errVomTypeDecoder = reg(".errVomDecoder", "failed to create vom type decoder{:3}")
errVomTypeEncoder = reg(".errVomEncoder", "failed to create vom type encoder{:3}")
+ errFailedToCreateFlowForAuth = reg(".errFailedToCreateFlowForAuth", "failed to create a Flow for authentication{:3}")
+ errAuthFlowNotAccepted = reg(".errAuthFlowNotAccepted", "authentication Flow not accepted{:3}")
errFailedToCreateFlowForWireType = reg(".errFailedToCreateFlowForWireType", "fail to create a Flow for wire type{:3}")
errFlowForWireTypeNotAccepted = reg(".errFlowForWireTypeNotAccepted", "Flow for wire type not accepted{:3}")
- errFailedToCreateTLSFlow = reg(".errFailedToCreateTLSFlow", "failed to create a Flow for setting up TLS{3:}")
+ errFailedToCreateFlowForDischarge = reg(".errFailedToCreateFlowForDischarge", "fail to create a Flow for discharge{:3}")
+ errFlowForDischargeNotAccepted = reg(".errFlowForDischargesNotAccepted", "Flow for discharge not accepted{:3}")
errFailedToSetupEncryption = reg(".errFailedToSetupEncryption", "failed to setup channel encryption{:3}")
- errFailedToCreateFlowForAuth = reg(".errFailedToCreateFlowForAuth", "failed to create a Flow for authentication{:3}")
errAuthFailed = reg(".errAuthFailed", "authentication failed{:3}")
errNoActiveListener = reg(".errNoActiveListener", "no active listener on VCI {3}")
errFailedToCreateWriterForNewFlow = reg(".errFailedToCreateWriterForNewFlow", "failed to create writer for new flow({3}){:4}")
errFailedToEnqueueFlow = reg(".errFailedToEnqueueFlow", "failed to enqueue flow at listener{:3}")
- errTLSFlowNotAccepted = reg(".errTLSFlowNotAccepted", "TLS handshake Flow not accepted{:3}")
- errAuthFlowNotAccepted = reg(".errAuthFlowNotAccepted", "authentication Flow not accepted{:3}")
errFailedToAcceptSystemFlows = reg(".errFailedToAcceptSystemFlows", "failed to accept system flows{:3}")
)
@@ -306,10 +306,9 @@
payload.Release()
return verror.New(stream.ErrNetwork, nil, verror.New(errIgnoringMessageOnClosedVC, nil, fid, vc.VCI()))
}
- // TLS decryption is stateful, so even if the message will be discarded
- // because of other checks further down in this method, go through with
- // the decryption.
- if fid != HandshakeFlowID && fid != AuthFlowID {
+ // Authentication is done by encrypting/decrypting its payload by itself,
+ // so we do not go through with the decryption for auth flow.
+ if fid != AuthFlowID {
vc.waitForHandshakeLocked()
var err error
if payload, err = vc.crypter.Decrypt(payload); err != nil {
@@ -531,14 +530,6 @@
vers := vc.Version()
// Authenticate (exchange identities)
- // Unfortunately, handshakeConn cannot be used for the authentication protocol.
- // This is because the Crypter implementation uses crypto/tls.Conn,
- // which can consume data beyond the handshake message boundaries (call
- // to readFromUntil in
- // https://code.google.com/p/go/source/browse/src/pkg/crypto/tls/conn.go?spec=svn654b2703fcc466a29692068ab56efedd09fb3d05&r=654b2703fcc466a29692068ab56efedd09fb3d05#539).
- // This is not a problem when tls.Conn is used as intended (to wrap over a stream), but
- // becomes a problem when shoehorning a block encrypter (Crypter interface) over this
- // stream API.
authConn, err := vc.connectFID(AuthFlowID, systemFlowPriority)
if err != nil {
return vc.appendCloseReason(verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForAuth, nil, err)))
@@ -555,7 +546,7 @@
if err != nil {
return vc.appendCloseReason(err)
}
- } else {
+ } else if vers < version.RPCVersion10 {
go vc.recvDischargesLoop(authConn)
}
@@ -689,14 +680,14 @@
vc.acceptHandshakeDone = nil
vc.mu.Unlock()
- if len(lBlessings.ThirdPartyCaveats()) > 0 {
+ if len(lBlessings.ThirdPartyCaveats()) > 0 && vers < version.RPCVersion10 {
go vc.sendDischargesLoop(authConn, dischargeClient, lBlessings.ThirdPartyCaveats(), dischargeExpiryBuffer)
} else {
authConn.Close()
}
// Accept system flows.
- if err = vc.acceptSystemFlows(ln); err != nil {
+ if err = vc.acceptSystemFlows(ln, dischargeClient, dischargeExpiryBuffer); err != nil {
sendErr(verror.New(stream.ErrNetwork, nil, verror.New(errFailedToAcceptSystemFlows, nil, err)))
}
@@ -808,26 +799,59 @@
return verror.New(stream.ErrSecurity, nil, verror.New(errVomTypeDecoder, nil, err))
}
vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
+
+ if vc.Version() < version.RPCVersion10 {
+ return nil
+ }
+
+ vc.mu.Lock()
+ rBlessings := vc.remoteBlessings
+ vc.mu.Unlock()
+ if len(rBlessings.ThirdPartyCaveats()) > 0 {
+ conn, err = vc.connectFID(DischargeFlowID, systemFlowPriority)
+ if err != nil {
+ return verror.New(stream.ErrSecurity, nil, verror.New(errFailedToCreateFlowForDischarge, nil, err))
+ }
+ go vc.recvDischargesLoop(conn)
+ }
+
return nil
}
-func (vc *VC) acceptSystemFlows(ln stream.Listener) error {
+func (vc *VC) acceptSystemFlows(ln stream.Listener, dischargeClient DischargeClient, dischargeExpiryBuffer time.Duration) error {
conn, err := ln.Accept()
if err != nil {
return verror.New(errFlowForWireTypeNotAccepted, nil, err)
}
- typeDec, err := vom.NewTypeDecoder(conn)
- if err != nil {
- conn.Close()
- return verror.New(errVomTypeDecoder, nil, err)
- }
- vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
typeEnc, err := vom.NewTypeEncoder(conn)
if err != nil {
conn.Close()
return verror.New(errVomTypeEncoder, nil, err)
}
vc.dataCache.Insert(TypeEncoderKey{}, typeEnc)
+ typeDec, err := vom.NewTypeDecoder(conn)
+ if err != nil {
+ conn.Close()
+ return verror.New(errVomTypeDecoder, nil, err)
+ }
+ vc.dataCache.Insert(TypeDecoderKey{}, typeDec)
+
+ if vc.Version() < version.RPCVersion10 {
+ return nil
+ }
+
+ vc.mu.Lock()
+ lBlessings := vc.localBlessings
+ vc.mu.Unlock()
+ tpCaveats := lBlessings.ThirdPartyCaveats()
+ if len(tpCaveats) > 0 {
+ conn, err := ln.Accept()
+ if err != nil {
+ return verror.New(errFlowForDischargeNotAccepted, nil, err)
+ }
+ go vc.sendDischargesLoop(conn, dischargeClient, tpCaveats, dischargeExpiryBuffer)
+ }
+
return nil
}
@@ -838,7 +862,7 @@
return nil, nil
}
vc.mu.Lock()
- if fid == HandshakeFlowID || fid == AuthFlowID {
+ if fid == AuthFlowID {
cipherslice = plaintext
} else {
cipherslice, err = vc.crypter.Encrypt(plaintext)
diff --git a/profiles/internal/rpc/test/proxy_test.go b/profiles/internal/rpc/test/proxy_test.go
index bd326f5..e6bd616 100644
--- a/profiles/internal/rpc/test/proxy_test.go
+++ b/profiles/internal/rpc/test/proxy_test.go
@@ -191,7 +191,7 @@
}
defer client.Close()
serverCtx, _ := v23.WithPrincipal(ctx, pserver)
- server, err := irpc.InternalNewServer(serverCtx, smserver, ns, nil, pserver)
+ server, err := irpc.InternalNewServer(serverCtx, smserver, ns, nil, "", nil, pserver)
if err != nil {
t.Fatal(err)
}
diff --git a/profiles/internal/rpc/version/version.go b/profiles/internal/rpc/version/version.go
index 2d3119d..c23913b 100644
--- a/profiles/internal/rpc/version/version.go
+++ b/profiles/internal/rpc/version/version.go
@@ -21,7 +21,7 @@
// change that's not both forward and backward compatible.
// Min should be incremented whenever we want to remove
// support for old protocol versions.
- SupportedRange = &Range{Min: version.RPCVersion9, Max: version.RPCVersion9}
+ SupportedRange = &Range{Min: version.RPCVersion9, Max: version.RPCVersion10}
)
const pkgPath = "v.io/x/ref/profiles/internal/rpc/version"
diff --git a/profiles/internal/rt/runtime.go b/profiles/internal/rt/runtime.go
index f0ec69f..bc9c3af 100644
--- a/profiles/internal/rt/runtime.go
+++ b/profiles/internal/rt/runtime.go
@@ -13,6 +13,8 @@
"syscall"
"time"
+ "v.io/x/lib/pubsub"
+
"v.io/v23"
"v.io/v23/context"
"v.io/v23/i18n"
@@ -52,9 +54,11 @@
)
type initData struct {
- appCycle v23.AppCycle
- listenSpec *rpc.ListenSpec
- protocols []string
+ appCycle v23.AppCycle
+ listenSpec *rpc.ListenSpec
+ protocols []string
+ settingsPublisher *pubsub.Publisher
+ settingsName string
}
type vtraceDependency struct{}
@@ -71,14 +75,18 @@
appCycle v23.AppCycle,
protocols []string,
listenSpec *rpc.ListenSpec,
+ settingsPublisher *pubsub.Publisher,
+ settingsName string,
flags flags.RuntimeFlags,
reservedDispatcher rpc.Dispatcher) (*Runtime, *context.T, v23.Shutdown, error) {
r := &Runtime{deps: dependency.NewGraph()}
ctx = context.WithValue(ctx, initKey, &initData{
- protocols: protocols,
- listenSpec: listenSpec,
- appCycle: appCycle,
+ protocols: protocols,
+ listenSpec: listenSpec,
+ appCycle: appCycle,
+ settingsPublisher: settingsPublisher,
+ settingsName: settingsName,
})
if reservedDispatcher != nil {
@@ -236,7 +244,7 @@
Blessings: principal.BlessingStore().Default(),
})
}
- server, err := irpc.InternalNewServer(ctx, sm, ns, r.GetClient(ctx), principal, otherOpts...)
+ server, err := irpc.InternalNewServer(ctx, sm, ns, id.settingsPublisher, id.settingsName, r.GetClient(ctx), principal, otherOpts...)
if err != nil {
return nil, err
}
diff --git a/profiles/internal/rt/runtime_test.go b/profiles/internal/rt/runtime_test.go
index cf6706d..f4cded9 100644
--- a/profiles/internal/rt/runtime_test.go
+++ b/profiles/internal/rt/runtime_test.go
@@ -20,7 +20,7 @@
// InitForTest creates a context for use in a test.
func InitForTest(t *testing.T) (*rt.Runtime, *context.T, v23.Shutdown) {
ctx, cancel := context.RootContext()
- r, ctx, shutdown, err := rt.Init(ctx, nil, nil, nil, flags.RuntimeFlags{}, nil)
+ r, ctx, shutdown, err := rt.Init(ctx, nil, nil, nil, nil, "", flags.RuntimeFlags{}, nil)
if err != nil {
t.Fatal(err)
}
diff --git a/profiles/roaming/.api b/profiles/roaming/.api
new file mode 100644
index 0000000..c15af37
--- /dev/null
+++ b/profiles/roaming/.api
@@ -0,0 +1,4 @@
+pkg roaming, const SettingsStreamDesc ideal-string
+pkg roaming, const SettingsStreamName ideal-string
+pkg roaming, func Init(*context.T) (v23.Runtime, *context.T, v23.Shutdown, error)
+pkg roaming, func NewProxy(*context.T, rpc.ListenSpec, ...string) (func(), naming.Endpoint, error)
diff --git a/profiles/roaming/roaminginit.go b/profiles/roaming/roaminginit.go
index 8ec4482..c6b37f6 100644
--- a/profiles/roaming/roaminginit.go
+++ b/profiles/roaming/roaminginit.go
@@ -31,6 +31,7 @@
"v.io/x/ref/profiles/internal"
"v.io/x/ref/profiles/internal/lib/appcycle"
"v.io/x/ref/profiles/internal/lib/websocket"
+ irpc "v.io/x/ref/profiles/internal/rpc"
_ "v.io/x/ref/profiles/internal/rpc/protocols/tcp"
_ "v.io/x/ref/profiles/internal/rpc/protocols/ws"
_ "v.io/x/ref/profiles/internal/rpc/protocols/wsh"
@@ -40,6 +41,7 @@
const (
SettingsStreamName = "roaming"
+ SettingsStreamDesc = "pubsub stream used by the roaming profile"
)
var commonFlags *flags.Flags
@@ -74,7 +76,7 @@
// flag to configure both the protocol and address.
return []net.Addr{netstate.NewNetAddr("wsh", addr.String())}, nil
}
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, shutdown, err
}
@@ -91,7 +93,8 @@
// Create stream in Init function to avoid a race between any
// goroutines started here and consumers started after Init returns.
ch := make(chan pubsub.Setting)
- stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamName, ch)
+ // TODO(cnicolaou): use stop to shutdown this stream when the profile shutdowns.
+ stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
if err != nil {
ac.Shutdown()
return nil, nil, nil, err
@@ -113,11 +116,9 @@
cleanupCh := make(chan struct{})
watcherCh := make(chan struct{})
- listenSpec.StreamPublisher = publisher
- listenSpec.StreamName = SettingsStreamName
listenSpec.AddressChooser = internal.IPAddressChooser
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, shutdown, err
}
@@ -170,12 +171,12 @@
}
if len(removed) > 0 {
vlog.VI(2).Infof("Sending removed: %s", removed)
- ch <- rpc.NewRmAddrsSetting(removed.AsNetAddrs())
+ ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
}
// We will always send the best currently available address
if chosen, err := listenSpec.AddressChooser(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
vlog.VI(2).Infof("Sending added and chosen: %s", chosen)
- ch <- rpc.NewAddAddrsSetting(chosen)
+ ch <- irpc.NewAddAddrsSetting(chosen)
} else {
vlog.VI(2).Infof("Ignoring added %s", added)
}
diff --git a/profiles/static/.api b/profiles/static/.api
new file mode 100644
index 0000000..2d3ebb3
--- /dev/null
+++ b/profiles/static/.api
@@ -0,0 +1,2 @@
+pkg static, func Init(*context.T) (v23.Runtime, *context.T, v23.Shutdown, error)
+pkg static, func NewProxy(*context.T, rpc.ListenSpec, ...string) (func(), naming.Endpoint, error)
diff --git a/profiles/static/staticinit.go b/profiles/static/staticinit.go
index 339e32a..dc93eae 100644
--- a/profiles/static/staticinit.go
+++ b/profiles/static/staticinit.go
@@ -57,7 +57,7 @@
listenSpec.AddressChooser = func(string, []net.Addr) ([]net.Addr, error) {
return []net.Addr{addr}, nil
}
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, nil, err
}
@@ -70,7 +70,7 @@
}
listenSpec.AddressChooser = internal.IPAddressChooser
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, shutdown, err
}
diff --git a/services/device/device/doc.go b/services/device/device/doc.go
index cd1ba14..53c9a17 100644
--- a/services/device/device/doc.go
+++ b/services/device/device/doc.go
@@ -41,6 +41,9 @@
-alsologtostderr=true
log to standard error as well as files
+ -chown=false
+ Change owner of files and directories given as command-line arguments to the
+ user specified by this flag
-dryrun=false
Elides root-requiring systemcalls.
-kill=false
diff --git a/services/device/deviced/doc.go b/services/device/deviced/doc.go
index 567ed5c..3468792 100644
--- a/services/device/deviced/doc.go
+++ b/services/device/deviced/doc.go
@@ -43,6 +43,9 @@
-alsologtostderr=true
log to standard error as well as files
+ -chown=false
+ Change owner of files and directories given as command-line arguments to the
+ user specified by this flag
-dryrun=false
Elides root-requiring systemcalls.
-kill=false
diff --git a/services/device/internal/impl/app_service.go b/services/device/internal/impl/app_service.go
index 3f2b49d..066b667 100644
--- a/services/device/internal/impl/app_service.go
+++ b/services/device/internal/impl/app_service.go
@@ -766,6 +766,7 @@
saArgs.workspace = rootDir
logDir := filepath.Join(instanceDir, "logs")
+ suidHelper.chownTree(suidHelper.getCurrentUser(), instanceDir, os.Stdout, os.Stdin)
if err := mkdirPerm(logDir, 0755); err != nil {
return nil, err
}
diff --git a/services/device/internal/impl/helper_manager.go b/services/device/internal/impl/helper_manager.go
index f4369eb..280db2b 100644
--- a/services/device/internal/impl/helper_manager.go
+++ b/services/device/internal/impl/helper_manager.go
@@ -40,6 +40,10 @@
}
}
+func (s suidHelperState) getCurrentUser() string {
+ return s.dmUser
+}
+
// terminatePid sends a SIGKILL to the target pid
func (s suidHelperState) terminatePid(pid int, stdout, stderr io.Writer) error {
if err := s.internalModalOp(stdout, stderr, "--kill", strconv.Itoa(pid)); err != nil {
@@ -56,6 +60,16 @@
return nil
}
+// chown files or directories
+func (s suidHelperState) chownTree(username string, dirOrFile string, stdout, stderr io.Writer) error {
+ args := []string{"--chown", "--username", username, dirOrFile}
+
+ if err := s.internalModalOp(stdout, stderr, args...); err != nil {
+ return fmt.Errorf("devicemanager's invocation of suidhelper chown %v failed: %v", dirOrFile, err)
+ }
+ return nil
+}
+
type suidAppCmdArgs struct {
// args to helper
targetUser, progname, workspace, logdir, binpath string
@@ -116,6 +130,7 @@
if err := cmd.Run(); err != nil {
vlog.Errorf("failed calling helper with args (%v):%v", arg, err)
+ return err
}
return nil
}
diff --git a/services/device/internal/suid/args.go b/services/device/internal/suid/args.go
index 10ccfac..1473654 100644
--- a/services/device/internal/suid/args.go
+++ b/services/device/internal/suid/args.go
@@ -8,6 +8,7 @@
"bytes"
"encoding/json"
"flag"
+ "fmt"
"os"
"os/user"
"strconv"
@@ -38,6 +39,7 @@
envv []string
dryrun bool
remove bool
+ chown bool
kill bool
killPids []int
}
@@ -54,7 +56,7 @@
var (
flagUsername, flagWorkspace, flagLogDir, flagRun, flagProgName *string
flagMinimumUid *int64
- flagRemove, flagKill, flagDryrun *bool
+ flagRemove, flagKill, flagChown, flagDryrun *bool
)
func init() {
@@ -71,6 +73,7 @@
flagMinimumUid = fs.Int64("minuid", uidThreshold, "UIDs cannot be less than this number.")
flagRemove = fs.Bool("rm", false, "Remove the file trees given as command-line arguments.")
flagKill = fs.Bool("kill", false, "Kill process ids given as command-line arguments.")
+ flagChown = fs.Bool("chown", false, "Change owner of files and directories given as command-line arguments to the user specified by this flag")
flagDryrun = fs.Bool("dryrun", false, "Elides root-requiring systemcalls.")
}
@@ -84,29 +87,62 @@
return nenv
}
+// checkFlagCombinations makes sure that a valid combination of flags has been
+// specified for rm/kill/chown
+//
+// --rm and --kill are modal. Complain if any other flag is set along with one of
+// those. --chown allows specification of --username, --dryrun, and --minuid,
+// but nothing else
+func checkFlagCombinations(fs *flag.FlagSet) error {
+ if !(*flagRemove || *flagKill || *flagChown) {
+ return nil
+ }
+
+ // Count flags that are set. The device manager test always sets --minuid=1
+ // and --test.run=TestSuidHelper so when in a test, tolerate those.
+ flagsToIgnore := map[string]string{}
+ if os.Getenv("V23_SUIDHELPER_TEST") != "" {
+ flagsToIgnore["minuid"] = "1"
+ flagsToIgnore["test.run"] = "TestSuidHelper"
+ }
+ if *flagChown {
+ // Allow all values of --username, --dryrun, and --minuid
+ flagsToIgnore["username"] = "*"
+ flagsToIgnore["dryrun"] = "*"
+ flagsToIgnore["minuid"] = "*"
+ }
+
+ counter := 0
+ fs.Visit(func(f *flag.Flag) {
+ if flagsToIgnore[f.Name] != f.Value.String() && flagsToIgnore[f.Name] != "*" {
+ counter++
+ }
+ })
+
+ if counter > 1 {
+ return verror.New(errInvalidFlags, nil, counter, "--rm and --kill cannot be used with any other flag. --chown can only be used with --username, --dryrun, and --minuid")
+ }
+ return nil
+}
+
+// warnMissingSuidPrivs makes it a little easier to debug when suid privs are required but
+// are not present. It's not a comprehensive check -- e.g. we may be running as user
+// <username> and suppress the warning, but still fail to chown a file owned by some other user.
+func warnMissingSuidPrivs(uid int) {
+ osUid, osEuid := os.Getuid(), os.Geteuid()
+ if osUid == 0 || osEuid == 0 || osUid == uid || osEuid == uid {
+ return
+ }
+
+ fmt.Fprintln(os.Stderr, "uid is ", os.Getuid(), ", effective uid is ", os.Geteuid())
+ fmt.Fprintln(os.Stderr, "WARNING: suidhelper is not root. Is your filesystem mounted with nosuid?")
+}
+
// ParseArguments populates the WorkParameter object from the provided args
// and env strings.
func (wp *WorkParameters) ProcessArguments(fs *flag.FlagSet, env []string) error {
- // --rm and --kill are modal. Complain if any other flag is set along with one of those.
- if *flagRemove || *flagKill {
- // Count flags that are set. The device manager test always sets --minuid=1
- // and --test.run=TestSuidHelper so when in a test, tolerate those
- flagsToIgnore := map[string]string{}
- if os.Getenv("V23_SUIDHELPER_TEST") != "" {
- flagsToIgnore["minuid"] = "1"
- flagsToIgnore["test.run"] = "TestSuidHelper"
- }
-
- counter := 0
- fs.Visit(func(f *flag.Flag) {
- if flagsToIgnore[f.Name] != f.Value.String() {
- counter++
- }
- })
-
- if counter > 1 {
- return verror.New(errInvalidFlags, nil, counter, "--rm and --kill cannot be used with any other flag")
- }
+ if err := checkFlagCombinations(fs); err != nil {
+ return err
}
if *flagRemove {
@@ -146,15 +182,25 @@
if err != nil {
return verror.New(errInvalidGID, nil, usr.Gid)
}
+ warnMissingSuidPrivs(int(uid))
// Uids less than 501 can be special so we forbid running as them.
if uid < *flagMinimumUid {
return verror.New(errUIDTooLow, nil,
uid, *flagMinimumUid)
}
+ wp.uid = int(uid)
+ wp.gid = int(gid)
wp.dryrun = *flagDryrun
+ // At this point, all flags allowed by --chown have been processed
+ if *flagChown {
+ wp.chown = true
+ wp.argv = fs.Args()
+ return nil
+ }
+
// Preserve the arguments for examination by the test harness if executed
// in the course of a test.
if os.Getenv("V23_SUIDHELPER_TEST") != "" {
@@ -171,8 +217,6 @@
wp.dryrun = true
}
- wp.uid = int(uid)
- wp.gid = int(gid)
wp.workspace = *flagWorkspace
wp.argv0 = *flagRun
wp.logDir = *flagLogDir
diff --git a/services/device/internal/suid/args_test.go b/services/device/internal/suid/args_test.go
index 092c1ab..491a171 100644
--- a/services/device/internal/suid/args_test.go
+++ b/services/device/internal/suid/args_test.go
@@ -41,6 +41,7 @@
envv: []string{"A=B"},
dryrun: false,
remove: false,
+ chown: false,
kill: false,
killPids: nil,
},
@@ -61,6 +62,7 @@
envv: []string{"A=B"},
dryrun: false,
remove: false,
+ chown: false,
kill: false,
killPids: nil,
},
@@ -87,6 +89,27 @@
envv: nil,
dryrun: false,
remove: true,
+ chown: false,
+ kill: false,
+ killPids: nil,
+ },
+ },
+
+ {
+ []string{"setuidhelper", "--chown", "--username", testUserName, "--dryrun", "--minuid", "1", "/tmp/foo", "/tmp/bar"},
+ []string{"A=B"},
+ "",
+ WorkParameters{
+ uid: testUid,
+ gid: testGid,
+ workspace: "",
+ logDir: "",
+ argv0: "",
+ argv: []string{"/tmp/foo", "/tmp/bar"},
+ envv: nil,
+ dryrun: true,
+ remove: false,
+ chown: true,
kill: false,
killPids: nil,
},
@@ -106,6 +129,7 @@
envv: nil,
dryrun: false,
remove: false,
+ chown: false,
kill: true,
killPids: []int{235, 451},
},
@@ -125,6 +149,7 @@
envv: nil,
dryrun: false,
remove: false,
+ chown: false,
kill: true,
killPids: nil,
},
@@ -145,6 +170,7 @@
envv: []string{"A=B"},
dryrun: true,
remove: false,
+ chown: false,
kill: false,
killPids: nil,
},
diff --git a/services/device/internal/suid/system.go b/services/device/internal/suid/system.go
index 6aad84b..285c4e3 100644
--- a/services/device/internal/suid/system.go
+++ b/services/device/internal/suid/system.go
@@ -39,8 +39,15 @@
return os.Chown(path, hw.uid, hw.gid)
}
- for _, p := range []string{hw.workspace, hw.logDir} {
+ chownPaths := hw.argv
+ if !hw.chown {
+ // Chown was invoked as part of regular suid execution, rather than directly
+ // via --chown. In that case, we chown the workspace and log directory
// TODO(rjkroege): Ensure that the device manager can read log entries.
+ chownPaths = []string{hw.workspace, hw.logDir}
+ }
+
+ for _, p := range chownPaths {
if err := filepath.Walk(p, chown); err != nil {
return verror.New(errChownFailed, nil, p, hw.uid, hw.gid, err)
}