blob: de89f98cd3fb067621068368391430e8a547641e [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"strconv"
"strings"
"time"
netcontext "golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/cloud"
"google.golang.org/cloud/bigtable"
"google.golang.org/cloud/bigtable/bttest"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"v.io/v23/context"
"v.io/v23/conventions"
"v.io/v23/security"
"v.io/v23/security/access"
v23mt "v.io/v23/services/mounttable"
"v.io/x/ref/lib/timekeeper"
)
const (
metadataFamily = "m"
serversFamily = "s"
childrenFamily = "c"
creatorColumn = "c"
permissionsColumn = "p"
stickyColumn = "s"
versionColumn = "v"
)
// NewBigTable returns a BigTable object that abstracts some aspects of the
// Cloud Bigtable API.
func NewBigTable(keyFile, project, zone, cluster, tableName string) (*BigTable, error) {
ctx := netcontext.Background()
tk, err := getTokenSource(ctx, bigtable.Scope, keyFile)
if err != nil {
return nil, err
}
client, err := bigtable.NewClient(ctx, project, zone, cluster, cloud.WithTokenSource(tk))
if err != nil {
return nil, err
}
bt := &BigTable{
tableName: tableName,
cache: &rowCache{},
createAdminClient: func() (*bigtable.AdminClient, error) {
return bigtable.NewAdminClient(ctx, project, zone, cluster, cloud.WithTokenSource(tk))
},
}
bt.nodeTbl = client.Open(bt.nodeTableName())
bt.counterTbl = client.Open(bt.counterTableName())
return bt, nil
}
// NewTestBigTable returns a BigTable object that is connected to an in-memory
// fake bigtable cluster.
func NewTestBigTable(tableName string) (*BigTable, func(), error) {
srv, err := bttest.NewServer("127.0.0.1:0")
if err != nil {
return nil, nil, err
}
ctx := netcontext.Background()
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
client, err := bigtable.NewClient(ctx, "", "", "", cloud.WithBaseGRPC(conn))
if err != nil {
return nil, nil, err
}
bt := &BigTable{
tableName: tableName,
testMode: true,
cache: &rowCache{},
createAdminClient: func() (*bigtable.AdminClient, error) {
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
if err != nil {
return nil, err
}
return bigtable.NewAdminClient(ctx, "", "", "", cloud.WithBaseGRPC(conn))
},
}
bt.nodeTbl = client.Open(bt.nodeTableName())
bt.counterTbl = client.Open(bt.counterTableName())
return bt, func() { srv.Close() }, nil
}
type BigTable struct {
tableName string
testMode bool
nodeTbl *bigtable.Table
counterTbl *bigtable.Table
cache *rowCache
createAdminClient func() (*bigtable.AdminClient, error)
}
func (b *BigTable) nodeTableName() string {
return b.tableName
}
func (b *BigTable) counterTableName() string {
return b.tableName + "-counters"
}
// SetupTable creates the table, column families, and GC policies.
func (b *BigTable) SetupTable(ctx *context.T, permissionsFile string) error {
bctx, cancel := btctx(ctx)
defer cancel()
client, err := b.createAdminClient()
if err != nil {
return err
}
defer client.Close()
if err := client.CreateTable(bctx, b.counterTableName()); err != nil {
return err
}
if err := client.CreateTable(bctx, b.nodeTableName()); err != nil {
return err
}
families := []struct {
tableName string
familyName string
gcPolicy bigtable.GCPolicy
}{
{b.counterTableName(), metadataFamily, bigtable.MaxVersionsPolicy(1)},
{b.nodeTableName(), metadataFamily, bigtable.MaxVersionsPolicy(1)},
{b.nodeTableName(), serversFamily, bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(1), bigtable.MaxAgePolicy(time.Second))},
{b.nodeTableName(), childrenFamily, bigtable.MaxVersionsPolicy(1)},
}
for _, f := range families {
if err := client.CreateColumnFamily(bctx, f.tableName, f.familyName); err != nil {
return err
}
if err := client.SetGCPolicy(bctx, f.tableName, f.familyName, f.gcPolicy); err != nil {
return err
}
}
if permissionsFile != "" {
return createNodesFromFile(ctx, b, permissionsFile)
}
perms := make(access.Permissions)
perms.Add(security.AllPrincipals, string(v23mt.Admin))
return b.createRow(ctx, "", perms, "", b.now())
}
func (b *BigTable) timeFloor(t bigtable.Timestamp) bigtable.Timestamp {
// The bigtable server expects millisecond granularity, but
// bigtable.Now() returns a timestamp with microsecond granularity.
//
// https://github.com/GoogleCloudPlatform/gcloud-golang/blob/master/bigtable/bttest/inmem.go#L734
// https://github.com/GoogleCloudPlatform/gcloud-golang/blob/master/bigtable/bigtable.go#L531
return (t / 1000) * 1000
}
func (b *BigTable) timeNext(t bigtable.Timestamp) bigtable.Timestamp {
return (t/1000 + 1) * 1000
}
func (b *BigTable) now() bigtable.Timestamp {
return b.timeFloor(bigtable.Now())
}
func (b *BigTable) time(t time.Time) bigtable.Timestamp {
return b.timeFloor(bigtable.Time(t))
}
// DeleteTable deletes the table.
func (b *BigTable) DeleteTable(ctx *context.T) error {
bctx, cancel := btctx(ctx)
defer cancel()
client, err := b.createAdminClient()
if err != nil {
return err
}
defer client.Close()
if err := client.DeleteTable(bctx, b.counterTableName()); err != nil {
return err
}
return client.DeleteTable(bctx, b.nodeTableName())
}
// DumpTable prints all the mounttable nodes stored in the bigtable.
func (b *BigTable) DumpTable(ctx *context.T) error {
bctx, cancel := btctx(ctx)
defer cancel()
clock := timekeeper.RealTime()
return b.nodeTbl.ReadRows(bctx, bigtable.InfiniteRange(""),
func(row bigtable.Row) bool {
n := nodeFromRow(ctx, b, row, clock)
if n.name == "" {
n.name = "(root)"
}
fmt.Printf("%s version: %s", n.name, n.version)
if n.sticky {
fmt.Printf(" sticky")
}
fmt.Printf(" perms: %s", n.permissions)
if len(n.servers) > 0 {
fmt.Printf(" servers:")
for _, s := range n.servers {
delta := s.Deadline.Time.Sub(clock.Now())
fmt.Printf(" [%s %ds]", s.Server, int(delta.Seconds()))
}
fmt.Printf(" flags: %+v", n.mountFlags)
}
if len(n.children) > 0 {
fmt.Printf(" children: [%s]", strings.Join(n.children, " "))
}
fmt.Println()
return true
},
bigtable.RowFilter(bigtable.LatestNFilter(1)),
)
}
func (b *BigTable) CountRows(ctx *context.T) (int, error) {
bctx, cancel := btctx(ctx)
defer cancel()
count := 0
if err := b.nodeTbl.ReadRows(bctx, bigtable.InfiniteRange(""),
func(row bigtable.Row) bool {
count++
return true
},
bigtable.RowFilter(bigtable.LatestNFilter(1)),
); err != nil {
return 0, err
}
return count, nil
}
func getTokenSource(ctx netcontext.Context, scope, keyFile string) (oauth2.TokenSource, error) {
if len(keyFile) == 0 {
return google.DefaultTokenSource(ctx, scope)
}
data, err := ioutil.ReadFile(keyFile)
if err != nil {
return nil, err
}
config, err := google.JWTConfigFromJSON(data, scope)
if err != nil {
return nil, err
}
return config.TokenSource(ctx), nil
}
func btctx(ctx *context.T) (netcontext.Context, func()) {
deadline, hasDeadline := ctx.Deadline()
now := time.Now()
if !hasDeadline || deadline.Sub(now) < time.Minute {
deadline = now.Add(time.Minute)
}
return netcontext.WithDeadline(netcontext.Background(), deadline)
}
func (b *BigTable) apply(ctx *context.T, row string, m *bigtable.Mutation, opts ...bigtable.ApplyOption) error {
bctx, cancel := btctx(ctx)
defer cancel()
// The local cache entry for this row is invalidated after each
// mutation, whether it succeeds or not.
// If it succeeds, the row has changed and the cached data is stale.
// If it fails, it's likely because of a concurrent mutation by another
// server.
// Either way, we can't used the cached version anymore.
defer b.cache.invalidate(row)
return b.nodeTbl.Apply(bctx, row, m, opts...)
}
func (b *BigTable) readRow(ctx *context.T, key string, opts ...bigtable.ReadOption) (bigtable.Row, error) {
row, err := b.cache.getRefresh(key,
func() (bigtable.Row, error) {
bctx, cancel := btctx(ctx)
defer cancel()
return b.nodeTbl.ReadRow(bctx, key, opts...)
},
)
if grpc.Code(err) == codes.DeadlineExceeded {
ctx.Errorf("Received DeadlineExceeded for %s", key)
if b.testMode {
panic("DeadlineExceeded from testserver")
}
}
return row, err
}
func (b *BigTable) createRow(ctx *context.T, name string, perms access.Permissions, creator string, ts bigtable.Timestamp) error {
jsonPerms, err := json.Marshal(perms)
if err != nil {
return err
}
if creator == "" {
creator = conventions.ServerUser
}
mut := bigtable.NewMutation()
mut.Set(metadataFamily, creatorColumn, ts, []byte(creator))
mut.Set(metadataFamily, permissionsColumn, bigtable.ServerTime, jsonPerms)
mut.Set(metadataFamily, versionColumn, bigtable.ServerTime, []byte(strconv.FormatUint(uint64(rand.Uint32()), 10)))
if err := b.apply(ctx, rowKey(name), mut); err != nil {
return err
}
return b.incrementCreatorNodeCount(ctx, creator, 1)
}
func (b *BigTable) incrementCreatorNodeCount(ctx *context.T, creator string, delta int64) error {
bctx, cancel := btctx(ctx)
defer cancel()
key := "num-nodes-per-user:" + creator
m := bigtable.NewReadModifyWrite()
m.Increment(metadataFamily, "c", delta)
row, err := b.counterTbl.ApplyReadModifyWrite(bctx, key, m)
if err != nil {
return err
}
if len(row[metadataFamily]) == 1 {
var c int64
b := row[metadataFamily][0].Value
if err := binary.Read(bytes.NewReader(b), binary.BigEndian, &c); err != nil {
return err
}
ctx.Infof("Counter %s = %d", key, c)
}
return nil
}