syncbase/bridge/cgo: Two changes to the C API for handling callbacks
The first change is to switch the handle from 'int' to 'void*'. This
will help interfacing with language where is easier to pass a pointer
to a struct instead of explicitly managing the mapping to it. Both
Java and Swift manage memory in a way that makes is hard/impossible
to use pointers so the 'void*' will be used as a larger 'int'.
The second change is to use a single handle instead of two for both
watching and scanning. This makes the C API simpler and is more inline
with what the Java/Swift need to do (namely, keeping a struct that
describes both callbacks).
Note: the author of the Swift code is Aaron Zinman <aaron@azinman.com>.
MultiPart: 2/2
Change-Id: Id0c16356fda53825a4b2821e5ce374bf051481ca
diff --git a/SyncbaseCore/Source/Collection.swift b/SyncbaseCore/Source/Collection.swift
index 56925af..748a4b9 100644
--- a/SyncbaseCore/Source/Collection.swift
+++ b/SyncbaseCore/Source/Collection.swift
@@ -191,14 +191,7 @@
}
}
- /// Scan returns all rows in the given half-open range [start, limit). If limit
- /// is "", all rows with keys >= start are included.
- /// Concurrency semantics: It is legal to perform writes concurrently with
- /// Scan. The returned stream reads from a consistent snapshot taken at the
- /// time of the RPC (or at the time of BeginBatch, if in a batch), and will not
- /// reflect subsequent writes to keys not yet reached by the stream.
- /// See helpers Prefix(), Range(), SingleRow().
- public func scan(r: RowRange) throws -> ScanStream {
+ private class ScanHandle {
// Scan works by having Go call Swift as encounters each row. This is a bit of a mismatch
// for the Swift GeneratorType which is pull instead of push-based. We create a push-pull
// adapter by blocking on either side using condition variables until both are ready for the
@@ -213,7 +206,7 @@
// The anonymous function that gets called from the Swift. It blocks until there's an update
// available from Go.
- let fetchNext = { (timeout: NSTimeInterval?) -> ((String, GetValueFromScanStream)?, ErrorType?) in
+ func fetchNext(timeout: NSTimeInterval?) -> ((String, GetValueFromScanStream)?, ErrorType?) {
condition.lock()
while !updateAvailable {
if let timeout = timeout {
@@ -247,7 +240,7 @@
}
// The callback from Go when there's a new Row (key-value) scanned.
- let onKV = { (key: String, valueBytes: NSData) in
+ func onKeyValue(key: String, valueBytes: NSData) {
condition.lock()
// Wait until any existing update has been received by the fetch so we don't just blow
// past it.
@@ -262,7 +255,7 @@
condition.unlock()
}
- let onDone = { (err: ErrorType?) in
+ func onDone(err: ErrorType?) {
condition.lock()
// Wait until any existing update has been received by the fetch so we don't just blow
// past it.
@@ -277,8 +270,19 @@
condition.signal()
condition.unlock()
}
+ }
+ /// Scan returns all rows in the given half-open range [start, limit). If limit
+ /// is "", all rows with keys >= start are included.
+ /// Concurrency semantics: It is legal to perform writes concurrently with
+ /// Scan. The returned stream reads from a consistent snapshot taken at the
+ /// time of the RPC (or at the time of BeginBatch, if in a batch), and will not
+ /// reflect subsequent writes to keys not yet reached by the stream.
+ /// See helpers Prefix(), Range(), SingleRow().
+ public func scan(r: RowRange) throws -> ScanStream {
+ let handle = ScanHandle()
try VError.maybeThrow { errPtr in
+ let oHandle = UnsafeMutablePointer<Void>(Unmanaged.passRetained(handle).toOpaque())
let cStartStr = try r.start.toCgoString()
let cLimitStr = try r.limit.toCgoString()
let cStartBytes = v23_syncbase_Bytes(
@@ -286,10 +290,9 @@
let cLimitBytes = v23_syncbase_Bytes(
p: unsafeBitCast(cLimitStr.p, UnsafeMutablePointer<UInt8>.self), n: cLimitStr.n)
let callbacks = v23_syncbase_CollectionScanCallbacks(
- hOnKeyValue: Collection.onScanKVClosures.ref(onKV),
- hOnDone: Collection.onScanDoneClosures.ref(onDone),
- onKeyValue: { Collection.onScanKV(AsyncId($0), kv: $1) },
- onDone: { Collection.onScanDone(AsyncId($0), doneHandle: AsyncId($1), err: $2) })
+ handle: v23_syncbase_Handle(oHandle),
+ onKeyValue: { Collection.onScanKeyValue($0, kv: $1) },
+ onDone: { Collection.onScanDone($0, err: $1) })
v23_syncbase_CollectionScan(
try encodedCollectionName.toCgoString(),
try cBatchHandle(),
@@ -298,36 +301,24 @@
callbacks,
errPtr)
}
-
return AnonymousStream(
- fetchNextFunction: fetchNext,
+ fetchNextFunction: handle.fetchNext,
cancelFunction: { preconditionFailure("stub") })
}
- // Reference maps between closured functions and handles passed back/forth with Go.
- private static var onScanKVClosures = RefMap < (String, NSData) -> Void > ()
- private static var onScanDoneClosures = RefMap < ErrorType? -> Void > ()
-
// Callback handlers that convert the Cgo bridge types to native Swift types and pass them to
- // the closured functions reference by the passed handle.
- private static func onScanKV(handle: AsyncId, kv: v23_syncbase_KeyValue) {
- guard let key = kv.key.toString(),
- valueBytes = kv.value.toNSData(),
- callback = onScanKVClosures.get(handle) else {
- fatalError("Could not fully unpact scan kv callback or find handle")
- }
- callback(key, valueBytes)
+ // the functions inside the passed handle.
+ private static func onScanKeyValue(handle: v23_syncbase_Handle, kv: v23_syncbase_KeyValue) {
+ let key = kv.key.toString()!
+ let valueBytes = kv.value.toNSData()!
+ let handle = Unmanaged<ScanHandle>.fromOpaque(COpaquePointer(handle)).takeUnretainedValue()
+ handle.onKeyValue(key, valueBytes: valueBytes)
}
- private static func onScanDone(kvHandle: AsyncId, doneHandle: AsyncId, err: v23_syncbase_VError) {
+ private static func onScanDone(handle: v23_syncbase_Handle, err: v23_syncbase_VError) {
let e = err.toVError()
- if onScanKVClosures.unref(kvHandle) == nil {
- fatalError("Could not find closure for scan onKV handle (via onDone callback)")
- }
- guard let callback = onScanDoneClosures.unref(doneHandle) else {
- fatalError("Could not find closure for scan onDone handle")
- }
- callback(e)
+ let handle = Unmanaged<ScanHandle>.fromOpaque(COpaquePointer(handle)).takeRetainedValue()
+ handle.onDone(e)
}
// MARK: Internal helpers
diff --git a/SyncbaseCore/Source/Locking.swift b/SyncbaseCore/Source/Locking.swift
deleted file mode 100644
index c73dd61..0000000
--- a/SyncbaseCore/Source/Locking.swift
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright 2016 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-import Foundation
-
-protocol Lockable {
- func lock(block:()->())
-}
-
-extension Lockable where Self : AnyObject {
- func lock(block:()->()) {
- objc_sync_enter(self)
- block()
- objc_sync_exit(self)
- }
-}
\ No newline at end of file
diff --git a/SyncbaseCore/Source/Refs.swift b/SyncbaseCore/Source/Refs.swift
deleted file mode 100644
index 585fddb..0000000
--- a/SyncbaseCore/Source/Refs.swift
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2016 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-/// RefMap holds onto objs that get referenced by AsyncId between Go and Swift.
-/// The larger motivation is closures that get passed as function pointers have to be 'context-free'
-/// so we can't closure on a reference to a given future. By allowing the end user to hold onto
-/// this in various places (strongly typed to the appropriate T) then we can pass a handle
-/// back and forth safely.
-typealias AsyncId = Int32
-
-class RefMap<T>: Lockable {
- private (set) var lastId: AsyncId = 0
- private var refs = [AsyncId: T]()
-
- /// Stores an object and returns the associated asyncId. If the object is already in the map
- /// it will be stored twice -- ref does not actually perform reference counting.
- func ref(obj: T) -> AsyncId {
- let asyncId = OSAtomicIncrement32(&lastId)
- lock { self.refs[asyncId] = obj }
- return asyncId
- }
-
- /// Gets the associated value for a given asyncId.
- func get(asyncId: AsyncId) -> T? {
- return refs[asyncId]
- }
-
- /// Get and deletes any associated asyncId, returning the associated value.
- func unref(asyncId: AsyncId) -> T? {
- guard let p = refs[asyncId] else { return nil }
- lock { self.refs[asyncId] = nil }
- return p
- }
-}
\ No newline at end of file
diff --git a/SyncbaseCore/Source/Watch.swift b/SyncbaseCore/Source/Watch.swift
index d04cc19..84303ea 100644
--- a/SyncbaseCore/Source/Watch.swift
+++ b/SyncbaseCore/Source/Watch.swift
@@ -67,88 +67,91 @@
/// Internal namespace for the watch API -- end-users will access this through Database.watch
/// instead. This is simply here to allow all watch-related code to be located in Watch.swift.
enum Watch {
+ private class Handle {
+ // Watch works by having Go call Swift as encounters each watch change.
+ // This is a bit of a mismatch for the Swift GeneratorType which is pull instead of push-based.
+ // Similar to collection.Scan, we create a push-pull adapter by blocking on either side using
+ // condition variables until both are ready for the next data handoff. See collection.Scan
+ // for more information.
+ let condition = NSCondition()
+ var data: WatchChange? = nil
+ var streamErr: ErrorType? = nil
+ var updateAvailable = false
+
+ // The anonymous function that gets called from the Swift. It blocks until there's an update
+ // available from Go.
+ func fetchNext(timeout: NSTimeInterval?) -> (WatchChange?, ErrorType?) {
+ condition.lock()
+ while !updateAvailable {
+ if let timeout = timeout {
+ if !condition.waitUntilDate(NSDate(timeIntervalSinceNow: timeout)) {
+ condition.unlock()
+ return (nil, nil)
+ }
+ } else {
+ condition.wait()
+ }
+ }
+ // Grab the data from this update and reset for the next update.
+ let fetchedData = data
+ data = nil
+ // We don't need to locally capture doneErr because, unlike data, errors can only come in
+ // once at the very end of the stream (after which no more callbacks will get called).
+ updateAvailable = false
+ // Signal that we've fetched the data to Go.
+ condition.signal()
+ condition.unlock()
+ return (fetchedData, streamErr)
+ }
+
+ // The callback from Go when there's a new Row (key-value) scanned.
+ func onChange(change: WatchChange) {
+ condition.lock()
+ // Wait until any existing update has been received by the fetch so we don't just blow
+ // past it.
+ while updateAvailable {
+ condition.wait()
+ }
+ // Set the new data.
+ data = change
+ updateAvailable = true
+ // Wake up any blocked fetch.
+ condition.signal()
+ condition.unlock()
+ }
+
+ // The callback from Go when there's been an error in the watch stream. The stream will then
+ // be closed and no new changes will ever come in from this request.
+ func onError(err: ErrorType) {
+ condition.lock()
+ // Wait until any existing update has been received by the fetch so we don't just blow
+ // past it.
+ while updateAvailable {
+ condition.wait()
+ }
+ // Marks the end of data by clearing it and saving the error from Syncbase.
+ data = nil
+ streamErr = err
+ updateAvailable = true
+ // Wake up any blocked fetch.
+ condition.signal()
+ condition.unlock()
+ }
+ }
+
static func watch(
encodedDatabaseName encodedDatabaseName: String,
patterns: [CollectionRowPattern],
resumeMarker: ResumeMarker? = nil) throws -> WatchStream {
- // Watch works by having Go call Swift as encounters each watch change.
- // This is a bit of a mismatch for the Swift GeneratorType which is pull instead of push-based.
- // Similar to collection.Scan, we create a push-pull adapter by blocking on either side using
- // condition variables until both are ready for the next data handoff. See collection.Scan
- // for more information.
- let condition = NSCondition()
- var data: WatchChange? = nil
- var streamErr: ErrorType? = nil
- var updateAvailable = false
-
- // The anonymous function that gets called from the Swift. It blocks until there's an update
- // available from Go.
- let fetchNext = { (timeout: NSTimeInterval?) -> (WatchChange?, ErrorType?) in
- condition.lock()
- while !updateAvailable {
- if let timeout = timeout {
- if !condition.waitUntilDate(NSDate(timeIntervalSinceNow: timeout)) {
- condition.unlock()
- return (nil, nil)
- }
- } else {
- condition.wait()
- }
- }
- // Grab the data from this update and reset for the next update.
- let fetchedData = data
- data = nil
- // We don't need to locally capture doneErr because, unlike data, errors can only come in
- // once at the very end of the stream (after which no more callbacks will get called).
- updateAvailable = false
- // Signal that we've fetched the data to Go.
- condition.signal()
- condition.unlock()
- return (fetchedData, streamErr)
- }
-
- // The callback from Go when there's a new Row (key-value) scanned.
- let onChange = { (change: WatchChange) in
- condition.lock()
- // Wait until any existing update has been received by the fetch so we don't just blow
- // past it.
- while updateAvailable {
- condition.wait()
- }
- // Set the new data.
- data = change
- updateAvailable = true
- // Wake up any blocked fetch.
- condition.signal()
- condition.unlock()
- }
-
- // The callback from Go when there's been an error in the watch stream. The stream will then
- // be closed and no new changes will ever come in from this request.
- let onError = { (err: ErrorType) in
- condition.lock()
- // Wait until any existing update has been received by the fetch so we don't just blow
- // past it.
- while updateAvailable {
- condition.wait()
- }
- // Marks the end of data by clearing it and saving the error from Syncbase.
- data = nil
- streamErr = err
- updateAvailable = true
- // Wake up any blocked fetch.
- condition.signal()
- condition.unlock()
- }
-
+ let handle = Watch.Handle()
try VError.maybeThrow { errPtr in
+ let oHandle = UnsafeMutablePointer<Void>(Unmanaged.passRetained(handle).toOpaque())
let cPatterns = try v23_syncbase_CollectionRowPatterns(patterns)
let cResumeMarker = v23_syncbase_Bytes(resumeMarker?.data)
let callbacks = v23_syncbase_DbWatchPatternsCallbacks(
- hOnChange: onWatchChangeClosures.ref(onChange),
- hOnError: onWatchErrorClosures.ref(onError),
- onChange: { Watch.onWatchChange(AsyncId($0), change: $1) },
- onError: { Watch.onWatchError(AsyncId($0), errorHandle: AsyncId($1), err: $2) })
+ handle: v23_syncbase_Handle(oHandle),
+ onChange: { Watch.onWatchChange($0, change: $1) },
+ onError: { Watch.onWatchError($0, err: $1) })
v23_syncbase_DbWatchPatterns(
try encodedDatabaseName.toCgoString(),
cResumeMarker,
@@ -158,32 +161,21 @@
}
return AnonymousStream(
- fetchNextFunction: fetchNext,
+ fetchNextFunction: handle.fetchNext,
cancelFunction: { preconditionFailure("stub") })
}
- // Reference maps between closured functions and handles passed back/forth with Go.
- private static var onWatchChangeClosures = RefMap < WatchChange -> Void > ()
- private static var onWatchErrorClosures = RefMap < ErrorType -> Void > ()
-
// Callback handlers that convert the Cgo bridge types to native Swift types and pass them to
- // the closured functions reference by the passed handle.
- private static func onWatchChange(changeHandle: AsyncId, change: v23_syncbase_WatchChange) {
+ // the functions inside the passed handle.
+ private static func onWatchChange(handle: v23_syncbase_Handle, change: v23_syncbase_WatchChange) {
let change = change.toWatchChange()
- guard let callback = onWatchChangeClosures.get(changeHandle) else {
- fatalError("Could not find closure for watch onChange handle")
- }
- callback(change)
+ let handle = Unmanaged<Watch.Handle>.fromOpaque(COpaquePointer(handle)).takeUnretainedValue()
+ handle.onChange(change)
}
- private static func onWatchError(changeHandle: AsyncId, errorHandle: AsyncId, err: v23_syncbase_VError) {
+ private static func onWatchError(handle: v23_syncbase_Handle, err: v23_syncbase_VError) {
let e: ErrorType = err.toVError() ?? SyncbaseError.InvalidOperation(reason: "A watch error occurred")
- if onWatchChangeClosures.unref(changeHandle) == nil {
- fatalError("Could not find closure for watch onChange handle (via onWatchError callback)")
- }
- guard let callback = onWatchErrorClosures.unref(errorHandle) else {
- fatalError("Could not find closure for watch onError handle")
- }
- callback(e)
+ let handle = Unmanaged<Watch.Handle>.fromOpaque(COpaquePointer(handle)).takeRetainedValue()
+ handle.onError(e)
}
}
\ No newline at end of file
diff --git a/SyncbaseCore/SyncbaseCore.xcodeproj/project.pbxproj b/SyncbaseCore/SyncbaseCore.xcodeproj/project.pbxproj
index ba7d08e..1620101 100644
--- a/SyncbaseCore/SyncbaseCore.xcodeproj/project.pbxproj
+++ b/SyncbaseCore/SyncbaseCore.xcodeproj/project.pbxproj
@@ -37,8 +37,6 @@
30D45FCA1CE14A8A004A59FA /* go_types.h in Headers */ = {isa = PBXBuildFile; fileRef = 30D45FC51CE14A8A004A59FA /* go_types.h */; settings = {ATTRIBUTES = (Public, ); }; };
30D45FCD1CE14A8A004A59FA /* sbcore_arm64.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 30D45FC81CE14A8A004A59FA /* sbcore_arm64.a */; };
930DFCE21CEE46DE00738DB8 /* OAuth.swift in Sources */ = {isa = PBXBuildFile; fileRef = 930DFCE11CEE46DE00738DB8 /* OAuth.swift */; };
- 930DFCEB1CEED8BD00738DB8 /* Refs.swift in Sources */ = {isa = PBXBuildFile; fileRef = 930DFCEA1CEED8BD00738DB8 /* Refs.swift */; };
- 930DFCF51CEED96B00738DB8 /* Locking.swift in Sources */ = {isa = PBXBuildFile; fileRef = 930DFCF41CEED96B00738DB8 /* Locking.swift */; };
9351A4941CE46DB9009CC4F4 /* sbcore_amd64.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 9351A4931CE46DB9009CC4F4 /* sbcore_amd64.a */; };
9374F6681D00FFE5004ECE59 /* TestHelpers.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9374F6661D00FF68004ECE59 /* TestHelpers.swift */; };
93D3AD5C1CE4392A00A80CDA /* libleveldb_amd64.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 93D3AD581CE4392A00A80CDA /* libleveldb_amd64.a */; };
@@ -91,8 +89,6 @@
30D45FC51CE14A8A004A59FA /* go_types.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = go_types.h; sourceTree = "<group>"; };
30D45FC81CE14A8A004A59FA /* sbcore_arm64.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; path = sbcore_arm64.a; sourceTree = "<group>"; };
930DFCE11CEE46DE00738DB8 /* OAuth.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = OAuth.swift; sourceTree = "<group>"; };
- 930DFCEA1CEED8BD00738DB8 /* Refs.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Refs.swift; sourceTree = "<group>"; };
- 930DFCF41CEED96B00738DB8 /* Locking.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Locking.swift; sourceTree = "<group>"; };
9351A4931CE46DB9009CC4F4 /* sbcore_amd64.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; path = sbcore_amd64.a; sourceTree = "<group>"; };
9374F6661D00FF68004ECE59 /* TestHelpers.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TestHelpers.swift; sourceTree = "<group>"; };
93D3AD581CE4392A00A80CDA /* libleveldb_amd64.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; path = libleveldb_amd64.a; sourceTree = "<group>"; };
@@ -204,9 +200,7 @@
children = (
30AD2E861CDD56B700A28A0C /* Exceptions.h */,
30AD2E871CDD56B700A28A0C /* Exceptions.m */,
- 930DFCF41CEED96B00738DB8 /* Locking.swift */,
30AD2E791CDD569200A28A0C /* Logging.swift */,
- 930DFCEA1CEED8BD00738DB8 /* Refs.swift */,
30AD2E7A1CDD569200A28A0C /* Strings.swift */,
30AD2E7B1CDD569200A28A0C /* Threads.swift */,
30AD2E921CDD60A600A28A0C /* Types.swift */,
@@ -342,10 +336,8 @@
30AD2E3F1CDD508700A28A0C /* Syncbase.swift in Sources */,
30AD2E401CDD508700A28A0C /* Syncgroup.swift in Sources */,
30AD2E811CDD569200A28A0C /* Strings.swift in Sources */,
- 930DFCF51CEED96B00738DB8 /* Locking.swift in Sources */,
30AD2E451CDD508700A28A0C /* Watch.swift in Sources */,
30AD2E821CDD569200A28A0C /* Threads.swift in Sources */,
- 930DFCEB1CEED8BD00738DB8 /* Refs.swift in Sources */,
30AD2E801CDD569200A28A0C /* Logging.swift in Sources */,
30AD2E351CDD508700A28A0C /* Errors.swift in Sources */,
930DFCE21CEE46DE00738DB8 /* OAuth.swift in Sources */,