blob: 759ff4374b5a16ee131baa0fd0d522db319f1dcd [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//=== High-level schema ===
// Each playground bundle is stored once as a BundleData in the bundle_data
// table. The BundleData contains the json string corresponding to the bundle
// files, and is indexed by the hash of the json.
//
// Links to a BundleData are stored in BundleLinks. There can be multiple
// BundleLinks corresponding to a single BundleData. BundleLinks are indexed by
// a unique id, and contain the hash of the BundleData that they correspond to.
//
// The additional layer of indirection provided by BundleLinks allows storing
// identical bundles more efficiently and makes the bundle Id independent of
// its contents, allowing implementation of change history, sharing, expiration
// etc.
//
// Each bundle save request first generates and stores a new BundleLink object,
// and will store a new BundleData only if it does not already exist in the
// database.
//
// Note: If bundles larger than ~1 MiB are to be stored, the max_allowed_packed
// SQL connection parameter must be increased.
//
// TODO(ivanpi): Normalize the Json (e.g. file order).
package storage
import (
crand "crypto/rand"
"database/sql"
"database/sql/driver"
"encoding/binary"
"errors"
"fmt"
"time"
"github.com/jmoiron/sqlx"
"v.io/x/playground/lib/hash"
)
var (
// Error returned when requested item is not found in the database.
ErrNotFound = errors.New("Not found")
// Error returned when an autogenerated ID matches an existing ID.
// Extremely unlikely for reasonably utilized database.
errIDCollision = errors.New("ID collision")
// Error returned when retries are exhausted.
errTooManyRetries = errors.New("Too many retries")
// Error returned from a transaction callback to trigger a rollback and
// retry. Other errors cause a rollback and abort.
errRetryTransaction = errors.New("Retry transaction")
)
//////////////////////////////////////////
// Relational type definitions
type BundleData struct {
// Raw SHA256 of the bundle contents
Hash []byte `db:"hash"` // primary key
// The bundle contents
Json string `db:"json"`
}
type BundleLink struct {
// 64-byte printable ASCII string
Id string `db:"id"` // primary key
// Part of the BundleLink specified by bundle author
BundleDesc
// Marks default bundles, returned by GetDefaultBundleList()
IsDefault bool `db:"is_default"`
// Raw SHA256 of the bundle contents
Hash []byte `db:"hash"` // foreign key => BundleData.Hash
// Link record creation time
CreatedAt time.Time `db:"created_at"`
}
type BundleDesc struct {
// Human-readable, URL-friendly unique name, up to 128 Unicode characters;
// used for newest version of default bundles (see `is_default`)
Slug EmptyNullString `db:"slug"`
}
//////////////////////////////////////////
// Helper type definitions
// Bundle that has not yet been saved into the database. Used as input type.
type NewBundle struct {
// Part of the BundleLink specified by bundle author
BundleDesc
// The bundle contents
Json string `db:"json"`
}
///////////////////////////////////////
// DB read-only methods
// TODO(nlacasse): Use prepared statements, otherwise we have an extra
// round-trip to the db, which is slow on cloud sql.
func getBundleLinkById(q sqlx.Queryer, id string) (*BundleLink, error) {
var bLink BundleLink
if err := sqlx.Get(q, &bLink, "SELECT * FROM bundle_link WHERE id=?", id); err != nil {
if err == sql.ErrNoRows {
err = ErrNotFound
}
return nil, err
}
return &bLink, nil
}
// Only default bundles can be retrieved by slug for now.
func getDefaultBundleLinkBySlug(q sqlx.Queryer, slug string) (*BundleLink, error) {
var bLink BundleLink
if err := sqlx.Get(q, &bLink, "SELECT * FROM bundle_link WHERE slug=? AND is_default", slug); err != nil {
if err == sql.ErrNoRows {
err = ErrNotFound
}
return nil, err
}
return &bLink, nil
}
func getBundleDataByHash(q sqlx.Queryer, hash []byte) (*BundleData, error) {
var bData BundleData
if err := sqlx.Get(q, &bData, "SELECT * FROM bundle_data WHERE hash=?", hash); err != nil {
if err == sql.ErrNoRows {
err = ErrNotFound
}
return nil, err
}
return &bData, nil
}
// All default bundles have non-empty slugs. Check just in case.
func getDefaultBundleList(q sqlx.Queryer) ([]*BundleLink, error) {
var bLinks []*BundleLink
if err := sqlx.Select(q, &bLinks, "SELECT * FROM bundle_link WHERE is_default AND slug IS NOT NULL"); err != nil {
return nil, err
}
return bLinks, nil
}
// GetBundleByLinkIdOrSlug retrieves a BundleData object linked to by a
// BundleLink with a particular id or slug. Id is tried first, slug if id
// doesn't exist.
// Note: This can fail if the bundle is deleted between fetching BundleLink
// and BundleData. However, it is highly unlikely, costly to mitigate (using
// a serializable transaction), and unimportant (error 500 instead of 404).
func GetBundleByLinkIdOrSlug(idOrSlug string) (*BundleLink, *BundleData, error) {
bLink, err := getBundleLinkById(dbRead, idOrSlug)
if err == ErrNotFound {
bLink, err = getDefaultBundleLinkBySlug(dbRead, idOrSlug)
}
if err != nil {
return nil, nil, err
}
bData, err := getBundleDataByHash(dbRead, bLink.Hash)
if err != nil {
return nil, nil, err
}
return bLink, bData, nil
}
// GetDefaultBundleList retrieves a list of BundleLink objects describing
// default bundles. All default bundles have slugs.
func GetDefaultBundleList() ([]*BundleLink, error) {
return getDefaultBundleList(dbRead)
}
////////////////////////////////////
// DB write methods
func storeBundleData(ext sqlx.Ext, bData *BundleData) error {
_, err := sqlx.NamedExec(ext, "INSERT INTO bundle_data (hash, json) VALUES (:hash, :json)", bData)
return err
}
func storeBundleLink(ext sqlx.Ext, bLink *BundleLink) error {
_, err := sqlx.NamedExec(ext, "INSERT INTO bundle_link (id, slug, is_default, hash) VALUES (:id, :slug, :is_default, :hash)", bLink)
return err
}
func storeBundle(tx *sqlx.Tx, bundle *NewBundle, asDefault bool) (*BundleLink, *BundleData, error) {
// All default bundles must have non-empty slugs.
if asDefault && bundle.Slug == "" {
return nil, nil, fmt.Errorf("default bundle must have non-empty slug")
}
bHashRaw := hash.Raw([]byte(bundle.Json))
bHash := bHashRaw[:]
// Generate a random id for the bundle link.
id, err := randomLink(bHash)
if err != nil {
return nil, nil, fmt.Errorf("error creating link id: %v", err)
}
// Check if bundle link with this id already exists in DB.
if _, err = getBundleLinkById(tx, id); err == nil {
// Bundle was found. Return ID collision error.
return nil, nil, errIDCollision
} else if err != ErrNotFound {
return nil, nil, fmt.Errorf("error checking for bundle link: %v", err)
}
// Check if bundle data with this hash already exists in DB.
bData, err := getBundleDataByHash(tx, bHash)
if err != nil {
if err != ErrNotFound {
return nil, nil, fmt.Errorf("error checking for bundle data: %v", err)
}
// Bundle does not exist in DB. Store it.
bData = &BundleData{
Hash: bHash,
Json: bundle.Json,
}
if err = storeBundleData(tx, bData); err != nil {
return nil, nil, fmt.Errorf("error storing bundle data: %v", err)
}
}
// Store the bundle link.
bLink := &BundleLink{
Id: id,
BundleDesc: bundle.BundleDesc,
IsDefault: asDefault,
Hash: bHash,
}
if err = storeBundleLink(tx, bLink); err != nil {
return nil, nil, fmt.Errorf("error storing bundle link: %v", err)
}
return bLink, bData, nil
}
func unmarkDefaultBundles(ext sqlx.Ext) error {
_, err := ext.Exec("UPDATE bundle_link SET slug=NULL, is_default=false WHERE is_default")
if err != nil {
return fmt.Errorf("failed unmarking default bundles: %v", err)
}
return nil
}
// StoreBundleLinkAndData creates a new bundle data for a given json byte slice
// if one does not already exist. It will create a new bundle link pointing to
// that data. All DB access is done in a transaction, which will retry up to 3
// times. Both the link and the data are returned, or an error if one occured.
// Slugs are currently not allowed for user-stored bundles.
func StoreBundleLinkAndData(json string) (bLink *BundleLink, bData *BundleData, retErr error) {
retErr = runInTransaction(dbSeq, 3, func(tx *sqlx.Tx) (err error) {
bLink, bData, err = storeBundle(tx, &NewBundle{Json: string(json)}, false)
if err == errIDCollision {
return errRetryTransaction
}
return err
})
return
}
// ReplaceDefaultBundles removes slugs and default flags from all existing
// default bundles and inserts all bundles in newDefBundles as default bundles.
// Each bundle in newDefBundles must have a unique non-empty slug.
func ReplaceDefaultBundles(newDefBundles []*NewBundle) (retErr error) {
retErr = runInTransaction(dbSeq, 5, func(tx *sqlx.Tx) error {
if err := unmarkDefaultBundles(tx); err != nil {
return err
}
for _, bundle := range newDefBundles {
if _, _, err := storeBundle(tx, bundle, true); err != nil {
if err == errIDCollision {
return errRetryTransaction
}
return err
}
}
return nil
})
return
}
//////////////////////////////////////////
// Transaction support
// Runs function txf inside a SQL transaction. txf should only use the database
// handle passed to it, which shares the prepared transaction cache with the
// original handle. If txf returns nil, the transaction is committed.
// Otherwise, it is rolled back.
// txf is retried at most maxRetries times, with a fresh transaction for every
// attempt, until the commit is successful. txf should not have side effects
// that could affect subsequent retries (apart from database operations, which
// are rolled back).
// If the error returned from txf is errRetryTransaction, txf is retried as if
// the commit failed. Otherwise, txf is not retried, and RunInTransaction
// returns the error.
// In rare cases, txf may be retried even if the transaction was successfully
// committed (when commit falsely returns an error). txf should be idempotent
// or able to detect this case.
// If maxRetries is exhausted, runInTransaction returns errTooManyRetries.
// Nested transactions are not supported and result in undefined behaviour.
// Inspired by https://cloud.google.com/appengine/docs/go/datastore/reference#RunInTransaction
func runInTransaction(db *sqlx.DB, maxRetries int, txf func(tx *sqlx.Tx) error) error {
for i := 0; i < maxRetries; i++ {
err := attemptInTransaction(db, txf)
if err == nil {
return nil
} else if err != errRetryTransaction {
return err
}
}
return errTooManyRetries
}
func attemptInTransaction(db *sqlx.DB, txf func(tx *sqlx.Tx) error) (rerr error) {
tx, err := db.Beginx()
if err != nil {
return fmt.Errorf("Failed opening transaction: %v", err)
}
defer func() {
// UPSTREAM BUG WORKAROUND: Rollback anyway to release transaction after
// manual commit.
//if rerr != nil {
if true {
// Silently ignore rollback error, we cannot do anything. Transaction
// will timeout eventually.
// UPSTREAM BUG: Transaction does not timeout, connection gets reused.
// This case is unlikely, but dangerous.
// TODO(ivanpi): Remove workaround when bug is resolved.
_ = tx.Rollback()
}
}()
// Call txf with the transaction handle - a shallow copy of the database
// handle (sharing the mutex, database connection, queries) with the
// transaction object added.
if err := txf(tx); err != nil {
return err
}
// UPSTREAM BUG WORKAROUND: Commit manually.
//if err = tx.Commit(); err != nil {
if _, err = tx.Exec("COMMIT"); err != nil {
return errRetryTransaction
}
return nil
}
////////////////////////////////////////////
// SQL helper types
// EmptyNullString is a convenience type mapping an empty string value to a
// NULL value in the database and vice-versa. It is less cumbersome and
// error-prone to use for this purpose than sql.NullString.
// NULL values are used instead of empty strings to allow a UNIQUE index on
// the value in MySQL with the semantics 'unique if not NULL'. MySQL doesn't
// have support for filtered (conditional) indexes otherwise.
type EmptyNullString string
func (s *EmptyNullString) Scan(value interface{}) error {
var ns sql.NullString
if err := ns.Scan(value); err != nil {
return err
}
if ns.Valid {
*s = EmptyNullString(ns.String)
} else {
*s = ""
}
return nil
}
func (s EmptyNullString) Value() (driver.Value, error) {
ns := sql.NullString{
String: string(s),
Valid: (s != ""),
}
return ns.Value()
}
////////////////////////////////////////////
// Helper methods
// randomLink creates a random link id for a given hash.
func randomLink(bHash []byte) (string, error) {
h := make([]byte, 32, 32+len(bHash))
err := binary.Read(crand.Reader, binary.LittleEndian, h)
if err != nil {
return "", fmt.Errorf("RNG failed: %v", err)
}
return "_" + hash.String(append(h, bHash...))[1:], nil
}