blob: 48bcd8838f8bc7cc24357563e685369f9fc5a965 [file] [log] [blame]
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -08001package ipc
2
3import (
4 "fmt"
5 "net"
6 "strings"
7
8 "veyron.io/veyron/veyron2/naming"
9 "veyron.io/veyron/veyron2/vlog"
10
11 "veyron.io/veyron/veyron/lib/netstate"
12 "veyron.io/veyron/veyron/runtimes/google/ipc/version"
13 inaming "veyron.io/veyron/veyron/runtimes/google/naming"
14)
15
16type errorAccumulator struct {
17 errs []error
18}
19
20func (e *errorAccumulator) add(err error) {
21 e.errs = append(e.errs, err)
22}
23
24func (e *errorAccumulator) failed() bool {
25 return len(e.errs) > 0
26}
27
28func (e *errorAccumulator) String() string {
29 r := ""
30 for _, err := range e.errs {
31 r += fmt.Sprintf("(%s)", err)
32 }
33 return r
34}
35
36// TODO(cnicolaou): simplify this code, especially the use of maps+slices
37// and special cases.
38
39func newErrorAccumulator() *errorAccumulator {
40 return &errorAccumulator{errs: make([]error, 0, 4)}
41}
42
43type serverEndpoint struct {
44 iep *inaming.Endpoint
45 suffix string
46}
47
48func (se *serverEndpoint) String() string {
49 return fmt.Sprintf("(%s, %q)", se.iep, se.suffix)
50}
51
52func filterCompatibleEndpoints(errs *errorAccumulator, servers []string) []*serverEndpoint {
53 se := make([]*serverEndpoint, 0, len(servers))
54 for _, server := range servers {
55 name := server
56 address, suffix := naming.SplitAddressName(name)
57 if len(address) == 0 {
58 errs.add(fmt.Errorf("%q is not a rooted name", name))
59 continue
60 }
61 iep, err := inaming.NewEndpoint(address)
62 if err != nil {
63 errs.add(fmt.Errorf("%q: %s", name, err))
64 continue
65 }
66 if err = version.CheckCompatibility(iep); err != nil {
67 errs.add(fmt.Errorf("%q: %s", name, err))
68 continue
69 }
70 sep := &serverEndpoint{iep, suffix}
71 se = append(se, sep)
72 }
73 return se
74}
75
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -080076// sortByProtocols sorts the supplied slice of serverEndpoints into a hash
77// map keyed by the protocol of each of those endpoints where that protocol
78// is listed in the protocols parameter and keyed by '*' if not so listed.
79func sortByProtocol(eps []*serverEndpoint, protocols []string) (bool, map[string][]*serverEndpoint) {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080080 byProtocol := make(map[string][]*serverEndpoint)
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -080081 matched := false
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080082 for _, ep := range eps {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080083 found := false
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -080084 for _, p := range protocols {
85 if ep.iep.Protocol == p || (p == "tcp" && strings.HasPrefix(ep.iep.Protocol, "tcp")) {
86 byProtocol[p] = append(byProtocol[p], ep)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080087 found = true
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -080088 matched = true
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080089 break
90 }
91 }
92 if !found {
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -080093 byProtocol["*"] = append(byProtocol["*"], ep)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080094 }
95 }
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -080096 return matched, byProtocol
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -080097}
98
99func orderByLocality(ifcs netstate.AddrList, eps []*serverEndpoint) []*serverEndpoint {
100 if len(ifcs) <= 1 {
101 return append([]*serverEndpoint{}, eps...)
102 }
103 ipnets := make([]*net.IPNet, 0, len(ifcs))
104 for _, a := range ifcs {
105 // Try IP
106 _, ipnet, err := net.ParseCIDR(a.Address().String())
107 if err != nil {
108 continue
109 }
110 ipnets = append(ipnets, ipnet)
111 }
112 if len(ipnets) == 0 {
113 return eps
114 }
115 // TODO(cnicolaou): this can obviously be made more efficient...
116 local := make([]*serverEndpoint, 0, len(eps))
117 remote := make([]*serverEndpoint, 0, len(eps))
118 notip := make([]*serverEndpoint, 0, len(eps))
119 for _, ep := range eps {
120 if strings.HasPrefix(ep.iep.Protocol, "tcp") || strings.HasPrefix(ep.iep.Protocol, "ws") {
121 // Take care to use the Address directly, since the network
122 // may be marked as a 'websocket'. This throws out any thought
123 // of dealing with IPv6 etc and web sockets.
124 host, _, err := net.SplitHostPort(ep.iep.Address)
125 if err != nil {
126 host = ep.iep.Address
127 }
128 ip := net.ParseIP(host)
129 if ip == nil {
130 notip = append(notip, ep)
131 continue
132 }
133 found := false
134 for _, ipnet := range ipnets {
135 if ipnet.Contains(ip) {
136 local = append(local, ep)
137 found = true
138 break
139 }
140 }
141 if !found {
142 remote = append(remote, ep)
143 }
144 } else {
145 notip = append(notip, ep)
146 }
147 }
148 return append(local, append(remote, notip...)...)
149}
150
151func slice(eps []*serverEndpoint) []string {
152 r := make([]string, len(eps))
153 for i, a := range eps {
154 r[i] = naming.JoinAddressName(a.iep.String(), a.suffix)
155 }
156 return r
157}
158
159func sliceByProtocol(eps map[string][]*serverEndpoint, protocols []string) []string {
160 r := make([]string, 0, 10)
161 for _, p := range protocols {
162 r = append(r, slice(eps[p])...)
163 }
164 return r
165}
166
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -0800167var defaultPreferredProtocolOrder = []string{"unixfd", "tcp4", "tcp", "*"}
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800168
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -0800169// filterAndOrderServers returns a set of servers that are compatible with
170// the current client in order of 'preference' specified by the supplied
171// protocols and a notion of 'locality' according to the supplied protocol
172// list as follows:
173// - if the protocol parameter is non-empty, then only servers matching those
174// protocols are returned and the endpoints are ordered first by protocol
175// and then by locality within each protocol. If tcp4 and unixfd are requested
176// for example then only protocols that match tcp4 and unixfd will returned
177// with the tcp4 ones preceeding the unixfd ones.
178// - if the protocol parameter is empty, then a default protocol ordering
179// will be used, but unlike the previous case, any servers that don't support
180// these protocols will be returned also, but following the default
181// preferences.
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800182func filterAndOrderServers(servers []string, protocols []string) ([]string, error) {
183 errs := newErrorAccumulator()
184 vlog.VI(3).Infof("Candidates[%v]: %v", protocols, servers)
Cosmos Nicolaou28f35c32014-12-01 20:36:27 -0800185
186 // TODO(cnicolaou): ideally we should filter out unsupported protocols
187 // here - e.g. no point dialing on a ws protocol if it's not registered
188 // etc.
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800189 compatible := filterCompatibleEndpoints(errs, servers)
190 if len(compatible) == 0 {
191 return nil, fmt.Errorf("failed to find any compatible servers: %s", errs)
192 }
193 vlog.VI(3).Infof("Version Compatible: %v", compatible)
194
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -0800195 defaultOrdering := len(protocols) == 0
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800196 preferredProtocolOrder := defaultPreferredProtocolOrder
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -0800197 if !defaultOrdering {
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800198 preferredProtocolOrder = protocols
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800199 }
200
Cosmos Nicolaou28f35c32014-12-01 20:36:27 -0800201 // put the server endpoints into per-protocol lists
202 matched, byProtocol := sortByProtocol(compatible, preferredProtocolOrder)
203 if !defaultOrdering && !matched {
204 return nil, fmt.Errorf("failed to find any servers compatible with %v from %s", protocols, servers)
205 }
206
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800207 vlog.VI(3).Infof("Have Protocols(%v): %v", protocols, byProtocol)
208
209 networks, err := netstate.GetAll()
210 if err != nil {
Cosmos Nicolaouba6c68b2014-11-18 22:13:16 -0800211 // return whatever we have now, just not sorted by locality.
212 return sliceByProtocol(byProtocol, preferredProtocolOrder), nil
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800213 }
214
215 ordered := make([]*serverEndpoint, 0, len(byProtocol))
216 for _, protocol := range preferredProtocolOrder {
217 o := orderByLocality(networks, byProtocol[protocol])
Cosmos Nicolaou28f35c32014-12-01 20:36:27 -0800218 vlog.VI(3).Infof("Protocol: %q ordered by locality: %v", protocol, o)
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800219 ordered = append(ordered, o...)
220 }
221
Cosmos Nicolaou4e8da642014-11-13 08:32:05 -0800222 vlog.VI(2).Infof("Ordered By Locality: %v", ordered)
223 return slice(ordered), nil
224}