syncbase/bridge/cgo: Two changes to the C API for handling callbacks

The first change is to switch the handle from 'int' to 'void*'. This
will help interfacing with language where is easier to pass a pointer
to a struct instead of explicitly managing the mapping to it. Both
Java and Swift manage memory in a way that makes is hard/impossible
to use pointers so the 'void*' will be used as a larger 'int'.

The second change is to use a single handle instead of two for both
watching and scanning. This makes the C API simpler and is more inline
with what the Java/Swift need to do (namely, keeping a struct that
describes both callbacks).

Note: the author of the Swift code is Aaron Zinman <aaron@azinman.com>.

MultiPart: 2/2
Change-Id: Id0c16356fda53825a4b2821e5ce374bf051481ca
diff --git a/SyncbaseCore/Source/Collection.swift b/SyncbaseCore/Source/Collection.swift
index 56925af..748a4b9 100644
--- a/SyncbaseCore/Source/Collection.swift
+++ b/SyncbaseCore/Source/Collection.swift
@@ -191,14 +191,7 @@
     }
   }
 
-  /// Scan returns all rows in the given half-open range [start, limit). If limit
-  /// is "", all rows with keys >= start are included.
-  /// Concurrency semantics: It is legal to perform writes concurrently with
-  /// Scan. The returned stream reads from a consistent snapshot taken at the
-  /// time of the RPC (or at the time of BeginBatch, if in a batch), and will not
-  /// reflect subsequent writes to keys not yet reached by the stream.
-  /// See helpers Prefix(), Range(), SingleRow().
-  public func scan(r: RowRange) throws -> ScanStream {
+  private class ScanHandle {
     // Scan works by having Go call Swift as encounters each row. This is a bit of a mismatch
     // for the Swift GeneratorType which is pull instead of push-based. We create a push-pull
     // adapter by blocking on either side using condition variables until both are ready for the
@@ -213,7 +206,7 @@
 
     // The anonymous function that gets called from the Swift. It blocks until there's an update
     // available from Go.
-    let fetchNext = { (timeout: NSTimeInterval?) -> ((String, GetValueFromScanStream)?, ErrorType?) in
+    func fetchNext(timeout: NSTimeInterval?) -> ((String, GetValueFromScanStream)?, ErrorType?) {
       condition.lock()
       while !updateAvailable {
         if let timeout = timeout {
@@ -247,7 +240,7 @@
     }
 
     // The callback from Go when there's a new Row (key-value) scanned.
-    let onKV = { (key: String, valueBytes: NSData) in
+    func onKeyValue(key: String, valueBytes: NSData) {
       condition.lock()
       // Wait until any existing update has been received by the fetch so we don't just blow
       // past it.
@@ -262,7 +255,7 @@
       condition.unlock()
     }
 
-    let onDone = { (err: ErrorType?) in
+    func onDone(err: ErrorType?) {
       condition.lock()
       // Wait until any existing update has been received by the fetch so we don't just blow
       // past it.
@@ -277,8 +270,19 @@
       condition.signal()
       condition.unlock()
     }
+  }
 
+  /// Scan returns all rows in the given half-open range [start, limit). If limit
+  /// is "", all rows with keys >= start are included.
+  /// Concurrency semantics: It is legal to perform writes concurrently with
+  /// Scan. The returned stream reads from a consistent snapshot taken at the
+  /// time of the RPC (or at the time of BeginBatch, if in a batch), and will not
+  /// reflect subsequent writes to keys not yet reached by the stream.
+  /// See helpers Prefix(), Range(), SingleRow().
+  public func scan(r: RowRange) throws -> ScanStream {
+    let handle = ScanHandle()
     try VError.maybeThrow { errPtr in
+      let oHandle = UnsafeMutablePointer<Void>(Unmanaged.passRetained(handle).toOpaque())
       let cStartStr = try r.start.toCgoString()
       let cLimitStr = try r.limit.toCgoString()
       let cStartBytes = v23_syncbase_Bytes(
@@ -286,10 +290,9 @@
       let cLimitBytes = v23_syncbase_Bytes(
         p: unsafeBitCast(cLimitStr.p, UnsafeMutablePointer<UInt8>.self), n: cLimitStr.n)
       let callbacks = v23_syncbase_CollectionScanCallbacks(
-        hOnKeyValue: Collection.onScanKVClosures.ref(onKV),
-        hOnDone: Collection.onScanDoneClosures.ref(onDone),
-        onKeyValue: { Collection.onScanKV(AsyncId($0), kv: $1) },
-        onDone: { Collection.onScanDone(AsyncId($0), doneHandle: AsyncId($1), err: $2) })
+        handle: v23_syncbase_Handle(oHandle),
+        onKeyValue: { Collection.onScanKeyValue($0, kv: $1) },
+        onDone: { Collection.onScanDone($0, err: $1) })
       v23_syncbase_CollectionScan(
         try encodedCollectionName.toCgoString(),
         try cBatchHandle(),
@@ -298,36 +301,24 @@
         callbacks,
         errPtr)
     }
-
     return AnonymousStream(
-      fetchNextFunction: fetchNext,
+      fetchNextFunction: handle.fetchNext,
       cancelFunction: { preconditionFailure("stub") })
   }
 
-  // Reference maps between closured functions and handles passed back/forth with Go.
-  private static var onScanKVClosures = RefMap < (String, NSData) -> Void > ()
-  private static var onScanDoneClosures = RefMap < ErrorType? -> Void > ()
-
   // Callback handlers that convert the Cgo bridge types to native Swift types and pass them to
-  // the closured functions reference by the passed handle.
-  private static func onScanKV(handle: AsyncId, kv: v23_syncbase_KeyValue) {
-    guard let key = kv.key.toString(),
-      valueBytes = kv.value.toNSData(),
-      callback = onScanKVClosures.get(handle) else {
-        fatalError("Could not fully unpact scan kv callback or find handle")
-    }
-    callback(key, valueBytes)
+  // the functions inside the passed handle.
+  private static func onScanKeyValue(handle: v23_syncbase_Handle, kv: v23_syncbase_KeyValue) {
+    let key = kv.key.toString()!
+    let valueBytes = kv.value.toNSData()!
+    let handle = Unmanaged<ScanHandle>.fromOpaque(COpaquePointer(handle)).takeUnretainedValue()
+    handle.onKeyValue(key, valueBytes: valueBytes)
   }
 
-  private static func onScanDone(kvHandle: AsyncId, doneHandle: AsyncId, err: v23_syncbase_VError) {
+  private static func onScanDone(handle: v23_syncbase_Handle, err: v23_syncbase_VError) {
     let e = err.toVError()
-    if onScanKVClosures.unref(kvHandle) == nil {
-      fatalError("Could not find closure for scan onKV handle (via onDone callback)")
-    }
-    guard let callback = onScanDoneClosures.unref(doneHandle) else {
-      fatalError("Could not find closure for scan onDone handle")
-    }
-    callback(e)
+    let handle = Unmanaged<ScanHandle>.fromOpaque(COpaquePointer(handle)).takeRetainedValue()
+    handle.onDone(e)
   }
 
   // MARK: Internal helpers
diff --git a/SyncbaseCore/Source/Locking.swift b/SyncbaseCore/Source/Locking.swift
deleted file mode 100644
index c73dd61..0000000
--- a/SyncbaseCore/Source/Locking.swift
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright 2016 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-import Foundation
-
-protocol Lockable {
-  func lock(block:()->())
-}
-
-extension Lockable where Self : AnyObject {
-  func lock(block:()->()) {
-    objc_sync_enter(self)
-    block()
-    objc_sync_exit(self)
-  }
-}
\ No newline at end of file
diff --git a/SyncbaseCore/Source/Refs.swift b/SyncbaseCore/Source/Refs.swift
deleted file mode 100644
index 585fddb..0000000
--- a/SyncbaseCore/Source/Refs.swift
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2016 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-/// RefMap holds onto objs that get referenced by AsyncId between Go and Swift.
-/// The larger motivation is closures that get passed as function pointers have to be 'context-free'
-/// so we can't closure on a reference to a given future. By allowing the end user to hold onto
-/// this in various places (strongly typed to the appropriate T) then we can pass a handle
-/// back and forth safely.
-typealias AsyncId = Int32
-
-class RefMap<T>: Lockable {
-  private (set) var lastId: AsyncId = 0
-  private var refs = [AsyncId: T]()
-
-  /// Stores an object and returns the associated asyncId. If the object is already in the map
-  /// it will be stored twice -- ref does not actually perform reference counting.
-  func ref(obj: T) -> AsyncId {
-    let asyncId = OSAtomicIncrement32(&lastId)
-    lock { self.refs[asyncId] = obj }
-    return asyncId
-  }
-
-  /// Gets the associated value for a given asyncId.
-  func get(asyncId: AsyncId) -> T? {
-    return refs[asyncId]
-  }
-
-  /// Get and deletes any associated asyncId, returning the associated value.
-  func unref(asyncId: AsyncId) -> T? {
-    guard let p = refs[asyncId] else { return nil }
-    lock { self.refs[asyncId] = nil }
-    return p
-  }
-}
\ No newline at end of file
diff --git a/SyncbaseCore/Source/Watch.swift b/SyncbaseCore/Source/Watch.swift
index d04cc19..84303ea 100644
--- a/SyncbaseCore/Source/Watch.swift
+++ b/SyncbaseCore/Source/Watch.swift
@@ -67,88 +67,91 @@
 /// Internal namespace for the watch API -- end-users will access this through Database.watch
 /// instead. This is simply here to allow all watch-related code to be located in Watch.swift.
 enum Watch {
+  private class Handle {
+    // Watch works by having Go call Swift as encounters each watch change.
+    // This is a bit of a mismatch for the Swift GeneratorType which is pull instead of push-based.
+    // Similar to collection.Scan, we create a push-pull adapter by blocking on either side using
+    // condition variables until both are ready for the next data handoff. See collection.Scan
+    // for more information.
+    let condition = NSCondition()
+    var data: WatchChange? = nil
+    var streamErr: ErrorType? = nil
+    var updateAvailable = false
+
+    // The anonymous function that gets called from the Swift. It blocks until there's an update
+    // available from Go.
+    func fetchNext(timeout: NSTimeInterval?) -> (WatchChange?, ErrorType?) {
+      condition.lock()
+      while !updateAvailable {
+        if let timeout = timeout {
+          if !condition.waitUntilDate(NSDate(timeIntervalSinceNow: timeout)) {
+            condition.unlock()
+            return (nil, nil)
+          }
+        } else {
+          condition.wait()
+        }
+      }
+      // Grab the data from this update and reset for the next update.
+      let fetchedData = data
+      data = nil
+      // We don't need to locally capture doneErr because, unlike data, errors can only come in
+      // once at the very end of the stream (after which no more callbacks will get called).
+      updateAvailable = false
+      // Signal that we've fetched the data to Go.
+      condition.signal()
+      condition.unlock()
+      return (fetchedData, streamErr)
+    }
+
+    // The callback from Go when there's a new Row (key-value) scanned.
+    func onChange(change: WatchChange) {
+      condition.lock()
+      // Wait until any existing update has been received by the fetch so we don't just blow
+      // past it.
+      while updateAvailable {
+        condition.wait()
+      }
+      // Set the new data.
+      data = change
+      updateAvailable = true
+      // Wake up any blocked fetch.
+      condition.signal()
+      condition.unlock()
+    }
+
+    // The callback from Go when there's been an error in the watch stream. The stream will then
+    // be closed and no new changes will ever come in from this request.
+    func onError(err: ErrorType) {
+      condition.lock()
+      // Wait until any existing update has been received by the fetch so we don't just blow
+      // past it.
+      while updateAvailable {
+        condition.wait()
+      }
+      // Marks the end of data by clearing it and saving the error from Syncbase.
+      data = nil
+      streamErr = err
+      updateAvailable = true
+      // Wake up any blocked fetch.
+      condition.signal()
+      condition.unlock()
+    }
+  }
+
   static func watch(
     encodedDatabaseName encodedDatabaseName: String,
     patterns: [CollectionRowPattern],
     resumeMarker: ResumeMarker? = nil) throws -> WatchStream {
-      // Watch works by having Go call Swift as encounters each watch change.
-      // This is a bit of a mismatch for the Swift GeneratorType which is pull instead of push-based.
-      // Similar to collection.Scan, we create a push-pull adapter by blocking on either side using
-      // condition variables until both are ready for the next data handoff. See collection.Scan
-      // for more information.
-      let condition = NSCondition()
-      var data: WatchChange? = nil
-      var streamErr: ErrorType? = nil
-      var updateAvailable = false
-
-      // The anonymous function that gets called from the Swift. It blocks until there's an update
-      // available from Go.
-      let fetchNext = { (timeout: NSTimeInterval?) -> (WatchChange?, ErrorType?) in
-        condition.lock()
-        while !updateAvailable {
-          if let timeout = timeout {
-            if !condition.waitUntilDate(NSDate(timeIntervalSinceNow: timeout)) {
-              condition.unlock()
-              return (nil, nil)
-            }
-          } else {
-            condition.wait()
-          }
-        }
-        // Grab the data from this update and reset for the next update.
-        let fetchedData = data
-        data = nil
-        // We don't need to locally capture doneErr because, unlike data, errors can only come in
-        // once at the very end of the stream (after which no more callbacks will get called).
-        updateAvailable = false
-        // Signal that we've fetched the data to Go.
-        condition.signal()
-        condition.unlock()
-        return (fetchedData, streamErr)
-      }
-
-      // The callback from Go when there's a new Row (key-value) scanned.
-      let onChange = { (change: WatchChange) in
-        condition.lock()
-        // Wait until any existing update has been received by the fetch so we don't just blow
-        // past it.
-        while updateAvailable {
-          condition.wait()
-        }
-        // Set the new data.
-        data = change
-        updateAvailable = true
-        // Wake up any blocked fetch.
-        condition.signal()
-        condition.unlock()
-      }
-
-      // The callback from Go when there's been an error in the watch stream. The stream will then
-      // be closed and no new changes will ever come in from this request.
-      let onError = { (err: ErrorType) in
-        condition.lock()
-        // Wait until any existing update has been received by the fetch so we don't just blow
-        // past it.
-        while updateAvailable {
-          condition.wait()
-        }
-        // Marks the end of data by clearing it and saving the error from Syncbase.
-        data = nil
-        streamErr = err
-        updateAvailable = true
-        // Wake up any blocked fetch.
-        condition.signal()
-        condition.unlock()
-      }
-
+      let handle = Watch.Handle()
       try VError.maybeThrow { errPtr in
+        let oHandle = UnsafeMutablePointer<Void>(Unmanaged.passRetained(handle).toOpaque())
         let cPatterns = try v23_syncbase_CollectionRowPatterns(patterns)
         let cResumeMarker = v23_syncbase_Bytes(resumeMarker?.data)
         let callbacks = v23_syncbase_DbWatchPatternsCallbacks(
-          hOnChange: onWatchChangeClosures.ref(onChange),
-          hOnError: onWatchErrorClosures.ref(onError),
-          onChange: { Watch.onWatchChange(AsyncId($0), change: $1) },
-          onError: { Watch.onWatchError(AsyncId($0), errorHandle: AsyncId($1), err: $2) })
+          handle: v23_syncbase_Handle(oHandle),
+          onChange: { Watch.onWatchChange($0, change: $1) },
+          onError: { Watch.onWatchError($0, err: $1) })
         v23_syncbase_DbWatchPatterns(
           try encodedDatabaseName.toCgoString(),
           cResumeMarker,
@@ -158,32 +161,21 @@
       }
 
       return AnonymousStream(
-        fetchNextFunction: fetchNext,
+        fetchNextFunction: handle.fetchNext,
         cancelFunction: { preconditionFailure("stub") })
   }
 
-  // Reference maps between closured functions and handles passed back/forth with Go.
-  private static var onWatchChangeClosures = RefMap < WatchChange -> Void > ()
-  private static var onWatchErrorClosures = RefMap < ErrorType -> Void > ()
-
   // Callback handlers that convert the Cgo bridge types to native Swift types and pass them to
-  // the closured functions reference by the passed handle.
-  private static func onWatchChange(changeHandle: AsyncId, change: v23_syncbase_WatchChange) {
+  // the functions inside the passed handle.
+  private static func onWatchChange(handle: v23_syncbase_Handle, change: v23_syncbase_WatchChange) {
     let change = change.toWatchChange()
-    guard let callback = onWatchChangeClosures.get(changeHandle) else {
-      fatalError("Could not find closure for watch onChange handle")
-    }
-    callback(change)
+    let handle = Unmanaged<Watch.Handle>.fromOpaque(COpaquePointer(handle)).takeUnretainedValue()
+    handle.onChange(change)
   }
 
-  private static func onWatchError(changeHandle: AsyncId, errorHandle: AsyncId, err: v23_syncbase_VError) {
+  private static func onWatchError(handle: v23_syncbase_Handle, err: v23_syncbase_VError) {
     let e: ErrorType = err.toVError() ?? SyncbaseError.InvalidOperation(reason: "A watch error occurred")
-    if onWatchChangeClosures.unref(changeHandle) == nil {
-      fatalError("Could not find closure for watch onChange handle (via onWatchError callback)")
-    }
-    guard let callback = onWatchErrorClosures.unref(errorHandle) else {
-      fatalError("Could not find closure for watch onError handle")
-    }
-    callback(e)
+    let handle = Unmanaged<Watch.Handle>.fromOpaque(COpaquePointer(handle)).takeRetainedValue()
+    handle.onError(e)
   }
 }
\ No newline at end of file
diff --git a/SyncbaseCore/SyncbaseCore.xcodeproj/project.pbxproj b/SyncbaseCore/SyncbaseCore.xcodeproj/project.pbxproj
index ba7d08e..1620101 100644
--- a/SyncbaseCore/SyncbaseCore.xcodeproj/project.pbxproj
+++ b/SyncbaseCore/SyncbaseCore.xcodeproj/project.pbxproj
@@ -37,8 +37,6 @@
 		30D45FCA1CE14A8A004A59FA /* go_types.h in Headers */ = {isa = PBXBuildFile; fileRef = 30D45FC51CE14A8A004A59FA /* go_types.h */; settings = {ATTRIBUTES = (Public, ); }; };
 		30D45FCD1CE14A8A004A59FA /* sbcore_arm64.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 30D45FC81CE14A8A004A59FA /* sbcore_arm64.a */; };
 		930DFCE21CEE46DE00738DB8 /* OAuth.swift in Sources */ = {isa = PBXBuildFile; fileRef = 930DFCE11CEE46DE00738DB8 /* OAuth.swift */; };
-		930DFCEB1CEED8BD00738DB8 /* Refs.swift in Sources */ = {isa = PBXBuildFile; fileRef = 930DFCEA1CEED8BD00738DB8 /* Refs.swift */; };
-		930DFCF51CEED96B00738DB8 /* Locking.swift in Sources */ = {isa = PBXBuildFile; fileRef = 930DFCF41CEED96B00738DB8 /* Locking.swift */; };
 		9351A4941CE46DB9009CC4F4 /* sbcore_amd64.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 9351A4931CE46DB9009CC4F4 /* sbcore_amd64.a */; };
 		9374F6681D00FFE5004ECE59 /* TestHelpers.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9374F6661D00FF68004ECE59 /* TestHelpers.swift */; };
 		93D3AD5C1CE4392A00A80CDA /* libleveldb_amd64.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 93D3AD581CE4392A00A80CDA /* libleveldb_amd64.a */; };
@@ -91,8 +89,6 @@
 		30D45FC51CE14A8A004A59FA /* go_types.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = go_types.h; sourceTree = "<group>"; };
 		30D45FC81CE14A8A004A59FA /* sbcore_arm64.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; path = sbcore_arm64.a; sourceTree = "<group>"; };
 		930DFCE11CEE46DE00738DB8 /* OAuth.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = OAuth.swift; sourceTree = "<group>"; };
-		930DFCEA1CEED8BD00738DB8 /* Refs.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Refs.swift; sourceTree = "<group>"; };
-		930DFCF41CEED96B00738DB8 /* Locking.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Locking.swift; sourceTree = "<group>"; };
 		9351A4931CE46DB9009CC4F4 /* sbcore_amd64.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; path = sbcore_amd64.a; sourceTree = "<group>"; };
 		9374F6661D00FF68004ECE59 /* TestHelpers.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TestHelpers.swift; sourceTree = "<group>"; };
 		93D3AD581CE4392A00A80CDA /* libleveldb_amd64.a */ = {isa = PBXFileReference; lastKnownFileType = archive.ar; path = libleveldb_amd64.a; sourceTree = "<group>"; };
@@ -204,9 +200,7 @@
 			children = (
 				30AD2E861CDD56B700A28A0C /* Exceptions.h */,
 				30AD2E871CDD56B700A28A0C /* Exceptions.m */,
-				930DFCF41CEED96B00738DB8 /* Locking.swift */,
 				30AD2E791CDD569200A28A0C /* Logging.swift */,
-				930DFCEA1CEED8BD00738DB8 /* Refs.swift */,
 				30AD2E7A1CDD569200A28A0C /* Strings.swift */,
 				30AD2E7B1CDD569200A28A0C /* Threads.swift */,
 				30AD2E921CDD60A600A28A0C /* Types.swift */,
@@ -342,10 +336,8 @@
 				30AD2E3F1CDD508700A28A0C /* Syncbase.swift in Sources */,
 				30AD2E401CDD508700A28A0C /* Syncgroup.swift in Sources */,
 				30AD2E811CDD569200A28A0C /* Strings.swift in Sources */,
-				930DFCF51CEED96B00738DB8 /* Locking.swift in Sources */,
 				30AD2E451CDD508700A28A0C /* Watch.swift in Sources */,
 				30AD2E821CDD569200A28A0C /* Threads.swift in Sources */,
-				930DFCEB1CEED8BD00738DB8 /* Refs.swift in Sources */,
 				30AD2E801CDD569200A28A0C /* Logging.swift in Sources */,
 				30AD2E351CDD508700A28A0C /* Errors.swift in Sources */,
 				930DFCE21CEE46DE00738DB8 /* OAuth.swift in Sources */,