| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // measured is the Sensor Log measuring daemon. Runs on any device, sampling |
| // data points and writing them to Syncbase. Sampling configuration is read |
| // from Syncbase as written by the client. |
| package main |
| |
| import ( |
| "flag" |
| "os" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/x/lib/vlog" |
| "v.io/x/ref/lib/signals" |
| _ "v.io/x/ref/runtime/factories/generic" |
| "v.io/x/sensorlog/internal/config" |
| "v.io/x/sensorlog/internal/measure" |
| "v.io/x/sensorlog/internal/measure/runloop" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/sbutil" |
| "v.io/x/sensorlog/internal/util" |
| ) |
| |
| var ( |
| flagSbService = flag.String("service", config.DefaultSbService, "Name of the Syncbase service to connect to. Can be absolute or relative to the namespace root.") |
| flagDevId = flag.String("devid", "", "DevId to be claimed by this measured. Must be specified.") |
| // Flags below are only applied the first time measured is run against a Syncbase service with the same devid. |
| flagAdmin = flag.String("admin", "", "Blessing of admin user allowed to join the syncgroup. Must be specified.") |
| flagPublishSb = flag.String("publish-sb", "", "Syncbase service to publish the syncgroup at. Must be absolute. Must be specified. The syncgroup is published as '"+config.SyncgroupName("<publish-sb>", "<devid>")+"'.") |
| flagPublishMt = flag.String("publish-mt", "", "Additional mounttable to use for sync rendezvous in addition to namespace roots. Optional.") |
| ) |
| |
| func main() { |
| os.Exit(runMain()) |
| } |
| |
| func runMain() int { |
| ctx, shutdown := v23.Init() |
| defer shutdown() |
| |
| if *flagDevId == "" { |
| vlog.Errorf("-devid must be specified") |
| return 1 |
| } |
| if *flagAdmin == "" { |
| vlog.Errorf("-admin must be specified") |
| return 1 |
| } |
| if !naming.Rooted(*flagPublishSb) { |
| vlog.Errorf("-publish-sb must be rooted") |
| return 1 |
| } |
| publishMts := v23.GetNamespace(ctx).Roots() |
| if *flagPublishMt != "" { |
| publishMts = append(publishMts, *flagPublishMt) |
| } |
| |
| db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MeasuredTables) |
| if err != nil { |
| vlog.Errorf("Failed opening Syncbase db: %v", err) |
| return 1 |
| } |
| vlog.VI(0).Infof("measured connected to %s", db.FullName()) |
| |
| if err := measure.InitSyncgroup(ctx, db, *flagDevId, *flagAdmin, *flagPublishSb, publishMts); err != nil { |
| vlog.Errorf("Failed initializing syncgroup: %v", err) |
| return 1 |
| } |
| |
| ctx, stop := context.WithCancel(ctx) |
| |
| ml := runloop.NewLoop(func(err error) { |
| vlog.Errorf("Measure loop failed: %v", err) |
| stop() |
| }) |
| |
| writer := func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error { |
| return db.Table(key.Table()).Put(ctx, key.Key(), val) |
| } |
| |
| waitWatch := util.AsyncRun(func() error { |
| return measure.WatchForStreams(ctx, db, *flagDevId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error { |
| if val.Enabled { |
| ml.Register(ctx, key, &val.Sampler, writer) |
| } else { |
| ml.Unregister(key) |
| } |
| return nil |
| }) |
| }, func(err error) { |
| vlog.Errorf("Watch failed: %v", err) |
| stop() |
| }) |
| |
| defer func() { |
| _ = waitWatch() |
| ml.WaitAll() |
| }() |
| |
| select { |
| case <-signals.ShutdownOnSignals(nil): |
| stop() |
| vlog.VI(0).Infof("Exiting on signal") |
| return 0 |
| case <-ctx.Done(): |
| vlog.VI(0).Infof("Exiting on error") |
| return 1 |
| } |
| } |