blob: d0cef6dd166ac60df03357a74c07f3d15d2f61fb [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package client
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/syncbase"
"v.io/x/ref/services/syncbase/longevity_tests/model"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Writer is a Client that periodically writes key/value pairs to a set of
// collections in a database. The time between writes can be configured by
// setting WriteInterval. The key/value pairs can be configured by setting
// KeyValueFunc.
type Writer struct {
// Amount of time to wait between db writes. Defaults to 1 second.
WriteInterval time.Duration
// Function that generates successive key/value pairs. If key is "", no
// row will be written.
// If KeyValueFunc is nil, keys will be strings containing the current unix
// timestamp and values will be random hex strings.
KeyValueFunc func(time.Time) (string, interface{})
// Function that will run after each write with the collection, key, value,
// and an error if one was encountered. Useful for tests.
OnWrite func(syncbase.Collection, string, interface{}, error)
ctx *context.T
// Map of databases and their respective collections.
dbColsMap map[syncbase.Database][]syncbase.Collection
err error
stopChan chan struct{}
// wg waits for all spawned goroutines to stop.
wg sync.WaitGroup
}
var _ Client = (*Writer)(nil)
func defaultKeyValueFunc(t time.Time) (string, interface{}) {
return fmt.Sprintf("%d", t.UnixNano()), fmt.Sprintf("%08x", rand.Int31())
}
func (w *Writer) String() string {
dbNames := []string{}
for db := range w.dbColsMap {
dbNames = append(dbNames, db.Id().Name)
}
return strings.Join(append([]string{"Writer"}, dbNames...), "-")
}
func (w *Writer) onTick(t time.Time) error {
key, value := w.KeyValueFunc(t)
if key == "" {
return nil
}
for _, colSlice := range w.dbColsMap {
for _, col := range colSlice {
err := col.Put(w.ctx, key, value)
if w.OnWrite != nil {
w.OnWrite(col, key, value, err)
}
w.setError(err)
}
}
return nil
}
func (w *Writer) Start(ctx *context.T, sbName string, databases model.DatabaseSet) {
w.ctx = ctx
w.err = nil
w.stopChan = make(chan struct{})
interval := w.WriteInterval
if interval == 0 {
interval = 1 * time.Second
}
if w.KeyValueFunc == nil {
w.KeyValueFunc = defaultKeyValueFunc
}
w.wg.Add(1)
go func() {
defer func() {
w.wg.Done()
w.stopChan = nil
}()
var err error
w.dbColsMap, _, err = CreateDbsAndCollections(ctx, sbName, databases)
if err != nil {
w.setError(err)
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
w.onTick(t)
case <-w.stopChan:
return
}
}
}()
}
func (w *Writer) Stop() error {
if w.stopChan != nil {
close(w.stopChan)
}
w.wg.Wait()
return w.err
}
func (w *Writer) setError(err error) {
if err != nil && w.err == nil {
w.err = err
}
}