TBR v.io/x/jni: support for asynchronous streaming
MultiPart: 2/2
Change-Id: Iea3761728271db644e38af75ffb10e66392d9f37
diff --git a/impl/google/channel/jni.go b/impl/google/channel/jni.go
index 4daff97..81ede12 100644
--- a/impl/google/channel/jni.go
+++ b/impl/google/channel/jni.go
@@ -7,10 +7,8 @@
package channel
import (
- "fmt"
"unsafe"
- "v.io/v23/verror"
jutil "v.io/x/jni/util"
)
@@ -18,12 +16,10 @@
import "C"
var (
- // Global reference for io.v.impl.google.channel.ChannelIterable class.
- jChannelIterableClass jutil.Class
+ // Global reference for io.v.impl.google.channel.InputChannelImpl class.
+ jInputChannelImplClass jutil.Class
// Global reference for io.v.impl.google.channel.OutputChannelImpl class.
jOutputChannelImplClass jutil.Class
- // Global reference for java.io.EOFException class.
- jEOFExceptionClass jutil.Class
)
// Init initializes the JNI code with the given Java environment. This method
@@ -31,7 +27,7 @@
// from the main Java thread (e.g., On_Load()).
func Init(env jutil.Env) error {
var err error
- jChannelIterableClass, err = jutil.JFindClass(env, "io/v/impl/google/channel/ChannelIterable")
+ jInputChannelImplClass, err = jutil.JFindClass(env, "io/v/impl/google/channel/InputChannelImpl")
if err != nil {
return err
}
@@ -39,63 +35,54 @@
if err != nil {
return err
}
- jEOFExceptionClass, err = jutil.JFindClass(env, "java/io/EOFException")
+ return nil
+}
+
+//export Java_io_v_impl_google_channel_InputChannelImpl_nativeRecv
+func Java_io_v_impl_google_channel_InputChannelImpl_nativeRecv(jenv *C.JNIEnv, jInputChannelImpl C.jobject, goRecvPtr C.jlong, jCallbackObj C.jobject) {
+ env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
+ recv := *(*func() (jutil.Object, error))(jutil.NativePtr(goRecvPtr))
+ jCallback := jutil.Object(uintptr(unsafe.Pointer(jCallbackObj)))
+ jutil.DoAsyncCall(env, jCallback, recv)
+}
+
+//export Java_io_v_impl_google_channel_InputChannelImpl_nativeFinalize
+func Java_io_v_impl_google_channel_InputChannelImpl_nativeFinalize(jenv *C.JNIEnv, jInputChannelImpl C.jobject, goRecvPtr C.jlong) {
+ jutil.GoUnref(jutil.NativePtr(goRecvPtr))
+}
+
+//export Java_io_v_impl_google_channel_OutputChannelImpl_nativeSend
+func Java_io_v_impl_google_channel_OutputChannelImpl_nativeSend(jenv *C.JNIEnv, jOutputChannelClass C.jclass, goConvertPtr C.jlong, goSendPtr C.jlong, jItemObj C.jobject, jCallbackObj C.jobject) {
+ env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
+ convert := *(*func(jutil.Object) (interface{}, error))(jutil.NativePtr(goConvertPtr))
+ send := *(*func(interface{}) error)(jutil.NativePtr(goSendPtr))
+ jItem := jutil.Object(uintptr(unsafe.Pointer(jItemObj)))
+ jCallback := jutil.Object(uintptr(unsafe.Pointer(jCallbackObj)))
+ // NOTE(spetrovic): Conversion must be done outside of DoAsyncCall as it references a Java
+ // object.
+ item, err := convert(jItem)
if err != nil {
- return err
+ jutil.CallbackOnFailure(env, jCallback, err)
+ return
}
- return nil
-}
-
-//export Java_io_v_impl_google_channel_ChannelIterable_nativeReadValue
-func Java_io_v_impl_google_channel_ChannelIterable_nativeReadValue(jenv *C.JNIEnv, jChannelIterable C.jobject, goChanPtr C.jlong, goConvertPtr C.jlong) C.jobject {
- env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
- c := *(*chan interface{})(jutil.NativePtr(goChanPtr))
- convert := *(*func(jutil.Env, interface{})(jutil.Object, error))(jutil.NativePtr(goConvertPtr))
- val, ok := <-c
- if !ok { // channel closed
- jutil.JThrow(env, jEOFExceptionClass, "Reached end of input channel.")
- return nil
- }
- jVal, err := convert(env, val)
- if err == nil {
- return C.jobject(unsafe.Pointer(jVal))
- }
- if verr, ok := err.(verror.E); ok && verr.ID == verror.ErrCanceled.ID { // EOF
- jutil.JThrow(env, jEOFExceptionClass, "User canceled the operation.")
- return nil
- }
- jutil.JThrowV(env, err)
- return nil
-}
-
-//export Java_io_v_impl_google_channel_ChannelIterable_nativeFinalize
-func Java_io_v_impl_google_channel_ChannelIterable_nativeFinalize(jenv *C.JNIEnv, jChannelIterable C.jobject, goChanPtr C.jlong, goConvertPtr C.jlong) {
- jutil.GoUnref(jutil.NativePtr(goChanPtr))
- jutil.GoUnref(jutil.NativePtr(goConvertPtr))
-}
-
-//export Java_io_v_impl_google_channel_OutputChannelImpl_nativeWriteValue
-func Java_io_v_impl_google_channel_OutputChannelImpl_nativeWriteValue(jenv *C.JNIEnv, jOutputChannelClass C.jclass, goChanPtr C.jlong, jObject C.jobject) {
- env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
- outCh := *(*outputChannel)(jutil.NativePtr(goChanPtr))
- // The other side of the channel is responsible for deleting this
- // global reference.
- if err := outCh.ReadFunc(jutil.NewGlobalRef(env, jutil.Object(uintptr(unsafe.Pointer(jObject))))); err != nil {
- jutil.JThrowV(env, fmt.Errorf("Exception while writing to OutputChannel: %+v", err))
- }
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return jutil.NullObject, send(item)
+ })
}
//export Java_io_v_impl_google_channel_OutputChannelImpl_nativeClose
-func Java_io_v_impl_google_channel_OutputChannelImpl_nativeClose(jenv *C.JNIEnv, jOutputChannelClass C.jclass, goChanPtr C.jlong) {
+func Java_io_v_impl_google_channel_OutputChannelImpl_nativeClose(jenv *C.JNIEnv, jOutputChannelClass C.jclass, goClosePtr C.jlong, jCallbackObj C.jobject) {
env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
- outCh := *(*outputChannel)(jutil.NativePtr(goChanPtr))
-
- if err := outCh.CloseFunc(); err != nil {
- jutil.JThrowV(env, fmt.Errorf("Exception while closing OutputChannel: %+v", err))
- }
+ close := *(*func() error)(jutil.NativePtr(goClosePtr))
+ jCallback := jutil.Object(uintptr(unsafe.Pointer(jCallbackObj)))
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return jutil.NullObject, close()
+ })
}
//export Java_io_v_impl_google_channel_OutputChannelImpl_nativeFinalize
-func Java_io_v_impl_google_channel_OutputChannelImpl_nativeFinalize(jenv *C.JNIEnv, jOutputChannelClass C.jclass, goChanPtr C.jlong) {
- jutil.GoUnref(jutil.NativePtr(goChanPtr))
+func Java_io_v_impl_google_channel_OutputChannelImpl_nativeFinalize(jenv *C.JNIEnv, jOutputChannelClass C.jclass, goConvertPtr C.jlong, goSendPtr C.jlong, goClosePtr C.jlong) {
+ jutil.GoUnref(jutil.NativePtr(goConvertPtr))
+ jutil.GoUnref(jutil.NativePtr(goSendPtr))
+ jutil.GoUnref(jutil.NativePtr(goClosePtr))
}
diff --git a/impl/google/channel/util.go b/impl/google/channel/util.go
index 331d542..486307a 100644
--- a/impl/google/channel/util.go
+++ b/impl/google/channel/util.go
@@ -13,54 +13,30 @@
// #include "jni.h"
import "C"
-// JavaIterable converts the provided Go channel into a Java VIterable object, using the
-// given convert function.
-func JavaIterable(env jutil.Env, ch chan interface{}, convert func(jutil.Env, interface{}) (jutil.Object, error)) (jutil.Object, error) {
- jIterable, err := jutil.NewObject(env, jChannelIterableClass, []jutil.Sign{jutil.LongSign, jutil.LongSign}, int64(jutil.PtrValue(&ch)), int64(jutil.PtrValue(&convert)))
+// JavaInputChannel creates a new Java InputChannel object given the provided Go recv function.
+//
+// All objects returned by the recv function must be globally references.
+//
+// The recv function must return verror.ErrEndOfFile when there are no more elements
+// to receive.
+func JavaInputChannel(env jutil.Env, recv func() (jutil.Object, error)) (jutil.Object, error) {
+ jInputChannel, err := jutil.NewObject(env, jInputChannelImplClass, []jutil.Sign{jutil.LongSign}, int64(jutil.PtrValue(&recv)))
if err != nil {
return jutil.NullObject, err
}
- jutil.GoRef(&ch) // Un-refed when ChannelIterable is finalized.
- jutil.GoRef(&convert) // Un-refed when ChannelIterable is finalized.
- return jIterable, nil
+ jutil.GoRef(&recv) // Un-refed when jInputChannel is finalized.
+ return jInputChannel, nil
}
-// outputChannel represents the Go-side of a Java OutputChannel. Each time the
-// Java side writes an object, the ReadFunc will be called. If the ReadFunc
-// returns an error, an exception will be raised on the Java side.
-type outputChannel struct {
- // ReadFunc is invoked when an object has been read from Java.
- //
- // The jutil.Object passed to this function is globally referenced. It is
- // required that the ReadFunc implementation delete the global
- // reference jutil.DeleteGlobalRef. Failure to do so will
- // result in a reference leak.
- //
- // If the ReadFunc implementation returns an error, that error will be
- // passed back to the Java writer in the form of a VException to the
- // OutputChannel.writeValue() method.
- ReadFunc func(jutil.Object) error
-
- // CloseFunc is invoked when the Java side has closed its OutputChannel
- // and no more values will be written.
- //
- // If CloseFunc returns an error, that error will be passed back to the
- // Java side in the form of a VException OutputChannel.close() method.
- CloseFunc func() error
-}
-
-// JavaOutputChannel converts the provided Go channel of jutil.Object values
-// into a Java OutputChannel object.
-func JavaOutputChannel(env jutil.Env, readFunc func(jutil.Object) error, closeFunc func() error) (jutil.Object, error) {
- outCh := outputChannel{
- ReadFunc: readFunc,
- CloseFunc: closeFunc,
- }
-
- jOutputChannel, err := jutil.NewObject(env, jOutputChannelImplClass, []jutil.Sign{jutil.LongSign}, int64(jutil.PtrValue(&outCh)))
+// JavaOutputChannel creates a new Java OutputChannel object given the provided Go convert, send
+// and close functions. Send is invoked with the result of convert, which must be non-blocking.
+func JavaOutputChannel(env jutil.Env, convert func(jutil.Object) (interface{}, error), send func(interface{}) error, close func() error) (jutil.Object, error) {
+ jOutputChannel, err := jutil.NewObject(env, jOutputChannelImplClass, []jutil.Sign{jutil.LongSign, jutil.LongSign, jutil.LongSign}, int64(jutil.PtrValue(&convert)), int64(jutil.PtrValue(&send)), int64(jutil.PtrValue(&close)))
if err != nil {
return jutil.NullObject, err
}
- jutil.GoRef(&outCh) // Un-refed when the OutputChannel is finalized.
+ jutil.GoRef(&convert) // Un-refed when jOutputChannel is finalized.
+ jutil.GoRef(&send) // Un-refed when jOutputChannel is finalized.
+ jutil.GoRef(&close) // Un-refed when jOutputChannel is finalized.
return jOutputChannel, nil
}
diff --git a/impl/google/namespace/jni.go b/impl/google/namespace/jni.go
index 96a6035..35c9481 100644
--- a/impl/google/namespace/jni.go
+++ b/impl/google/namespace/jni.go
@@ -71,31 +71,39 @@
return
}
-func doGlob(env jutil.Env, n namespace.T, context *context.T, pattern string, opts []naming.NamespaceOpt) (jutil.Object, error) {
+func doGlob(n namespace.T, context *context.T, pattern string, opts []naming.NamespaceOpt) (jutil.Object, error) {
c, err := n.Glob(context, pattern, opts...)
if err != nil {
return jutil.NullObject, err
}
- valChan := make(chan interface{}, 100)
- go func() {
- for val := range c {
- valChan <- val
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
+ jChannel, err := jchannel.JavaInputChannel(env, func() (jutil.Object, error) {
+ // This is a blocking call, so don't call GetEnv() before this line.
+ val, ok := <-c
+ if !ok {
+ return jutil.NullObject, verror.NewErrEndOfFile(context)
}
- close(valChan)
- }()
- return jchannel.JavaIterable(env, valChan, func(env jutil.Env, val interface{}) (jutil.Object, error) {
globReply, ok := val.(naming.GlobReply)
if !ok {
return jutil.NullObject, fmt.Errorf("Expected value of GlobReply type, got type %T: %v", val, val)
}
- // Check for a canceled context error, we surface these as EOF.
- if errorEntry, ok := globReply.(*naming.GlobReplyError); ok {
- if verr, ok := errorEntry.Value.Error.(verror.E); ok && verr.ID == verror.ErrCanceled.ID {
- return jutil.NullObject, verr
- }
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
+ jReply, err := jutil.JVomCopy(env, globReply, jGlobReplyClass)
+ if err != nil {
+ return jutil.NullObject, err
}
- return jutil.JVomCopy(env, val, jGlobReplyClass)
+ // Must grab a global reference as we free up the env and all local references that come
+ // along with it.
+ return jutil.NewGlobalRef(env, jReply), nil // Un-refed by InputChannelImpl_nativeRecv
})
+ if err != nil {
+ return jutil.NullObject, err
+ }
+ // Must grab a global reference as we free up the env and all local references that come along
+ // with it.
+ return jutil.NewGlobalRef(env, jChannel), nil // Un-refed by DoAsyncCall
}
//export Java_io_v_impl_google_namespace_NamespaceImpl_nativeGlob
@@ -108,8 +116,8 @@
jutil.CallbackOnFailure(env, jCallback, err)
return
}
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
- return doGlob(env, n, context, pattern, opts)
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return doGlob(n, context, pattern, opts)
})
}
@@ -138,7 +146,7 @@
jutil.CallbackOnFailure(env, jCallback, err)
return
}
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
return jutil.NullObject, n.Mount(context, name, server, duration, options...)
})
}
@@ -164,7 +172,7 @@
jutil.CallbackOnFailure(env, jCallback, err)
return
}
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
return jutil.NullObject, n.Unmount(context, name, server, options...)
})
}
@@ -193,7 +201,7 @@
jutil.CallbackOnFailure(env, jCallback, err)
return
}
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
return jutil.NullObject, n.Delete(context, name, deleteSubtree, options...)
})
}
@@ -211,12 +219,20 @@
return
}
-func doResolve(env jutil.Env, n namespace.T, context *context.T, name string, options []naming.NamespaceOpt) (jutil.Object, error) {
+func doResolve(n namespace.T, context *context.T, name string, options []naming.NamespaceOpt) (jutil.Object, error) {
entry, err := n.Resolve(context, name, options...)
if err != nil {
return jutil.NullObject, err
}
- return jutil.JVomCopy(env, entry, jMountEntryClass)
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
+ jEntry, err := jutil.JVomCopy(env, entry, jMountEntryClass)
+ if err != nil {
+ return jutil.NullObject, err
+ }
+ // Must grab a global reference as we free up the env and all local references that come along
+ // with it.
+ return jutil.NewGlobalRef(env, jEntry), nil // Un-refed in DoAsyncCall
}
//export Java_io_v_impl_google_namespace_NamespaceImpl_nativeResolve
@@ -229,8 +245,8 @@
jutil.CallbackOnFailure(env, jCallback, err)
return
}
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
- return doResolve(env, n, context, name, options)
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return doResolve(n, context, name, options)
})
}
@@ -247,12 +263,20 @@
return
}
-func doResolveToMountTable(env jutil.Env, n namespace.T, context *context.T, name string, options []naming.NamespaceOpt) (jutil.Object, error) {
+func doResolveToMountTable(n namespace.T, context *context.T, name string, options []naming.NamespaceOpt) (jutil.Object, error) {
entry, err := n.ResolveToMountTable(context, name, options...)
if err != nil {
return jutil.NullObject, err
}
- return jutil.JVomCopy(env, entry, jMountEntryClass)
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
+ jEntry, err := jutil.JVomCopy(env, entry, jMountEntryClass)
+ if err != nil {
+ return jutil.NullObject, err
+ }
+ // Must grab a global reference as we free up the env and all local references that come along
+ // with it.
+ return jutil.NewGlobalRef(env, jEntry), nil // Un-refed in DoAsyncCall
}
//export Java_io_v_impl_google_namespace_NamespaceImpl_nativeResolveToMountTable
@@ -265,8 +289,8 @@
jutil.CallbackOnFailure(env, jCallback, err)
return
}
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
- return doResolveToMountTable(env, n, context, name, options)
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return doResolveToMountTable(n, context, name, options)
})
}
@@ -327,7 +351,7 @@
jutil.CallbackOnFailure(env, jCallback, err)
return
}
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
return jutil.NullObject, n.SetPermissions(context, name, permissions, version, options...)
})
}
@@ -345,18 +369,26 @@
return
}
-func doGetPermissions(env jutil.Env, n namespace.T, context *context.T, name string, options []naming.NamespaceOpt) (jutil.Object, error) {
+func doGetPermissions(n namespace.T, context *context.T, name string, options []naming.NamespaceOpt) (jutil.Object, error) {
permissions, version, err := n.GetPermissions(context, name, options...)
if err != nil {
return jutil.NullObject, err
}
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
jPermissions, err := jutil.JVomCopy(env, permissions, jPermissionsClass)
if err != nil {
return jutil.NullObject, err
}
result := make(map[jutil.Object]jutil.Object)
result[jutil.JString(env, version)] = jPermissions
- return jutil.JObjectMap(env, result)
+ jResult, err := jutil.JObjectMap(env, result)
+ if err != nil {
+ return jutil.NullObject, err
+ }
+ // Must grab a global reference as we free up the env and all local references that come along
+ // with it.
+ return jutil.NewGlobalRef(env, jResult), nil // Un-refed in DoAsyncCall
}
//export Java_io_v_impl_google_namespace_NamespaceImpl_nativeGetPermissions
@@ -369,8 +401,8 @@
jutil.CallbackOnFailure(env, jCallback, err)
return
}
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
- return doGetPermissions(env, n, context, name, options)
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return doGetPermissions(n, context, name, options)
})
}
diff --git a/impl/google/rpc/invoker.go b/impl/google/rpc/invoker.go
index 28449b4..7dc85a6 100644
--- a/impl/google/rpc/invoker.go
+++ b/impl/google/rpc/invoker.go
@@ -201,24 +201,26 @@
return err
}
- readFunc := func(input jutil.Object) error {
+ convert := func(input jutil.Object) (interface{}, error) {
env, freeFunc := jutil.GetEnv()
- defer jutil.DeleteGlobalRef(env, input)
+ defer freeFunc()
var reply naming.GlobReply
- err := jutil.GoVomCopy(env, input, jGlobReplyClass, &reply)
- if err != nil {
- freeFunc()
- return err
+ if err := jutil.GoVomCopy(env, input, jGlobReplyClass, &reply); err != nil {
+ return nil, err
}
- freeFunc()
- call.SendStream().Send(reply)
+ return reply, nil
+ }
+ send := func(item interface{}) error {
+ reply, ok := item.(naming.GlobReply)
+ if !ok {
+ return fmt.Errorf("Expected item of type naming.GlobReply, got: %T", reply)
+ }
+ return call.SendStream().Send(reply)
+ }
+ close := func() error {
return nil
}
- closeFunc := func() error {
- return nil
- }
-
- jOutputChannel, err := jchannel.JavaOutputChannel(env, readFunc, closeFunc)
+ jOutputChannel, err := jchannel.JavaOutputChannel(env, convert, send, close)
if err != nil {
return err
}
diff --git a/impl/google/rpc/jni.go b/impl/google/rpc/jni.go
index be7af99..b643d86 100644
--- a/impl/google/rpc/jni.go
+++ b/impl/google/rpc/jni.go
@@ -22,6 +22,7 @@
jcontext "v.io/x/jni/v23/context"
jnaming "v.io/x/jni/v23/naming"
jsecurity "v.io/x/jni/v23/security"
+ "v.io/v23/verror"
)
// #include "jni.h"
@@ -74,8 +75,6 @@
jServerStateClass jutil.Class
// Global reference for io.v.v23.OptionDefs class.
jOptionDefsClass jutil.Class
- // Global reference for java.io.EOFException class.
- jEOFExceptionClass jutil.Class
// Global reference for io.v.v23.naming.Endpoint.
jEndpointClass jutil.Class
// Global reference for io.v.v23.vdlroot.signature.Interface class.
@@ -170,10 +169,6 @@
if err != nil {
return err
}
- jEOFExceptionClass, err = jutil.JFindClass(env, "java/io/EOFException")
- if err != nil {
- return err
- }
jEndpointClass, err = jutil.JFindClass(env, "io/v/v23/naming/Endpoint")
if err != nil {
return err
@@ -257,7 +252,7 @@
return args, nil
}
-func doStartCall(env jutil.Env, context *context.T, name, method string, skipServerAuth bool, goPtr C.jlong, args []interface{}) (jutil.Object, error) {
+func doStartCall(context *context.T, name, method string, skipServerAuth bool, goPtr C.jlong, args []interface{}) (jutil.Object, error) {
var opts []rpc.CallOpt
if skipServerAuth {
opts = append(opts,
@@ -269,11 +264,15 @@
if err != nil {
return jutil.NullObject, err
}
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
jCall, err := javaCall(env, call)
if err != nil {
return jutil.NullObject, err
}
- return jCall, nil
+ // Must grab a global reference as we free up the env and all local references that come along
+ // with it.
+ return jutil.NewGlobalRef(env, jCall), nil // Un-refed in DoAsyncCall
}
//export Java_io_v_impl_google_rpc_ClientImpl_nativeStartCall
@@ -293,8 +292,8 @@
return
}
skipServerAuth := jSkipServerAuth == C.JNI_TRUE
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
- return doStartCall(env, context, name, method, skipServerAuth, goPtr, args)
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return doStartCall(context, name, method, skipServerAuth, goPtr, args)
})
}
@@ -309,43 +308,46 @@
}
//export Java_io_v_impl_google_rpc_StreamImpl_nativeSend
-func Java_io_v_impl_google_rpc_StreamImpl_nativeSend(jenv *C.JNIEnv, jStream C.jobject, goPtr C.jlong, jVomItem C.jbyteArray) {
+func Java_io_v_impl_google_rpc_StreamImpl_nativeSend(jenv *C.JNIEnv, jStream C.jobject, goPtr C.jlong, jVomItem C.jbyteArray, jCallbackObj C.jobject) {
env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
+ jCallback := jutil.Object(uintptr(unsafe.Pointer(jCallbackObj)))
vomItem := jutil.GoByteArray(env, jutil.Object(uintptr(unsafe.Pointer(jVomItem))))
- item, err := jutil.VomDecodeToValue(vomItem)
- if err != nil {
- jutil.JThrowV(env, err)
- return
- }
- if err := (*(*rpc.Stream)(jutil.NativePtr(goPtr))).Send(item); err != nil {
- jutil.JThrowV(env, err)
- return
- }
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ item, err := jutil.VomDecodeToValue(vomItem)
+ if err != nil {
+ return jutil.NullObject, err
+ }
+ return jutil.NullObject, (*(*rpc.Stream)(jutil.NativePtr(goPtr))).Send(item)
+ })
}
//export Java_io_v_impl_google_rpc_StreamImpl_nativeRecv
-func Java_io_v_impl_google_rpc_StreamImpl_nativeRecv(jenv *C.JNIEnv, jStream C.jobject, goPtr C.jlong) C.jbyteArray {
+func Java_io_v_impl_google_rpc_StreamImpl_nativeRecv(jenv *C.JNIEnv, jStream C.jobject, goPtr C.jlong, jCallbackObj C.jobject) {
env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
+ jCallback := jutil.Object(uintptr(unsafe.Pointer(jCallbackObj)))
result := new(vdl.Value)
- if err := (*(*rpc.Stream)(jutil.NativePtr(goPtr))).Recv(&result); err != nil {
- if err == io.EOF {
- jutil.JThrow(env, jEOFExceptionClass, err.Error())
- return nil
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ if err := (*(*rpc.Stream)(jutil.NativePtr(goPtr))).Recv(&result); err != nil {
+ if err == io.EOF {
+ // Java uses EndOfFile error to detect EOF.
+ err = verror.NewErrEndOfFile(nil)
+ }
+ return jutil.NullObject, err
}
- jutil.JThrowV(env, err)
- return nil
- }
- vomResult, err := vom.Encode(result)
- if err != nil {
- jutil.JThrowV(env, err)
- return nil
- }
- jArr, err := jutil.JByteArray(env, vomResult)
- if err != nil {
- jutil.JThrowV(env, err)
- return nil
- }
- return C.jbyteArray(unsafe.Pointer(jArr))
+ vomResult, err := vom.Encode(result)
+ if err != nil {
+ return jutil.NullObject, err
+ }
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
+ jResult, err := jutil.JByteArray(env, vomResult)
+ if err != nil {
+ return jutil.NullObject, err
+ }
+ // Must grab a global reference as we free up the env and all local references that come along
+ // with it.
+ return jutil.NewGlobalRef(env, jResult), nil // Un-refed in DoAsyncCall
+ })
}
//export Java_io_v_impl_google_rpc_StreamImpl_nativeFinalize
@@ -354,15 +356,15 @@
}
//export Java_io_v_impl_google_rpc_ClientCallImpl_nativeCloseSend
-func Java_io_v_impl_google_rpc_ClientCallImpl_nativeCloseSend(jenv *C.JNIEnv, jCall C.jobject, goPtr C.jlong) {
+func Java_io_v_impl_google_rpc_ClientCallImpl_nativeCloseSend(jenv *C.JNIEnv, jCall C.jobject, goPtr C.jlong, jCallbackObj C.jobject) {
env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
- if err := (*(*rpc.ClientCall)(jutil.NativePtr(goPtr))).CloseSend(); err != nil {
- jutil.JThrowV(env, err)
- return
- }
+ jCallback := jutil.Object(uintptr(unsafe.Pointer(jCallbackObj)))
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return jutil.NullObject, (*(*rpc.ClientCall)(jutil.NativePtr(goPtr))).CloseSend()
+ })
}
-func doFinish(env jutil.Env, goPtr C.jlong, numResults int) (jutil.Object, error) {
+func doFinish(goPtr C.jlong, numResults int) (jutil.Object, error) {
// Have all the results be decoded into *vdl.Value.
resultPtrs := make([]interface{}, numResults)
for i := 0; i < numResults; i++ {
@@ -384,11 +386,15 @@
return jutil.NullObject, err
}
}
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
jArr, err := jutil.JByteArrayArray(env, vomResults)
if err != nil {
return jutil.NullObject, err
}
- return jArr, nil
+ // Must grab a global reference as we free up the env and all local references that come along
+ // with it.
+ return jutil.NewGlobalRef(env, jArr), nil // Un-refed in DoAsyncCall
}
//export Java_io_v_impl_google_rpc_ClientCallImpl_nativeFinish
@@ -396,8 +402,8 @@
env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
numResults := int(jNumResults)
jCallback := jutil.Object(uintptr(unsafe.Pointer(jCallbackObj)))
- jutil.DoAsyncCall(env, jCallback, func(env jutil.Env) (jutil.Object, error) {
- return doFinish(env, goPtr, numResults)
+ jutil.DoAsyncCall(env, jCallback, func() (jutil.Object, error) {
+ return doFinish(goPtr, numResults)
})
}
diff --git a/test/fortune/fortune.vdl b/test/fortune/fortune.vdl
index 8b9470e..c829464 100644
--- a/test/fortune/fortune.vdl
+++ b/test/fortune/fortune.vdl
@@ -29,12 +29,17 @@
// Get returns a random fortune.
Get() (Fortune string | error) {access.Read}
- // StreamingGet returns a stream that can be used to obtain fortunes.
+ // StreamingGet returns a stream that can be used to obtain fortunes, and returns the
+ // total number of items sent.
StreamingGet() stream<bool, string> (total int32 | error) {access.Read}
// MultipleGet returns the same fortune twice.
MultipleGet() (Fortune string, Another string | error) {access.Read}
+ // MultipleStreamingGet returns a stream that can be used to obtain fortunes, and returns
+ // the total number of items sent, twice.
+ MultipleStreamingGet() stream<bool, string> (total int32, another int32 | error) {access.Read}
+
// GetComplexError returns (always!) ErrComplex.
GetComplexError() error {access.Read}
diff --git a/util/call.go b/util/call.go
index 31ff97f..57560f4 100644
--- a/util/call.go
+++ b/util/call.go
@@ -295,7 +295,8 @@
// DoAsyncCall invokes the given fnToWrap in a goroutine. If fnToWrap returns an
// error, the given callback's onFailure method is invoked with the error as a
// parameter. If fnToWrap succeeds, its Object result is passed as a
-// parameter to the callback's onSuccess method.
+// parameter to the callback's onSuccess method. fnToWrap must return a global reference
+// to any non-null objects - this reference will be deleted by DoAsyncCall.
//
// The caller of doAsyncCall must take care that no local JNI references are
// used in fnToWrap's closure. For example:
@@ -312,31 +313,39 @@
// JNI references are only valid in the scope of a particular thread. You are
// free to capture any pure-Go variables and we recommend that you use that
// approach to pass parameters through to fnToWrap.
-func DoAsyncCall(env Env, jCallback Object, fnToWrap func(env Env) (Object, error)) {
+func DoAsyncCall(env Env, jCallback Object, fnToWrap func() (Object, error)) {
go func(jCallback Object) {
+ jResult, err := fnToWrap() // probably blocking, so don't call GetEnv() before this line
env, freeFunc := GetEnv()
defer freeFunc()
defer DeleteGlobalRef(env, jCallback)
- if result, err := fnToWrap(env); err != nil {
- CallbackOnFailure(env, jCallback, err)
- } else {
- CallbackOnSuccess(env, jCallback, result)
+ if !jResult.IsNull() {
+ if !IsGlobalRef(env, jResult) {
+ CallbackOnFailure(env, jCallback, fmt.Errorf("Function passed to DoAsyncCall must return global object references"))
+ return
+ }
+ defer DeleteGlobalRef(env, jResult)
}
+ if err != nil {
+ CallbackOnFailure(env, jCallback, err)
+ return
+ }
+ CallbackOnSuccess(env, jCallback, jResult)
}(NewGlobalRef(env, jCallback))
}
// CallbackOnFailure calls the given callback's "onFailure" method with the given error and
// panic-s if the method couldn't be invoked.
-func CallbackOnFailure(env Env, callback Object, err error) {
- if err := CallVoidMethod(env, callback, "onFailure", []Sign{VExceptionSign}, err); err != nil {
+func CallbackOnFailure(env Env, jCallback Object, err error) {
+ if err := CallVoidMethod(env, jCallback, "onFailure", []Sign{VExceptionSign}, err); err != nil {
panic(fmt.Sprintf("couldn't call Java onFailure method: %v", err))
}
}
// CalbackOnSuccess calls the given callback's "onSuccess" method with the given result
// and panic-s if the method couldn't be invoked.
-func CallbackOnSuccess(env Env, callback Object, jClientCall Object) {
- if err := CallVoidMethod(env, callback, "onSuccess", []Sign{ObjectSign}, jClientCall); err != nil {
+func CallbackOnSuccess(env Env, jCallback Object, jResult Object) {
+ if err := CallVoidMethod(env, jCallback, "onSuccess", []Sign{ObjectSign}, jResult); err != nil {
panic(fmt.Sprintf("couldn't call Java onSuccess method: %v", err))
}
}
diff --git a/util/jni_wrapper.c b/util/jni_wrapper.c
index 21c4a74..054a94c 100644
--- a/util/jni_wrapper.c
+++ b/util/jni_wrapper.c
@@ -166,6 +166,10 @@
(*env)->DeleteGlobalRef(env, globalRef);
}
+jobjectRefType GetObjectRefType(JNIEnv* env, jobject obj) {
+ return (*env)->GetObjectRefType(env, obj);
+}
+
jint GetJavaVM(JNIEnv* env, JavaVM** vm) {
return (*env)->GetJavaVM(env, vm);
}
diff --git a/util/jni_wrapper.h b/util/jni_wrapper.h
index 2241c30..ef66a03 100644
--- a/util/jni_wrapper.h
+++ b/util/jni_wrapper.h
@@ -115,6 +115,10 @@
// Deletes the global reference pointed to by globalRef.
void DeleteGlobalRef(JNIEnv* env, jobject globalRef);
+// Returns the type of the object referred to by the obj argument.
+// The argument obj can either be a local, global or weak global reference.
+jobjectRefType GetObjectRefType(JNIEnv* env, jobject obj);
+
// Returns the Java VM interface (used in the Invocation API) associated with
// the current thread.
jint GetJavaVM(JNIEnv* env, JavaVM** vm);
diff --git a/util/ref.go b/util/ref.go
index 2b19433..7fdbc11 100644
--- a/util/ref.go
+++ b/util/ref.go
@@ -40,6 +40,16 @@
C.DeleteLocalRef(env.value(), obj.value())
}
+// IsGlobalRef returns true iff the reference pointed to by obj is a global reference.
+func IsGlobalRef(env Env, obj Object) bool {
+ return C.GetObjectRefType(env.value(), obj.value()) == C.JNIGlobalRefType
+}
+
+// IsLocalRef returns true iff the reference pointed to by obj is a local reference.
+func IsLocalRef(env Env, obj Object) bool {
+ return C.GetObjectRefType(env.value(), obj.value()) == C.JNILocalRefType
+}
+
// GoRef creates a new reference to the value addressed by the provided pointer.
// The value will remain referenced until it is explicitly unreferenced using
// goUnref().
diff --git a/util/util.go b/util/util.go
index 18cdde5..f2d97bd 100644
--- a/util/util.go
+++ b/util/util.go
@@ -233,7 +233,6 @@
C.AttachCurrentThreadAsDaemon(jVM, &jenv, nil)
}
env = Env(uintptr(unsafe.Pointer(jenv)))
- //env := Env{jenv}
// GetEnv is called by Go code that wishes to call Java methods. In
// this case, JNI cannot automatically free unused local refererences.
// We must do it manually by pushing a new local reference frame. The