Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package vsync |
| 2 | |
| 3 | // Package vsync provides veyron sync daemon utility functions. Sync |
| 4 | // daemon serves incoming GetDeltas requests and contacts other peers |
| 5 | // to get deltas from them. When it receives a GetDeltas request, the |
| 6 | // incoming generation vector is diffed with the local generation |
| 7 | // vector, and missing generations are sent back. When it receives |
| 8 | // log records in response to a GetDeltas request, it replays those |
| 9 | // log records to get in sync with the sender. |
| 10 | import ( |
Raja Daoud | 4b5b00f | 2014-06-05 15:02:53 -0700 | [diff] [blame] | 11 | "fmt" |
Himabindu Pucha | a7b4df4 | 2014-07-10 15:06:51 -0700 | [diff] [blame] | 12 | "strings" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 13 | "sync" |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 14 | "time" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 15 | |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 16 | "veyron/services/store/raw" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 17 | |
| 18 | "veyron2/ipc" |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 19 | "veyron2/naming" |
Himabindu Pucha | a7b4df4 | 2014-07-10 15:06:51 -0700 | [diff] [blame] | 20 | "veyron2/security" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 21 | "veyron2/storage" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 22 | "veyron2/vlog" |
| 23 | "veyron2/vom" |
Tilak Sharma | 96eb25a | 2014-05-21 18:52:55 -0700 | [diff] [blame] | 24 | |
| 25 | _ "veyron/services/store/typeregistryhack" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 26 | ) |
| 27 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 28 | // syncd contains the metadata for the sync daemon. |
| 29 | type syncd struct { |
| 30 | // Pointers to metadata structures. |
| 31 | log *iLog |
| 32 | devtab *devTable |
| 33 | dag *dag |
| 34 | |
| 35 | // Local device id. |
| 36 | id DeviceID |
| 37 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 38 | // RWlock to concurrently access log and device table data structures. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 39 | lock sync.RWMutex |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 40 | // State to coordinate shutting down all spawned goroutines. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 41 | pending sync.WaitGroup |
| 42 | closed chan struct{} |
| 43 | |
| 44 | // Local Veyron store. |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 45 | storeEndpoint string |
| 46 | store raw.Store |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 47 | |
| 48 | // Handlers for goroutine procedures. |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 49 | hdlGC *syncGC |
| 50 | hdlWatcher *syncWatcher |
| 51 | hdlInitiator *syncInitiator |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 52 | } |
| 53 | |
Himabindu Pucha | a7b4df4 | 2014-07-10 15:06:51 -0700 | [diff] [blame] | 54 | type syncDispatcher struct { |
| 55 | server ipc.Invoker |
| 56 | auth security.Authorizer |
| 57 | } |
| 58 | |
| 59 | // NewSyncDispatcher returns an object dispatcher. |
| 60 | func NewSyncDispatcher(s interface{}, auth security.Authorizer) ipc.Dispatcher { |
| 61 | return &syncDispatcher{ipc.ReflectInvoker(s), auth} |
| 62 | } |
| 63 | |
Cosmos Nicolaou | 8bfacf2 | 2014-08-19 11:19:36 -0700 | [diff] [blame^] | 64 | func (d *syncDispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) { |
Himabindu Pucha | a7b4df4 | 2014-07-10 15:06:51 -0700 | [diff] [blame] | 65 | if strings.HasSuffix(suffix, "sync") { |
| 66 | return d.server, d.auth, nil |
| 67 | } |
| 68 | return nil, nil, fmt.Errorf("Lookup:: failed on suffix: %s", suffix) |
| 69 | } |
| 70 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 71 | // NewSyncd creates a new syncd instance. |
| 72 | // |
| 73 | // Syncd concurrency: syncd initializes three goroutines at |
| 74 | // startup. The "watcher" thread is responsible for watching the store |
| 75 | // for changes to its objects. The "initiator" thread is responsible |
| 76 | // for periodically checking the neighborhood and contacting a peer to |
| 77 | // obtain changes from that peer. The "gc" thread is responsible for |
| 78 | // periodically checking if any log records and dag state can be |
| 79 | // pruned. All these 3 threads perform write operations to the data |
| 80 | // structures, and synchronize by acquiring a write lock on s.lock. In |
| 81 | // addition, when syncd receives an incoming RPC, it responds to the |
| 82 | // request by acquiring a read lock on s.lock. Thus, at any instant in |
| 83 | // time, either one of the watcher, initiator or gc threads is active, |
| 84 | // or any number of responders can be active, serving incoming |
| 85 | // requests. Fairness between these threads follows from |
| 86 | // sync.RWMutex. The spec says that the writers cannot be starved by |
| 87 | // the readers but it does not guarantee FIFO. We may have to revisit |
| 88 | // this in the future. |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 89 | func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint string, syncTick time.Duration) *syncd { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 90 | // Connect to the local Veyron store. |
| 91 | // At present this is optional to allow testing (from the command-line) w/o Veyron store running. |
| 92 | // TODO: connecting to Veyron store should be mandatory. |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 93 | var store raw.Store |
| 94 | if storeEndpoint != "" { |
| 95 | var err error |
| 96 | store, err = raw.BindStore(naming.JoinAddressName(storeEndpoint, raw.RawStoreSuffix)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 97 | if err != nil { |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 98 | vlog.Fatalf("NewSyncd: cannot connect to Veyron store endpoint (%s): %s", storeEndpoint, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 99 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 100 | } |
| 101 | |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 102 | return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint, store, syncTick) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 103 | } |
| 104 | |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 105 | // newSyncdCore is the internal function that creates the Syncd |
| 106 | // structure and initilizes its thread (goroutines). It takes a |
| 107 | // Veyron Store parameter to separate the core of Syncd setup from the |
| 108 | // external dependency on Veyron Store. |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 109 | func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint string, |
| 110 | store raw.Store, syncTick time.Duration) *syncd { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 111 | s := &syncd{} |
| 112 | |
| 113 | // Bootstrap my own DeviceID. |
| 114 | s.id = DeviceID(devid) |
| 115 | |
| 116 | var err error |
| 117 | // Log init. |
| 118 | if s.log, err = openILog(storePath+"/ilog", s); err != nil { |
| 119 | vlog.Fatalf("newSyncd: ILogInit failed: err %v", err) |
| 120 | } |
| 121 | |
| 122 | // DevTable init. |
| 123 | if s.devtab, err = openDevTable(storePath+"/dtab", s); err != nil { |
| 124 | vlog.Fatalf("newSyncd: DevTableInit failed: err %v", err) |
| 125 | } |
| 126 | |
| 127 | // Dag Init. |
| 128 | if s.dag, err = openDAG(storePath + "/dag"); err != nil { |
| 129 | vlog.Fatalf("newSyncd: OpenDag failed: err %v", err) |
| 130 | } |
| 131 | |
| 132 | // Veyron Store. |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 133 | s.storeEndpoint = storeEndpoint |
| 134 | s.store = store |
| 135 | vlog.VI(1).Infof("newSyncd: Local Veyron store: %s", s.storeEndpoint) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 136 | |
| 137 | // Register these Watch data types with VOM. |
| 138 | // TODO(tilaks): why aren't they auto-retrieved from the IDL? |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 139 | vom.Register(&raw.Mutation{}) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 140 | vom.Register(&storage.DEntry{}) |
| 141 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 142 | // Channel to propagate close event to all threads. |
| 143 | s.closed = make(chan struct{}) |
| 144 | |
| 145 | s.pending.Add(3) |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 146 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 147 | // Get deltas every peerSyncInterval. |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 148 | s.hdlInitiator = newInitiator(s, peerEndpoints, peerDeviceIDs, syncTick) |
Himabindu Pucha | 38ddeb3 | 2014-05-12 10:57:22 -0700 | [diff] [blame] | 149 | go s.hdlInitiator.contactPeers() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 150 | |
| 151 | // Garbage collect every garbageCollectInterval. |
| 152 | s.hdlGC = newGC(s) |
| 153 | go s.hdlGC.garbageCollect() |
| 154 | |
| 155 | // Start a watcher thread that will get updates from local store. |
| 156 | s.hdlWatcher = newWatcher(s) |
| 157 | go s.hdlWatcher.watchStore() |
| 158 | |
| 159 | return s |
| 160 | } |
| 161 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 162 | // Close cleans up syncd state. |
| 163 | func (s *syncd) Close() { |
| 164 | close(s.closed) |
| 165 | s.pending.Wait() |
| 166 | |
| 167 | // TODO(hpucha): close without flushing. |
| 168 | } |
| 169 | |
| 170 | // isSyncClosing returns true if Close() was called i.e. the "closed" channel is closed. |
| 171 | func (s *syncd) isSyncClosing() bool { |
| 172 | select { |
| 173 | case <-s.closed: |
| 174 | return true |
| 175 | default: |
| 176 | return false |
| 177 | } |
| 178 | } |
| 179 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 180 | // GetDeltas responds to the incoming request from a client by sending missing generations to the client. |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame] | 181 | func (s *syncd) GetDeltas(_ ipc.ServerContext, In GenVector, ClientID DeviceID, Stream SyncServiceGetDeltasStream) (GenVector, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 182 | vlog.VI(1).Infof("GetDeltas:: Received vector %v from client %s", In, ClientID) |
| 183 | |
Raja Daoud | 4b5b00f | 2014-06-05 15:02:53 -0700 | [diff] [blame] | 184 | // Handle misconfiguration: the client cannot have the same ID as me. |
| 185 | if ClientID == s.id { |
| 186 | vlog.VI(1).Infof("GetDeltas:: impostor alert: client ID %s is the same as mine %s", ClientID, s.id) |
| 187 | return GenVector{}, fmt.Errorf("impostor: you cannot be %s, for I am %s", ClientID, s.id) |
| 188 | } |
| 189 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 190 | if err := s.updateDeviceInfo(ClientID, In); err != nil { |
| 191 | vlog.Fatalf("GetDeltas:: updateDeviceInfo failed with err %v", err) |
| 192 | } |
| 193 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 194 | out, gens, gensInfo, err := s.prepareGensToReply(In) |
| 195 | if err != nil { |
| 196 | vlog.Fatalf("GetDeltas:: prepareGensToReply failed with err %v", err) |
| 197 | } |
| 198 | |
| 199 | for pos, v := range gens { |
| 200 | gen := gensInfo[pos] |
| 201 | var count uint64 |
| 202 | for i := LSN(0); i <= gen.MaxLSN; i++ { |
| 203 | count++ |
| 204 | rec, err := s.getLogRec(v.devID, v.genID, i) |
| 205 | if err != nil { |
| 206 | vlog.Fatalf("GetDeltas:: Couldn't get log record %s %d %d, err %v", |
| 207 | v.devID, v.genID, i, err) |
| 208 | } |
| 209 | vlog.VI(1).Infof("Sending log record %v", rec) |
Shyam Jayaraman | 97b9dca | 2014-07-31 13:30:46 -0700 | [diff] [blame] | 210 | if err := Stream.SendStream().Send(*rec); err != nil { |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 211 | vlog.Errorf("GetDeltas:: Couldn't send stream err: %v", err) |
| 212 | return GenVector{}, err |
| 213 | } |
| 214 | } |
| 215 | if count != gen.Count { |
| 216 | vlog.Fatalf("GetDeltas:: GenMetadata has incorrect log records for generation %s %d %v", |
| 217 | v.devID, v.genID, gen) |
| 218 | } |
| 219 | } |
| 220 | return out, nil |
| 221 | } |
| 222 | |
| 223 | // updateDeviceInfo updates the remote device's information based on |
| 224 | // the incoming GetDeltas request. |
| 225 | func (s *syncd) updateDeviceInfo(ClientID DeviceID, In GenVector) error { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 226 | s.lock.Lock() |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 227 | defer s.lock.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 228 | |
| 229 | // Note that the incoming client generation vector cannot be |
| 230 | // used for garbage collection. We can only garbage collect |
| 231 | // based on the generations we receive from other |
| 232 | // devices. Receiving a set of generations assures that all |
| 233 | // updates branching from those generations are also received |
| 234 | // and hence generations present on all devices can be |
| 235 | // GC'ed. This function sends generations to other devices and |
| 236 | // hence does not use the generation vector for GC. |
| 237 | // |
| 238 | // TODO(hpucha): Cache the client's incoming generation vector |
| 239 | // to assist in tracking missing generations and hence next |
| 240 | // peer to contact. |
| 241 | if !s.devtab.hasDevInfo(ClientID) { |
| 242 | if err := s.devtab.addDevice(ClientID); err != nil { |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 243 | return err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 244 | } |
| 245 | } |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 246 | return nil |
| 247 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 248 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 249 | // prepareGensToReply processes the incoming generation vector and |
| 250 | // returns the metadata of all the missing generations between the |
| 251 | // incoming and the local generation vector. |
| 252 | func (s *syncd) prepareGensToReply(In GenVector) (GenVector, []*genOrder, []*genMetadata, error) { |
| 253 | s.lock.RLock() |
| 254 | defer s.lock.RUnlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 255 | |
| 256 | // Get local generation vector. |
| 257 | out, err := s.devtab.getGenVec(s.id) |
| 258 | if err != nil { |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 259 | return GenVector{}, nil, nil, err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 260 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 261 | |
| 262 | // Diff the two generation vectors. |
| 263 | gens, err := s.devtab.diffGenVectors(out, In) |
| 264 | if err != nil { |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 265 | return GenVector{}, nil, nil, err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 266 | } |
| 267 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 268 | // Get the metadata for all the generations in the reply. |
| 269 | gensInfo := make([]*genMetadata, len(gens)) |
| 270 | for pos, v := range gens { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 271 | gen, err := s.log.getGenMetadata(v.devID, v.genID) |
| 272 | if err != nil || gen.Count <= 0 { |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 273 | return GenVector{}, nil, nil, err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 274 | } |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 275 | gensInfo[pos] = gen |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 276 | } |
| 277 | |
Himabindu Pucha | 8b746a3 | 2014-05-21 12:35:05 -0700 | [diff] [blame] | 278 | return out, gens, gensInfo, nil |
| 279 | } |
| 280 | |
| 281 | // getLogRec gets the log record for a given generation and lsn. |
| 282 | func (s *syncd) getLogRec(dev DeviceID, gen GenID, lsn LSN) (*LogRec, error) { |
| 283 | s.lock.RLock() |
| 284 | defer s.lock.RUnlock() |
| 285 | return s.log.getLogRec(dev, gen, lsn) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 286 | } |