Add the store/sync service to the boxes app. We still dont use the store to communicate box information. That would be the next step. For now, this was tested by just making sure an array of values stayed in sync across the phones.
Change-Id: Id680b0106fa0d36d7a695f31774fdda0422fa5ad
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index 7bd3cd8..b76d464 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -57,17 +57,28 @@
import "C"
import (
+ "bytes"
"fmt"
"io"
"net"
+ "os"
+ "runtime"
+ "strings"
"unsafe"
"veyron/examples/boxes"
+ vsync "veyron/runtimes/google/vsync"
+ inaming "veyron/runtimes/google/naming"
+ "veyron/services/store/raw"
+ storage "veyron/services/store/server"
"veyron2"
"veyron2/ipc"
"veyron2/naming"
"veyron2/rt"
+ "veyron2/security"
+ "veyron2/services/store"
+ "veyron2/vom"
)
type jniState struct {
@@ -76,23 +87,28 @@
jMID C.jmethodID
}
-type veyronState struct {
+type goState struct {
runtime veyron2.Runtime
+ store *storage.Server
+ ipc ipc.Server
+
drawStream boxes.DrawInterfaceDrawStream
signalling boxes.BoxSignalling
boxList chan boxes.Box
}
var (
- veyron veyronState
+ gs goState
nativeJava jniState
+ myIPAddr string
)
const (
- signallingServer = "@1@tcp@54.200.34.44:8509@9282acd6784022b1945ee1044bddb804@@"
- signallingService = "signalling"
- drawService = "draw"
drawServicePort = ":8509"
+ storeServicePort = ":8000"
+ syncServicePort = ":8001"
+ storePath = "/data/data/com.boxes.android.draganddraw/files/vsync"
+ storeDatabase = "veyron_store.db"
)
func (jni *jniState) registerAddBox(env *C.JNIEnv, obj C.jobject) {
@@ -113,7 +129,14 @@
C.callMethod(env, jni.jObj, jni.jMID, jBoxId, &jPoints[0])
}
-func (v *veyronState) Draw(_ ipc.Context, stream boxes.DrawInterfaceServiceDrawStream) error {
+func (gs *goState) Draw(context ipc.Context, stream boxes.DrawInterfaceServiceDrawStream) error {
+ // Get the endpoint of the remote process
+ endPt, err := inaming.NewEndpoint(context.RemoteAddr().String())
+ if err != nil {
+ panic(fmt.Errorf("failed to parse endpoint:%v\n", err))
+ }
+ // Launch the sync service
+ initSyncService(endPt.Addr().String())
for {
box, err := stream.Recv()
if err == io.EOF {
@@ -127,65 +150,60 @@
return nil
}
-func (v *veyronState) sendBox(box boxes.Box) {
- v.boxList <- box
+func (gs *goState) sendBox(box boxes.Box) {
+ gs.boxList <- box
}
-func (v *veyronState) sendDrawLoop() {
- v.boxList = make(chan boxes.Box, 256)
+func (gs *goState) sendDrawLoop() {
for {
- if err := v.drawStream.Send(<-v.boxList); err != nil {
+ if err := gs.drawStream.Send(<-gs.boxList); err != nil {
break
}
}
}
-func (v *veyronState) registerAsPeer() {
- var registerIP string
- // Get an IP that can be published to the signal server
- ifaces, err := net.InterfaceAddrs()
- for _, addr := range ifaces {
- if ipa, ok := addr.(*net.IPNet); ok {
- if ip4 := ipa.IP.To4(); ip4 != nil && ip4.String() != "127.0.0.1" {
- registerIP = ip4.String()
- break
- }
- }
+func (gs *goState) registerAsPeer() {
+ auth := security.NewACLAuthorizer(security.ACL{security.AllPrincipals: security.LabelSet(security.AdminLabel)})
+ srv, err := gs.runtime.NewServer()
+ if err != nil {
+ panic(fmt.Errorf("Failed runtime.NewServer:%v", err))
}
- rtServer, err := veyron.runtime.NewServer()
+ drawServer := boxes.NewServerDrawInterface(gs)
+ if err := srv.Register("draw", ipc.SoloDispatcher(drawServer, auth)); err != nil {
+ panic(fmt.Errorf("Failed Register:%v", err))
+ }
+ endPt, err := srv.Listen("tcp", myIPAddr + drawServicePort)
if err != nil {
- panic(fmt.Errorf("failed runtime.NewServer:%v\n", err))
+ panic(fmt.Errorf("Failed to Listen:%v", err))
}
- drawServer := boxes.NewServerDrawInterface(&veyron)
- if err := rtServer.Register(drawService, ipc.SoloDispatcher(drawServer, nil)); err != nil {
- panic(fmt.Errorf("failed Register:%v\n", err))
- }
- endPt, err := rtServer.Listen("tcp", registerIP+drawServicePort)
- if err != nil {
- panic(fmt.Errorf("failed to Listen:%v\n", err))
- }
- if err := v.signalling.Add(endPt.String()); err != nil {
- panic(fmt.Errorf("failed to Add endpoint to signalling server:%v\n", err))
- }
- if err := rtServer.Publish("/" + drawService); err != nil {
- panic(fmt.Errorf("failed to Publish:%v\n", err))
+ if err := gs.signalling.Add(endPt.String()); err != nil {
+ panic(fmt.Errorf("Failed to Add endpoint to signalling server:%v", err))
}
}
-func (v *veyronState) connectPeer() {
- endPt, err := v.signalling.Get()
+func (gs *goState) connectPeer() {
+ endpointStr, err := gs.signalling.Get()
if err != nil {
panic(fmt.Errorf("failed to Get peer endpoint from signalling server:%v\n", err))
}
- drawInterface, err := boxes.BindDrawInterface(naming.JoinAddressName(endPt, drawService))
+ drawInterface, err := boxes.BindDrawInterface(naming.JoinAddressName(endpointStr, "draw"))
if err != nil {
panic(fmt.Errorf("failed BindDrawInterface:%v\n", err))
}
- if v.drawStream, err = drawInterface.Draw(); err != nil {
+ if gs.drawStream, err = drawInterface.Draw(); err != nil {
panic(fmt.Errorf("failed to get handle to Draw stream:%v\n", err))
}
- go v.sendDrawLoop()
+
+ // Initialize the store sync service that listens for updates from a peer
+ endpoint, err := inaming.NewEndpoint(endpointStr)
+ if err != nil {
+ panic(fmt.Errorf("failed to parse endpoint:%v\n", err))
+ }
+ initSyncService(endpoint.Addr().String())
+
+ gs.boxList = make(chan boxes.Box, 256)
+ go gs.sendDrawLoop()
}
//export JNI_OnLoad
@@ -196,18 +214,18 @@
//export Java_com_boxes_GoNative_registerAsPeer
func Java_com_boxes_GoNative_registerAsPeer(env *C.JNIEnv) {
- veyron.registerAsPeer()
+ gs.registerAsPeer()
}
//export Java_com_boxes_GoNative_connectPeer
func Java_com_boxes_GoNative_connectPeer(env *C.JNIEnv) {
- veyron.connectPeer()
+ gs.connectPeer()
}
//export Java_com_boxes_GoNative_sendDrawBox
func Java_com_boxes_GoNative_sendDrawBox(env *C.JNIEnv, class C.jclass,
boxId C.jstring, ox C.jfloat, oy C.jfloat, cx C.jfloat, cy C.jfloat) {
- veyron.sendBox(boxes.Box{BoxId: C.GoString(C.JToCString(env, boxId)), Points: [4]float32{float32(ox), float32(oy), float32(cx), float32(cy)}})
+ gs.sendBox(boxes.Box{BoxId: C.GoString(C.JToCString(env, boxId)), Points: [4]float32{float32(ox), float32(oy), float32(cx), float32(cy)}})
}
//export Java_com_boxes_GoNative_registerAddBox
@@ -215,10 +233,100 @@
nativeJava.registerAddBox(env, obj)
}
-func main() {
- veyron.runtime = rt.Init()
+func initStoreService() {
+ publicID := gs.runtime.Identity().PublicID()
+ if len(publicID.Names()) == 0 {
+ panic(fmt.Errorf("invalid PublicID:%v\n", publicID))
+ }
+
+ // Create a new store server
var err error
- if veyron.signalling, err = boxes.BindBoxSignalling(naming.JoinAddressName(signallingServer, signallingService)); err != nil {
+ storeDBName := storePath + "/" + storeDatabase
+ if gs.store, err = storage.New(storage.ServerConfig{Admin: publicID, DBName: storeDBName}); err != nil {
+ panic(fmt.Errorf("storage.New() failed:%v", err))
+ }
+
+ // Create ACL Authorizer with read/write permissions for the identity
+ acl, err := security.LoadACL(bytes.NewBufferString("{\"" + publicID.Names()[0] + "\":\"RW\"}"))
+ if err != nil {
+ panic(fmt.Errorf("LoadACL failed:%v", err))
+ }
+ auth := security.NewACLAuthorizer(acl)
+
+ // Register the services
+ if err = gs.ipc.Register(store.StoreSuffix, storage.NewStoreDispatcher(gs.store, auth)); err != nil {
+ panic(fmt.Errorf("s.Register(store) failed:%v", err))
+ }
+ if err := gs.ipc.Register(raw.RawStoreSuffix, storage.NewRawStoreDispatcher(gs.store, auth)); err != nil {
+ panic(fmt.Errorf("s.Register(rawstore) failed:%v", err))
+ }
+ if err := gs.ipc.Register("", storage.NewObjectDispatcher(gs.store, auth)); err != nil {
+ panic(fmt.Errorf("s.Register(object) failed:%v", err))
+ }
+
+ // Create an endpoint and start listening
+ if _, err = gs.ipc.Listen("tcp", myIPAddr + storeServicePort); err != nil {
+ panic(fmt.Errorf("s.Listen() failed:%v", err))
+ }
+}
+
+func initSyncService(peerEndpoint string) {
+ peerSyncAddr := strings.Split(peerEndpoint, ":")[0]
+ storeName := "/" + myIPAddr + storeServicePort
+ srv:= vsync.NewServerSync(vsync.NewSyncd(peerSyncAddr + syncServicePort, peerSyncAddr /* peer deviceID */, myIPAddr /* my deviceID */, storePath, storeName, 1))
+ if err := gs.ipc.Register("sync", ipc.SoloDispatcher(srv, nil)); err != nil {
+ panic(fmt.Errorf("syncd:: error registering service: err %v", err))
+ }
+ if _, err := gs.ipc.Listen("tcp", myIPAddr + syncServicePort); err != nil {
+ panic(fmt.Errorf("syncd:: error listening to service: err %v", err))
+ }
+}
+
+func init() {
+ vom.Register(&raw.Mutation{})
+ runtime.GOMAXPROCS(runtime.NumCPU())
+}
+
+func main() {
+ // Get an IP that can be published to the signal server
+ ifaces, _ := net.InterfaceAddrs()
+ for _, addr := range ifaces {
+ if ipa, ok := addr.(*net.IPNet); ok {
+ if ip4 := ipa.IP.To4(); ip4 != nil && ip4.String() != "127.0.0.1" {
+ myIPAddr = ip4.String()
+ break
+ }
+ }
+ }
+ if len(myIPAddr) <= 0 {
+ panic(fmt.Errorf("Failed to get value IPAddr:%v", ifaces))
+ }
+
+ myIdentity := "_4EEGgFCAP-DNBoBQwEudmV5cm9uL3J1bnRpbWVzL2dvb2dsZS9zZWN1cml0eS5jaGFpblByaXZhdGVJRAD_hVEYAQIBRAEIUHVibGljSUQAAQQBBlNlY3JldAABM3ZleXJvbi9ydW50aW1lcy9nb29nbGUvc2VjdXJpdHkvd2lyZS5DaGFpblByaXZhdGVJRAD_hwQaAUUA_4lJGAEBAUYBDENlcnRpZmljYXRlcwABMnZleXJvbi9ydW50aW1lcy9nb29nbGUvc2VjdXJpdHkvd2lyZS5DaGFpblB1YmxpY0lEAP-LBBIBRwD_jWcYAQQBAwEETmFtZQABSAEJUHVibGljS2V5AAFJAQdDYXZlYXRzAAFKAQlTaWduYXR1cmUAATB2ZXlyb24vcnVudGltZXMvZ29vZ2xlL3NlY3VyaXR5L3dpcmUuQ2VydGlmaWNhdGUA_49FGAECAUsBBUN1cnZlAAEEAQJYWQABLnZleXJvbi9ydW50aW1lcy9nb29nbGUvc2VjdXJpdHkvd2lyZS5QdWJsaWNLZXkA_5UzEAEyAS12ZXlyb24vcnVudGltZXMvZ29vZ2xlL3NlY3VyaXR5L3dpcmUua2V5Q3VydmUA_5EEEgFMAP-XRxgBAgFNAQdTZXJ2aWNlAAEEAQVCeXRlcwABK3ZleXJvbi9ydW50aW1lcy9nb29nbGUvc2VjdXJpdHkvd2lyZS5DYXZlYXQA_5knEAEDASF2ZXlyb24yL3NlY3VyaXR5LlByaW5jaXBhbFBhdHRlcm4A_5NAGAECAQQBAVIAAQQBAVMAAS52ZXlyb24vcnVudGltZXMvZ29vZ2xlL3NlY3VyaXR5L3dpcmUuU2lnbmF0dXJlAP-C_8ABAwEFAQEBCGdhdXRoYW10AQJBBL1M858IVO3sxJTYFxv1EiDVLFG6WdH-l4OpOHQXlZn5MO8LXNdnRhJ_r_Zwe92VHpbemsPNek_SJOfsSmsVRA8AAgEgm5eZNI1lUqwPCqlQOgesp-7zx0zLxvJe9IcwRbnMycoBINYA7GSPqFxDkbWYScL3Kj0k4AZK-e9sF01a7RAjcGMRAAAAASBldL9q-34I_W5yrapTmqaItm66RGNGtiUrFTlfh8VaNwA="
+
+ // Load identity that has been generated.
+ privateID, err := security.LoadIdentity(strings.NewReader(myIdentity))
+ if err != nil {
+ panic(fmt.Errorf("Failed to load identity:%v\n", err))
+ }
+
+ // Remove any remnant files from the store directory
+ if err = os.RemoveAll(storePath); err != nil {
+ panic(fmt.Errorf("Failed to remove remnant store files:%v\n", err))
+ }
+
+ // Initialize veyron runtime and bind to the signalling server used to rendezvous with
+ // another peer device. TODO(gauthamt): Switch to using the nameserver for signalling.
+ gs.runtime = rt.Init(veyron2.LocalID(privateID))
+ if gs.signalling, err = boxes.BindBoxSignalling(naming.JoinAddressName("@2@tcp@162.222.181.93:8509@08a93d90836cd94d4dc1acbe40b9048d@1@1@@", "signalling")); err != nil {
panic(fmt.Errorf("failed to bind to signalling server:%v\n", err))
}
+
+ // Create a new server instance
+ if gs.ipc, err = gs.runtime.NewServer(); err != nil {
+ panic(fmt.Errorf("r.NewServer() failed:%v", err))
+ }
+
+ // Initialize an instance of the store service for this device
+ initStoreService()
}