blob: aee1f6ae597259a847c276c802e66c66a3f50612 [file] [log] [blame]
package bttest
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
"golang.org/x/net/context"
btdpb "google.golang.org/cloud/bigtable/internal/data_proto"
btspb "google.golang.org/cloud/bigtable/internal/service_proto"
bttdpb "google.golang.org/cloud/bigtable/internal/table_data_proto"
bttspb "google.golang.org/cloud/bigtable/internal/table_service_proto"
)
func TestConcurrentMutationsAndGC(t *testing.T) {
s := &server{
tables: make(map[string]*table),
}
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
if _, err := s.CreateTable(
ctx,
&bttspb.CreateTableRequest{Name: "cluster", TableId: "t"}); err != nil {
t.Fatal(err)
}
const name = `cluster/tables/t`
tbl := s.tables[name]
req := &bttspb.CreateColumnFamilyRequest{Name: name, ColumnFamilyId: "cf"}
fam, err := s.CreateColumnFamily(ctx, req)
if err != nil {
t.Fatal(err)
}
fam.GcRule = &bttdpb.GcRule{Rule: &bttdpb.GcRule_MaxNumVersions{MaxNumVersions: 1}}
if _, err := s.UpdateColumnFamily(ctx, fam); err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
var ts int64
ms := func() []*btdpb.Mutation {
return []*btdpb.Mutation{
{
Mutation: &btdpb.Mutation_SetCell_{
SetCell: &btdpb.Mutation_SetCell{
FamilyName: "cf",
ColumnQualifier: []byte(`col`),
TimestampMicros: atomic.AddInt64(&ts, 1000),
},
},
},
}
}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for ctx.Err() == nil {
req := &btspb.MutateRowRequest{
TableName: name,
RowKey: []byte(fmt.Sprint(rand.Intn(100))),
Mutations: ms(),
}
s.MutateRow(ctx, req)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
tbl.gc()
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.Error("Concurrent mutations and GCs haven't completed after 100ms")
}
}