blob: 0ebeea722b253339c562663aa6ecd18d0c8d168b [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
// Utilities for testing sync.
import (
"fmt"
"os"
"path"
"testing"
"time"
"v.io/v23/context"
"v.io/v23/rpc"
wire "v.io/v23/services/syncbase"
pubutil "v.io/v23/syncbase/util"
"v.io/v23/verror"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/store"
storeutil "v.io/x/ref/services/syncbase/store/util"
"v.io/x/ref/services/syncbase/store/watchable"
"v.io/x/ref/services/syncbase/vclock"
"v.io/x/ref/test"
)
var (
mockDbId = wire.Id{Blessing: "mockapp", Name: "mockdb"}
mockCRStream *conflictResolverStream
)
// mockService emulates a Syncbase service that includes store and sync.
// It is used to access mock databases.
type mockService struct {
engine string
dir string
st store.Store
vclock *vclock.VClock
sync *syncService
shutdown func()
}
func (s *mockService) St() store.Store {
return s.st
}
func (s *mockService) Sync() interfaces.SyncServerMethods {
return s.sync
}
func (s *mockService) Database(ctx *context.T, call rpc.ServerCall, dbId wire.Id) (interfaces.Database, error) {
wst, err := watchable.Wrap(s.st, s.vclock, &watchable.Options{
ManagedPrefixes: []string{common.RowPrefix, common.CollectionPermsPrefix},
})
if err != nil {
return nil, err
}
return &mockDatabase{st: wst}, nil
}
func (s *mockService) DatabaseIds(ctx *context.T, call rpc.ServerCall) ([]wire.Id, error) {
return []wire.Id{mockDbId}, nil
}
// mockDatabase emulates a Syncbase Database. It is used to test sync
// functionality.
type mockDatabase struct {
st *watchable.Store
}
func (d *mockDatabase) Id() wire.Id {
return mockDbId
}
func (d *mockDatabase) Service() interfaces.Service {
return nil
}
func (d *mockDatabase) St() *watchable.Store {
return d.st
}
func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
return verror.NewErrNotImplemented(ctx)
}
func (d *mockDatabase) GetSchemaMetadataInternal(ctx *context.T) (*wire.SchemaMetadata, error) {
return nil, verror.NewErrNoExist(ctx)
}
func (d *mockDatabase) CrConnectionStream() wire.ConflictManagerStartConflictResolverServerStream {
return mockCRStream
}
func (d *mockDatabase) ResetCrConnectionStream() {
mockCRStream = nil
}
// createService creates a mock Syncbase service used for testing sync
// functionality.
func createService(t *testing.T) *mockService {
ctx, shutdown := test.V23Init()
engine := store.EngineForTest
opts := storeutil.OpenOptions{CreateIfMissing: true, ErrorIfExists: false}
dir := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
st, err := storeutil.OpenStore(engine, path.Join(dir, engine), opts)
if err != nil {
t.Fatalf("cannot create store %s (%s): %v", engine, dir, err)
}
cl := vclock.NewVClock(st)
if err := cl.InitVClockData(); err != nil {
t.Fatalf("InitVClockData failed: %v", err)
}
s := &mockService{
engine: engine,
dir: dir,
st: st,
vclock: cl,
shutdown: shutdown,
}
if s.sync, err = New(ctx, s, engine, dir, cl, true); err != nil {
storeutil.DestroyStore(engine, dir)
t.Fatalf("cannot create sync service: %v", err)
}
return s
}
// createDatabase creates a mock database.
// TODO(sadovsky): These mocks use methods like s.Database in ways that violate
// the API contract. Can we eliminate them and just use the actual
// implementations?
func createDatabase(t *testing.T, s *mockService) *mockDatabase {
db, err := s.Database(nil, nil, mockDbId)
if err != nil {
t.Errorf("Error while accessing Database: %v", err)
}
// TODO(razvanm): Get rid of the type assertion from below.
return db.(*mockDatabase)
}
func setMockCRStream(crs *conflictResolverStream) {
mockCRStream = crs
}
// destroyService cleans up the mock Syncbase service.
func destroyService(t *testing.T, s *mockService) {
defer s.shutdown()
defer Close(s.sync)
if err := storeutil.DestroyStore(s.engine, s.dir); err != nil {
t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.dir, err)
}
}
// makeCxId returns a collection id with a default user blessing.
func makeCxId(name string) wire.Id {
return wire.Id{Blessing: "v.io:u:sam", Name: name}
}
// makeRowKey returns the database row key for a given application key.
func makeRowKey(key string) string {
return common.JoinKeyParts(common.RowPrefix, key)
}
func makeRowKeyFromParts(cxBlessing, cxName, row string) string {
return common.JoinKeyParts(common.RowPrefix, pubutil.EncodeId(wire.Id{cxBlessing, cxName}), row)
}
func makeCollectionPermsKey(cxBlessing, cxName string) string {
// TODO(rdaoud,ivanpi): See hack in collection.go.
return common.JoinKeyParts(common.CollectionPermsPrefix, pubutil.EncodeId(wire.Id{cxBlessing, cxName}), "")
}
// conflictResolverStream mock for testing.
type conflictResolverStream struct {
sendQ []wire.ConflictInfo
recvQ []wire.ResolutionInfo
sendErr error
recvErr error
}
func (crs *conflictResolverStream) RecvStream() interface {
Advance() bool
Value() wire.ResolutionInfo
Err() error
} {
return &recvStream{index: -1, cr: crs}
}
func (crs *conflictResolverStream) SendStream() interface {
Send(item wire.ConflictInfo) error
} {
return &sendStream{cr: crs}
}
type sendStream struct {
cr *conflictResolverStream
}
func (ss *sendStream) Send(item wire.ConflictInfo) error {
if ss.cr.sendErr != nil {
return ss.cr.sendErr
}
ss.cr.sendQ = append(ss.cr.sendQ, item)
return nil
}
type recvStream struct {
index int
cr *conflictResolverStream
}
func (rs *recvStream) Advance() bool {
rs.index++
return rs.index < len(rs.cr.recvQ)
}
func (rs *recvStream) Value() wire.ResolutionInfo {
return rs.cr.recvQ[rs.index]
}
func (rs *recvStream) Err() error {
return rs.cr.recvErr
}