Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package vsync |
| 2 | |
| 3 | // Tests for the Veyron Sync watcher. |
| 4 | |
| 5 | import ( |
| 6 | "bytes" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 7 | "fmt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 8 | "os" |
Himabindu Pucha | 5f8293d | 2014-07-29 13:18:47 -0700 | [diff] [blame] | 9 | "reflect" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 10 | "testing" |
| 11 | "time" |
| 12 | |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 13 | "veyron/services/store/raw" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 14 | |
Matt Rosencrantz | 29147f7 | 2014-06-06 12:46:01 -0700 | [diff] [blame] | 15 | "veyron2/context" |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 16 | "veyron2/ipc" |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame] | 17 | "veyron2/rt" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 18 | "veyron2/services/watch" |
| 19 | "veyron2/storage" |
| 20 | ) |
| 21 | |
| 22 | var ( |
Raja Daoud | 62df0da | 2014-05-13 11:23:14 -0700 | [diff] [blame] | 23 | info testInfo |
| 24 | recvBlocked chan struct{} |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 25 | ) |
| 26 | |
| 27 | // testInfo controls the flow through the fake store and fake reply stream used |
| 28 | // to simulate the Watch API. |
| 29 | type testInfo struct { |
| 30 | failWatch bool |
| 31 | failWatchCount int |
| 32 | failRecv bool |
| 33 | failRecvCount int |
| 34 | eofRecv bool |
| 35 | blockRecv bool |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 36 | watchResmark []byte |
| 37 | } |
| 38 | |
| 39 | // fakeVStore is used to simulate the Watch() API of the store and stubs the other store APIs. |
| 40 | type fakeVStore struct { |
| 41 | } |
| 42 | |
Matt Rosencrantz | 29147f7 | 2014-06-06 12:46:01 -0700 | [diff] [blame] | 43 | func (*fakeVStore) GetMethodTags(_ context.T, _ string, _ ...ipc.CallOpt) ([]interface{}, error) { |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 44 | panic("not implemented") |
| 45 | } |
| 46 | |
Matt Rosencrantz | 29147f7 | 2014-06-06 12:46:01 -0700 | [diff] [blame] | 47 | func (*fakeVStore) UnresolveStep(_ context.T, _ ...ipc.CallOpt) ([]string, error) { |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 48 | panic("not implemented") |
| 49 | } |
| 50 | |
Matt Rosencrantz | 29147f7 | 2014-06-06 12:46:01 -0700 | [diff] [blame] | 51 | func (*fakeVStore) Signature(_ context.T, _ ...ipc.CallOpt) (ipc.ServiceSignature, error) { |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 52 | panic("not implemented") |
| 53 | } |
| 54 | |
Shyam Jayaraman | 97b9dca | 2014-07-31 13:30:46 -0700 | [diff] [blame] | 55 | func (v *fakeVStore) Watch(_ context.T, req raw.Request, _ ...ipc.CallOpt) (raw.StoreWatchCall, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 56 | // If "failWatch" is set, simulate a failed RPC call. |
| 57 | if info.failWatch { |
| 58 | info.failWatchCount++ |
| 59 | return nil, fmt.Errorf("fakeWatch forced error: %d", info.failWatchCount) |
| 60 | } |
| 61 | |
| 62 | // Save the resmark from the Watch request. |
| 63 | info.watchResmark = req.ResumeMarker |
| 64 | |
| 65 | // Return a fake stream to access the batch of changes (store mutations). |
| 66 | return newFakeStream(), nil |
| 67 | } |
| 68 | |
Shyam Jayaraman | 97b9dca | 2014-07-31 13:30:46 -0700 | [diff] [blame] | 69 | func (*fakeVStore) PutMutations(_ context.T, _ ...ipc.CallOpt) (raw.StorePutMutationsCall, error) { |
Tilak Sharma | 79cfb41 | 2014-05-27 18:34:57 -0700 | [diff] [blame] | 70 | panic("not implemented") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 71 | } |
| 72 | |
| 73 | // fakeStream is used to simulate the reply stream of the Watch() API. |
| 74 | type fakeStream struct { |
| 75 | canceled chan struct{} |
Shyam Jayaraman | c4aed6e | 2014-07-22 14:25:06 -0700 | [diff] [blame] | 76 | err error |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 77 | } |
| 78 | |
| 79 | func newFakeStream() *fakeStream { |
| 80 | s := &fakeStream{} |
| 81 | s.canceled = make(chan struct{}) |
| 82 | return s |
| 83 | } |
| 84 | |
Shyam Jayaraman | 97b9dca | 2014-07-31 13:30:46 -0700 | [diff] [blame] | 85 | func (s *fakeStream) RecvStream() interface { |
| 86 | Advance() bool |
| 87 | Value() watch.ChangeBatch |
| 88 | Err() error |
| 89 | } { |
| 90 | return s |
| 91 | } |
| 92 | |
Shyam Jayaraman | c4aed6e | 2014-07-22 14:25:06 -0700 | [diff] [blame] | 93 | func (s *fakeStream) Advance() bool { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 94 | // If "failRecv" is set, simulate a failed call. |
| 95 | if info.failRecv { |
| 96 | info.failRecvCount++ |
Shyam Jayaraman | c4aed6e | 2014-07-22 14:25:06 -0700 | [diff] [blame] | 97 | s.err = fmt.Errorf("fake recv error on fake stream: %d", info.failRecvCount) |
| 98 | return false |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 99 | } |
| 100 | |
| 101 | // If "eofRecv" is set, simulate a closed stream and make sure the next Recv() call blocks. |
| 102 | if info.eofRecv { |
| 103 | info.eofRecv, info.blockRecv = false, true |
Shyam Jayaraman | c4aed6e | 2014-07-22 14:25:06 -0700 | [diff] [blame] | 104 | s.err = nil |
| 105 | return false |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 106 | } |
| 107 | |
| 108 | // If "blockRecv" is set, simulate blocking the call until the stream is canceled. |
| 109 | if info.blockRecv { |
Raja Daoud | 62df0da | 2014-05-13 11:23:14 -0700 | [diff] [blame] | 110 | close(recvBlocked) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 111 | <-s.canceled |
Shyam Jayaraman | c4aed6e | 2014-07-22 14:25:06 -0700 | [diff] [blame] | 112 | s.err = nil |
| 113 | return false |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 114 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 115 | // Otherwise return a batch of changes, and make sure the next Recv() call returns EOF on the stream. |
| 116 | // Adjust the resume marker of the change records to follow the one given to the Watch request. |
| 117 | info.eofRecv = true |
Shyam Jayaraman | c4aed6e | 2014-07-22 14:25:06 -0700 | [diff] [blame] | 118 | return true |
| 119 | } |
| 120 | |
| 121 | func (s *fakeStream) Value() watch.ChangeBatch { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 122 | changes := getChangeBatch() |
| 123 | |
| 124 | var lastCount byte |
| 125 | if info.watchResmark != nil { |
| 126 | lastCount = info.watchResmark[0] |
| 127 | } |
| 128 | |
| 129 | for i := range changes.Changes { |
| 130 | ch := &changes.Changes[i] |
| 131 | if !ch.Continued { |
| 132 | lastCount++ |
| 133 | resmark := []byte{lastCount, 0, 0, 0, 0, 0, 0, 0} |
| 134 | changes.Changes[i].ResumeMarker = resmark |
| 135 | } |
| 136 | } |
| 137 | |
Shyam Jayaraman | c4aed6e | 2014-07-22 14:25:06 -0700 | [diff] [blame] | 138 | return changes |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 139 | } |
| 140 | |
Shyam Jayaraman | c4aed6e | 2014-07-22 14:25:06 -0700 | [diff] [blame] | 141 | func (s *fakeStream) Err() error { |
| 142 | return s.err |
| 143 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 144 | func (s *fakeStream) Finish() error { |
| 145 | return nil |
| 146 | } |
| 147 | |
| 148 | func (s *fakeStream) Cancel() { |
| 149 | close(s.canceled) |
| 150 | } |
| 151 | |
| 152 | // getChangeBatch returns a batch of store mutations used to simulate the Watch API. |
| 153 | // The batch contains two transactions to verify both new-object creation and the |
| 154 | // mutation of an existing object. |
| 155 | func getChangeBatch() watch.ChangeBatch { |
| 156 | var batch watch.ChangeBatch |
| 157 | |
| 158 | batch.Changes = []watch.Change{ |
| 159 | // 1st transaction: create "/" and "/a" and "/a/b" as 3 new objects (prior versions are 0). |
| 160 | watch.Change{ |
Tilak Sharma | 0ed29c3 | 2014-07-23 23:24:55 -0700 | [diff] [blame] | 161 | Name: "", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 162 | State: 0, |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 163 | Value: &raw.Mutation{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 164 | ID: storage.ID{0x4c, 0x6d, 0xb5, 0x1a, 0xa7, 0x40, 0xd8, 0xc6, |
| 165 | 0x2b, 0x90, 0xdf, 0x87, 0x45, 0x3, 0xe2, 0x85}, |
| 166 | PriorVersion: 0x0, |
| 167 | Version: 0x4d65822107fcfd52, |
| 168 | Value: "value-root", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 169 | Dir: []storage.DEntry{ |
| 170 | storage.DEntry{ |
| 171 | Name: "a", |
| 172 | ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf, |
| 173 | 0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f}, |
| 174 | }, |
| 175 | }, |
| 176 | }, |
| 177 | ResumeMarker: nil, |
| 178 | Continued: true, |
| 179 | }, |
| 180 | watch.Change{ |
Tilak Sharma | 0ed29c3 | 2014-07-23 23:24:55 -0700 | [diff] [blame] | 181 | Name: "", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 182 | State: 0, |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 183 | Value: &raw.Mutation{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 184 | ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf, |
| 185 | 0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f}, |
| 186 | PriorVersion: 0x0, |
| 187 | Version: 0x57e9d1860d1d68d8, |
| 188 | Value: "value-a", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 189 | Dir: []storage.DEntry{ |
| 190 | storage.DEntry{ |
| 191 | Name: "b", |
| 192 | ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb, |
| 193 | 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7}, |
| 194 | }, |
| 195 | }, |
| 196 | }, |
| 197 | ResumeMarker: nil, |
| 198 | Continued: true, |
| 199 | }, |
| 200 | watch.Change{ |
Tilak Sharma | 0ed29c3 | 2014-07-23 23:24:55 -0700 | [diff] [blame] | 201 | Name: "", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 202 | State: 0, |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 203 | Value: &raw.Mutation{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 204 | ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb, |
| 205 | 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7}, |
| 206 | PriorVersion: 0x0, |
| 207 | Version: 0x55104dc76695721d, |
| 208 | Value: "value-b", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 209 | Dir: nil, |
| 210 | }, |
| 211 | ResumeMarker: nil, |
| 212 | Continued: false, |
| 213 | }, |
| 214 | |
| 215 | // 2nd transaction: create "/a/c" as a new object, which also updates "a" (its "Dir" field). |
| 216 | watch.Change{ |
Tilak Sharma | 0ed29c3 | 2014-07-23 23:24:55 -0700 | [diff] [blame] | 217 | Name: "", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 218 | State: 0, |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 219 | Value: &raw.Mutation{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 220 | ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf, |
| 221 | 0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f}, |
| 222 | PriorVersion: 0x57e9d1860d1d68d8, |
| 223 | Version: 0x365a858149c6e2d1, |
| 224 | Value: "value-a", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 225 | Dir: []storage.DEntry{ |
| 226 | storage.DEntry{ |
| 227 | Name: "b", |
| 228 | ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb, |
| 229 | 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7}, |
| 230 | }, |
| 231 | storage.DEntry{ |
| 232 | Name: "c", |
| 233 | ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44, |
| 234 | 0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71}, |
| 235 | }, |
| 236 | }, |
| 237 | }, |
| 238 | ResumeMarker: nil, |
| 239 | Continued: true, |
| 240 | }, |
| 241 | watch.Change{ |
Tilak Sharma | 0ed29c3 | 2014-07-23 23:24:55 -0700 | [diff] [blame] | 242 | Name: "", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 243 | State: 0, |
Tilak Sharma | 2af7002 | 2014-05-14 11:29:27 -0700 | [diff] [blame] | 244 | Value: &raw.Mutation{ |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 245 | ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44, |
| 246 | 0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71}, |
| 247 | PriorVersion: 0x0, |
| 248 | Version: 0x380704bb7b4d7c03, |
| 249 | Value: "value-c", |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 250 | Dir: nil, |
| 251 | }, |
| 252 | ResumeMarker: nil, |
| 253 | Continued: false, |
| 254 | }, |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 255 | |
| 256 | // 3rd transaction: remove "/a/b" which updates "a" (its "Dir" field) and deletes "b". |
| 257 | watch.Change{ |
| 258 | Name: "", |
| 259 | State: 0, |
| 260 | Value: &raw.Mutation{ |
| 261 | ID: storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf, |
| 262 | 0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f}, |
| 263 | PriorVersion: 0x365a858149c6e2d1, |
| 264 | Version: 0xa858149c6e2d1000, |
| 265 | Value: "value-a", |
| 266 | Dir: []storage.DEntry{ |
| 267 | storage.DEntry{ |
| 268 | Name: "c", |
| 269 | ID: storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44, |
| 270 | 0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71}, |
| 271 | }, |
| 272 | }, |
| 273 | }, |
| 274 | ResumeMarker: nil, |
| 275 | Continued: true, |
| 276 | }, |
| 277 | watch.Change{ |
| 278 | Name: "", |
| 279 | State: watch.DoesNotExist, |
| 280 | Value: &raw.Mutation{ |
| 281 | ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb, |
| 282 | 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7}, |
| 283 | PriorVersion: 0x55104dc76695721d, |
| 284 | Version: 0x0, |
| 285 | Value: "", |
| 286 | Dir: nil, |
| 287 | }, |
| 288 | ResumeMarker: nil, |
| 289 | Continued: false, |
| 290 | }, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 291 | } |
| 292 | |
| 293 | return batch |
| 294 | } |
| 295 | |
| 296 | // initTestDir creates a per-test directory to store the Sync DB files and returns it. |
| 297 | // It also initializes (resets) the test control metadata. |
| 298 | func initTestDir(t *testing.T) string { |
| 299 | info = testInfo{} |
Raja Daoud | 62df0da | 2014-05-13 11:23:14 -0700 | [diff] [blame] | 300 | recvBlocked = make(chan struct{}) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 301 | watchRetryDelay = 10 * time.Millisecond |
| 302 | streamRetryDelay = 5 * time.Millisecond |
| 303 | |
| 304 | path := fmt.Sprintf("%s/sync_test_%d_%d/", os.TempDir(), os.Getpid(), time.Now().UnixNano()) |
| 305 | if err := os.Mkdir(path, 0775); err != nil { |
| 306 | t.Fatalf("makeTestDir: cannot create directory %s: %s", path, err) |
| 307 | } |
| 308 | return path |
| 309 | } |
| 310 | |
| 311 | // fakeSyncd creates a Syncd server structure with enough metadata to be used |
| 312 | // in watcher unit tests. If "withStore" is true, create a fake store entry. |
| 313 | // Otherwise simulate a no-store Sync server. |
| 314 | func fakeSyncd(t *testing.T, storeDir string, withStore bool) *syncd { |
| 315 | var s *syncd |
| 316 | if withStore { |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 317 | s = newSyncdCore("", "", "fake-dev", storeDir, "", &fakeVStore{}, 0) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 318 | } else { |
Raja Daoud | c861ab4 | 2014-05-20 14:32:54 -0700 | [diff] [blame] | 319 | s = newSyncdCore("", "", "fake-dev", storeDir, "", nil, 0) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 320 | } |
| 321 | if s == nil { |
| 322 | t.Fatal("cannot create a Sync server") |
| 323 | } |
| 324 | return s |
| 325 | } |
| 326 | |
| 327 | // TestWatcherNoStore tests the watcher without a connection to a local store. |
| 328 | // It verifies that the watcher exits without side-effects. |
| 329 | func TestWatcherNoStore(t *testing.T) { |
| 330 | dir := initTestDir(t) |
| 331 | defer os.RemoveAll(dir) |
| 332 | |
| 333 | s := fakeSyncd(t, dir, false) |
| 334 | s.Close() |
| 335 | } |
| 336 | |
| 337 | // TestWatcherRPCError tests the watcher reacting to an error from the Watch() RPC. |
| 338 | // It verifies that the watcher retries the RPC after a delay. |
| 339 | func TestWatcherRPCError(t *testing.T) { |
Matt Rosencrantz | f5afcaf | 2014-06-02 11:31:22 -0700 | [diff] [blame] | 340 | rt.Init() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 341 | dir := initTestDir(t) |
| 342 | defer os.RemoveAll(dir) |
| 343 | |
| 344 | info.failWatch = true |
| 345 | s := fakeSyncd(t, dir, true) |
| 346 | |
| 347 | n := 4 |
| 348 | time.Sleep(time.Duration(n) * watchRetryDelay) |
| 349 | |
| 350 | s.Close() |
| 351 | |
Raja Daoud | c009028 | 2014-07-31 16:32:45 -0700 | [diff] [blame] | 352 | if info.failWatchCount == 0 { |
| 353 | t.Fatal("Watch() RPC retry count is zero") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 354 | } |
| 355 | } |
| 356 | |
| 357 | // TestWatcherRecvError tests the watcher reacting to an error from the stream receive. |
| 358 | // It verifies that the watcher retries the Watch() RPC after a delay. |
| 359 | func TestWatcherRecvError(t *testing.T) { |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 360 | rt.Init() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 361 | dir := initTestDir(t) |
| 362 | defer os.RemoveAll(dir) |
| 363 | |
| 364 | info.failRecv = true |
| 365 | s := fakeSyncd(t, dir, true) |
| 366 | |
| 367 | n := 2 |
| 368 | time.Sleep(time.Duration(n) * streamRetryDelay) |
| 369 | |
| 370 | s.Close() |
| 371 | |
Raja Daoud | c009028 | 2014-07-31 16:32:45 -0700 | [diff] [blame] | 372 | if info.failRecvCount == 0 { |
| 373 | t.Fatal("Recv() retry count is zero") |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 374 | } |
| 375 | } |
| 376 | |
| 377 | // TestWatcherChanges tests the watcher applying changes received from store. |
| 378 | func TestWatcherChanges(t *testing.T) { |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 379 | rt.Init() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 380 | dir := initTestDir(t) |
| 381 | defer os.RemoveAll(dir) |
| 382 | |
| 383 | s := fakeSyncd(t, dir, true) |
| 384 | |
Raja Daoud | 62df0da | 2014-05-13 11:23:14 -0700 | [diff] [blame] | 385 | // Wait for the watcher to block on the Recv(), i.e. it finished processing the updates. |
| 386 | <-recvBlocked |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 387 | |
Raja Daoud | 62df0da | 2014-05-13 11:23:14 -0700 | [diff] [blame] | 388 | // Verify the state of the Sync DAG and Device Table before terminating it. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 389 | oidRoot := storage.ID{0x4c, 0x6d, 0xb5, 0x1a, 0xa7, 0x40, 0xd8, 0xc6, 0x2b, 0x90, 0xdf, 0x87, 0x45, 0x3, 0xe2, 0x85} |
| 390 | oidA := storage.ID{0x8, 0x2b, 0xc4, 0x2e, 0x15, 0xaf, 0x4f, 0xcf, 0x61, 0x1d, 0x7f, 0x19, 0xa8, 0xd7, 0x83, 0x1f} |
| 391 | oidB := storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb, 0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7} |
| 392 | oidC := storage.ID{0x70, 0xff, 0x65, 0xec, 0xf, 0x82, 0x5f, 0x44, 0xb6, 0x9f, 0x89, 0x5e, 0xea, 0x75, 0x9d, 0x71} |
| 393 | |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 394 | oids := []storage.ID{oidRoot, oidA, oidC} |
| 395 | heads := []storage.Version{0x4d65822107fcfd52, 0xa858149c6e2d1000, 0x380704bb7b4d7c03} |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 396 | |
| 397 | for i, oid := range oids { |
| 398 | expHead := heads[i] |
| 399 | head, err := s.dag.getHead(oid) |
| 400 | if err != nil { |
| 401 | t.Errorf("cannot find head node for object %d: %s", oid, err) |
| 402 | } else if head != expHead { |
| 403 | t.Errorf("wrong head for object %d: %d instead of %d", oid, head, expHead) |
| 404 | } |
| 405 | } |
| 406 | |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 407 | // Verify oidB. |
| 408 | headB, err := s.dag.getHead(oidB) |
| 409 | if err != nil { |
| 410 | t.Errorf("cannot find head node for object %d: %s", oidB, err) |
| 411 | } |
| 412 | if headB == storage.NoVersion || headB == storage.Version(0x55104dc76695721d) { |
| 413 | t.Errorf("wrong head for object B %d: %d ", oidB, headB) |
| 414 | } |
| 415 | |
Himabindu Pucha | 5f8293d | 2014-07-29 13:18:47 -0700 | [diff] [blame] | 416 | // Verify transaction state for the first transaction. |
| 417 | node, err := s.dag.getNode(oidRoot, heads[0]) |
| 418 | if err != nil { |
| 419 | t.Errorf("cannot find dag node for object %d %v: %s", oidRoot, heads[0], err) |
| 420 | } |
| 421 | if node.TxID == NoTxID { |
| 422 | t.Errorf("expecting non nil txid for object %d:%v", oidRoot, heads[0]) |
| 423 | } |
| 424 | txMap, err := s.dag.getTransaction(node.TxID) |
| 425 | if err != nil { |
| 426 | t.Errorf("cannot find transaction for id %v: %s", node.TxID, err) |
| 427 | } |
| 428 | expTxMap := dagTxMap{ |
| 429 | oidRoot: heads[0], |
| 430 | oidA: storage.Version(0x57e9d1860d1d68d8), |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 431 | oidB: storage.Version(0x55104dc76695721d), |
Himabindu Pucha | 5f8293d | 2014-07-29 13:18:47 -0700 | [diff] [blame] | 432 | } |
| 433 | if !reflect.DeepEqual(txMap, expTxMap) { |
| 434 | t.Errorf("Data mismatch for txid %v txmap %v instead of %v", |
| 435 | node.TxID, txMap, expTxMap) |
| 436 | } |
| 437 | |
| 438 | // Verify transaction state for the second transaction. |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 439 | node, err = s.dag.getNode(oidC, heads[2]) |
| 440 | if err != nil { |
| 441 | t.Errorf("cannot find dag node for object %d %v: %s", oidC, heads[2], err) |
| 442 | } |
| 443 | if node.TxID == NoTxID { |
| 444 | t.Errorf("expecting non nil txid for object %d:%v", oidC, heads[2]) |
| 445 | } |
| 446 | txMap, err = s.dag.getTransaction(node.TxID) |
| 447 | if err != nil { |
| 448 | t.Errorf("cannot find transaction for id %v: %s", node.TxID, err) |
| 449 | } |
| 450 | expTxMap = dagTxMap{ |
| 451 | oidA: storage.Version(0x365a858149c6e2d1), |
| 452 | oidC: heads[2], |
| 453 | } |
| 454 | if !reflect.DeepEqual(txMap, expTxMap) { |
| 455 | t.Errorf("Data mismatch for txid %v txmap %v instead of %v", |
| 456 | node.TxID, txMap, expTxMap) |
| 457 | } |
| 458 | |
| 459 | // Verify transaction state for the third transaction. |
Himabindu Pucha | 5f8293d | 2014-07-29 13:18:47 -0700 | [diff] [blame] | 460 | node, err = s.dag.getNode(oidA, heads[1]) |
| 461 | if err != nil { |
| 462 | t.Errorf("cannot find dag node for object %d %v: %s", oidA, heads[1], err) |
| 463 | } |
| 464 | if node.TxID == NoTxID { |
| 465 | t.Errorf("expecting non nil txid for object %d:%v", oidA, heads[1]) |
| 466 | } |
| 467 | txMap, err = s.dag.getTransaction(node.TxID) |
| 468 | if err != nil { |
| 469 | t.Errorf("cannot find transaction for id %v: %s", node.TxID, err) |
| 470 | } |
| 471 | expTxMap = dagTxMap{ |
| 472 | oidA: heads[1], |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 473 | oidB: headB, |
Himabindu Pucha | 5f8293d | 2014-07-29 13:18:47 -0700 | [diff] [blame] | 474 | } |
| 475 | if !reflect.DeepEqual(txMap, expTxMap) { |
| 476 | t.Errorf("Data mismatch for txid %v txmap %v instead of %v", |
| 477 | node.TxID, txMap, expTxMap) |
| 478 | } |
| 479 | |
Himabindu Pucha | df263cd | 2014-08-12 22:22:21 -0700 | [diff] [blame] | 480 | // Verify deletion tracking. |
| 481 | node, err = s.dag.getNode(oidB, headB) |
| 482 | if err != nil { |
| 483 | t.Errorf("cannot find dag node for object %d %v: %s", oidB, headB, err) |
| 484 | } |
| 485 | if !node.Deleted { |
| 486 | t.Errorf("deleted node not found for object %d %v: %s", oidB, headB, err) |
| 487 | } |
| 488 | if !s.dag.hasDeletedDescendant(oidB, storage.Version(0x55104dc76695721d)) { |
| 489 | t.Errorf("link to deleted node not found for object %d %v: %s", oidB, headB, err) |
| 490 | } |
| 491 | |
| 492 | expResmark := []byte{3, 0, 0, 0, 0, 0, 0, 0} |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 493 | |
| 494 | if bytes.Compare(s.devtab.head.Resmark, expResmark) != 0 { |
| 495 | t.Errorf("error in watch device table resume marker: %v instead of %v", s.devtab.head.Resmark, expResmark) |
| 496 | } |
| 497 | |
| 498 | if bytes.Compare(info.watchResmark, expResmark) != 0 { |
| 499 | t.Errorf("error in watch call final resume marker: %v instead of %v", info.watchResmark, expResmark) |
| 500 | } |
| 501 | |
| 502 | s.Close() |
| 503 | } |