blob: 6ed2d1afbabe904f5456c9305b30829e6aab2dd5 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// measured is the Sensor Log measuring daemon. Runs on any device, sampling
// data points and writing them to Syncbase. Sampling configuration is read
// from Syncbase as written by the client.
package main
import (
"flag"
"os"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/x/lib/vlog"
"v.io/x/ref/lib/signals"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/sensorlog/internal/config"
"v.io/x/sensorlog/internal/measure"
"v.io/x/sensorlog/internal/measure/runloop"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbutil"
"v.io/x/sensorlog/internal/util"
)
var (
flagSbService = flag.String("service", config.DefaultSbService, "Name of the Syncbase service to connect to. Can be absolute or relative to the namespace root.")
flagDevId = flag.String("devid", "", "DevId to be claimed by this measured. Must be specified.")
// Flags below are only applied the first time measured is run against a Syncbase service with the same devid.
flagAdmin = flag.String("admin", "", "Blessing of admin user allowed to join the syncgroup. Must be specified.")
flagPublishSb = flag.String("publish-sb", "", "Syncbase service to publish the syncgroup at. Must be absolute. Must be specified. The syncgroup is published as '"+config.SyncgroupName("<devid>")+"' at <publish-sb>.")
flagPublishMt = flag.String("publish-mt", "", "Additional mount table to use for sync rendezvous in addition to namespace roots. Optional.")
)
func main() {
os.Exit(runMain())
}
func runMain() int {
ctx, shutdown := v23.Init()
defer shutdown()
if *flagDevId == "" {
vlog.Errorf("-devid must be specified")
return 1
}
if *flagAdmin == "" {
vlog.Errorf("-admin must be specified")
return 1
}
if !naming.Rooted(*flagPublishSb) {
vlog.Errorf("-publish-sb must be rooted")
return 1
}
publishMts := v23.GetNamespace(ctx).Roots()
if *flagPublishMt != "" {
publishMts = append(publishMts, *flagPublishMt)
}
db, err := sbutil.CreateOrOpenDB(ctx, *flagSbService, sbmodel.MeasuredCollections)
if err != nil {
vlog.Errorf("Failed opening Syncbase db: %v", err)
return 1
}
vlog.VI(0).Infof("measured connected to %s", db.FullName())
if err := measure.InitSyncgroup(ctx, db, *flagDevId, *flagAdmin, *flagPublishSb, publishMts); err != nil {
vlog.Errorf("Failed initializing syncgroup: %v", err)
return 1
}
ctx, stop := context.WithCancel(ctx)
ml := runloop.NewLoop(func(err error) {
vlog.Errorf("Measure loop failed: %v", err)
stop()
})
writer := func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
return db.Collection(ctx, key.Collection()).Put(ctx, key.Key(), val)
}
waitWatch := util.AsyncRun(func() error {
return measure.WatchForStreams(ctx, db, *flagDevId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
if val.Enabled {
ml.Register(ctx, key, &val.Sampler, writer)
} else {
ml.Unregister(key)
}
return nil
})
}, func(err error) {
vlog.Errorf("Watch failed: %v", err)
stop()
})
defer func() {
_ = waitWatch()
ml.WaitAll()
}()
select {
case <-signals.ShutdownOnSignals(nil):
stop()
vlog.VI(0).Infof("Exiting on signal")
return 0
case <-ctx.Done():
vlog.VI(0).Infof("Exiting on error")
return 1
}
}