blob: 45b492e938660a830e69106dce9a170325bfb2f2 [file] [log] [blame]
package memstore
import (
// cancellableContext implements ipc.Context.
type cancellableContext struct {
cancelled chan struct{}
func newCancellableContext() *cancellableContext {
return &cancellableContext{cancelled: make(chan struct{})}
func (*cancellableContext) Server() ipc.Server {
return nil
func (*cancellableContext) Method() string {
return ""
func (*cancellableContext) Name() string {
return ""
func (*cancellableContext) Suffix() string {
return ""
func (*cancellableContext) Label() (l security.Label) {
func (*cancellableContext) CaveatDischarges() security.CaveatDischargeMap {
return nil
func (*cancellableContext) LocalID() security.PublicID {
return rootPublicID
func (*cancellableContext) RemoteID() security.PublicID {
return rootPublicID
func (*cancellableContext) LocalAddr() net.Addr {
return nil
func (*cancellableContext) RemoteAddr() net.Addr {
return nil
func (*cancellableContext) Deadline() (t time.Time) {
func (ctx *cancellableContext) IsClosed() bool {
select {
case <-ctx.cancelled:
return true
return false
func (ctx *cancellableContext) Closed() <-chan struct{} {
return ctx.cancelled
// cancel synchronously closes the context. After cancel returns, calls to
// IsClosed will return true and the stream returned by Closed will be closed.
func (ctx *cancellableContext) cancel() {
// serverStream implements raw.StoreServicePutMutationsStream
type serverStream struct {
mus <-chan raw.Mutation
func (s *serverStream) Recv() (raw.Mutation, error) {
mu, ok := <-s.mus
if !ok {
return mu, io.EOF
return mu, nil
// clientStream implements raw.StorePutMutationsStream
type clientStream struct {
ctx ipc.Context
closed bool
mus chan<- raw.Mutation
err <-chan error
func (s *clientStream) Send(mu raw.Mutation) error {
s.mus <- mu
return nil
func (s *clientStream) CloseSend() error {
if !s.closed {
s.closed = true
return nil
func (s *clientStream) Finish() error {
return <-s.err
func (s *clientStream) Cancel() {
func putMutations(st *Store) raw.StorePutMutationsStream {
ctx := newCancellableContext()
mus := make(chan raw.Mutation)
err := make(chan error)
go func() {
err <- st.PutMutations(ctx, &serverStream{mus})
return &clientStream{
ctx: ctx,
mus: mus,
err: err,
func putMutationsBatch(t *testing.T, st *Store, mus []raw.Mutation) {
clientStream := putMutations(st)
for _, mu := range mus {
if err := clientStream.Finish(); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Errorf("%s(%d): can't put mutations %s: %s", file, line, mus, err)
func mkdir(t *testing.T, st *Store, tr storage.Transaction, path string) (storage.ID, interface{}) {
_, file, line, _ := runtime.Caller(1)
dir := &Dir{}
stat, err := st.Bind(path).Put(rootPublicID, tr, dir)
if err != nil || stat == nil {
t.Errorf("%s(%d): mkdir %s: %s", file, line, path, err)
return stat.ID, dir
func get(t *testing.T, st *Store, tr storage.Transaction, path string) interface{} {
_, file, line, _ := runtime.Caller(1)
e, err := st.Bind(path).Get(rootPublicID, tr)
if err != nil {
t.Fatalf("%s(%d): can't get %s: %s", file, line, path, err)
return e.Value
func put(t *testing.T, st *Store, tr storage.Transaction, path string, v interface{}) storage.ID {
_, file, line, _ := runtime.Caller(1)
stat, err := st.Bind(path).Put(rootPublicID, tr, v)
if err != nil {
t.Errorf("%s(%d): can't put %s: %s", file, line, path, err)
if _, err := st.Bind(path).Get(rootPublicID, tr); err != nil {
t.Errorf("%s(%d): can't get %s: %s", file, line, path, err)
if stat != nil {
return stat.ID
return storage.ID{}
func remove(t *testing.T, st *Store, tr storage.Transaction, path string) {
if err := st.Bind(path).Remove(rootPublicID, tr); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Errorf("%s(%d): can't remove %s: %s", file, line, path, err)
func commit(t *testing.T, tr storage.Transaction) {
if err := tr.Commit(); err != nil {
_, file, line, _ := runtime.Caller(1)
t.Fatalf("%s(%d): Transaction aborted: %s", file, line, err)
func expectExists(t *testing.T, st *Store, tr storage.Transaction, path string) {
_, file, line, _ := runtime.Caller(1)
if ok, _ := st.Bind(path).Exists(rootPublicID, tr); !ok {
t.Errorf("%s(%d): does not exist: %s", file, line, path)
func expectNotExists(t *testing.T, st *Store, tr storage.Transaction, path string) {
if e, err := st.Bind(path).Get(rootPublicID, tr); err == nil {
_, file, line, _ := runtime.Caller(1)
t.Errorf("%s(%d): should not exist: %s: got %+v", file, line, path, e.Value)
func expectValue(t *testing.T, st *Store, tr storage.Transaction, path string, v interface{}) {
_, file, line, _ := runtime.Caller(1)
e, err := st.Bind(path).Get(rootPublicID, tr)
if err != nil {
t.Errorf("%s(%d): does not exist: %s", file, line, path)
if e.Value != v {
t.Errorf("%s(%d): expected %+v, got %+v", file, line, e.Value, v)