blob: 9df19d0fd6753a395d7f39fc4a2f6fba42cf31c9 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package vsync
2
3// Package vsync provides veyron sync daemon utility functions. Sync
4// daemon serves incoming GetDeltas requests and contacts other peers
5// to get deltas from them. When it receives a GetDeltas request, the
6// incoming generation vector is diffed with the local generation
7// vector, and missing generations are sent back. When it receives
8// log records in response to a GetDeltas request, it replays those
9// log records to get in sync with the sender.
10import (
Raja Daoud4b5b00f2014-06-05 15:02:53 -070011 "fmt"
Himabindu Puchaa7b4df42014-07-10 15:06:51 -070012 "strings"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070013 "sync"
Raja Daoudc861ab42014-05-20 14:32:54 -070014 "time"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070015
Tilak Sharma2af70022014-05-14 11:29:27 -070016 "veyron/services/store/raw"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070017
18 "veyron2/ipc"
Tilak Sharma79cfb412014-05-27 18:34:57 -070019 "veyron2/naming"
Himabindu Puchaa7b4df42014-07-10 15:06:51 -070020 "veyron2/security"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070021 "veyron2/storage"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070022 "veyron2/vlog"
23 "veyron2/vom"
Tilak Sharma96eb25a2014-05-21 18:52:55 -070024
25 _ "veyron/services/store/typeregistryhack"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070026)
27
Jiri Simsa5293dcb2014-05-10 09:56:38 -070028// syncd contains the metadata for the sync daemon.
29type syncd struct {
30 // Pointers to metadata structures.
31 log *iLog
32 devtab *devTable
33 dag *dag
34
35 // Local device id.
36 id DeviceID
37
Jiri Simsa5293dcb2014-05-10 09:56:38 -070038 // RWlock to concurrently access log and device table data structures.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070039 lock sync.RWMutex
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070040 // State to coordinate shutting down all spawned goroutines.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070041 pending sync.WaitGroup
42 closed chan struct{}
43
44 // Local Veyron store.
Tilak Sharma79cfb412014-05-27 18:34:57 -070045 storeEndpoint string
46 store raw.Store
Jiri Simsa5293dcb2014-05-10 09:56:38 -070047
48 // Handlers for goroutine procedures.
Himabindu Pucha38ddeb32014-05-12 10:57:22 -070049 hdlGC *syncGC
50 hdlWatcher *syncWatcher
51 hdlInitiator *syncInitiator
Jiri Simsa5293dcb2014-05-10 09:56:38 -070052}
53
Himabindu Puchaa7b4df42014-07-10 15:06:51 -070054type syncDispatcher struct {
55 server ipc.Invoker
56 auth security.Authorizer
57}
58
59// NewSyncDispatcher returns an object dispatcher.
60func NewSyncDispatcher(s interface{}, auth security.Authorizer) ipc.Dispatcher {
61 return &syncDispatcher{ipc.ReflectInvoker(s), auth}
62}
63
Cosmos Nicolaou8bfacf22014-08-19 11:19:36 -070064func (d *syncDispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
Himabindu Puchaa7b4df42014-07-10 15:06:51 -070065 if strings.HasSuffix(suffix, "sync") {
66 return d.server, d.auth, nil
67 }
68 return nil, nil, fmt.Errorf("Lookup:: failed on suffix: %s", suffix)
69}
70
Jiri Simsa5293dcb2014-05-10 09:56:38 -070071// NewSyncd creates a new syncd instance.
72//
73// Syncd concurrency: syncd initializes three goroutines at
74// startup. The "watcher" thread is responsible for watching the store
75// for changes to its objects. The "initiator" thread is responsible
76// for periodically checking the neighborhood and contacting a peer to
77// obtain changes from that peer. The "gc" thread is responsible for
78// periodically checking if any log records and dag state can be
79// pruned. All these 3 threads perform write operations to the data
80// structures, and synchronize by acquiring a write lock on s.lock. In
81// addition, when syncd receives an incoming RPC, it responds to the
82// request by acquiring a read lock on s.lock. Thus, at any instant in
83// time, either one of the watcher, initiator or gc threads is active,
84// or any number of responders can be active, serving incoming
85// requests. Fairness between these threads follows from
86// sync.RWMutex. The spec says that the writers cannot be starved by
87// the readers but it does not guarantee FIFO. We may have to revisit
88// this in the future.
Tilak Sharma79cfb412014-05-27 18:34:57 -070089func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint string, syncTick time.Duration) *syncd {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070090 // Connect to the local Veyron store.
91 // At present this is optional to allow testing (from the command-line) w/o Veyron store running.
92 // TODO: connecting to Veyron store should be mandatory.
Tilak Sharma79cfb412014-05-27 18:34:57 -070093 var store raw.Store
94 if storeEndpoint != "" {
95 var err error
96 store, err = raw.BindStore(naming.JoinAddressName(storeEndpoint, raw.RawStoreSuffix))
Jiri Simsa5293dcb2014-05-10 09:56:38 -070097 if err != nil {
Tilak Sharma79cfb412014-05-27 18:34:57 -070098 vlog.Fatalf("NewSyncd: cannot connect to Veyron store endpoint (%s): %s", storeEndpoint, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070099 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700100 }
101
Tilak Sharma79cfb412014-05-27 18:34:57 -0700102 return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint, store, syncTick)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700103}
104
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700105// newSyncdCore is the internal function that creates the Syncd
106// structure and initilizes its thread (goroutines). It takes a
107// Veyron Store parameter to separate the core of Syncd setup from the
108// external dependency on Veyron Store.
Tilak Sharma79cfb412014-05-27 18:34:57 -0700109func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint string,
110 store raw.Store, syncTick time.Duration) *syncd {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700111 s := &syncd{}
112
113 // Bootstrap my own DeviceID.
114 s.id = DeviceID(devid)
115
116 var err error
117 // Log init.
118 if s.log, err = openILog(storePath+"/ilog", s); err != nil {
119 vlog.Fatalf("newSyncd: ILogInit failed: err %v", err)
120 }
121
122 // DevTable init.
123 if s.devtab, err = openDevTable(storePath+"/dtab", s); err != nil {
124 vlog.Fatalf("newSyncd: DevTableInit failed: err %v", err)
125 }
126
127 // Dag Init.
128 if s.dag, err = openDAG(storePath + "/dag"); err != nil {
129 vlog.Fatalf("newSyncd: OpenDag failed: err %v", err)
130 }
131
132 // Veyron Store.
Tilak Sharma79cfb412014-05-27 18:34:57 -0700133 s.storeEndpoint = storeEndpoint
134 s.store = store
135 vlog.VI(1).Infof("newSyncd: Local Veyron store: %s", s.storeEndpoint)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700136
137 // Register these Watch data types with VOM.
138 // TODO(tilaks): why aren't they auto-retrieved from the IDL?
Tilak Sharma2af70022014-05-14 11:29:27 -0700139 vom.Register(&raw.Mutation{})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700140 vom.Register(&storage.DEntry{})
141
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700142 // Channel to propagate close event to all threads.
143 s.closed = make(chan struct{})
144
145 s.pending.Add(3)
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700146
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700147 // Get deltas every peerSyncInterval.
Raja Daoudc861ab42014-05-20 14:32:54 -0700148 s.hdlInitiator = newInitiator(s, peerEndpoints, peerDeviceIDs, syncTick)
Himabindu Pucha38ddeb32014-05-12 10:57:22 -0700149 go s.hdlInitiator.contactPeers()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700150
151 // Garbage collect every garbageCollectInterval.
152 s.hdlGC = newGC(s)
153 go s.hdlGC.garbageCollect()
154
155 // Start a watcher thread that will get updates from local store.
156 s.hdlWatcher = newWatcher(s)
157 go s.hdlWatcher.watchStore()
158
159 return s
160}
161
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700162// Close cleans up syncd state.
163func (s *syncd) Close() {
164 close(s.closed)
165 s.pending.Wait()
166
167 // TODO(hpucha): close without flushing.
168}
169
170// isSyncClosing returns true if Close() was called i.e. the "closed" channel is closed.
171func (s *syncd) isSyncClosing() bool {
172 select {
173 case <-s.closed:
174 return true
175 default:
176 return false
177 }
178}
179
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700180// GetDeltas responds to the incoming request from a client by sending missing generations to the client.
Matt Rosencrantzf5afcaf2014-06-02 11:31:22 -0700181func (s *syncd) GetDeltas(_ ipc.ServerContext, In GenVector, ClientID DeviceID, Stream SyncServiceGetDeltasStream) (GenVector, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700182 vlog.VI(1).Infof("GetDeltas:: Received vector %v from client %s", In, ClientID)
183
Raja Daoud4b5b00f2014-06-05 15:02:53 -0700184 // Handle misconfiguration: the client cannot have the same ID as me.
185 if ClientID == s.id {
186 vlog.VI(1).Infof("GetDeltas:: impostor alert: client ID %s is the same as mine %s", ClientID, s.id)
187 return GenVector{}, fmt.Errorf("impostor: you cannot be %s, for I am %s", ClientID, s.id)
188 }
189
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700190 if err := s.updateDeviceInfo(ClientID, In); err != nil {
191 vlog.Fatalf("GetDeltas:: updateDeviceInfo failed with err %v", err)
192 }
193
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700194 out, gens, gensInfo, err := s.prepareGensToReply(In)
195 if err != nil {
196 vlog.Fatalf("GetDeltas:: prepareGensToReply failed with err %v", err)
197 }
198
199 for pos, v := range gens {
200 gen := gensInfo[pos]
201 var count uint64
202 for i := LSN(0); i <= gen.MaxLSN; i++ {
203 count++
204 rec, err := s.getLogRec(v.devID, v.genID, i)
205 if err != nil {
206 vlog.Fatalf("GetDeltas:: Couldn't get log record %s %d %d, err %v",
207 v.devID, v.genID, i, err)
208 }
209 vlog.VI(1).Infof("Sending log record %v", rec)
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700210 if err := Stream.SendStream().Send(*rec); err != nil {
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700211 vlog.Errorf("GetDeltas:: Couldn't send stream err: %v", err)
212 return GenVector{}, err
213 }
214 }
215 if count != gen.Count {
216 vlog.Fatalf("GetDeltas:: GenMetadata has incorrect log records for generation %s %d %v",
217 v.devID, v.genID, gen)
218 }
219 }
220 return out, nil
221}
222
223// updateDeviceInfo updates the remote device's information based on
224// the incoming GetDeltas request.
225func (s *syncd) updateDeviceInfo(ClientID DeviceID, In GenVector) error {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700226 s.lock.Lock()
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700227 defer s.lock.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700228
229 // Note that the incoming client generation vector cannot be
230 // used for garbage collection. We can only garbage collect
231 // based on the generations we receive from other
232 // devices. Receiving a set of generations assures that all
233 // updates branching from those generations are also received
234 // and hence generations present on all devices can be
235 // GC'ed. This function sends generations to other devices and
236 // hence does not use the generation vector for GC.
237 //
238 // TODO(hpucha): Cache the client's incoming generation vector
239 // to assist in tracking missing generations and hence next
240 // peer to contact.
241 if !s.devtab.hasDevInfo(ClientID) {
242 if err := s.devtab.addDevice(ClientID); err != nil {
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700243 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700244 }
245 }
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700246 return nil
247}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700248
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700249// prepareGensToReply processes the incoming generation vector and
250// returns the metadata of all the missing generations between the
251// incoming and the local generation vector.
252func (s *syncd) prepareGensToReply(In GenVector) (GenVector, []*genOrder, []*genMetadata, error) {
253 s.lock.RLock()
254 defer s.lock.RUnlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700255
256 // Get local generation vector.
257 out, err := s.devtab.getGenVec(s.id)
258 if err != nil {
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700259 return GenVector{}, nil, nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700260 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700261
262 // Diff the two generation vectors.
263 gens, err := s.devtab.diffGenVectors(out, In)
264 if err != nil {
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700265 return GenVector{}, nil, nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700266 }
267
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700268 // Get the metadata for all the generations in the reply.
269 gensInfo := make([]*genMetadata, len(gens))
270 for pos, v := range gens {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700271 gen, err := s.log.getGenMetadata(v.devID, v.genID)
272 if err != nil || gen.Count <= 0 {
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700273 return GenVector{}, nil, nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700274 }
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700275 gensInfo[pos] = gen
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700276 }
277
Himabindu Pucha8b746a32014-05-21 12:35:05 -0700278 return out, gens, gensInfo, nil
279}
280
281// getLogRec gets the log record for a given generation and lsn.
282func (s *syncd) getLogRec(dev DeviceID, gen GenID, lsn LSN) (*LogRec, error) {
283 s.lock.RLock()
284 defer s.lock.RUnlock()
285 return s.log.getLogRec(dev, gen, lsn)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700286}