// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
import Foundation
import SyncbaseCore
// Counter to allow SyncgroupInviteHandler and WatchChangeHandler structs to be unique and hashable.
var uniqueIdCounter: Int32 = 0
/// Handles discovered syncgroup invites.
public struct SyncgroupInviteHandler: Hashable, Equatable {
/// Called when a syncgroup invitation is discovered. Clients typically handle invites by
/// calling `acceptSyncgroupInvite` or `ignoreSyncgroupInvite`.
public let onInvite: SyncgroupInvite -> Void
/// Called when an error occurs while scanning for syncgroup invitations. Once
/// `onError` is called, no other methods will be called on this handler.
public let onError: ErrorType -> Void
// This internal-only variable allows us to test SyncgroupInviteHandler structs for equality.
// This cannot be done otherwise as function calls cannot be tested for equality.
// Equality is important to facilitate the add/remove APIs in Database where
// handlers are directly passed for removal instead of other indirect identifier.
let uniqueId = OSAtomicIncrement32(&uniqueIdCounter)
public var hashValue: Int {
return uniqueId.hashValue
public func == (lhs: SyncgroupInviteHandler, rhs: SyncgroupInviteHandler) -> Bool {
return lhs.uniqueId == rhs.uniqueId
/// Handles observed changes to the database.
public struct WatchChangeHandler: Hashable, Equatable {
/// Called once, when a watch change handler is added, to provide the initial state of the
/// values being watched.
public let onInitialState: [WatchChange] -> Void
/// Called whenever a batch of changes is committed to the database. Individual puts/deletes
/// surface as a single-change batch.
public let onChangeBatch: [WatchChange] -> Void
/// Called when an error occurs while watching for changes. Once `onError` is called,
/// no other methods will be called on this handler.
public let onError: ErrorType -> Void
public init(
onInitialState: [WatchChange] -> Void,
onChangeBatch: [WatchChange] -> Void,
onError: ErrorType -> Void) {
self.onInitialState = onInitialState
self.onChangeBatch = onChangeBatch
self.onError = onError
// This internal-only variable allows us to test WatchChangeHandler structs for equality.
// This cannot be done otherwise as function calls cannot be tested for equality.
// Equality is important to facilitate the add/remove APIs in Database where
// handlers are directly passed for removal instead of other indirect identifier.
let uniqueId = OSAtomicIncrement32(&uniqueIdCounter)
public var hashValue: Int {
return uniqueId.hashValue
public func == (lhs: WatchChangeHandler, rhs: WatchChangeHandler) -> Bool {
return lhs.uniqueId == rhs.uniqueId
struct HandlerOperation {
let databaseId: SyncbaseCore.Identifier
let queue: dispatch_queue_t
let cancel: Void -> Void
/// A set of collections and syncgroups.
/// To get a Database handle, call `Syncbase.database`.
public class Database: DatabaseHandle, CustomStringConvertible {
let coreDatabase: SyncbaseCore.Database
// These are all static because we might have active handlers to a database while it goes
// out of scope (e.g. the user gets the database, then adds a watch handler, but doesn't retain
// the original reference to Database). Instead, we store the databaseId to make sure that any two
// Database instances generated from Syncbase.database behave the same with respect to
// adding/removing watch handlers.
private static let syncgroupInviteHandlersMu = NSLock()
private static let watchChangeHandlersMu = NSLock()
private static var syncgroupInviteHandlers: [SyncgroupInviteHandler: HandlerOperation] = [:]
private static var watchChangeHandlers: [WatchChangeHandler: HandlerOperation] = [:]
func createIfMissing() throws {
do {
try SyncbaseError.wrap {
try self.coreDatabase.create(try defaultDatabasePerms())
} catch SyncbaseError.Exist {
// Database already exists, presumably from a previous run of the app.
init(coreDatabase: SyncbaseCore.Database) {
self.coreDatabase = coreDatabase
public var databaseId: Identifier {
return Identifier(coreId: coreDatabase.databaseId)
public func collection(name: String, withoutSyncgroup: Bool = false) throws -> Collection {
let res = try collection(Identifier(name: name, blessing: personalBlessingString()))
try res.createIfMissing()
// TODO(sadovsky): Unwind collection creation on syncgroup creation failure? It would be
// nice if we could create the collection and syncgroup in a batch.
if (!withoutSyncgroup) {
try syncgroup(name, collections: [res])
return res
public func collection(collectionId: Identifier) throws -> Collection {
// TODO(sadovsky): Consider throwing an exception or returning null if the collection does
// not exist. But note, a collection can get destroyed via sync after a client obtains a
// handle for it, so perhaps we should instead add an 'exists' method.
return try SyncbaseError.wrap {
return try Collection(
coreCollection: self.coreDatabase.collection(collectionId.toCore()),
databaseHandle: self)
/// Returns all collections in the database.
public func collections() throws -> [Collection] {
return try SyncbaseError.wrap {
let coreIds = try self.coreDatabase.listCollections().filter({ return $ != Syncbase.USERDATA_SYNCGROUP_NAME })
return try { coreId in
return Collection(
coreCollection: try self.coreDatabase.collection(coreId),
databaseHandle: self)
/// **FOR ADVANCED USERS**. Creates syncgroup and adds it to the user's "userdata" collection, as
/// needed. Idempotent. The id of the new syncgroup will include the creator's user id and the
/// given syncgroup name. Requires that all collections were created by the current user.
/// - parameter name: Name of the syncgroup.
/// - parameter collections: Collections in the syncgroup.
/// - returns: The created syncgroup.
public func syncgroup(name: String, collections: [Collection]) throws -> Syncgroup {
if (collections.isEmpty) {
throw SyncbaseError.IllegalArgument(detail: "No collections specified")
let id = Identifier(name: name, blessing: collections[0].collectionId.blessing)
for cx in collections {
if (cx.collectionId.blessing != id.blessing) {
throw SyncbaseError.BlessingError(detail: "Collections must all have the same creator")
return try SyncbaseError.wrap {
let res = Syncgroup(
coreSyncgroup: self.coreDatabase.syncgroup(id.toCore()),
database: self)
try res.createIfMissing(collections)
return res
/// Returns the syncgroup with the given id.
public func syncgroup(syncgroupId: Identifier) -> Syncgroup {
// TODO(sadovsky): Consider throwing an exception or returning null if the syncgroup does
// not exist. But note, a syncgroup can get destroyed via sync after a client obtains a
// handle for it, so perhaps we should instead add an 'exists' method.
return Syncgroup(
coreSyncgroup: coreDatabase.syncgroup(syncgroupId.toCore()),
database: self)
/// Returns all syncgroups in the database.
public func syncgroups() throws -> [Syncgroup] {
return try SyncbaseError.wrap {
let coreIds = try self.coreDatabase.listSyncgroups()
return { coreId in
return Syncgroup(coreSyncgroup: self.coreDatabase.syncgroup(coreId), database: self)
/// Notifies `handler` of any existing syncgroup invites, and of all subsequent new invites.
public func addSyncgroupInviteHandler(handler: SyncgroupInviteHandler) {
defer { Database.syncgroupInviteHandlersMu.unlock() }
preconditionFailure("Not implemented")
/// Makes it so `handler` stops receiving notifications.
public func removeSyncgroupInviteHandler(handler: SyncgroupInviteHandler) {
let op = Database.syncgroupInviteHandlers[handler]
if let op = op {
/// Makes it so all syncgroup invite handlers stop receiving notifications.
public func removeAllSyncgroupInviteHandlers() {
// Grab a local copy by value so we don't need to worry about concurrency or deadlocking
// on the main mutex.
let handlers = Database.syncgroupInviteHandlers
for op in handlers.values {
if op.databaseId == coreDatabase.databaseId {
public typealias AcceptSyncgroupInviteCallback = (sg: Syncgroup?, err: ErrorType?) -> Void
/// Joins the syncgroup associated with the given invite and adds it to the user's "userdata"
/// collection, as needed.
/// - parameter invite: The syncgroup invite.
/// - parameter callback: The callback run on Syncbase.`queue` with either the accepted Syncgroup
/// or an error.
public func acceptSyncgroupInvite(invite: SyncgroupInvite, callback: AcceptSyncgroupInviteCallback) {
dispatch_async(dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0)) {
do {
try SyncbaseError.wrap {
let coreSyncgroup = self.coreDatabase.syncgroup(invite.syncgroupId.toCore())
let syncgroup = Syncgroup(coreSyncgroup: coreSyncgroup, database: self)
try coreSyncgroup.join(invite.remoteSyncbaseName,
expectedSyncbaseBlessings: invite.expectedSyncbaseBlessings,
myInfo: Syncgroup.syncgroupMemberInfo)
dispatch_async(Syncbase.queue) {
callback(sg: syncgroup, err: nil)
} catch let e {
dispatch_async(Syncbase.queue) {
callback(sg: nil, err: e)
/// Records that the user has ignored this invite, such that it's never surfaced again.
public func ignoreSyncgroupInvite(invite: SyncgroupInvite) {
preconditionFailure("Not implemented")
/// Runs the given operation in a batch, managing retries and commit/abort. Writable batches are
/// committed, retrying if commit fails due to a concurrent batch. Read-only batches are aborted.
/// - parameter readOnly: Run batch in read-only mode.
/// - parameter op: The operation to run.
public func runInBatch(readOnly: Bool = false, op: BatchOperation) throws {
// TODO(zinman): Is this suppose to be async?
try SyncbaseError.wrap {
try SyncbaseCore.Batch.runInBatchSync(
db: self.coreDatabase,
opts: SyncbaseCore.BatchOptions(readOnly: readOnly),
op: { coreDb in try op(BatchDatabase(coreBatchDatabase: coreDb)) })
/// Creates a new batch. Instead of calling this function directly, clients are encouraged to use
/// the `runInBatch` helper function, which detects "concurrent batch" errors and handles
/// retries internally.
/// Default concurrency semantics:
/// - Reads (e.g. gets, scans) inside a batch operate over a consistent snapshot taken during
/// `beginBatch`, and will see the effects of prior writes performed inside the batch.
/// - `commit` may fail with `ConcurrentBatch` exception, indicating that after
/// `beginBatch` but before `commit`, some concurrent routine wrote to a key that
/// matches a key or row-range read inside this batch.
/// - Other methods will never fail with error `ConcurrentBatch`, even if it is
/// known that `commit` will fail with this error.
/// Once a batch has been committed or aborted, subsequent method calls will fail with no
/// effect.
/// Concurrency semantics can be configured using the optional parameters.
/// - parameter readOnly: If true, set the transaction to read-only.
/// - returns: The BatchDatabase object representing the transaction.
public func beginBatch(readOnly: Bool = false) throws -> BatchDatabase {
return try SyncbaseError.wrap {
return BatchDatabase(coreBatchDatabase:
try self.coreDatabase.beginBatch(BatchOptions(readOnly: readOnly)))
/// Notifies `handler` of initial state, and of all subsequent changes to this database. If this
/// handler is canceled by `removeWatchChangeHandler` then no subsequent calls will be made. Note
/// that there may be WatchChanges queued up for a `OnChangeBatch` that will be ignored.
/// Callbacks to `handler` occur on Syncbase.queue, which defaults to main.
public func addWatchChangeHandler(resumeMarker: ResumeMarker? = nil, handler: WatchChangeHandler) throws {
// Note: Eventually we'll add a watch variant that takes a query, where the query can be
// constructed using some sort of query builder API.
// TODO(sadovsky): Support specifying resumeMarker. Note, watch-from-resumeMarker may be
// problematic in that we don't track the governing ACL for changes in the watch log.
if let resumeMarker = resumeMarker where resumeMarker.length != 0 {
throw SyncbaseError.IllegalArgument(detail: "Specifying resumeMarker is not yet supported")
return try SyncbaseError.wrap {
let stream = try
[CollectionRowPattern(collectionName: "%", collectionBlessing: "%", rowKey: "%")],
resumeMarker: resumeMarker)
// Create a serial queue to immediately run this watch operation on via SyncbaseCore's
// blocking stream.
let watchQueue = dispatch_queue_create(
"Syncbase WatchChangeHandler \(handler.uniqueId)",
var isCanceled = false
defer { Database.watchChangeHandlersMu.unlock() }
Database.watchChangeHandlers[handler] = HandlerOperation(
databaseId: self.coreDatabase.databaseId,
queue: watchQueue,
cancel: {
isCanceled = true
dispatch_async(watchQueue) {
var gotFirstBatch = false
var batch = [WatchChange]()
for coreChange in stream {
if isCanceled {
let change = WatchChange(coreChange: coreChange)
// Ignore changes to userdata collection.
if (!change.isContinued) {
if (!gotFirstBatch) {
gotFirstBatch = true
let b = batch
// We synchronously run on Syncbase.queue to facilitate flow control. Go blocks
// until each callback is consumed before it calls with another WatchChange event.
// Backpressure in Swift is achieved by blocking until the WatchChange event is
// consumed by the app on Syncbase.queue using dispatch_sync. If we used
// dispatch_async, we could potentially queue up events faster than the
// ability for the app to consume them. By using dispatch_sync we also help the user
// mitigate against out-of-order events should Syncbase.queue be a concurrent queue
// rather than a serial queue.
dispatch_sync(Syncbase.queue, {
} else {
dispatch_sync(Syncbase.queue, {
// Notify of error if we're permitted (not canceled).
if let err = stream.err() where !isCanceled {
dispatch_sync(Syncbase.queue, {
// Cleanup
defer { Database.watchChangeHandlersMu.unlock() }
Database.watchChangeHandlers[handler] = nil
/// Makes it so `handler` stops receiving notifications. Note there may be queued WatchChanges
/// queued up for a `OnChangeBatch` that will be ignored.
public func removeWatchChangeHandler(handler: WatchChangeHandler) {
let op = Database.watchChangeHandlers[handler]
if let op = op {
/// Makes it so all watch change handlers stop receiving notifications attached to this database.
public func removeAllWatchChangeHandlers() {
// Grab a local copy by value so we don't need to worry about concurrency or deadlocking
// on the main mutex.
let handlers = Database.watchChangeHandlers
for op in handlers.values {
// Because Database.watchChangeHandlers is static, we must make sure the database ids
// are the same before we cancel it.
if op.databaseId == self.coreDatabase.databaseId {
public var description: String {
return "[Syncbase.Database id=\(databaseId)]"