| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vsync |
| |
| // Sync utility functions |
| |
| import ( |
| "time" |
| |
| "v.io/v23/context" |
| "v.io/v23/rpc" |
| "v.io/x/lib/vlog" |
| "v.io/x/ref/services/syncbase/server/util" |
| "v.io/x/ref/services/syncbase/store" |
| ) |
| |
| const ( |
| nanoPerSec = int64(1000000000) |
| ) |
| |
| // forEachDatabaseStore iterates over all Databases in all Apps within the |
| // service and invokes the callback function on each database. The callback |
| // returns a "done" flag to make forEachDatabaseStore() stop the iteration |
| // earlier; otherwise the function loops across all databases of all apps. |
| func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(string, string, store.Store) bool) { |
| // Get the apps and iterate over them. |
| // TODO(rdaoud): use a "privileged call" parameter instead of nil (here and |
| // elsewhere). |
| appNames, err := s.sv.AppNames(ctx, nil) |
| if err != nil { |
| vlog.Errorf("sync: forEachDatabaseStore: cannot get all app names: %v", err) |
| return |
| } |
| |
| for _, a := range appNames { |
| // For each app, get its databases and iterate over them. |
| app, err := s.sv.App(ctx, nil, a) |
| if err != nil { |
| vlog.Errorf("sync: forEachDatabaseStore: cannot get app %s: %v", a, err) |
| continue |
| } |
| dbNames, err := app.NoSQLDatabaseNames(ctx, nil) |
| if err != nil { |
| vlog.Errorf("sync: forEachDatabaseStore: cannot get all db names for app %s: %v", a, err) |
| continue |
| } |
| |
| for _, d := range dbNames { |
| // For each database, get its Store and invoke the callback. |
| db, err := app.NoSQLDatabase(ctx, nil, d) |
| if err != nil { |
| vlog.Errorf("sync: forEachDatabaseStore: cannot get db %s for app %s: %v", d, a, err) |
| continue |
| } |
| |
| if callback(a, d, db.St()) { |
| return // done, early exit |
| } |
| } |
| } |
| } |
| |
| // getDbStore gets the store handle to the database. |
| func (s *syncService) getDbStore(ctx *context.T, call rpc.ServerCall, appName, dbName string) (store.Store, error) { |
| app, err := s.sv.App(ctx, call, appName) |
| if err != nil { |
| return nil, err |
| } |
| db, err := app.NoSQLDatabase(ctx, call, dbName) |
| if err != nil { |
| return nil, err |
| } |
| return db.St(), nil |
| } |
| |
| // unixNanoToTime converts a Unix timestamp in nanoseconds to a Time object. |
| func unixNanoToTime(timestamp int64) time.Time { |
| if timestamp < 0 { |
| vlog.Fatalf("sync: unixNanoToTime: invalid timestamp %d", timestamp) |
| } |
| return time.Unix(timestamp/nanoPerSec, timestamp%nanoPerSec) |
| } |
| |
| // extractAppKey extracts the app key from the key sent over the wire between |
| // two Syncbases. The on-wire key starts with one of the store's reserved |
| // prefixes for managed namespaces (e.g. $row, $perms). This function removes |
| // that prefix and returns the application component of the key. This is done |
| // typically before comparing keys with the SyncGroup prefixes which are defined |
| // by the application. |
| func extractAppKey(key string) string { |
| parts := util.SplitKeyParts(key) |
| if len(parts) < 2 { |
| vlog.Fatalf("sync: extractAppKey: invalid entry key %s", key) |
| } |
| return util.JoinKeyParts(parts[1:]...) |
| } |