| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| //=== High-level schema === |
| // Each playground bundle is stored once as a BundleData in the bundle_data |
| // table. The BundleData contains the json string corresponding to the bundle |
| // files, and is indexed by the hash of the json. |
| // |
| // Links to a BundleData are stored in BundleLinks. There can be multiple |
| // BundleLinks corresponding to a single BundleData. BundleLinks are indexed by |
| // a unique id, and contain the hash of the BundleData that they correspond to. |
| // |
| // The additional layer of indirection provided by BundleLinks allows storing |
| // identical bundles more efficiently and makes the bundle Id independent of |
| // its contents, allowing implementation of change history, sharing, expiration |
| // etc. |
| // |
| // Each bundle save request first generates and stores a new BundleLink object, |
| // and will store a new BundleData only if it does not already exist in the |
| // database. |
| // |
| // Note: If bundles larger than ~1 MiB are to be stored, the max_allowed_packed |
| // SQL connection parameter must be increased. |
| // |
| // TODO(ivanpi): Normalize the Json (e.g. file order). |
| |
| package storage |
| |
| import ( |
| crand "crypto/rand" |
| "database/sql" |
| "database/sql/driver" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "time" |
| |
| "github.com/jmoiron/sqlx" |
| |
| "v.io/x/playground/lib/hash" |
| ) |
| |
| var ( |
| // Error returned when requested item is not found in the database. |
| ErrNotFound = errors.New("Not found") |
| |
| // Error returned when an autogenerated ID matches an existing ID. |
| // Extremely unlikely for reasonably utilized database. |
| errIDCollision = errors.New("ID collision") |
| |
| // Error returned when retries are exhausted. |
| errTooManyRetries = errors.New("Too many retries") |
| |
| // Error returned from a transaction callback to trigger a rollback and |
| // retry. Other errors cause a rollback and abort. |
| errRetryTransaction = errors.New("Retry transaction") |
| ) |
| |
| ////////////////////////////////////////// |
| // Relational type definitions |
| |
| type BundleData struct { |
| // Raw SHA256 of the bundle contents |
| Hash []byte `db:"hash"` // primary key |
| // The bundle contents |
| Json string `db:"json"` |
| } |
| |
| type BundleLink struct { |
| // 64-byte printable ASCII string |
| Id string `db:"id"` // primary key |
| // Part of the BundleLink specified by bundle author |
| BundleDesc |
| // Marks default bundles, returned by GetDefaultBundleList() |
| IsDefault bool `db:"is_default"` |
| // Raw SHA256 of the bundle contents |
| Hash []byte `db:"hash"` // foreign key => BundleData.Hash |
| // Link record creation time |
| CreatedAt time.Time `db:"created_at"` |
| } |
| |
| type BundleDesc struct { |
| // Human-readable, URL-friendly unique name, up to 128 Unicode characters; |
| // used for newest version of default bundles (see `is_default`) |
| Slug EmptyNullString `db:"slug"` |
| } |
| |
| ////////////////////////////////////////// |
| // Helper type definitions |
| |
| // Bundle that has not yet been saved into the database. Used as input type. |
| type NewBundle struct { |
| // Part of the BundleLink specified by bundle author |
| BundleDesc |
| // The bundle contents |
| Json string `db:"json"` |
| } |
| |
| /////////////////////////////////////// |
| // DB read-only methods |
| |
| // TODO(nlacasse): Use prepared statements, otherwise we have an extra |
| // round-trip to the db, which is slow on cloud sql. |
| |
| func getBundleLinkById(q sqlx.Queryer, id string) (*BundleLink, error) { |
| var bLink BundleLink |
| if err := sqlx.Get(q, &bLink, "SELECT * FROM bundle_link WHERE id=?", id); err != nil { |
| if err == sql.ErrNoRows { |
| err = ErrNotFound |
| } |
| return nil, err |
| } |
| return &bLink, nil |
| } |
| |
| // Only default bundles can be retrieved by slug for now. |
| func getDefaultBundleLinkBySlug(q sqlx.Queryer, slug string) (*BundleLink, error) { |
| var bLink BundleLink |
| if err := sqlx.Get(q, &bLink, "SELECT * FROM bundle_link WHERE slug=? AND is_default", slug); err != nil { |
| if err == sql.ErrNoRows { |
| err = ErrNotFound |
| } |
| return nil, err |
| } |
| return &bLink, nil |
| } |
| |
| func getBundleDataByHash(q sqlx.Queryer, hash []byte) (*BundleData, error) { |
| var bData BundleData |
| if err := sqlx.Get(q, &bData, "SELECT * FROM bundle_data WHERE hash=?", hash); err != nil { |
| if err == sql.ErrNoRows { |
| err = ErrNotFound |
| } |
| return nil, err |
| } |
| return &bData, nil |
| } |
| |
| // All default bundles have non-empty slugs. Check just in case. |
| func getDefaultBundleList(q sqlx.Queryer) ([]*BundleLink, error) { |
| var bLinks []*BundleLink |
| if err := sqlx.Select(q, &bLinks, "SELECT * FROM bundle_link WHERE is_default AND slug IS NOT NULL"); err != nil { |
| return nil, err |
| } |
| return bLinks, nil |
| } |
| |
| // GetBundleByLinkIdOrSlug retrieves a BundleData object linked to by a |
| // BundleLink with a particular id or slug. Id is tried first, slug if id |
| // doesn't exist. |
| // Note: This can fail if the bundle is deleted between fetching BundleLink |
| // and BundleData. However, it is highly unlikely, costly to mitigate (using |
| // a serializable transaction), and unimportant (error 500 instead of 404). |
| func GetBundleByLinkIdOrSlug(idOrSlug string) (*BundleLink, *BundleData, error) { |
| bLink, err := getBundleLinkById(dbRead, idOrSlug) |
| if err == ErrNotFound { |
| bLink, err = getDefaultBundleLinkBySlug(dbRead, idOrSlug) |
| } |
| if err != nil { |
| return nil, nil, err |
| } |
| bData, err := getBundleDataByHash(dbRead, bLink.Hash) |
| if err != nil { |
| return nil, nil, err |
| } |
| return bLink, bData, nil |
| } |
| |
| // GetDefaultBundleList retrieves a list of BundleLink objects describing |
| // default bundles. All default bundles have slugs. |
| func GetDefaultBundleList() ([]*BundleLink, error) { |
| return getDefaultBundleList(dbRead) |
| } |
| |
| //////////////////////////////////// |
| // DB write methods |
| |
| func storeBundleData(ext sqlx.Ext, bData *BundleData) error { |
| _, err := sqlx.NamedExec(ext, "INSERT INTO bundle_data (hash, json) VALUES (:hash, :json)", bData) |
| return err |
| } |
| |
| func storeBundleLink(ext sqlx.Ext, bLink *BundleLink) error { |
| _, err := sqlx.NamedExec(ext, "INSERT INTO bundle_link (id, slug, is_default, hash) VALUES (:id, :slug, :is_default, :hash)", bLink) |
| return err |
| } |
| |
| func storeBundle(tx *sqlx.Tx, bundle *NewBundle, asDefault bool) (*BundleLink, *BundleData, error) { |
| // All default bundles must have non-empty slugs. |
| if asDefault && bundle.Slug == "" { |
| return nil, nil, fmt.Errorf("default bundle must have non-empty slug") |
| } |
| |
| bHashRaw := hash.Raw([]byte(bundle.Json)) |
| bHash := bHashRaw[:] |
| |
| // Generate a random id for the bundle link. |
| id, err := randomLink(bHash) |
| if err != nil { |
| return nil, nil, fmt.Errorf("error creating link id: %v", err) |
| } |
| |
| // Check if bundle link with this id already exists in DB. |
| if _, err = getBundleLinkById(tx, id); err == nil { |
| // Bundle was found. Return ID collision error. |
| return nil, nil, errIDCollision |
| } else if err != ErrNotFound { |
| return nil, nil, fmt.Errorf("error checking for bundle link: %v", err) |
| } |
| |
| // Check if bundle data with this hash already exists in DB. |
| bData, err := getBundleDataByHash(tx, bHash) |
| if err != nil { |
| if err != ErrNotFound { |
| return nil, nil, fmt.Errorf("error checking for bundle data: %v", err) |
| } |
| |
| // Bundle does not exist in DB. Store it. |
| bData = &BundleData{ |
| Hash: bHash, |
| Json: bundle.Json, |
| } |
| if err = storeBundleData(tx, bData); err != nil { |
| return nil, nil, fmt.Errorf("error storing bundle data: %v", err) |
| } |
| } |
| |
| // Store the bundle link. |
| bLink := &BundleLink{ |
| Id: id, |
| BundleDesc: bundle.BundleDesc, |
| IsDefault: asDefault, |
| Hash: bHash, |
| } |
| if err = storeBundleLink(tx, bLink); err != nil { |
| return nil, nil, fmt.Errorf("error storing bundle link: %v", err) |
| } |
| |
| return bLink, bData, nil |
| } |
| |
| func unmarkDefaultBundles(ext sqlx.Ext) error { |
| _, err := ext.Exec("UPDATE bundle_link SET slug=NULL, is_default=false WHERE is_default") |
| if err != nil { |
| return fmt.Errorf("failed unmarking default bundles: %v", err) |
| } |
| |
| return nil |
| } |
| |
| // StoreBundleLinkAndData creates a new bundle data for a given json byte slice |
| // if one does not already exist. It will create a new bundle link pointing to |
| // that data. All DB access is done in a transaction, which will retry up to 3 |
| // times. Both the link and the data are returned, or an error if one occured. |
| // Slugs are currently not allowed for user-stored bundles. |
| func StoreBundleLinkAndData(json string) (bLink *BundleLink, bData *BundleData, retErr error) { |
| retErr = runInTransaction(dbSeq, 3, func(tx *sqlx.Tx) (err error) { |
| bLink, bData, err = storeBundle(tx, &NewBundle{Json: string(json)}, false) |
| if err == errIDCollision { |
| return errRetryTransaction |
| } |
| return err |
| }) |
| |
| return |
| } |
| |
| // ReplaceDefaultBundles removes slugs and default flags from all existing |
| // default bundles and inserts all bundles in newDefBundles as default bundles. |
| // Each bundle in newDefBundles must have a unique non-empty slug. |
| func ReplaceDefaultBundles(newDefBundles []*NewBundle) (retErr error) { |
| retErr = runInTransaction(dbSeq, 5, func(tx *sqlx.Tx) error { |
| if err := unmarkDefaultBundles(tx); err != nil { |
| return err |
| } |
| |
| for _, bundle := range newDefBundles { |
| if _, _, err := storeBundle(tx, bundle, true); err != nil { |
| if err == errIDCollision { |
| return errRetryTransaction |
| } |
| return err |
| } |
| } |
| |
| return nil |
| }) |
| |
| return |
| } |
| |
| ////////////////////////////////////////// |
| // Transaction support |
| |
| // Runs function txf inside a SQL transaction. txf should only use the database |
| // handle passed to it, which shares the prepared transaction cache with the |
| // original handle. If txf returns nil, the transaction is committed. |
| // Otherwise, it is rolled back. |
| // txf is retried at most maxRetries times, with a fresh transaction for every |
| // attempt, until the commit is successful. txf should not have side effects |
| // that could affect subsequent retries (apart from database operations, which |
| // are rolled back). |
| // If the error returned from txf is errRetryTransaction, txf is retried as if |
| // the commit failed. Otherwise, txf is not retried, and RunInTransaction |
| // returns the error. |
| // In rare cases, txf may be retried even if the transaction was successfully |
| // committed (when commit falsely returns an error). txf should be idempotent |
| // or able to detect this case. |
| // If maxRetries is exhausted, runInTransaction returns errTooManyRetries. |
| // Nested transactions are not supported and result in undefined behaviour. |
| // Inspired by https://cloud.google.com/appengine/docs/go/datastore/reference#RunInTransaction |
| func runInTransaction(db *sqlx.DB, maxRetries int, txf func(tx *sqlx.Tx) error) error { |
| for i := 0; i < maxRetries; i++ { |
| err := attemptInTransaction(db, txf) |
| if err == nil { |
| return nil |
| } else if err != errRetryTransaction { |
| return err |
| } |
| } |
| return errTooManyRetries |
| } |
| |
| func attemptInTransaction(db *sqlx.DB, txf func(tx *sqlx.Tx) error) (rerr error) { |
| tx, err := db.Beginx() |
| if err != nil { |
| return fmt.Errorf("Failed opening transaction: %v", err) |
| } |
| defer func() { |
| // UPSTREAM BUG WORKAROUND: Rollback anyway to release transaction after |
| // manual commit. |
| //if rerr != nil { |
| if true { |
| // Silently ignore rollback error, we cannot do anything. Transaction |
| // will timeout eventually. |
| // UPSTREAM BUG: Transaction does not timeout, connection gets reused. |
| // This case is unlikely, but dangerous. |
| // TODO(ivanpi): Remove workaround when bug is resolved. |
| _ = tx.Rollback() |
| } |
| }() |
| // Call txf with the transaction handle - a shallow copy of the database |
| // handle (sharing the mutex, database connection, queries) with the |
| // transaction object added. |
| if err := txf(tx); err != nil { |
| return err |
| } |
| // UPSTREAM BUG WORKAROUND: Commit manually. |
| //if err = tx.Commit(); err != nil { |
| if _, err = tx.Exec("COMMIT"); err != nil { |
| return errRetryTransaction |
| } |
| return nil |
| } |
| |
| //////////////////////////////////////////// |
| // SQL helper types |
| |
| // EmptyNullString is a convenience type mapping an empty string value to a |
| // NULL value in the database and vice-versa. It is less cumbersome and |
| // error-prone to use for this purpose than sql.NullString. |
| // NULL values are used instead of empty strings to allow a UNIQUE index on |
| // the value in MySQL with the semantics 'unique if not NULL'. MySQL doesn't |
| // have support for filtered (conditional) indexes otherwise. |
| type EmptyNullString string |
| |
| func (s *EmptyNullString) Scan(value interface{}) error { |
| var ns sql.NullString |
| if err := ns.Scan(value); err != nil { |
| return err |
| } |
| if ns.Valid { |
| *s = EmptyNullString(ns.String) |
| } else { |
| *s = "" |
| } |
| return nil |
| } |
| |
| func (s EmptyNullString) Value() (driver.Value, error) { |
| ns := sql.NullString{ |
| String: string(s), |
| Valid: (s != ""), |
| } |
| return ns.Value() |
| } |
| |
| //////////////////////////////////////////// |
| // Helper methods |
| |
| // randomLink creates a random link id for a given hash. |
| func randomLink(bHash []byte) (string, error) { |
| h := make([]byte, 32, 32+len(bHash)) |
| err := binary.Read(crand.Reader, binary.LittleEndian, h) |
| if err != nil { |
| return "", fmt.Errorf("RNG failed: %v", err) |
| } |
| return "_" + hash.String(append(h, bHash...))[1:], nil |
| } |