swift: Update WatchChange for EntityType and make Watch unit tests pass

This CL adds the EntityType to Swift's WatchChange and makes the Watch
tests in SyncbaseCore pass. It also has the other following changes:

- Batches fire if the last element was a user data syncgroup change.
- Collection.exists is now part of the unit tests.
- All VErrors are wrapped in SyncbaseError.
- Stream.err() is now SyncbaseError type instead of ErrorType.

Change-Id: Ic75fd896deb71b514ad957e7804ca7502e9ad738
diff --git a/Syncbase/Source/Database.swift b/Syncbase/Source/Database.swift
index 4a456e0..c09148e 100644
--- a/Syncbase/Source/Database.swift
+++ b/Syncbase/Source/Database.swift
@@ -339,14 +339,12 @@
           }
           let change = WatchChange(coreChange: coreChange)
           // Ignore changes to userdata collection.
-          if (change.collectionId.name == Syncbase.USERDATA_SYNCGROUP_NAME) {
-            continue
+          if (change.collectionId?.name != Syncbase.USERDATA_SYNCGROUP_NAME) {
+            batch.append(change)
           }
-          batch.append(change)
           if (!change.isContinued) {
             if (!gotFirstBatch) {
               gotFirstBatch = true
-              let b = batch
               // We synchronously run on Syncbase.queue to facilitate flow control. Go blocks
               // until each callback is consumed before it calls with another WatchChange event.
               // Backpressure in Swift is achieved by blocking until the WatchChange event is
@@ -356,7 +354,7 @@
               // mitigate against out-of-order events should Syncbase.queue be a concurrent queue
               // rather than a serial queue.
               dispatch_sync(Syncbase.queue, {
-                handler.onInitialState(b)
+                handler.onInitialState(batch)
               })
             } else {
               dispatch_sync(Syncbase.queue, {
diff --git a/Syncbase/Source/Error.swift b/Syncbase/Source/Error.swift
index cee530b..bd2e716 100644
--- a/Syncbase/Source/Error.swift
+++ b/Syncbase/Source/Error.swift
@@ -31,8 +31,9 @@
   case InvalidUTF8(invalidUtf8: String)
   case CastError(obj: Any)
   case IllegalArgument(detail: String)
+  case UnknownVError(err: VError)
 
-  init?(coreError: SyncbaseCore.SyncbaseError) {
+  init(coreError: SyncbaseCore.SyncbaseError) {
     switch coreError {
     case .AlreadyConfigured: self = .AlreadyConfigured
     case .NotConfigured: self = .NotConfigured
@@ -55,6 +56,7 @@
     case .InvalidUTF8(let invalidUtf8): self = .InvalidUTF8(invalidUtf8: invalidUtf8)
     case .CastError(let obj): self = .CastError(obj: obj)
     case .IllegalArgument(let detail): self = .IllegalArgument(detail: detail)
+    case .UnknownVError(let err): self = .UnknownVError(err: err)
     }
   }
 
@@ -62,7 +64,9 @@
     do {
       return try block()
     } catch let e as SyncbaseCore.SyncbaseError {
-      throw SyncbaseError(coreError: e)!
+      throw SyncbaseError(coreError: e)
+    } catch let e as VError {
+      throw SyncbaseError.UnknownVError(err: e)
     } catch let e {
       throw SyncbaseError.UnknownError(err: e)
     }
@@ -98,6 +102,7 @@
     case .InvalidUTF8(let invalidUtf8): return "Unable to convert to utf8: \(invalidUtf8)"
     case .CastError(let obj): return "Unable to convert to cast: \(obj)"
     case .IllegalArgument(let detail): return "Illegal argument: \(detail)"
+    case .UnknownVError(let err): return "Unknown error: \(err)"
     }
   }
 }
diff --git a/Syncbase/Source/WatchChange.swift b/Syncbase/Source/WatchChange.swift
index 7c84418..afbbccd 100644
--- a/Syncbase/Source/WatchChange.swift
+++ b/Syncbase/Source/WatchChange.swift
@@ -12,25 +12,53 @@
 public typealias ResumeMarker = NSData
 
 /// Describes a change to a database.
-public class WatchChange {
+public class WatchChange: CustomStringConvertible {
+  public enum EntityType: Int {
+    case Root
+    case Collection
+    case Row
+  }
+
   public enum ChangeType: Int {
     case Put
     case Delete
   }
 
-  /// Collection is the id of the collection that contains the changed row.
-  public let collectionId: Identifier
+  /// EntityType is the type of the entity - Root, Collection, or Row.
+  public let entityType: EntityType
 
-  /// Row is the key of the changed row.
-  public let row: String
+  /// Collection is the id of the collection that was changed or contains the
+  /// changed row. Is nil if EntityType is not Collection or Row.
+  public let collectionId: Identifier?
 
-  /// ChangeType describes the type of the change. If ChangeType is PutChange,
-  /// then the row exists in the collection, and Value can be called to obtain
-  /// the new value for this row. If ChangeType is DeleteChange, then the row was
-  /// removed from the collection.
+  /// Row is the key of the changed row. Nil if EntityType is not Row.
+  public let row: String?
+
+  /// ChangeType describes the type of the change, depending on the EntityType:
+  ///
+  /// **EntityRow:**
+  ///
+  /// - PutChange: the row exists in the collection, and Value can be called to
+  /// obtain the new value for this row.
+  ///
+  /// - DeleteChange: the row was removed from the collection.
+  ///
+  /// **EntityCollection:**
+  ///
+  /// - PutChange: the collection exists, and CollectionInfo can be called to
+  /// obtain the collection info.
+  ///
+  /// - DeleteChange: the collection was destroyed.
+  ///
+  /// **EntityRoot:**
+  ///
+  /// - PutChange: appears as the first (possibly only) change in the initial
+  /// state batch, only if watching from an empty ResumeMarker. This is the
+  /// only situation where an EntityRoot appears.
   public let changeType: ChangeType
 
-  /// value is the new value for the row if the ChangeType is PutChange, or nil
+  /// value is the new value for the row for EntityRow PutChanges, an encoded
+  /// StoreChangeCollectionInfo value for EntityCollection PutChanges, or nil
   /// otherwise.
   public let value: NSData?
 
@@ -39,7 +67,7 @@
   /// This marker can be provided in the Request message to allow the caller
   /// to resume the stream watching at a specific point without fetching the
   /// initial state.
-  public let resumeMarker: ResumeMarker
+  public let resumeMarker: ResumeMarker?
 
   /// FromSync indicates whether the change came from sync. If FromSync is false,
   /// then the change originated from the local device.
@@ -50,7 +78,12 @@
   public let isContinued: Bool
 
   init(coreChange: SyncbaseCore.WatchChange) {
-    self.collectionId = Identifier(coreId: coreChange.collectionId)
+    self.entityType = EntityType(rawValue: coreChange.entityType.rawValue)!
+    if let coreCollectionId = coreChange.collectionId {
+      self.collectionId = Identifier(coreId: coreCollectionId)
+    } else {
+      self.collectionId = nil
+    }
     self.row = coreChange.row
     self.changeType = ChangeType(rawValue: coreChange.changeType.rawValue)!
     self.value = coreChange.value
@@ -58,4 +91,20 @@
     self.isFromSync = coreChange.isFromSync
     self.isContinued = coreChange.isContinued
   }
+
+  public var description: String {
+    var valueDesc = "<nil>"
+    if let v = value {
+      if v.length > 1024 {
+        valueDesc = "<\(v.length) bytes>"
+      } else if let str = String(data: v, encoding: NSUTF8StringEncoding) {
+        valueDesc = str
+      } else {
+        valueDesc = v.description
+      }
+    }
+    return "[Syncbase.WatchChange entityType=\(entityType) changeType=\(changeType) " +
+      "collectionId=\(collectionId) row=\(row) isFromSync=\(isFromSync) isContinued=\(isContinued) " +
+      "value=\(valueDesc) ]"
+  }
 }
\ No newline at end of file
diff --git a/SyncbaseCore/Source/Collection.swift b/SyncbaseCore/Source/Collection.swift
index 1c0f406..557c9be 100644
--- a/SyncbaseCore/Source/Collection.swift
+++ b/SyncbaseCore/Source/Collection.swift
@@ -211,12 +211,12 @@
     // Go with similar condition-variable logic.
     let condition = NSCondition()
     var data: (String, NSData)? = nil
-    var doneErr: ErrorType? = nil
+    var doneErr: SyncbaseError? = nil
     var updateAvailable = false
 
     // The anonymous function that gets called from the Swift. It blocks until there's an update
     // available from Go.
-    func fetchNext(timeout: NSTimeInterval?) -> ((String, GetValueFromScanStream)?, ErrorType?) {
+    func fetchNext(timeout: NSTimeInterval?) -> ((String, GetValueFromScanStream)?, SyncbaseError?) {
       condition.lock()
       while !updateAvailable {
         if let timeout = timeout {
@@ -265,7 +265,7 @@
       condition.unlock()
     }
 
-    func onDone(err: ErrorType?) {
+    func onDone(err: SyncbaseError?) {
       condition.lock()
       // Wait until any existing update has been received by the fetch so we don't just blow
       // past it.
@@ -326,9 +326,12 @@
   }
 
   private static func onScanDone(handle: v23_syncbase_Handle, err: v23_syncbase_VError) {
-    let e = err.toVError()
+    var serr: SyncbaseError?
+    if let e = err.toVError() {
+      serr = SyncbaseError(e)
+    }
     let handle = Unmanaged<ScanHandle>.fromOpaque(COpaquePointer(handle)).takeRetainedValue()
-    handle.onDone(e)
+    handle.onDone(serr)
   }
 
   // MARK: Internal helpers
diff --git a/SyncbaseCore/Source/Errors.swift b/SyncbaseCore/Source/Errors.swift
index c434d73..bab30b7 100644
--- a/SyncbaseCore/Source/Errors.swift
+++ b/SyncbaseCore/Source/Errors.swift
@@ -26,8 +26,9 @@
   case InvalidUTF8(invalidUtf8: String)
   case CastError(obj: Any)
   case IllegalArgument(detail: String)
+  case UnknownVError(err: VError)
 
-  init?(_ err: VError) {
+  init(_ err: VError) {
     // TODO(zinman): Make VError better by having the proper arguments transmitted across
     // so we don't have to use err.msg to repeat our messages.
     switch err.id {
@@ -44,7 +45,7 @@
     case "v.io/v23/services/syncbase.InvalidPermissionsChange": self = SyncbaseError.InvalidPermissionsChange
     case "v.io/v23/verror.Exist": self = SyncbaseError.Exist
     case "v.io/v23/verror.NoExist": self = SyncbaseError.NoExist
-    default: return nil
+    default: self = SyncbaseError.UnknownVError(err: err)
     }
   }
 
@@ -72,6 +73,7 @@
     case .InvalidUTF8(let invalidUtf8): return "Unable to convert to UTF-8: \(invalidUtf8)"
     case .CastError(let obj): return "Unable to convert to cast: \(obj)"
     case .IllegalArgument(let detail): return "Illegal argument: \(detail)"
+    case .UnknownVError(let err): return "Unknown error: \(err)"
     }
   }
 }
diff --git a/SyncbaseCore/Source/Stream.swift b/SyncbaseCore/Source/Stream.swift
index 0a37034..4fddd3a 100644
--- a/SyncbaseCore/Source/Stream.swift
+++ b/SyncbaseCore/Source/Stream.swift
@@ -8,7 +8,7 @@
 public protocol Stream: SequenceType, GeneratorType {
   /// Err returns a non-nil error iff the stream encountered any errors. Err does
   /// not block.
-  func err() -> ErrorType?
+  func err() -> SyncbaseError?
 
   /// Cancel notifies the stream provider that it can stop producing elements.
   /// The client must call Cancel if it does not iterate through all elements
@@ -21,11 +21,11 @@
 /// Typed-stream backed by anonymous callbacks
 public class AnonymousStream<T>: Stream {
   public typealias Element = T
-  public typealias FetchNextFunction = NSTimeInterval? -> (T?, ErrorType?)
+  public typealias FetchNextFunction = NSTimeInterval? -> (T?, SyncbaseError?)
   let fetchNextFunction: FetchNextFunction
   public typealias CancelFunction = Void -> Void
   let cancelFunction: CancelFunction
-  private var lastErr: ErrorType?
+  private var lastErr: SyncbaseError?
   private var isDone: Bool = false
   init(fetchNextFunction: FetchNextFunction, cancelFunction: CancelFunction) {
     self.fetchNextFunction = fetchNextFunction
@@ -63,7 +63,7 @@
 
   /// Err returns a non-nil error iff the stream encountered any errors. Err does
   /// not block.
-  public func err() -> ErrorType? {
+  public func err() -> SyncbaseError? {
     return lastErr
   }
 
diff --git a/SyncbaseCore/Source/Watch.swift b/SyncbaseCore/Source/Watch.swift
index 78e9be3..b801500 100644
--- a/SyncbaseCore/Source/Watch.swift
+++ b/SyncbaseCore/Source/Watch.swift
@@ -16,7 +16,8 @@
   public let collectionBlessing: String
 
   /// rowKey is a SQL LIKE-style glob pattern ('%' and '_' wildcards, '\' as escape character)
-  /// for matching rows. If empty then only the collectionId pattern is matched.
+  /// for matching rows. If empty then only the collectionId pattern is matched and NO row events
+  /// are returned.
   public let rowKey: String?
 
   public init(collectionName: String, collectionBlessing: String, rowKey: String?) {
@@ -29,24 +30,52 @@
 public typealias ResumeMarker = NSData
 
 public struct WatchChange {
+  public enum EntityType: Int {
+    case Root
+    case Collection
+    case Row
+  }
+
   public enum ChangeType: Int {
     case Put
     case Delete
   }
 
-  /// Collection is the id of the collection that contains the changed row.
-  public let collectionId: Identifier
+  /// EntityType is the type of the entity - Root, Collection, or Row.
+  public let entityType: EntityType
 
-  /// Row is the key of the changed row.
-  public let row: String
+  /// Collection is the id of the collection that was changed or contains the
+  /// changed row. Is nil if EntityType is not Collection or Row.
+  public let collectionId: Identifier?
 
-  /// ChangeType describes the type of the change. If ChangeType is PutChange,
-  /// then the row exists in the collection, and Value can be called to obtain
-  /// the new value for this row. If ChangeType is DeleteChange, then the row was
-  /// removed from the collection.
+  /// Row is the key of the changed row. Nil if EntityType is not Row.
+  public let row: String?
+
+  /// ChangeType describes the type of the change, depending on the EntityType:
+  ///
+  /// **EntityRow:**
+  ///
+  /// - PutChange: the row exists in the collection, and Value can be called to
+  /// obtain the new value for this row.
+  ///
+  /// - DeleteChange: the row was removed from the collection.
+  ///
+  /// **EntityCollection:**
+  ///
+  /// - PutChange: the collection exists, and CollectionInfo can be called to
+  /// obtain the collection info.
+  ///
+  /// - DeleteChange: the collection was destroyed.
+  ///
+  /// **EntityRoot:**
+  ///
+  /// - PutChange: appears as the first (possibly only) change in the initial
+  /// state batch, only if watching from an empty ResumeMarker. This is the
+  /// only situation where an EntityRoot appears.
   public let changeType: ChangeType
 
-  /// value is the new value for the row if the ChangeType is PutChange, or nil
+  /// value is the new value for the row for EntityRow PutChanges, an encoded
+  /// StoreChangeCollectionInfo value for EntityCollection PutChanges, or nil
   /// otherwise.
   public let value: NSData?
 
@@ -55,7 +84,7 @@
   /// This marker can be provided in the Request message to allow the caller
   /// to resume the stream watching at a specific point without fetching the
   /// initial state.
-  public let resumeMarker: ResumeMarker
+  public let resumeMarker: ResumeMarker?
 
   /// FromSync indicates whether the change came from sync. If FromSync is false,
   /// then the change originated from the local device.
@@ -79,12 +108,12 @@
     // for more information.
     let condition = NSCondition()
     var data: WatchChange? = nil
-    var streamErr: ErrorType? = nil
+    var streamErr: SyncbaseError? = nil
     var updateAvailable = false
 
     // The anonymous function that gets called from the Swift. It blocks until there's an update
     // available from Go.
-    func fetchNext(timeout: NSTimeInterval?) -> (WatchChange?, ErrorType?) {
+    func fetchNext(timeout: NSTimeInterval?) -> (WatchChange?, SyncbaseError?) {
       condition.lock()
       while !updateAvailable {
         if let timeout = timeout {
@@ -126,7 +155,7 @@
 
     // The callback from Go when there's been an error in the watch stream. The stream will then
     // be closed and no new changes will ever come in from this request.
-    func onError(err: ErrorType) {
+    func onError(err: SyncbaseError) {
       condition.lock()
       // Wait until any existing update has been received by the fetch so we don't just blow
       // past it.
@@ -178,7 +207,10 @@
   }
 
   private static func onWatchError(handle: v23_syncbase_Handle, err: v23_syncbase_VError) {
-    let e: ErrorType = err.toVError() ?? SyncbaseError.InvalidOperation(reason: "A watch error occurred")
+    var e = SyncbaseError.InvalidOperation(reason: "A watch error occurred")
+    if let verr: VError = err.toVError() {
+      e = SyncbaseError(verr)
+    }
     let handle = Unmanaged<Watch.Handle>.fromOpaque(COpaquePointer(handle)).takeRetainedValue()
     handle.onError(e)
   }
diff --git a/SyncbaseCore/Source/util/Types.swift b/SyncbaseCore/Source/util/Types.swift
index 110b488..a505974 100644
--- a/SyncbaseCore/Source/util/Types.swift
+++ b/SyncbaseCore/Source/util/Types.swift
@@ -103,6 +103,12 @@
   }
 }
 
+public extension v23_syncbase_EntityType {
+  func toEntityType() -> WatchChange.EntityType? {
+    return WatchChange.EntityType(rawValue: Int(self.rawValue))
+  }
+}
+
 public extension v23_syncbase_Id {
   init(_ id: Identifier) throws {
     self.name = try id.name.toCgoString()
@@ -328,12 +334,29 @@
   func toWatchChange() -> WatchChange {
     let resumeMarkerData = v23_syncbase_Bytes(
       p: unsafeBitCast(self.resumeMarker.p, UnsafeMutablePointer<UInt8>.self),
-      n: self.resumeMarker.n).toNSData()!
+      n: self.resumeMarker.n).toNSData()
+    // Turn row & collectionId zero-values into nil.
+    var row = self.row.toString()
+    if row == "" {
+      row = nil
+    }
+    var collectionId = self.collection.toIdentifier()
+    if collectionId?.name == "" && collectionId?.blessing == "" {
+      collectionId = nil
+    }
+    // Zero-valued Value does not get turned into a nil on put -- if it's a put then we know
+    // it cannot be nil. This allows us to store empty arrays (an esoteric use case but one that
+    // is supported nevertheless).
+    var value = self.value.toNSData()
+    if value == nil && self.changeType == v23_syncbase_ChangeTypePut {
+      value = NSData()
+    }
     return WatchChange(
-      collectionId: self.collection.toIdentifier()!,
-      row: self.row.toString()!,
+      entityType: self.entityType.toEntityType()!,
+      collectionId: collectionId,
+      row: row,
       changeType: self.changeType.toChangeType()!,
-      value: self.value.toNSData(),
+      value: value,
       resumeMarker: resumeMarkerData,
       isFromSync: self.fromSync,
       isContinued: self.continued)
@@ -364,11 +387,7 @@
     var e = v23_syncbase_VError()
     let res = try f(&e)
     if let err = e.toVError() {
-      // We might be able to convert this VError into a SyncbaseError depending on the ID.
-      if let syncbaseError = SyncbaseError(err) {
-        throw syncbaseError
-      }
-      throw err
+      throw SyncbaseError(err)
     }
     return res
   }
diff --git a/SyncbaseCore/Tests/BasicDatabaseTests.swift b/SyncbaseCore/Tests/BasicDatabaseTests.swift
index 7523c3c..b739bf9 100644
--- a/SyncbaseCore/Tests/BasicDatabaseTests.swift
+++ b/SyncbaseCore/Tests/BasicDatabaseTests.swift
@@ -83,6 +83,7 @@
     withTestCollection { db, collection in
       let key = "testrow"
       try collection.delete(key)
+      XCTAssertFalse(try collection.exists(key))
       try BasicDatabaseTests.testGetPutRow(collection, key: key, targetValue: NSData())
       try BasicDatabaseTests.testGetPutRow(collection, key: key, targetValue: NSJSONSerialization.hackSerializeAnyObject(false))
       try BasicDatabaseTests.testGetPutRow(collection, key: key, targetValue: NSJSONSerialization.hackSerializeAnyObject(M_PI))
@@ -331,23 +332,19 @@
   // MARK: Test watch
 
   func testWatchTimeout() {
-    let completed = expectationWithDescription("Completed watch timeout")
-    withTestDbAsync { (db, cleanup) in
-      self.withTestCollection(db) { db, collection in
-        try collection.put("a", value: NSData())
-        let stream = try db.watch([CollectionRowPattern(
-          collectionName: collection.collectionId.name,
-          collectionBlessing: collection.collectionId.blessing,
-          rowKey: nil)])
-        dispatch_async(dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0)) {
-          XCTAssertNil(stream.next(timeout: 0.1))
-          XCTAssertNil(stream.err())
-          cleanup()
-          completed.fulfill()
-        }
+    withTestCollection { (db, collection) in
+      try collection.put("a", value: NSData())
+      let stream = try db.watch([CollectionRowPattern(
+        collectionName: collection.collectionId.name,
+        collectionBlessing: collection.collectionId.blessing,
+        rowKey: nil)])
+      if !self.consumeInitialState(stream) {
+        XCTFail("Initial stream died")
+      } else {
+        XCTAssertNil(stream.next(timeout: 0.1))
+        XCTAssertNil(stream.err())
       }
     }
-    waitForExpectationsWithTimeout(2) { XCTAssertNil($0) }
   }
 
   func testWatchPut() {
@@ -362,42 +359,41 @@
       let stream = try db.watch([CollectionRowPattern(
         collectionName: collection.collectionId.name,
         collectionBlessing: collection.collectionId.blessing,
-        rowKey: nil)])
-      let semaphore = dispatch_semaphore_create(0)
+        rowKey: "%")])
+      // Skip all the initial changes
+      if !self.consumeInitialState(stream) {
+        cleanup()
+        XCTFail("Initial stream died")
+        completed.fulfill()
+        return
+      }
       dispatch_async(dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0)) {
-        dispatch_semaphore_signal(semaphore)
         // Watch for changes in bg thread.
-        for tup in data {
-          let (key, value) = tup
+        for (key, value) in data {
           guard let change = stream.next(timeout: 1) else {
             cleanup()
             XCTFail("Missing put change")
             completed.fulfill()
             return
           }
-          XCTAssertNotNil(change)
           XCTAssertNil(stream.err())
           XCTAssertEqual(change.changeType, WatchChange.ChangeType.Put)
-          XCTAssertEqual(change.collectionId.blessing, collection.collectionId.blessing)
-          XCTAssertEqual(change.collectionId.name, collection.collectionId.name)
+          XCTAssertEqual(change.collectionId!.blessing, collection.collectionId.blessing)
+          XCTAssertEqual(change.collectionId!.name, collection.collectionId.name)
           XCTAssertFalse(change.isContinued)
           XCTAssertFalse(change.isFromSync)
-          XCTAssertGreaterThan(change.resumeMarker.length, 0)
+          XCTAssertGreaterThan(change.resumeMarker!.length, 0)
           XCTAssertEqual(change.row, key)
-          XCTAssertEqual(change.value, value)
+          XCTAssertEqual(change.value!, value)
         }
         cleanup()
         completed.fulfill()
       }
-
-      // Wait for background thread to start.
-      dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER)
-
       // Add data.
       do {
-        NSThread.sleepForTimeInterval(0.05) // Wait until the stream.next is called and blocking
-        for tup in data {
-          try collection.put(tup.0, value: tup.1)
+        for (key, value) in data {
+          try collection.put(key, value: value)
+          XCTAssertTrue(try! collection.exists(key))
         }
       } catch let e {
         XCTFail("Unexpected error: \(e)")
@@ -415,32 +411,40 @@
     withTestDbAsync { (db, cleanup) in
       let collection = try db.collection(Identifier(name: "collectionWatchDelete", blessing: anyPermissions))
       try collection.create(anyCollectionPermissions)
-      for tup in data {
-        try collection.put(tup.0, value: tup.1)
+      for (key, value) in data {
+        try collection.put(key, value: value)
       }
       let stream = try db.watch([CollectionRowPattern(
         collectionName: collection.collectionId.name,
         collectionBlessing: collection.collectionId.blessing,
-        rowKey: nil)])
-      let semaphore = dispatch_semaphore_create(0)
+        rowKey: "%")])
+      // Skip all the initial changes.
+      if !self.consumeInitialState(stream) {
+        cleanup()
+        XCTFail("Initial stream died")
+        completed.fulfill()
+        return
+      }
       dispatch_async(dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0)) {
-        dispatch_semaphore_signal(semaphore)
-        // Watch for changes in bg thread.
-        for tup in data {
-          let (key, _) = tup
-          guard let change = stream.next(timeout: 1) else {
+        // Watch for put changes.
+        for (key, _) in data {
+          guard let change = stream.next(timeout: 2) else {
             cleanup()
-            XCTFail("Missing delete change")
+            if stream.err() == nil {
+              XCTFail("Timed out")
+            } else {
+              XCTFail("Missing delete change before end of stream: \(stream.err())")
+            }
             completed.fulfill()
             return
           }
           XCTAssertNil(stream.err())
           XCTAssertEqual(change.changeType, WatchChange.ChangeType.Delete)
-          XCTAssertEqual(change.collectionId.blessing, collection.collectionId.blessing)
-          XCTAssertEqual(change.collectionId.name, collection.collectionId.name)
+          XCTAssertEqual(change.collectionId!.blessing, collection.collectionId.blessing)
+          XCTAssertEqual(change.collectionId!.name, collection.collectionId.name)
           XCTAssertFalse(change.isContinued)
           XCTAssertFalse(change.isFromSync)
-          XCTAssertGreaterThan(change.resumeMarker.length, 0)
+          XCTAssertGreaterThan(change.resumeMarker!.length, 0)
           XCTAssertEqual(change.row, key)
           XCTAssertNil(change.value)
         }
@@ -448,14 +452,10 @@
         completed.fulfill()
       }
 
-      // Wait for background thread to start.
-      dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER)
-
       do {
-        NSThread.sleepForTimeInterval(0.05) // Wait until the stream.next is called and blocking
         // Delete rows.
-        for tup in data {
-          try collection.delete(tup.0)
+        for (key, _) in data {
+          try collection.delete(key)
         }
       } catch let e {
         XCTFail("Unexpected error: \(e)")
@@ -464,6 +464,18 @@
     waitForExpectationsWithTimeout(2) { XCTAssertNil($0) }
   }
 
+  func consumeInitialState(stream: WatchStream) -> Bool {
+    // Get all the initial changes.
+    while true {
+      guard let change = stream.next(timeout: 1) else {
+        return false
+      }
+      if !change.isContinued {
+        return true
+      }
+    }
+  }
+
   func testWatchError() {
     var stream: WatchStream? = nil
     withTestDb { db in
@@ -474,13 +486,28 @@
         collectionName: collection.collectionId.name,
         collectionBlessing: collection.collectionId.blessing,
         rowKey: nil)])
+      // Skip all the initial changes.
+      if !self.consumeInitialState(stream!) {
+        XCTFail("Initial stream died")
+        return
+      }
       try collection.destroy()
+      let change = stream!.next(timeout: 1)
+      XCTAssertNotNil(change)
+      XCTAssert(change?.changeType == .Delete)
+      XCTAssert(change?.entityType == .Collection)
     }
     let change = stream!.next(timeout: 1)
-    print("Got watch change: \(stream!.err())")
     XCTAssertNil(change)
-    XCTAssertNotNil(stream!.err())
-    let verr = stream!.err() as! VError
-    XCTAssertTrue(verr.id.hasPrefix("v.io/v23/verror"))
+    guard let err = stream!.err() else {
+      XCTFail("Missing error: \(stream!.err())")
+      return
+    }
+    switch err {
+    case SyncbaseError.UnknownVError(let verr):
+      XCTAssertTrue(verr.id.hasPrefix("v.io/v23/verror"))
+    default:
+      XCTFail("Wrong kind of error: \(err)")
+    }
   }
 }