blob: 4a2fe7120832eae2cabd64249cdd18ef20e834ca [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package vsync
2
3// Package vsync provides veyron sync DevTable utility functions.
4// DevTable is indexed by the device id and stores device level
5// information needed by sync. Main component of a device's info is
6// its generation vector. Generation vector is the version vector for
7// a device's store, representing all the different generations (from
8// different devices) seen by a given device. A generation represents
9// a collection of updates that originated on a device during an
10// interval of time. It serves as a checkpoint when communicating with
11// other devices. Generations do not overlap and all updates belong to
12// a generation.
13//
14// Synchronization between two devices A and B uses generation vectors
15// as follows:
16// A B
17// <== B's generation vector
18// diff(A's generation vector, B's generation vector)
19// log records of missing generations ==>
20// cache B's generation vector (for space reclamation)
21//
22// Implementation notes: DevTable is stored in a persistent K/V
23// database in the current implementation. Generation vector is
24// implemented as a map of (Device ID -> Generation ID), one entry for
25// every known device. If the generation vector contains an entry
26// (Device ID -> Generation ID), it implies that the device has
27// learned of all the generations until and including Generation
28// ID. Generation IDs start from 1. A generation ID of 0 is a
29// reserved boot strap value, and indicates the device has no updates.
30import (
31 "errors"
32 "sort"
33 "time"
34
35 "veyron2/vlog"
36)
37
38var (
39 errInvalidDTab = errors.New("invalid devtable db")
40)
41
42// devInfo is the information stored per device.
43type devInfo struct {
44 Vector GenVector // device generation vector.
45 Ts time.Time // last communication time stamp.
46}
47
48// devTableHeader contains the header metadata.
49type devTableHeader struct {
50 Resmark []byte // resume marker for watch.
51 // Generation vector for space reclamation. All generations
52 // less than this generation vector are deleted from storage.
53 ReclaimVec GenVector
54}
55
56// devTable contains the metadata for the device table db.
57type devTable struct {
58 fname string // file pathname.
59 db *kvdb // underlying K/V DB.
60 devices *kvtable // pointer to the "devices" table in the kvdb. Contains device info.
61
62 // Key:"Head" Value:devTableHeader
63 header *kvtable // pointer to the "header" table in the kvdb. Contains device table header.
64 head *devTableHeader // devTable head cached in memory.
65
66 s *syncd // pointer to the sync daemon object.
67}
68
69// genOrder represents a generation along with its position in the log.
70type genOrder struct {
71 devID DeviceID
72 genID GenID
73 order uint32
74}
75
76// byOrder is used to sort the genOrder array.
77type byOrder []*genOrder
78
79func (a byOrder) Len() int {
80 return len(a)
81}
82
83func (a byOrder) Swap(i, j int) {
84 a[i], a[j] = a[j], a[i]
85}
86
87func (a byOrder) Less(i, j int) bool {
88 return a[i].order < a[j].order
89}
90
91// openDevTable opens or creates a devTable for the given filename.
92func openDevTable(filename string, sin *syncd) (*devTable, error) {
93 dtab := &devTable{
94 fname: filename,
95 s: sin,
96 }
97 // Open the file and create it if it does not exist.
98 // Also initialize the kvdb and its collection.
99 db, tbls, err := kvdbOpen(filename, []string{"devices", "header"})
100 if err != nil {
101 return nil, err
102 }
103
104 dtab.db = db
105 dtab.devices = tbls[0]
106 dtab.header = tbls[1]
107
108 // Initialize local gen vector with own device id and
109 // generation id of 0 if local vector doesn't exist.
110 if !dtab.hasDevInfo(dtab.s.id) {
111 vector := GenVector{
112 dtab.s.id: 0,
113 }
114 if err := dtab.putGenVec(dtab.s.id, vector); err != nil {
115 dtab.db.close() // this also closes the tables.
116 return nil, err
117 }
118 }
119
120 // Initialize the devTable header.
121 dtab.head = &devTableHeader{
122 ReclaimVec: GenVector{
123 dtab.s.id: 0,
124 },
125 }
126 // If header already exists in db, read it back from db.
127 if dtab.hasHead() {
128 if err := dtab.getHead(); err != nil {
129 dtab.db.close() // this also closes the tables.
130 return nil, err
131 }
132 }
133
134 return dtab, nil
135}
136
137// close closes the devTable and invalidates its struct.
138func (dt *devTable) close() error {
139 if dt.db == nil {
140 return errInvalidDTab
141 }
142 // Flush the dirty data.
143 if err := dt.flush(); err != nil {
144 return err
145 }
146 dt.db.close() // this also closes the tables.
147
148 *dt = devTable{} // zero out the devTable struct.
149 return nil
150}
151
152// flush flushes the devTable db to storage.
153func (dt *devTable) flush() error {
154 if dt.db == nil {
155 return errInvalidDTab
156 }
157 // Set the head from memory before flushing.
158 if err := dt.putHead(); err != nil {
159 return err
160 }
161 dt.db.flush()
162 return nil
163}
164
165// compact compacts the file associated with kvdb.
166func (dt *devTable) compact() error {
167 if dt.db == nil {
168 return errInvalidDTab
169 }
170 db, tbls, err := dt.db.compact(dt.fname, []string{"devices", "header"})
171 if err != nil {
172 return err
173 }
174 dt.db = db
175 dt.devices = tbls[0]
176 dt.header = tbls[1]
177 return nil
178}
179
180// putHead puts the devTable head into the devTable db.
181func (dt *devTable) putHead() error {
182 return dt.header.set("Head", dt.head)
183}
184
185// getHead gets the devTable head from the devTable db.
186func (dt *devTable) getHead() error {
187 if dt.head == nil {
188 return errors.New("nil devTable header")
189 }
190 return dt.header.get("Head", dt.head)
191}
192
193// hasHead returns true if the devTable db has a devTable head.
194func (dt *devTable) hasHead() bool {
195 return dt.header.hasKey("Head")
196}
197
198// putDevInfo puts a devInfo struct in the devTable db.
199func (dt *devTable) putDevInfo(devid DeviceID, info *devInfo) error {
200 if dt.db == nil {
201 return errInvalidDTab
202 }
203 return dt.devices.set(string(devid), info)
204}
205
206// getDevInfo gets a devInfo struct from the devTable db.
207func (dt *devTable) getDevInfo(devid DeviceID) (*devInfo, error) {
208 if dt.db == nil {
209 return nil, errInvalidDTab
210 }
211 var info devInfo
212 if err := dt.devices.get(string(devid), &info); err != nil {
213 return nil, err
214 }
215 if info.Vector == nil {
216 return nil, errors.New("nil genvector")
217 }
218 return &info, nil
219}
220
221// hasDevInfo returns true if the device (devid) has any devInfo in the devTable db.
222func (dt *devTable) hasDevInfo(devid DeviceID) bool {
223 if dt.db == nil {
224 return false
225 }
226 return dt.devices.hasKey(string(devid))
227}
228
229// putGenVec puts a generation vector in the devTable db.
230func (dt *devTable) putGenVec(devid DeviceID, v GenVector) error {
231 if dt.db == nil {
232 return errInvalidDTab
233 }
234 var info *devInfo
235 if dt.hasDevInfo(devid) {
236 var err error
237 if info, err = dt.getDevInfo(devid); err != nil {
238 return err
239 }
240 info.Vector = v
241 } else {
242 info = &devInfo{
243 Vector: v,
244 Ts: time.Now().UTC(),
245 }
246 }
247 return dt.putDevInfo(devid, info)
248}
249
250// getGenVec gets a generation vector from the devTable db.
251func (dt *devTable) getGenVec(devid DeviceID) (GenVector, error) {
252 if dt.db == nil {
253 return nil, errInvalidDTab
254 }
255 info, err := dt.getDevInfo(devid)
256 if err != nil {
257 return nil, err
258 }
259 return info.Vector, nil
260}
261
262// populateGenOrderEntry populates a genOrder entry.
263func (dt *devTable) populateGenOrderEntry(e *genOrder, id DeviceID, gnum GenID) error {
264 e.devID = id
265 e.genID = gnum
266
267 o, err := dt.s.log.getGenMetadata(id, gnum)
268 if err != nil {
269 return err
270 }
271 e.order = o.Pos
272 return nil
273}
274
275// updateGeneration updates a single generation (upID, upGen) in a device's generation vector.
276func (dt *devTable) updateGeneration(key, upID DeviceID, upGen GenID) error {
277 if dt.db == nil {
278 return errInvalidDTab
279 }
280 info, err := dt.getDevInfo(key)
281 if err != nil {
282 return err
283 }
284
285 info.Vector[upID] = upGen
286
287 return dt.putDevInfo(key, info)
288}
289
290// updateLocalGenVector updates local generation vector based on the remote generation vector.
291func (dt *devTable) updateLocalGenVector(local, remote GenVector) error {
292 if dt.db == nil {
293 return errInvalidDTab
294 }
295 if local == nil || remote == nil {
296 return errors.New("invalid input args to function")
297 }
298 for rid, rgen := range remote {
299 lgen, ok := local[rid]
300 if !ok || lgen < rgen {
301 local[rid] = rgen
302 }
303 }
304 return nil
305}
306
307// diffGenVectors diffs generation vectors belonging to src and dest
308// and returns the generations known to src and not known to dest. In
309// addition, sync needs to maintain the order in which device
310// generations are created/received. Hence, when two generation
311// vectors are diffed, the differing generations are returned in a
312// sorted order based on their position in the src's log. genOrder
313// array consists of every generation that is missing between src and
314// dest sorted using its position in the src's log.
315// Example: Generation vector for device A (src) AVec = {A:10, B:5, C:1}
316// Generation vector for device B (dest) BVec = {A:5, B:10, D:2}
317// Missing generations in unsorted order: {A:6, A:7, A:8, A:9, A:10,
318// C:1} TODO(hpucha): Revisit for the case of a lot of generations to
319// send back (say during bootstrap).
320func (dt *devTable) diffGenVectors(srcVec, destVec GenVector) ([]*genOrder, error) {
321 if dt.db == nil {
322 return nil, errInvalidDTab
323 }
324
325 // Create an array for the generations that need to be returned.
326 var gens []*genOrder
327
328 // Compute missing generations for devices that are in destination and source vector.
329 for devid, genid := range destVec {
330 srcGenID, ok := srcVec[devid]
331 // Skip since src doesn't know of this device.
332 if !ok {
333 continue
334 }
335 // Need to include all generations in the interval [genid+1, srcGenID],
336 // genid+1 and srcGenID inclusive.
337 // Check against reclaimVec to see if required generations are already GCed.
338 // Starting gen is then max(oldGen, genid+1)
339 startGen := genid + 1
340 oldGen := dt.getOldestGen(devid) + 1
341 if startGen < oldGen {
342 vlog.VI(1).Infof("diffGenVectors:: Adjusting starting generations from %d to %d",
343 startGen, oldGen)
344 startGen = oldGen
345 }
346 for i := startGen; i <= srcGenID; i++ {
347 // Populate the genorder entry.
348 var entry genOrder
349 if err := dt.populateGenOrderEntry(&entry, devid, i); err != nil {
350 return nil, err
351 }
352 gens = append(gens, &entry)
353 }
354 }
355 // Compute missing generations for devices not in destination vector but in source vector.
356 for devid, genid := range srcVec {
357 // Add devices destination does not know about.
358 if _, ok := destVec[devid]; !ok {
359 // Bootstrap generation to oldest available.
360 destGenID := dt.getOldestGen(devid) + 1
361 // Need to include all generations in the interval [destGenID, genid],
362 // destGenID and genid inclusive.
363 for i := destGenID; i <= genid; i++ {
364 // Populate the genorder entry.
365 var entry genOrder
366 if err := dt.populateGenOrderEntry(&entry, devid, i); err != nil {
367 return nil, err
368 }
369 gens = append(gens, &entry)
370 }
371 }
372 }
373
374 // Sort generations in log order.
375 sort.Sort(byOrder(gens))
376 return gens, nil
377}
378
379// getOldestGen returns the most recent gc'ed generation for the device "dev".
380func (dt *devTable) getOldestGen(dev DeviceID) GenID {
381 return dt.head.ReclaimVec[dev]
382}
383
384// computeReclaimVector computes a generation vector such that the
385// generations less than or equal to those in the vector can be
386// garbage collected. Caller holds a lock on s.lock.
387//
388// Approach: For each device in the system, we compute its maximum
389// generation known to all the other devices in the system. This is a
390// O(N^2) algorithm where N is the number of devices in the system. N
391// is assumed to be small, of the order of hundreds of devices.
392func (dt *devTable) computeReclaimVector() (GenVector, error) {
393 // Get local generation vector to create the set of devices in
394 // the system. Local generation vector is a good bootstrap
395 // device set since it contains all the devices whose log
396 // records were ever stored locally.
397 devSet, err := dt.getGenVec(dt.s.id)
398 if err != nil {
399 return nil, err
400 }
401
402 newReclaimVec := GenVector{}
403 for devid := range devSet {
404 if !dt.hasDevInfo(devid) {
405 // This node knows of devid, but hasn't yet
406 // contacted the device. Do not garbage
407 // collect any further. For instance, when
408 // node A learns of node C's generations from
409 // node B, node A may not have an entry for
410 // node C yet, but node C will be part of its
411 // devSet.
412 for dev := range devSet {
413 newReclaimVec[dev] = dt.getOldestGen(dev)
414 }
415 return newReclaimVec, nil
416 }
417
418 vec, err := dt.getGenVec(devid)
419 if err != nil {
420 return nil, err
421 }
422 for dev := range devSet {
423 gen1, ok := vec[dev]
424 // Device "devid" does not know about device "dev".
425 if !ok {
426 newReclaimVec[dev] = dt.getOldestGen(dev)
427 continue
428 }
429 gen2, ok := newReclaimVec[dev]
430 if !ok || (gen1 < gen2) {
431 newReclaimVec[dev] = gen1
432 }
433 }
434 }
435 return newReclaimVec, nil
436}
437
438// addDevice adds a newly learned device to the devTable state.
439func (dt *devTable) addDevice(newDev DeviceID) error {
440 // Create an entry in the device table for the new device.
441 vector := GenVector{
442 newDev: 0,
443 }
444 if err := dt.putGenVec(newDev, vector); err != nil {
445 return err
446 }
447
448 // Update local generation vector with the new device.
449 local, err := dt.getDevInfo(dt.s.id)
450 if err != nil {
451 return err
452 }
453 if err := dt.updateLocalGenVector(local.Vector, vector); err != nil {
454 return err
455 }
456 if err := dt.putDevInfo(dt.s.id, local); err != nil {
457 return err
458 }
459 return nil
460}
461
462// updateReclaimVec updates the reclaim vector to track gc'ed generations.
463func (dt *devTable) updateReclaimVec(minGens GenVector) error {
464 for dev, min := range minGens {
465 gen, ok := dt.head.ReclaimVec[dev]
466 if !ok {
467 if min < 1 {
468 vlog.Errorf("updateReclaimVec:: Received bad generation %s %d",
469 dev, min)
470 dt.head.ReclaimVec[dev] = 0
471 } else {
472 dt.head.ReclaimVec[dev] = min - 1
473 }
474 continue
475 }
476
477 // We obtained a generation that is already reclaimed.
478 if min <= gen {
479 return errors.New("requested gen smaller than GC'ed gen")
480 }
481 }
482 return nil
483}