blob: f636a2c15743108fb52a27fac66d235cae150ff1 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
import Foundation
/// CollectionRowPattern contains SQL LIKE-style glob patterns ('%' and '_'
/// wildcards, '\' as escape character) for matching rows and collections by
/// name components. It is used by the API.
public struct CollectionRowPattern {
/// collectionName is a SQL LIKE-style glob pattern ('%' and '_' wildcards, '\' as escape
/// character) for matching collections. May not be empty.
public let collectionName: String
/// The blessing for collections.
public let collectionBlessing: String
/// rowKey is a SQL LIKE-style glob pattern ('%' and '_' wildcards, '\' as escape character)
/// for matching rows. If empty then only the collectionId pattern is matched and NO row events
/// are returned.
public let rowKey: String?
public init(collectionName: String, collectionBlessing: String, rowKey: String?) {
self.collectionName = collectionName
self.collectionBlessing = collectionBlessing
self.rowKey = rowKey
public typealias ResumeMarker = NSData
public struct WatchChange {
public enum EntityType: Int {
case Root
case Collection
case Row
public enum ChangeType: Int {
case Put
case Delete
/// EntityType is the type of the entity - Root, Collection, or Row.
public let entityType: EntityType
/// Collection is the id of the collection that was changed or contains the
/// changed row. Is nil if EntityType is not Collection or Row.
public let collectionId: Identifier?
/// Row is the key of the changed row. Nil if EntityType is not Row.
public let row: String?
/// ChangeType describes the type of the change, depending on the EntityType:
/// **EntityRow:**
/// - PutChange: the row exists in the collection, and Value can be called to
/// obtain the new value for this row.
/// - DeleteChange: the row was removed from the collection.
/// **EntityCollection:**
/// - PutChange: the collection exists, and CollectionInfo can be called to
/// obtain the collection info.
/// - DeleteChange: the collection was destroyed.
/// **EntityRoot:**
/// - PutChange: appears as the first (possibly only) change in the initial
/// state batch, only if watching from an empty ResumeMarker. This is the
/// only situation where an EntityRoot appears.
public let changeType: ChangeType
/// value is the new value for the row for EntityRow PutChanges, an encoded
/// StoreChangeCollectionInfo value for EntityCollection PutChanges, or nil
/// otherwise.
public let value: NSData?
/// ResumeMarker provides a compact representation of all the messages that
/// have been received by the caller for the given Watch call.
/// This marker can be provided in the Request message to allow the caller
/// to resume the stream watching at a specific point without fetching the
/// initial state.
public let resumeMarker: ResumeMarker?
/// FromSync indicates whether the change came from sync. If FromSync is false,
/// then the change originated from the local device.
public let isFromSync: Bool
/// If true, this WatchChange is followed by more WatchChanges that are in the
/// same batch as this WatchChange.
public let isContinued: Bool
public typealias WatchStream = AnonymousStream<WatchChange>
/// Internal namespace for the watch API -- end-users will access this through
/// instead. This is simply here to allow all watch-related code to be located in Watch.swift.
enum Watch {
private class Handle {
// Watch works by having Go call Swift as encounters each watch change.
// This is a bit of a mismatch for the Swift GeneratorType which is pull instead of push-based.
// Similar to collection.Scan, we create a push-pull adapter by blocking on either side using
// condition variables until both are ready for the next data handoff. See collection.Scan
// for more information.
let condition = NSCondition()
var data: WatchChange? = nil
var streamErr: SyncbaseError? = nil
var updateAvailable = false
var isCanceled = false
// The anonymous function that gets called from the Swift. It blocks until there's an update
// available from Go.
func fetchNext(timeout: NSTimeInterval?) -> (WatchChange?, SyncbaseError?) {
while !updateAvailable {
if let timeout = timeout {
if !condition.waitUntilDate(NSDate(timeIntervalSinceNow: timeout)) {
return (nil, nil)
} else {
// Grab the data from this update and reset for the next update.
let fetchedData = data
data = nil
// We don't need to locally capture doneErr because, unlike data, errors can only come in
// once at the very end of the stream (after which no more callbacks will get called).
updateAvailable = false
// Signal that we've fetched the data to Go.
return (fetchedData, streamErr)
// The callback from Go when there's a new Row (key-value) scanned.
func onChange(change: WatchChange) {
if isCanceled {
// Wait until any existing update has been received by the fetch so we don't just blow
// past it.
while updateAvailable && !isCanceled {
if isCanceled {
// Set the new data.
data = change
updateAvailable = true
// Wake up any blocked fetch.
// The callback from Go when there's been an error in the watch stream. The stream will then
// be closed and no new changes will ever come in from this request.
func onError(err: SyncbaseError) {
if isCanceled {
// Wait until any existing update has been received by the fetch so we don't just blow
// past it.
while updateAvailable && !isCanceled {
if isCanceled {
// Marks the end of data by clearing it and saving the error from Syncbase.
data = nil
streamErr = err
updateAvailable = true
// Wake up any blocked fetch.
static func watch(
encodedDatabaseName encodedDatabaseName: String,
patterns: [CollectionRowPattern],
resumeMarker: ResumeMarker? = nil) throws -> WatchStream {
let handle = Watch.Handle()
let unmanaged = Unmanaged.passRetained(handle)
let oHandle = UnsafeMutablePointer<Void>(unmanaged.toOpaque())
do {
try VError.maybeThrow { errPtr in
let cPatterns = try v23_syncbase_CollectionRowPatterns(patterns)
let cResumeMarker = v23_syncbase_Bytes(resumeMarker)
let callbacks = v23_syncbase_DbWatchPatternsCallbacks(
handle: v23_syncbase_Handle(unsafeBitCast(oHandle, UInt.self)),
onChange: { Watch.onWatchChange($0, change: $1) },
onError: { Watch.onWatchError($0, err: $1) })
try encodedDatabaseName.toCgoString(),
} catch {
throw error
return AnonymousStream(
fetchNextFunction: handle.fetchNext,
cancelFunction: {
// Until we can cancel a watch (see:
// we hack around it.
handle.isCanceled = true
// Callback handlers that convert the Cgo bridge types to native Swift types and pass them to
// the functions inside the passed handle.
private static func onWatchChange(handle: v23_syncbase_Handle, change: v23_syncbase_WatchChange) {
let change = change.extract()
let handle = Unmanaged<Watch.Handle>.fromOpaque(
COpaquePointer(bitPattern: handle)).takeUnretainedValue()
private static func onWatchError(handle: v23_syncbase_Handle, err: v23_syncbase_VError) {
var e = SyncbaseError.InvalidOperation(reason: "A watch error occurred")
if let verr: VError = err.extract() {
e = SyncbaseError(verr)
let handle = Unmanaged<Watch.Handle>.fromOpaque(
COpaquePointer(bitPattern: handle)).takeRetainedValue()