swift: Add support for the userdata syncgroup

This CL also adds support passing a CollectionRowPattern to HLAPI watch,
sets defaults for SyncgroupMemberInfo, refactors the HLAPI unit tests
slightly, and fixes a bug with converting permissions that have null
values.

Change-Id: I188d66a1a8079edb86ce725afdc3ae209f5f5095
diff --git a/Syncbase/Source/Database.swift b/Syncbase/Source/Database.swift
index c09148e..2be7690 100644
--- a/Syncbase/Source/Database.swift
+++ b/Syncbase/Source/Database.swift
@@ -163,10 +163,12 @@
       }
     }
     return try SyncbaseError.wrap {
-      let res = Syncgroup(
-        coreSyncgroup: self.coreDatabase.syncgroup(id.toCore()),
-        database: self)
+      let res = Syncgroup(coreSyncgroup: self.coreDatabase.syncgroup(id.toCore()), database: self)
       try res.createIfMissing(collections)
+      // Remember this syncgroup in the userdata collection. The value doesn't matter, so we use
+      // empty data.
+      // Note: We may eventually want to use the value to deal with rejected invitations.
+      try Syncbase.userDataCollection!.put(try id.encode(), value: NSData())
       return res
     }
   }
@@ -184,7 +186,7 @@
   /// Returns all syncgroups in the database.
   public func syncgroups() throws -> [Syncgroup] {
     return try SyncbaseError.wrap {
-      let coreIds = try self.coreDatabase.listSyncgroups()
+      let coreIds = try self.coreDatabase.listSyncgroups().filter({ return $0.name != Syncbase.USERDATA_SYNCGROUP_NAME })
       return coreIds.map { coreId in
         return Syncgroup(coreSyncgroup: self.coreDatabase.syncgroup(coreId), database: self)
       }
@@ -303,79 +305,80 @@
   /// handler is canceled by `removeWatchChangeHandler` then no subsequent calls will be made. Note
   /// that there may be WatchChanges queued up for a `OnChangeBatch` that will be ignored.
   /// Callbacks to `handler` occur on Syncbase.queue, which defaults to main.
-  public func addWatchChangeHandler(resumeMarker: ResumeMarker? = nil, handler: WatchChangeHandler) throws {
-    // Note: Eventually we'll add a watch variant that takes a query, where the query can be
-    // constructed using some sort of query builder API.
-    // TODO(sadovsky): Support specifying resumeMarker. Note, watch-from-resumeMarker may be
-    // problematic in that we don't track the governing ACL for changes in the watch log.
-    if let resumeMarker = resumeMarker where resumeMarker.length != 0 {
-      throw SyncbaseError.IllegalArgument(detail: "Specifying resumeMarker is not yet supported")
-    }
-    return try SyncbaseError.wrap {
-      let stream = try self.coreDatabase.watch(
-        [CollectionRowPattern(collectionName: "%", collectionBlessing: "%", rowKey: "%")],
-        resumeMarker: resumeMarker)
-      // Create a serial queue to immediately run this watch operation on via SyncbaseCore's
-      // blocking stream.
-      let watchQueue = dispatch_queue_create(
-        "Syncbase WatchChangeHandler \(handler.uniqueId)",
-        DISPATCH_QUEUE_SERIAL)
-      var isCanceled = false
-      Database.watchChangeHandlersMu.lock()
-      defer { Database.watchChangeHandlersMu.unlock() }
-      Database.watchChangeHandlers[handler] = HandlerOperation(
-        databaseId: self.coreDatabase.databaseId,
-        queue: watchQueue,
-        cancel: {
-          isCanceled = true
-          stream.cancel()
-      })
-      dispatch_async(watchQueue) {
-        var gotFirstBatch = false
-        var batch = [WatchChange]()
-        for coreChange in stream {
-          if isCanceled {
-            break
-          }
-          let change = WatchChange(coreChange: coreChange)
-          // Ignore changes to userdata collection.
-          if (change.collectionId?.name != Syncbase.USERDATA_SYNCGROUP_NAME) {
-            batch.append(change)
-          }
-          if (!change.isContinued) {
-            if (!gotFirstBatch) {
-              gotFirstBatch = true
-              // We synchronously run on Syncbase.queue to facilitate flow control. Go blocks
-              // until each callback is consumed before it calls with another WatchChange event.
-              // Backpressure in Swift is achieved by blocking until the WatchChange event is
-              // consumed by the app on Syncbase.queue using dispatch_sync. If we used
-              // dispatch_async, we could potentially queue up events faster than the
-              // ability for the app to consume them. By using dispatch_sync we also help the user
-              // mitigate against out-of-order events should Syncbase.queue be a concurrent queue
-              // rather than a serial queue.
-              dispatch_sync(Syncbase.queue, {
-                handler.onInitialState(batch)
-              })
-            } else {
-              dispatch_sync(Syncbase.queue, {
-                handler.onChangeBatch(batch)
-              })
-            }
-            batch.removeAll()
-          }
-        }
-        // Notify of error if we're permitted (not canceled).
-        if let err = stream.err() where !isCanceled {
-          dispatch_sync(Syncbase.queue, {
-            handler.onError(err)
-          })
-        }
-        // Cleanup
+  public func addWatchChangeHandler(
+    pattern pattern: CollectionRowPattern = CollectionRowPattern.Everything,
+    resumeMarker: ResumeMarker? = nil,
+    handler: WatchChangeHandler) throws {
+      // Note: Eventually we'll add a watch variant that takes a query, where the query can be
+      // constructed using some sort of query builder API.
+      // TODO(sadovsky): Support specifying resumeMarker. Note, watch-from-resumeMarker may be
+      // problematic in that we don't track the governing ACL for changes in the watch log.
+      if let resumeMarker = resumeMarker where resumeMarker.length != 0 {
+        throw SyncbaseError.IllegalArgument(detail: "Specifying resumeMarker is not yet supported")
+      }
+      return try SyncbaseError.wrap {
+        let stream = try self.coreDatabase.watch([pattern.toCore()], resumeMarker: resumeMarker)
+        // Create a serial queue to immediately run this watch operation on via SyncbaseCore's
+        // blocking stream.
+        let watchQueue = dispatch_queue_create(
+          "Syncbase WatchChangeHandler \(handler.uniqueId)",
+          DISPATCH_QUEUE_SERIAL)
+        var isCanceled = false
         Database.watchChangeHandlersMu.lock()
         defer { Database.watchChangeHandlersMu.unlock() }
-        Database.watchChangeHandlers[handler] = nil
+        Database.watchChangeHandlers[handler] = HandlerOperation(
+          databaseId: self.coreDatabase.databaseId,
+          queue: watchQueue,
+          cancel: {
+            isCanceled = true
+            stream.cancel()
+        })
+        dispatch_async(watchQueue) {
+          var gotFirstBatch = false
+          var batch = [WatchChange]()
+          for coreChange in stream {
+            if isCanceled {
+              break
+            }
+            let change = WatchChange(coreChange: coreChange)
+            // Ignore changes to userdata collection.
+            if (change.collectionId?.name != Syncbase.USERDATA_SYNCGROUP_NAME) {
+              batch.append(change)
+            }
+            if (!change.isContinued) {
+              if (!gotFirstBatch) {
+                gotFirstBatch = true
+                // We synchronously run on Syncbase.queue to facilitate flow control. Go blocks
+                // until each callback is consumed before it calls with another WatchChange event.
+                // Backpressure in Swift is achieved by blocking until the WatchChange event is
+                // consumed by the app on Syncbase.queue using dispatch_sync. If we used
+                // dispatch_async, we could potentially queue up events faster than the
+                // ability for the app to consume them. By using dispatch_sync we also help the user
+                // mitigate against out-of-order events should Syncbase.queue be a concurrent queue
+                // rather than a serial queue.
+                dispatch_sync(Syncbase.queue, {
+                  handler.onInitialState(batch)
+                })
+              } else {
+                dispatch_sync(Syncbase.queue, {
+                  handler.onChangeBatch(batch)
+                })
+              }
+              batch.removeAll()
+            }
+          }
+          // Notify of error if we're permitted (not canceled).
+          if let err = stream.err() where !isCanceled {
+            dispatch_sync(Syncbase.queue, {
+              handler.onError(err)
+            })
+          }
+          // Cleanup
+          Database.watchChangeHandlersMu.lock()
+          defer { Database.watchChangeHandlersMu.unlock() }
+          Database.watchChangeHandlers[handler] = nil
+        }
       }
-    }
   }
 
   /// Makes it so `handler` stops receiving notifications. Note there may be queued WatchChanges
diff --git a/Syncbase/Source/Syncbase.swift b/Syncbase/Source/Syncbase.swift
index bcb9898..1058b0b 100644
--- a/Syncbase/Source/Syncbase.swift
+++ b/Syncbase/Source/Syncbase.swift
@@ -16,8 +16,11 @@
   // Initialization state
   static var didInit = false
   static var didPostLogin = false
+  static var didStartShutdown = false
   // Main database.
   static var db: Database?
+  // The userData collection, created post-login.
+  static var userDataCollection: Collection?
   // Options for opening a database.
   static var adminUserId = "alexfandrianto@google.com"
   static var defaultBlessingStringPrefix = "dev.v.io:o:608941808256-43vtfndets79kf5hac8ieujto8837660.apps.googleusercontent.com:"
@@ -91,6 +94,7 @@
       Syncbase.disableSyncgroupPublishing = disableSyncgroupPublishing
       Syncbase.disableUserdataSyncgroup = disableUserdataSyncgroup
       Syncbase.didPostLogin = false
+      Syncbase.didStartShutdown = false
       // We don't need to set Syncbase.queue as it is a proxy for SyncbaseCore's queue, which is
       // set in the configure below.
       try SyncbaseError.wrap {
@@ -105,27 +109,64 @@
           // If we get an exception after configuring the low-level API, make sure we shutdown
           // Syncbase so that any subsequent call to this configure method doesn't get a
           // SyncbaseError.AlreadyConfigured exception from SyncbaseCore.Syncbase.configure.
-          SyncbaseCore.Syncbase.shutdown()
+          Syncbase.shutdown()
           throw e
         }
       }
       Syncbase.didInit = true
   }
 
+  /// Shuts down the Syncbase service. You must call configure again before any calls will work.
+  public static func shutdown() {
+    Syncbase.didStartShutdown = true
+    SyncbaseCore.Syncbase.shutdown()
+    Syncbase.didInit = false
+    Syncbase.didPostLogin = false
+  }
+
   private static func postLoginCreateDefaults() throws {
     let coreDb = try SyncbaseCore.Syncbase.database(Syncbase.DB_NAME)
     let database = Database(coreDatabase: coreDb)
     try database.createIfMissing()
-    if (Syncbase.disableUserdataSyncgroup) {
-      try database.collection(Syncbase.USERDATA_SYNCGROUP_NAME, withoutSyncgroup: true)
-    } else {
-      // FIXME(zinman): Implement create-or-join (and watch) of userdata syncgroup.
-      throw SyncbaseError.IllegalArgument(detail: "Synced userdata collection is not yet supported")
+    userDataCollection = try database.collection(Syncbase.USERDATA_SYNCGROUP_NAME, withoutSyncgroup: true)
+    if (!Syncbase.disableUserdataSyncgroup) {
+      let syncgroup = try userDataCollection!.syncgroup()
+      do {
+        try syncgroup.join()
+      } catch {
+        try syncgroup.createIfMissing([userDataCollection!])
+      }
+      // TODO(zinman): Figure out when/how this can throw and if we should handle it better.
+      try database.addWatchChangeHandler(
+        pattern: CollectionRowPattern(collectionName: Syncbase.USERDATA_SYNCGROUP_NAME),
+        handler: WatchChangeHandler(
+          onInitialState: onUserDataWatchChange,
+          onChangeBatch: onUserDataWatchChange,
+          onError: { err in
+            if !Syncbase.didStartShutdown {
+              NSLog("Syncbase - Error watching userdata syncgroups: %@", "\(err)")
+            }
+        }))
     }
     Syncbase.db = database
     Syncbase.didPostLogin = true
   }
 
+  private static func onUserDataWatchChange(changes: [WatchChange]) {
+    for change in changes {
+      guard let row = change.row where change.entityType == .Row && change.changeType == .Put else {
+        continue
+      }
+      do {
+        let syncgroupId = try Identifier.decode(row)
+        let syncgroup = try Syncbase.database().syncgroup(syncgroupId)
+        try syncgroup.join()
+      } catch let e {
+        NSLog("Syncbase - Error joining syncgroup: %@", "\(e)")
+      }
+    }
+  }
+
   /// Returns the shared database handle. Must have already called `configure` and be logged in,
   /// otherwise this will throw a `SyncbaseError.NotConfigured` or `SyncbaseError.NotLoggedIn`
   /// error.
diff --git a/Syncbase/Source/Syncgroup.swift b/Syncbase/Source/Syncgroup.swift
index e27065e..773f425 100644
--- a/Syncbase/Source/Syncgroup.swift
+++ b/Syncbase/Source/Syncgroup.swift
@@ -13,7 +13,7 @@
 
   static var syncgroupMemberInfo: SyncgroupMemberInfo {
     // TODO(zinman): Validate these are correct.
-    return SyncgroupMemberInfo(syncPriority: UInt8(3), blobDevType: BlobDevType.BlobDevTypeLeaf)
+    return SyncgroupMemberInfo(syncPriority: UInt8(3), blobDevType: BlobDevType.Leaf)
   }
 
   func createIfMissing(collections: [Collection]) throws {
@@ -46,6 +46,12 @@
     return Identifier(coreId: coreSyncgroup.syncgroupId)
   }
 
+  func join() throws {
+    // TODO(razvanm): Find a way to restrict the remote blessing. Cloud is one thing the remote
+    // blessings should include.
+    try coreSyncgroup.join("", expectedSyncbaseBlessings: ["..."], myInfo: Syncgroup.syncgroupMemberInfo)
+  }
+
   /// Returns the `AccessList` for this syncgroup.
   public func accessList() throws -> AccessList {
     return try AccessList(perms: try coreSyncgroup.getSpec().spec.permissions)
diff --git a/Syncbase/Source/WatchChange.swift b/Syncbase/Source/WatchChange.swift
index afbbccd..1f07c31 100644
--- a/Syncbase/Source/WatchChange.swift
+++ b/Syncbase/Source/WatchChange.swift
@@ -5,6 +5,48 @@
 import Foundation
 import SyncbaseCore
 
+/// CollectionRowPattern contains SQL LIKE-style glob patterns ('%' and '_'
+/// wildcards, '\' as escape character) for matching rows and collections by
+/// name components. It is used by the Database.watch API.
+public struct CollectionRowPattern {
+  /// WildCard matches everything.
+  public static var Everything = CollectionRowPattern(
+    collectionName: Wildcard, collectionBlessing: Wildcard, rowKey: Wildcard)
+
+  public static var Wildcard = "%"
+
+  /// collectionName is a SQL LIKE-style glob pattern ('%' and '_' wildcards, '\' as escape
+  /// character) for matching collections. May not be empty.
+  public let collectionName: String
+
+  /// The blessing for collections.
+  public let collectionBlessing: String
+
+  /// rowKey is a SQL LIKE-style glob pattern ('%' and '_' wildcards, '\' as escape character)
+  /// for matching rows. If empty then only the collectionId pattern is matched and NO row events
+  /// are returned.
+  public let rowKey: String?
+
+  public init(collectionName: String = Wildcard, collectionBlessing: String = Wildcard, rowKey: String? = Wildcard) {
+    self.collectionName = collectionName
+    self.collectionBlessing = collectionBlessing
+    self.rowKey = rowKey
+  }
+
+  public init(collectionId: Identifier, rowKey: String? = Wildcard) {
+    self.collectionName = collectionId.name
+    self.collectionBlessing = collectionId.blessing
+    self.rowKey = rowKey
+  }
+
+  func toCore() -> SyncbaseCore.CollectionRowPattern {
+    return SyncbaseCore.CollectionRowPattern(
+      collectionName: collectionName,
+      collectionBlessing: collectionBlessing,
+      rowKey: rowKey)
+  }
+}
+
 /// ResumeMarker provides a compact representation of all the messages that
 /// have been received by the caller for the given Watch call. It is not
 /// something that you would ever generate; it is always provided to you
diff --git a/Syncbase/Tests/BasicDatabaseTests.swift b/Syncbase/Tests/BasicDatabaseTests.swift
index 2a3f191..bc8f8d4 100644
--- a/Syncbase/Tests/BasicDatabaseTests.swift
+++ b/Syncbase/Tests/BasicDatabaseTests.swift
@@ -11,32 +11,15 @@
 
 class BasicDatabaseTests: XCTestCase {
   override class func setUp() {
-    SyncbaseCore.Syncbase.isUnitTest = true
-    let rootDir = NSFileManager.defaultManager()
-      .URLsForDirectory(.ApplicationSupportDirectory, inDomains: .UserDomainMask)[0]
-      .URLByAppendingPathComponent("SyncbaseUnitTest")
-      .path!
-    // TODO(zinman): Once we have create-and-join implemented don't always set
-    // disableUserdataSyncgroup to true.
-    try! Syncbase.configure(
-      adminUserId: "unittest@google.com",
-      rootDir: rootDir,
-      disableUserdataSyncgroup: true,
-      queue: testQueue)
-    let semaphore = dispatch_semaphore_create(0)
-    Syncbase.login(GoogleOAuthCredentials(token: ""), callback: { err in
-      XCTAssertNil(err)
-      dispatch_semaphore_signal(semaphore)
-    })
-    dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER)
+    configureDb(disableUserdataSyncgroup: true, disableSyncgroupPublishing: true)
   }
 
   override class func tearDown() {
-    SyncbaseCore.Syncbase.shutdown()
+    Syncbase.shutdown()
   }
 
   func testDatabaseInit() {
-    asyncDbTest() { db in
+    withDb { db in
       // Must be idempotent.
       try db.createIfMissing()
       try db.createIfMissing()
@@ -44,7 +27,7 @@
   }
 
   func testCollection() {
-    asyncDbTest() { db in
+    withDb { db in
       var collections = try db.collections()
       XCTAssertEqual(collections.count, 0)
 
@@ -65,3 +48,31 @@
 
   // TODO(zinman): Add more unit tests.
 }
+
+class SyncgroupTests: XCTestCase {
+  override class func setUp() {
+    configureDb(disableUserdataSyncgroup: false, disableSyncgroupPublishing: true)
+  }
+
+  override class func tearDown() {
+    Syncbase.shutdown()
+  }
+
+  func testUserdata() {
+    withDb { db in
+      let coreCollections = try db.coreDatabase.listCollections()
+      XCTAssertEqual(coreCollections.count, 1)
+      XCTAssertEqual(coreCollections[0].name, Syncbase.USERDATA_SYNCGROUP_NAME)
+
+      let coreSyncgroups = try db.coreDatabase.listSyncgroups()
+      XCTAssertEqual(coreSyncgroups.count, 1)
+      XCTAssertEqual(coreSyncgroups[0].name, Syncbase.USERDATA_SYNCGROUP_NAME)
+
+      let verSpec = try db.coreDatabase.syncgroup(Syncbase.USERDATA_SYNCGROUP_NAME).getSpec()
+      XCTAssertEqual(verSpec.spec.collections.count, 1)
+
+      // TODO(razvanm): Make the userdata syncgroup private.
+      XCTAssertEqual(verSpec.spec.isPrivate, false)
+    }
+  }
+}
diff --git a/Syncbase/Tests/TestHelpers.swift b/Syncbase/Tests/TestHelpers.swift
index bfc3e33..16cc823 100644
--- a/Syncbase/Tests/TestHelpers.swift
+++ b/Syncbase/Tests/TestHelpers.swift
@@ -3,13 +3,45 @@
 // license that can be found in the LICENSE file.
 
 import XCTest
-import Syncbase
+@testable import Syncbase
+import enum Syncbase.Syncbase
+import class Syncbase.Database
+@testable import SyncbaseCore
+
+let unitTestRootDir = NSFileManager.defaultManager()
+  .URLsForDirectory(.ApplicationSupportDirectory, inDomains: .UserDomainMask)[0]
+  .URLByAppendingPathComponent("SyncbaseUnitTest")
+  .path!
 
 extension XCTestCase {
-  func asyncDbTest(runBlock: Database throws -> Void) {
+  class func configureDb(disableUserdataSyncgroup disableUserdataSyncgroup: Bool, disableSyncgroupPublishing: Bool) {
+    SyncbaseCore.Syncbase.isUnitTest = true
+
+    try! NSFileManager.defaultManager().removeItemAtPath(unitTestRootDir)
+
+    // TODO(zinman): Once we have create-and-join implemented don't always set
+    // disableUserdataSyncgroup to true.
+    try! Syncbase.configure(
+      adminUserId: "unittest@google.com",
+      rootDir: unitTestRootDir,
+      disableUserdataSyncgroup: disableUserdataSyncgroup,
+      disableSyncgroupPublishing: disableSyncgroupPublishing,
+      queue: testQueue)
+    let semaphore = dispatch_semaphore_create(0)
+    Syncbase.login(GoogleOAuthCredentials(token: ""), callback: { err in
+      XCTAssertNil(err)
+      dispatch_semaphore_signal(semaphore)
+    })
+    dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER)
+  }
+
+  func withDb(runBlock: Database throws -> Void) {
     do {
       let db = try Syncbase.database()
       try runBlock(db)
+      try db.collections().forEach { try $0.destroy() }
+      // TODO(zinman): Re-enable when supported in Syncbase
+//      try db.syncgroups().forEach { try $0.coreSyncgroup.destroy() }
     } catch let e {
       XCTFail("Unexpected error: \(e)")
     }
diff --git a/SyncbaseCore/Source/Blob.swift b/SyncbaseCore/Source/Blob.swift
index 1176866..cd9102b 100644
--- a/SyncbaseCore/Source/Blob.swift
+++ b/SyncbaseCore/Source/Blob.swift
@@ -9,11 +9,11 @@
 
 public enum BlobDevType: Int {
   /// Blobs migrate toward servers, which store them.  (example: server in cloud)
-  case BlobDevTypeServer = 0
+  case Server = 0
   /// Ordinary devices (example: laptop)
-  case BlobDevTypeNormal = 1
+  case Normal = 1
   /// Blobs migrate from leaves, which have less storage (examples: a camera, phone)
-  case BlobDevTypeLeaf = 2
+  case Leaf = 2
 }
 
 let kNullBlobRef = BlobRef("")
diff --git a/SyncbaseCore/Source/Permissions.swift b/SyncbaseCore/Source/Permissions.swift
index 9ccb892..fa791c4 100644
--- a/SyncbaseCore/Source/Permissions.swift
+++ b/SyncbaseCore/Source/Permissions.swift
@@ -62,9 +62,22 @@
   }
 
   static func fromJsonable(jsonable: [String: AnyObject]) -> AccessList? {
-    guard let castIn = jsonable["In"] as? [String],
-      castNotIn = jsonable["NotIn"] as? [String] else {
+    // We do this funky casting structure because we want to differentiate between a map element
+    // where it's value is null (legal and happens), and non-nils where we want to return nil
+    // if it's anything other than a string array.
+    var castIn = [String]()
+    var castNotIn = [String]()
+    if let allowed = jsonable["In"] {
+      guard let allowedStr = allowed as? [String] else {
         return nil
+      }
+      castIn = allowedStr
+    }
+    if let notAllowed = jsonable["In"] {
+      guard let notAllowedStr = notAllowed as? [String] else {
+        return nil
+      }
+      castNotIn = notAllowedStr
     }
     return AccessList(
       allowed: castIn as [BlessingPattern],
diff --git a/SyncbaseCore/Source/Syncgroup.swift b/SyncbaseCore/Source/Syncgroup.swift
index 4604f87..928187c 100644
--- a/SyncbaseCore/Source/Syncgroup.swift
+++ b/SyncbaseCore/Source/Syncgroup.swift
@@ -170,7 +170,7 @@
   public let syncPriority: UInt8
   public let blobDevType: BlobDevType /// See BlobDevType* constants.
 
-  public init(syncPriority: UInt8, blobDevType: BlobDevType) {
+  public init(syncPriority: UInt8 = 0, blobDevType: BlobDevType = .Leaf) {
     self.syncPriority = syncPriority
     self.blobDevType = blobDevType
   }