Merge branch 'master' into todoshacks_bt
diff --git a/lib/discovery/advertise.go b/lib/discovery/advertise.go
index 67d43cb..bf93e52 100644
--- a/lib/discovery/advertise.go
+++ b/lib/discovery/advertise.go
@@ -13,6 +13,7 @@
"v.io/v23/discovery"
"v.io/v23/naming"
"v.io/v23/security"
+ "v.io/x/lib/vlog"
"v.io/x/ref/lib/stats"
)
@@ -22,6 +23,7 @@
)
func (d *idiscovery) advertise(ctx *context.T, session sessionId, ad *discovery.Advertisement, visibility []security.BlessingPattern) (<-chan struct{}, error) {
+ vlog.Infof("HACK: advertising")
if !ad.Id.IsValid() {
var err error
if ad.Id, err = discovery.NewAdId(); err != nil {
@@ -33,6 +35,7 @@
}
adinfo := &AdInfo{Ad: *ad}
+ vlog.Infof("HACK: advertising adinfo: %v", adinfo)
if err := encrypt(ctx, adinfo, visibility); err != nil {
return nil, err
}
@@ -117,6 +120,7 @@
}
func (d *idiscovery) startAdvertising(ctx *context.T, adinfo *AdInfo) (func(), error) {
+ vlog.Infof("HACK: startAdvertising adinfo: %v", adinfo)
statName := naming.Join(d.statsPrefix, "ad", adinfo.Ad.Id.String())
stats.NewStringFunc(statName, func() string { return fmt.Sprint(*adinfo) })
ctx, cancel := context.WithCancel(ctx)
diff --git a/lib/discovery/plugins/ble/ble.go b/lib/discovery/plugins/ble/ble.go
index e8a16c4..de7c310 100644
--- a/lib/discovery/plugins/ble/ble.go
+++ b/lib/discovery/plugins/ble/ble.go
@@ -14,6 +14,7 @@
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/naming"
+ "v.io/x/lib/vlog"
idiscovery "v.io/x/ref/lib/discovery"
"v.io/x/ref/lib/stats"
)
@@ -42,6 +43,7 @@
done()
return err
}
+ vlog.Infof("HACK: ble plugin advertising: %v", adinfo)
stop := func() {
p.advertiser.removeAd(adinfo)
done()
@@ -84,6 +86,7 @@
for {
select {
case adinfo := <-listener:
+ vlog.Infof("HACK: ble plugin got adinfo: %v", adinfo)
if adinfo.Lost {
seenMu.Lock()
delete(seen, adinfo.Ad.Id)
diff --git a/lib/discovery/plugins/mdns/mdns.go b/lib/discovery/plugins/mdns/mdns.go
index 77dae7e..82f8226 100644
--- a/lib/discovery/plugins/mdns/mdns.go
+++ b/lib/discovery/plugins/mdns/mdns.go
@@ -29,8 +29,8 @@
"v.io/v23/context"
"v.io/v23/discovery"
-
"v.io/x/lib/netconfig"
+ "v.io/x/lib/vlog"
idiscovery "v.io/x/ref/lib/discovery"
)
@@ -100,6 +100,7 @@
}
// Announce the service.
+ vlog.Infof("HACK: announcing service %v on %v", serviceName, hostName)
err = p.mdns.AddService(serviceName, hostName, 0, txt...)
if err != nil {
done()
@@ -108,11 +109,13 @@
// Announce it as v23 service as well so that we can discover
// all v23 services through mDNS.
+ vlog.Infof("HACK: announcing v23service %v on %v", v23ServiceName, hostName)
err = p.mdns.AddService(v23ServiceName, hostName, 0, txt...)
if err != nil {
done()
return err
}
+ vlog.Infof("HACK: done announcing")
stop := func() {
p.mdns.RemoveService(serviceName, hostName, 0, txt...)
p.mdns.RemoveService(v23ServiceName, hostName, 0, txt...)
@@ -145,12 +148,14 @@
select {
case service = <-watcher:
case <-time.After(p.subscriptionRefreshTime):
+ vlog.Infof("HACK: refreshing subscription for %v", serviceName)
p.refreshSubscription(serviceName)
continue
case <-ctx.Done():
return
}
adinfo, err := newAdInfo(service)
+ vlog.Infof("HACK: mdns plugin got adinfo %v", adinfo)
if err != nil {
ctx.Error(err)
continue
@@ -436,7 +441,7 @@
v4addr = fmt.Sprintf("224.0.0.251:%d", port)
v6addr = fmt.Sprintf("[FF02::FB]:%d", port)
}
- m, err := mdns.NewMDNS(host, v4addr, v6addr, loopback, 0)
+ m, err := mdns.NewMDNS(host, v4addr, v6addr, loopback, 0) // MAYBE INCREASE LOGLEVEL HERE
if err != nil {
// The name may not have been unique. Try one more time with a unique
// name. NewMDNS will replace the "()" with "(hardware mac address)".
diff --git a/lib/discovery/scan.go b/lib/discovery/scan.go
index 095347f..c901b54 100644
--- a/lib/discovery/scan.go
+++ b/lib/discovery/scan.go
@@ -10,6 +10,7 @@
"v.io/v23/context"
"v.io/v23/discovery"
+ "v.io/x/lib/vlog"
)
type scanChanElem struct {
@@ -101,6 +102,22 @@
plugin, adinfo := e.src, e.val
id := adinfo.Ad.Id
prev := seen[adinfo.Ad.Id]
+ /* THIS IS A HACK TO FORCE BT-CLASSIC RPCs.
+ var newAddrs []string
+ for _, a := range adinfo.Ad.Addresses {
+ if strings.HasPrefix(a, "/@6@bt") {
+ newAddrs = append(newAddrs, a)
+ } else if strings.HasPrefix(a, "/") {
+ newAddrs = append(newAddrs, "/invalid"+a)
+ } else {
+ newAddrs = append(newAddrs, a)
+ }
+ }
+ adinfo.Ad.Addresses = newAddrs
+ */
+ if d.getAdSession(id) != session {
+ vlog.Infof("HACK: got adinfo: %v", adinfo)
+ }
if adinfo.Lost {
// A 'Lost' advertisement may not have complete
// information. Send the lost notification on
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 157e394..5b10499 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -8,6 +8,7 @@
"fmt"
"math"
"reflect"
+ "runtime"
"sync"
"time"
@@ -656,7 +657,11 @@
func (c *Conn) internalCloseLocked(ctx *context.T, closedRemotely bool, err error) {
debug := ctx.VI(2)
- debug.Infof("Closing connection: %v", err)
+ stack := make([]byte, 10000)
+ stackEnd := runtime.Stack(stack, false)
+ stack = stack[:stackEnd]
+ ctx.Infof("Closing connection to %v(%p) because %v: %v [stack size %d] %s", c.remote, c, closedRemotely, err, stackEnd, string(stack))
+ ctx.InfoStack(false)
flows := make([]*flw, 0, len(c.flows))
for _, f := range c.flows {
diff --git a/runtime/internal/flow/manager/conncache.go b/runtime/internal/flow/manager/conncache.go
index 7645c14..ab8821d 100644
--- a/runtime/internal/flow/manager/conncache.go
+++ b/runtime/internal/flow/manager/conncache.go
@@ -164,10 +164,10 @@
for _, e := range entries {
if status := e.conn.Status(); status >= conn.Closing {
// Remove undialable conns.
- c.removeEntryLocked(e)
+ c.removeEntryLocked(ctx, e)
} else if status == conn.LameDuckAcknowledged && e.conn.CloseIfIdle(ctx, c.idleExpiry) {
// Close and remove lameducked or idle connections.
- c.removeEntryLocked(e)
+ c.removeEntryLocked(ctx, e)
num--
} else {
entries[k] = e
@@ -188,7 +188,7 @@
// Kill idle connections.
if num > 0 && !e.conn.IsEncapsulated() && e.conn.CloseIfIdle(ctx, c.idleExpiry) {
num--
- c.removeEntryLocked(e)
+ c.removeEntryLocked(ctx, e)
continue
}
// Lameduck idle connections.
@@ -213,7 +213,7 @@
for i := 0; i < num && i < len(entries); i++ {
e := entries[i]
e.conn.Close(ctx, err)
- c.removeEntryLocked(e)
+ c.removeEntryLocked(ctx, e)
}
return nil
}
@@ -308,7 +308,7 @@
c.mu.Unlock()
return nil, nil, nil, nil
}
- entries := c.makeRTTEntriesLocked(c.ridCache[rid])
+ entries := c.makeRTTEntriesLocked(ctx, c.ridCache[rid])
c.mu.Unlock()
conn, names, rejected := c.pickFirstAuthorizedConn(ctx, remote, entries, auth)
@@ -332,7 +332,7 @@
}
}
c.started[k] = true
- entries := c.makeRTTEntriesLocked(c.addrCache[k])
+ entries := c.makeRTTEntriesLocked(ctx, c.addrCache[k])
c.mu.Unlock()
conn, names, rejected := c.pickFirstAuthorizedConn(ctx, remote, entries, auth)
@@ -355,7 +355,7 @@
return nil, nil, nil, NewErrCacheClosed(ctx)
}
k := key(network, address)
- entries := c.makeRTTEntriesLocked(c.addrCache[k])
+ entries := c.makeRTTEntriesLocked(ctx, c.addrCache[k])
c.mu.Unlock()
if conn, names, rejected := c.pickFirstAuthorizedConn(ctx, remote, entries, auth); conn != nil {
@@ -366,7 +366,7 @@
return nil, nil, nil, nil
}
-func (c *ConnCache) makeRTTEntriesLocked(es []*connEntry) rttEntries {
+func (c *ConnCache) makeRTTEntriesLocked(ctx *context.T, es []*connEntry) rttEntries {
if len(es) == 0 {
return nil
}
@@ -378,7 +378,7 @@
k := 0
for _, e := range entries {
if status := e.conn.Status(); status >= conn.Closing {
- c.removeEntryLocked(e)
+ c.removeEntryLocked(ctx, e)
} else if !e.conn.RemoteLameDuck() {
entries[k] = e
k++
@@ -405,7 +405,12 @@
return nil, nil, nil
}
-func (c *ConnCache) removeEntryLocked(entry *connEntry) {
+func (c *ConnCache) removeEntryLocked(ctx *context.T, entry *connEntry) {
+ if entry != nil {
+ ctx.Infof("removing conn to %v(%p) from cache",
+ entry.conn.RemoteEndpoint(), entry.conn)
+ }
+
addrConns, ok := c.addrCache[entry.addrKey]
if ok {
addrConns = removeEntryFromSlice(addrConns, entry)
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 2c9220b..f422f32 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -84,6 +84,7 @@
flowMgr flow.Manager
preferredProtocols []string
ctx *context.T
+ outstanding *outstandingStats
// stop is kept for backward compatibilty to implement Close().
// TODO(mattr): deprecate Close.
stop func()
@@ -101,11 +102,13 @@
func NewClient(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
ctx, cancel := context.WithCancel(ctx)
+ statsPrefix := fmt.Sprintf("rpc/client/outstanding/%p", ctx)
c := &client{
- ctx: ctx,
- typeCache: newTypeCache(),
- stop: cancel,
- closed: make(chan struct{}),
+ ctx: ctx,
+ typeCache: newTypeCache(),
+ stop: cancel,
+ closed: make(chan struct{}),
+ outstanding: newOutstandingStats(statsPrefix),
}
connIdleExpiry := time.Duration(0)
@@ -131,6 +134,7 @@
<-c.flowMgr.Closed()
c.wg.Wait()
+ c.outstanding.close()
close(c.closed)
}()
@@ -179,7 +183,8 @@
return nil, err
}
- fc, err := newFlowClient(ctx, r.flow, r.typeEnc, r.typeDec)
+ removeStat := c.outstanding.start(method, r.flow.RemoteEndpoint())
+ fc, err := newFlowClient(ctx, removeStat, r.flow, r.typeEnc, r.typeDec)
if err != nil {
return nil, err
}
@@ -583,26 +588,33 @@
sendClosedMu sync.Mutex
sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
finished bool // has Finish() already been called?
+ removeStat func()
}
var _ rpc.ClientCall = (*flowClient)(nil)
var _ rpc.Stream = (*flowClient)(nil)
-func newFlowClient(ctx *context.T, flow flow.Flow, typeEnc *vom.TypeEncoder, typeDec *vom.TypeDecoder) (*flowClient, error) {
+func newFlowClient(ctx *context.T, removeStat func(), flow flow.Flow, typeEnc *vom.TypeEncoder, typeDec *vom.TypeDecoder) (*flowClient, error) {
bf := conn.NewBufferingFlow(ctx, flow)
if _, err := bf.Write([]byte{dataFlow}); err != nil {
flow.Close()
+ removeStat()
return nil, err
}
fc := &flowClient{
- ctx: ctx,
- flow: bf,
- dec: vom.NewDecoderWithTypeDecoder(bf, typeDec),
- enc: vom.NewEncoderWithTypeEncoder(bf, typeEnc),
+ ctx: ctx,
+ flow: bf,
+ dec: vom.NewDecoderWithTypeDecoder(bf, typeDec),
+ enc: vom.NewEncoderWithTypeEncoder(bf, typeEnc),
+ removeStat: removeStat,
}
return fc, nil
}
+func (fc *flowClient) Conn() flow.ManagedConn {
+ return fc.flow.Conn()
+}
+
// close determines the appropriate error to return, in particular,
// if a timeout or cancelation has occured then any error
// is turned into a timeout or cancelation as appropriate.
@@ -617,6 +629,7 @@
// which case we'll get an error. Not clear what to do.
//return verror.New(verror.ErrInternal, fc.ctx, subErr)
}
+ fc.removeStat()
switch verror.ErrorID(err) {
case verror.ErrCanceled.ID:
return err
@@ -866,6 +879,7 @@
return fc.close(berr)
}
}
+ fc.close(nil)
return nil
}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 6b8566a..48fd4e4 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -78,7 +78,8 @@
isLeaf bool
lameDuckTimeout time.Duration // the time to wait for inflight operations to finish on shutdown
- stats *rpcStats // stats for this server.
+ stats *rpcStats // stats for this server.
+ outstanding *outstandingStats
}
func WithNewServer(ctx *context.T,
@@ -123,6 +124,7 @@
endpoints: make(map[string]naming.Endpoint),
lameDuckTimeout: 5 * time.Second,
closed: make(chan struct{}),
+ outstanding: newOutstandingStats(naming.Join("rpc", "server", "outstanding", rid.String())),
}
channelTimeout := time.Duration(0)
connIdleExpiry := time.Duration(0)
@@ -410,7 +412,7 @@
}
}
if ch, err = s.flowMgr.Listen(ctx, protocol, address); err != nil {
- s.ctx.Errorf("Listen(%q, %q, ...) failed: %v", protocol, address, err)
+ //s.ctx.Errorf("Listen(%q, %q, ...) failed: %v", protocol, address, err)
}
}
}
@@ -607,6 +609,7 @@
discharges map[string]security.Discharge
starttime time.Time
endStreamArgs bool // are the stream args at EOF?
+ removeStat func()
}
var (
@@ -659,6 +662,11 @@
func (fs *flowServer) serve() error {
defer fs.flow.Close()
+ defer func() {
+ if fs.removeStat != nil {
+ fs.removeStat()
+ }
+ }()
ctx, results, err := fs.processRequest()
vtrace.GetSpan(ctx).Finish()
@@ -745,6 +753,7 @@
ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
return ctx, nil, err
}
+ fs.removeStat = fs.server.outstanding.start(req.Method, fs.flow.RemoteEndpoint())
// Start building up a new context for the request now that we know
// the header information.
diff --git a/runtime/internal/rpc/stats.go b/runtime/internal/rpc/stats.go
index 640618a..524bdc9 100644
--- a/runtime/internal/rpc/stats.go
+++ b/runtime/internal/rpc/stats.go
@@ -5,6 +5,9 @@
package rpc
import (
+ "bytes"
+ "fmt"
+ "sort"
"sync"
"time"
@@ -15,6 +18,79 @@
"v.io/v23/naming"
)
+type outstandingCall struct {
+ remote naming.Endpoint
+ method string
+ when time.Time
+}
+
+type outstandingCalls []*outstandingCall
+
+func (oc outstandingCalls) Less(i, j int) bool {
+ return oc[i].when.Before(oc[j].when)
+}
+func (oc outstandingCalls) Swap(i, j int) {
+ oc[i], oc[j] = oc[j], oc[i]
+}
+func (oc outstandingCalls) Len() int {
+ return len(oc)
+}
+
+type outstandingStats struct {
+ prefix string
+ mu sync.Mutex
+ outstanding map[*outstandingCall]bool
+}
+
+func newOutstandingStats(prefix string) *outstandingStats {
+ o := &outstandingStats{
+ prefix: prefix,
+ outstanding: make(map[*outstandingCall]bool),
+ }
+ stats.NewStringFunc(prefix, o.String)
+ return o
+}
+
+func (o *outstandingStats) String() string {
+ defer o.mu.Unlock()
+ o.mu.Lock()
+ if len(o.outstanding) == 0 {
+ return "No outstanding calls."
+ }
+ calls := make(outstandingCalls, 0, len(o.outstanding))
+ for o := range o.outstanding {
+ calls = append(calls, o)
+ }
+ sort.Sort(calls)
+ now := time.Now()
+ buf := &bytes.Buffer{}
+ for _, o := range calls {
+ fmt.Fprintf(buf, "%s age:%v from:%v\n", o.method, now.Sub(o.when), o.remote)
+ }
+ return buf.String()
+}
+
+func (o *outstandingStats) close() {
+ stats.Delete(o.prefix)
+}
+
+func (o *outstandingStats) start(method string, remote naming.Endpoint) func() {
+ o.mu.Lock()
+ nw := &outstandingCall{
+ method: method,
+ remote: remote,
+ when: time.Now(),
+ }
+ o.outstanding[nw] = true
+ o.mu.Unlock()
+
+ return func() {
+ o.mu.Lock()
+ delete(o.outstanding, nw)
+ o.mu.Unlock()
+ }
+}
+
type rpcStats struct {
mu sync.RWMutex
prefix string
diff --git a/services/debug/debug/browseserver/sbtree/colltree.go b/services/debug/debug/browseserver/sbtree/colltree.go
index 561ae5d..0e40812 100644
--- a/services/debug/debug/browseserver/sbtree/colltree.go
+++ b/services/debug/debug/browseserver/sbtree/colltree.go
@@ -10,6 +10,7 @@
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
+ "v.io/v23/vdl"
)
// CollectionTree has all the data for the collection page of the Syncbase debug
@@ -100,12 +101,14 @@
if state == gathering {
// Grab the value, put it and the key into a KeyVal, and
// add it to the page.
- var value interface{}
- err := stream.Value(&value)
- if err != nil {
- value = fmt.Sprintf("ERROR getting value: %v", err)
+ kv := keyVal{Index: rowCount, Key: key}
+ var value *vdl.Value
+ if err := stream.Value(&value); err != nil {
+ kv.Value = fmt.Sprintf("ERROR getting value: %v", err)
+ } else {
+ kv.Value = value
}
- page.KeyVals = append(page.KeyVals, keyVal{rowCount, key, value})
+ page.KeyVals = append(page.KeyVals, kv)
}
rowCount++
}
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
index cf33a9b..90528a5 100644
--- a/services/syncbase/vsync/blob.go
+++ b/services/syncbase/vsync/blob.go
@@ -192,8 +192,8 @@
// RPCs for managing blobs between Syncbase and its clients.
func (sd *syncDatabase) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
- vlog.VI(2).Infof("sync: CreateBlob: begin")
- defer vlog.VI(2).Infof("sync: CreateBlob: end")
+ vlog.Infof("sync: CreateBlob: begin")
+ defer vlog.Infof("sync: CreateBlob: end")
// Get this Syncbase's blob store handle.
ss := sd.sync.(*syncService)
@@ -206,13 +206,13 @@
defer writer.CloseWithoutFinalize()
name := writer.Name()
- vlog.VI(4).Infof("sync: CreateBlob: blob ref %s", name)
+ vlog.Infof("sync: CreateBlob: blob ref %s", name)
return wire.BlobRef(name), nil
}
func (sd *syncDatabase) PutBlob(ctx *context.T, call wire.BlobManagerPutBlobServerCall, br wire.BlobRef) error {
- vlog.VI(2).Infof("sync: PutBlob: begin br %v", br)
- defer vlog.VI(2).Infof("sync: PutBlob: end br %v", br)
+ vlog.Infof("sync: PutBlob: begin br %v", br)
+ defer vlog.Infof("sync: PutBlob: end br %v", br)
// Get this Syncbase's blob store handle.
ss := sd.sync.(*syncService)
@@ -235,8 +235,8 @@
}
func (sd *syncDatabase) CommitBlob(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) error {
- vlog.VI(2).Infof("sync: CommitBlob: begin br %v", br)
- defer vlog.VI(2).Infof("sync: CommitBlob: end br %v", br)
+ vlog.Infof("sync: CommitBlob: begin br %v", br)
+ defer vlog.Infof("sync: CommitBlob: end br %v", br)
// Get this Syncbase's blob store handle.
ss := sd.sync.(*syncService)
@@ -250,8 +250,8 @@
}
func (sd *syncDatabase) GetBlobSize(ctx *context.T, call rpc.ServerCall, br wire.BlobRef) (int64, error) {
- vlog.VI(2).Infof("sync: GetBlobSize: begin br %v", br)
- defer vlog.VI(2).Infof("sync: GetBlobSize: end br %v", br)
+ vlog.Infof("sync: GetBlobSize: begin br %v", br)
+ defer vlog.Infof("sync: GetBlobSize: end br %v", br)
// Get this Syncbase's blob store handle.
ss := sd.sync.(*syncService)
@@ -271,8 +271,8 @@
}
func (sd *syncDatabase) GetBlob(ctx *context.T, call wire.BlobManagerGetBlobServerCall, br wire.BlobRef, offset int64) error {
- vlog.VI(2).Infof("sync: GetBlob: begin br %v", br)
- defer vlog.VI(2).Infof("sync: GetBlob: end br %v", br)
+ vlog.Infof("sync: GetBlob: begin br %v", br)
+ defer vlog.Infof("sync: GetBlob: end br %v", br)
// First get the blob locally if available.
ss := sd.sync.(*syncService)
@@ -285,8 +285,8 @@
}
func (sd *syncDatabase) FetchBlob(ctx *context.T, call wire.BlobManagerFetchBlobServerCall, br wire.BlobRef, priority uint64) error {
- vlog.VI(2).Infof("sync: FetchBlob: begin br %v", br)
- defer vlog.VI(2).Infof("sync: FetchBlob: end br %v", br)
+ vlog.Infof("sync: FetchBlob: begin br %v", br)
+ defer vlog.Infof("sync: FetchBlob: end br %v", br)
clientStream := call.SendStream()
@@ -331,8 +331,8 @@
func (s *syncService) FetchBlob(ctx *context.T, call interfaces.SyncFetchBlobServerCall, br wire.BlobRef,
remoteSgPriorities interfaces.SgPriorities) (sharesToTransfer interfaces.BlobSharesBySyncgroup, err error) {
- vlog.VI(2).Infof("sync: FetchBlob: sb-sb begin br %v", br)
- defer vlog.VI(2).Infof("sync: FetchBlob: sb-sb end br %v", br)
+ vlog.Infof("sync: FetchBlob: sb-sb begin br %v", br)
+ defer vlog.Infof("sync: FetchBlob: sb-sb end br %v", br)
err = getLocalBlob(ctx, call.SendStream(), s.bst, br, 0)
if err == nil {
@@ -379,8 +379,8 @@
func (s *syncService) HaveBlob(ctx *context.T, call rpc.ServerCall,
br wire.BlobRef) (size int64, signpost interfaces.Signpost, err error) {
- vlog.VI(2).Infof("sync: HaveBlob: begin br %v", br)
- defer vlog.VI(2).Infof("sync: HaveBlob: end br %v", br)
+ vlog.Infof("sync: HaveBlob: begin br %v", br)
+ defer vlog.Infof("sync: HaveBlob: end br %v", br)
// In this routine we do not set err!=nil if the blob is unavailable.
// Instead set size==-1, and set signpost.
@@ -496,8 +496,8 @@
// blob and sends it to the client. If the blob is found, it starts reading it
// from the given offset and sends its bytes into the client stream.
func getLocalBlob(ctx *context.T, stream byteStream, bst blob.BlobStore, br wire.BlobRef, offset int64) error {
- vlog.VI(4).Infof("sync: getLocalBlob: begin br %v, offset %v", br, offset)
- defer vlog.VI(4).Infof("sync: getLocalBlob: end br %v, offset %v", br, offset)
+ vlog.Infof("sync: getLocalBlob: begin br %v, offset %v", br, offset)
+ defer vlog.Infof("sync: getLocalBlob: end br %v, offset %v", br, offset)
reader, err := bst.NewBlobReader(ctx, string(br))
if err != nil {
@@ -529,8 +529,8 @@
}
func (sd *syncDatabase) fetchBlobRemote(ctx *context.T, br wire.BlobRef, statusCall wire.BlobManagerFetchBlobServerCall, dataCall wire.BlobManagerGetBlobServerCall, offset int64) error {
- vlog.VI(4).Infof("sync: fetchBlobRemote: begin br %v, offset %v", br, offset)
- defer vlog.VI(4).Infof("sync: fetchBlobRemote: end br %v, offset %v", br, offset)
+ vlog.Infof("sync: fetchBlobRemote: begin br %v, offset %v", br, offset)
+ defer vlog.Infof("sync: fetchBlobRemote: end br %v, offset %v", br, offset)
// TODO(m3b): If this is called concurrently on the same blobref, we'll do redundant work.
// We might also transfer too many ownership shares.
@@ -838,8 +838,8 @@
// TODO(hpucha): Add syncgroup driven blob discovery.
func (sd *syncDatabase) locateBlob(ctx *context.T, br wire.BlobRef) (string, int64, error) {
- vlog.VI(4).Infof("sync: locateBlob: begin br %v", br)
- defer vlog.VI(4).Infof("sync: locateBlob: end br %v", br)
+ vlog.Infof("sync: locateBlob: begin br %v", br)
+ defer vlog.Infof("sync: locateBlob: end br %v", br)
ss := sd.sync.(*syncService)
var sp interfaces.Signpost
@@ -861,7 +861,7 @@
}
for i := 0; i != len(locationList) && i != maxLocationsInSignpost; i++ {
var p string = locationList[i].peer
- vlog.VI(4).Infof("sync: locateBlob: attempting %s", p)
+ vlog.Infof("sync: locateBlob: attempting %s", p)
// Get the mount tables for this peer.
mtTables, err := sd.getMountTables(ctx, p)
if err != nil {
@@ -876,7 +876,7 @@
if updatedSp {
ss.bst.SetSignpost(ctx, br, &sp)
}
- vlog.VI(4).Infof("sync: locateBlob: found blob on %s", absName)
+ vlog.Infof("sync: locateBlob: found blob on %s", absName)
return absName, size, nil
} else if err == nil { // no size, but remoteSp is valid.
// Add new locations to locationList so
@@ -967,8 +967,8 @@
objid := m.ObjId
srcPeer := syncbaseIdToName(m.Id)
- vlog.VI(4).Infof("sync: processBlobRefs: begin: objid %s, peer %s, src %s", objid, peer, srcPeer)
- defer vlog.VI(4).Infof("sync: processBlobRefs: end: objid %s, peer %s, src %s", objid, peer, srcPeer)
+ vlog.Infof("sync: processBlobRefs: begin: objid %s, peer %s, src %s", objid, peer, srcPeer)
+ defer vlog.Infof("sync: processBlobRefs: end: objid %s, peer %s, src %s", objid, peer, srcPeer)
if rawValue == nil {
return nil
@@ -1014,7 +1014,7 @@
// Associate the blob metadata with each blob ref. Create a separate
// copy of the syncgroup set for each blob ref.
for br := range brs {
- vlog.VI(4).Infof("sync: processBlobRefs: found blobref %v, sgs %v", br, allSgIds)
+ vlog.Infof("sync: processBlobRefs: found blobref %v, sgs %v", br, allSgIds)
sp := interfaces.Signpost{Locations: make(interfaces.PeerToLocationDataMap), SgIds: make(sgSet)}
var peerSyncgroups sgSet = s.syncgroupsWithServer(ctx, dbId, peer, allSgIds)
var srcPeerSyncgroups sgSet
diff --git a/services/syncbase/vsync/clock.go b/services/syncbase/vsync/clock.go
index 7598895..462aee1 100644
--- a/services/syncbase/vsync/clock.go
+++ b/services/syncbase/vsync/clock.go
@@ -23,8 +23,8 @@
// TODO(sadovsky): This method does zero authorization, which means any client
// can send a syncgroup these requests. This seems undesirable.
func (s *syncService) GetTime(ctx *context.T, call rpc.ServerCall, req interfaces.TimeReq, initiator string) (interfaces.TimeResp, error) {
- vlog.VI(2).Infof("sync: GetTime: begin: from initiator %s", initiator)
- defer vlog.VI(2).Infof("sync: GetTime: end: from initiator %s", initiator)
+ vlog.Infof("sync: GetTime: begin: from initiator %s", initiator)
+ defer vlog.Infof("sync: GetTime: end: from initiator %s", initiator)
// For detecting sysclock changes that would break the NTP send/recv time
// calculations. See detailed comment in vclock/ntp.go.
@@ -62,8 +62,8 @@
// Works by treating the peer as an NTP server, obtaining a single sample, and
// then updating the local VClockData as appropriate.
func (s *syncService) syncVClock(ctxIn *context.T, peer connInfo) error {
- vlog.VI(2).Infof("sync: syncVClock: begin: contacting peer %v", peer)
- defer vlog.VI(2).Infof("sync: syncVClock: end: contacting peer %v", peer)
+ vlog.Infof("sync: syncVClock: begin: contacting peer %v", peer)
+ defer vlog.Infof("sync: syncVClock: end: contacting peer %v", peer)
// For detecting sysclock changes that would break the NTP send/recv time
// calculations. See detailed comment in vclock/ntp.go.
@@ -115,7 +115,7 @@
if _, ok := err.(*vclock.NoUpdateErr); ok {
// Note, we still return an error in this case so that UpdateVClockData
// does not even attempt to write VClockData back to the store.
- vlog.VI(2).Infof("sync: syncVClock: decided not to update VClockData: %v", err)
+ vlog.Infof("sync: syncVClock: decided not to update VClockData: %v", err)
} else {
vlog.Errorf("sync: syncVClock: error updating VClockData: %v", err)
}
diff --git a/services/syncbase/vsync/conflict_resolution.go b/services/syncbase/vsync/conflict_resolution.go
index 3b512ee..9b3747e 100644
--- a/services/syncbase/vsync/conflict_resolution.go
+++ b/services/syncbase/vsync/conflict_resolution.go
@@ -33,21 +33,21 @@
// resolved by adding new versions or picking either the local or the remote
// version.
func (iSt *initiationState) resolveConflicts(ctx *context.T) error {
- vlog.VI(2).Infof("cr: resolveConflicts: start")
- defer vlog.VI(2).Infof("cr: resolveConflicts: end")
+ //vlog.Infof("cr: resolveConflicts: start")
+ //defer vlog.Infof("cr: resolveConflicts: end")
// Lookup schema for the database to figure out the CR policy set by the
// application.
schema, err := iSt.getDbSchema(ctx)
if err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
- vlog.VI(2).Infof("cr: resolveConflicts: no schema found, resolving based on timestamp")
+ //vlog.Infof("cr: resolveConflicts: no schema found, resolving based on timestamp")
return iSt.resolveViaTimestamp(ctx, iSt.updObjects)
}
vlog.Errorf("sync: resolveConflicts: error while fetching schema: %v", err)
return err
}
- vlog.VI(2).Infof("cr: resolveConflicts: schema found.")
+ vlog.Infof("cr: resolveConflicts: schema found.")
// Categorize conflicts based on CR policy in schema.
conflictsByType := iSt.groupConflictsByType(schema)
diff --git a/services/syncbase/vsync/cr_app_resolves.go b/services/syncbase/vsync/cr_app_resolves.go
index 3d1fd9d..759724f 100644
--- a/services/syncbase/vsync/cr_app_resolves.go
+++ b/services/syncbase/vsync/cr_app_resolves.go
@@ -53,15 +53,15 @@
appConn := db.CrConnectionStream()
if appConn == nil {
// CR not possible now, delay conflict resolution
- vlog.VI(2).Infof("sync: resolveViaApp: No ConflictResolution stream available for db. App based resolution cannot move forward.")
+ vlog.Infof("sync: resolveViaApp: No ConflictResolution stream available for db. App based resolution cannot move forward.")
return interfaces.NewErrBrokenCrConnection(ctx)
}
sendStream := appConn.SendStream()
recvStream := appConn.RecvStream()
- vlog.VI(2).Infof("cr: resolveViaApp: starting app based resolution on %d groups", len(groupedConflicts.groups))
+ vlog.Infof("cr: resolveViaApp: starting app based resolution on %d groups", len(groupedConflicts.groups))
for i, group := range groupedConflicts.groups {
- vlog.VI(2).Infof("cr: resolveViaApp: sending conflict group %d to app", i)
+ vlog.Infof("cr: resolveViaApp: sending conflict group %d to app", i)
// Send all batches first
if err := sendBatches(ctx, iSt, sendStream, db, group); err != nil {
return err
@@ -84,11 +84,11 @@
// CR not possible now, delay conflict resolution
// TODO(jlodhia):[usability] send an error message to app.
errStr := fmt.Sprintf("cr: resolveViaApp: Resolution for oid %s expected but not received", oid)
- vlog.VI(2).Infof(errStr)
+ vlog.Infof(errStr)
return verror.New(verror.ErrInternal, ctx, errStr)
}
}
- vlog.VI(2).Infof("cr: resolveViaApp: all resolutions received")
+ vlog.Infof("cr: resolveViaApp: all resolutions received")
// Process resolutions.
conf := iSt.config
@@ -114,7 +114,7 @@
// CR not possible now, delay conflict resolution
// Remove the outdated cr connection object from database.
db.ResetCrConnectionStream()
- vlog.VI(2).Infof("sync: resolveViaApp: Error while sending conflict over stream: %v", err)
+ vlog.Infof("sync: resolveViaApp: Error while sending conflict over stream: %v", err)
return interfaces.NewErrBrokenCrConnection(ctx)
}
}
@@ -134,7 +134,7 @@
// CR not possible now, delay conflict resolution
// Remove the outdated cr connection object from database.
db.ResetCrConnectionStream()
- vlog.VI(2).Infof("sync: resolveViaApp: Error while sending conflict over stream: %v", err)
+ vlog.Infof("sync: resolveViaApp: Error while sending conflict over stream: %v", err)
return interfaces.NewErrBrokenCrConnection(ctx)
}
}
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index f969c86..9f845ad 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -529,7 +529,7 @@
// snapshotted head is the common ancestor.
isConflict = true
if numHeads == 1 {
- vlog.VI(4).Infof("sync: hasConflict: old graft snapshot %v, head %s", graft, oldHead)
+ vlog.Infof("sync: hasConflict: old graft snapshot %v, head %s", graft, oldHead)
ancestor = info.oldHeadSnap
return
}
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 85662cf..319365f 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -67,14 +67,14 @@
//
// TODO(hpucha): Check the idempotence, esp in addNode in DAG.
func (s *syncService) getDeltasFromPeer(ctx *context.T, peer connInfo) error {
- vlog.VI(2).Infof("sync: getDeltasFromPeer: begin: contacting peer %v", peer)
- defer vlog.VI(2).Infof("sync: getDeltasFromPeer: end: contacting peer %v", peer)
+ vlog.Infof("sync: getDeltasFromPeer: begin: contacting peer %v", peer)
+ defer vlog.Infof("sync: getDeltasFromPeer: end: contacting peer %v", peer)
var errFinal error // the last non-nil error encountered is returned to the caller.
info := s.copyMemberInfo(ctx, peer.relName)
if info == nil {
- vlog.VI(4).Infof("sync: getDeltasFromPeer: copyMemberInfo failed %v", peer)
+ vlog.Infof("sync: getDeltasFromPeer: copyMemberInfo failed %v", peer)
return verror.New(verror.ErrInternal, ctx, peer.relName, "no member info found")
}
@@ -102,8 +102,8 @@
// getDBDeltas performs an initiation round for the specified database.
func (s *syncService) getDBDeltas(ctx *context.T, dbId wire.Id, info sgMember, peer connInfo) (connInfo, error) {
- vlog.VI(2).Infof("sync: getDBDeltas: begin: contacting peer %v for db %v", peer, dbId)
- defer vlog.VI(2).Infof("sync: getDBDeltas: end: contacting peer %v for db %v", peer, dbId)
+ vlog.Infof("sync: getDBDeltas: begin: contacting peer %v for db %v", peer, dbId)
+ defer vlog.Infof("sync: getDBDeltas: end: contacting peer %v for db %v", peer, dbId)
// Note that the "identify" step is done once per database for privacy
// reasons. When the app blesses Syncbase during syncgroup create/join,
@@ -174,15 +174,15 @@
// syncgroups that are allowed to be synced with it based on its current
// syncgroup acls.
func (s *syncService) filterSyncgroups(ctx *context.T, c *initiationConfig, blessingNames []string) error {
- vlog.VI(2).Infof("sync: filterSyncGroups: begin")
- defer vlog.VI(2).Infof("sync: filterSyncGroups: end")
+ vlog.Infof("sync: filterSyncGroups: begin")
+ defer vlog.Infof("sync: filterSyncGroups: end")
// Perform authorization.
if len(blessingNames) == 0 {
return verror.New(verror.ErrNoAccess, ctx)
}
- vlog.VI(4).Infof("sync: filterSyncGroups: got peer names %v", blessingNames)
+ vlog.Infof("sync: filterSyncGroups: got peer names %v", blessingNames)
remSgIds := make(sgSet)
@@ -218,7 +218,7 @@
// on the GetDeltas RPC to authorize the initiator and this
// makes it symmetric.
if !isAuthorizedForTag(sg.Spec.Perms, access.Read, blessingNames) {
- vlog.VI(4).Infof("sync: filterSyncGroups: skipping sg %v", gid)
+ vlog.Infof("sync: filterSyncGroups: skipping sg %v", gid)
continue
}
@@ -244,8 +244,8 @@
// getDeltas gets the deltas from the chosen peer. If sg flag is set to true, it
// will sync syncgroup metadata. If sg flag is false, it will sync data.
func (s *syncService) getDeltas(ctxIn *context.T, c *initiationConfig, sg bool) error {
- vlog.VI(2).Infof("sync: getDeltas: begin: contacting peer sg %v %v", sg, c.peer)
- defer vlog.VI(2).Infof("sync: getDeltas: end: contacting peer sg %v %v", sg, c.peer)
+ vlog.Infof("sync: getDeltas: begin: contacting peer sg %v %v", sg, c.peer)
+ defer vlog.Infof("sync: getDeltas: end: contacting peer sg %v %v", sg, c.peer)
ctx, cancel := context.WithCancel(ctxIn)
// cancel() is idempotent.
@@ -312,7 +312,7 @@
updateAllSyncgroupPriorities(ctx, s.bst, deltaFinalResp.SgPriorities)
}
- vlog.VI(4).Infof("sync: getDeltas: got reply: %v", iSt.remote)
+ vlog.Infof("sync: getDeltas: got reply: %v", iSt.remote)
// Process deltas locally.
return iSt.processUpdatedObjects(ctx)
@@ -436,7 +436,7 @@
// on incoming pause/resume requests.
if !iSt.config.sync.isDbSyncable(ctx, iSt.config.dbId) {
// The database is offline. Skip the db till it becomes syncable again.
- vlog.VI(1).Infof("sync: prepareDataDeltaReq: database not allowed to sync, skipping sync on db %v", iSt.config.dbId)
+ vlog.Infof("sync: prepareDataDeltaReq: database not allowed to sync, skipping sync on db %v", iSt.config.dbId)
return interfaces.NewErrDbOffline(ctx, iSt.config.dbId)
}
@@ -501,7 +501,7 @@
iSt.req = interfaces.DeltaReqData{req}
- vlog.VI(4).Infof("sync: prepareDataDeltaReq: request: %v", req)
+ vlog.Infof("sync: prepareDataDeltaReq: request: %v", req)
return nil
}
@@ -513,7 +513,7 @@
if !iSt.config.sync.isDbSyncable(ctx, iSt.config.dbId) {
// The database is offline. Skip the db till it becomes syncable again.
- vlog.VI(1).Infof("sync: prepareSGDeltaReq: database not allowed to sync, skipping sync on db %v", iSt.config.dbId)
+ vlog.Infof("sync: prepareSGDeltaReq: database not allowed to sync, skipping sync on db %v", iSt.config.dbId)
return interfaces.NewErrDbOffline(ctx, iSt.config.dbId)
}
@@ -531,7 +531,7 @@
iSt.req = interfaces.DeltaReqSgs{req}
- vlog.VI(4).Infof("sync: prepareSGDeltaReq: request: %v", req)
+ vlog.Infof("sync: prepareSGDeltaReq: request: %v", req)
return nil
}
@@ -594,7 +594,7 @@
}
}
- vlog.VI(4).Infof("sync: recvAndProcessDeltas: processing rec %v", rec)
+ vlog.Infof("sync: recvAndProcessDeltas: processing rec %v", rec)
if err := iSt.insertRecInLogAndDag(ctx, rec, batchId, tx); err != nil {
return err
}
@@ -755,7 +755,7 @@
}()
for {
- vlog.VI(4).Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects))
+ vlog.Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects))
iSt.tx = iSt.config.st.NewWatchableTransaction()
watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression
@@ -763,7 +763,7 @@
if count, err := iSt.detectConflicts(ctx); err != nil {
return err
} else {
- vlog.VI(4).Infof("sync: processUpdatedObjects: %d conflicts detected", count)
+ vlog.Infof("sync: processUpdatedObjects: %d conflicts detected", count)
}
if err := iSt.resolveConflicts(ctx); err != nil {
@@ -785,7 +785,7 @@
// we just run it asynchronously in its own goroutine.
go iSt.advertiseSyncgroups(ctx)
- vlog.VI(4).Info("sync: processUpdatedObjects: end: changes committed")
+ vlog.Info("sync: processUpdatedObjects: end: changes committed")
return nil
}
if verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
@@ -799,7 +799,7 @@
// solution. Next iteration will have coordination with watch
// thread to intelligently retry. Hence this value is not a
// config param.
- vlog.VI(4).Info("sync: processUpdatedObjects: retry due to local mutations")
+ vlog.Info("sync: processUpdatedObjects: retry due to local mutations")
iSt.tx.Abort()
time.Sleep(1 * time.Second)
iSt.resetConflictResolutionState(ctx)
@@ -889,7 +889,7 @@
return err
}
if string(version) != confSt.oldHead {
- vlog.VI(4).Infof("sync: updateDbAndSyncSt: concurrent updates %s %s", version, confSt.oldHead)
+ vlog.Infof("sync: updateDbAndSyncSt: concurrent updates %s %s", version, confSt.oldHead)
return store.NewErrConcurrentTransaction(ctx)
}
} else {
@@ -926,17 +926,17 @@
if !newVersDeleted {
if confSt.res.ty == createNew {
- vlog.VI(4).Infof("sync: updateDbAndSyncSt: PutAtVersion %s %s", objid, newVersion)
+ vlog.Infof("sync: updateDbAndSyncSt: PutAtVersion %s %s", objid, newVersion)
if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(objid), confSt.res.val, []byte(newVersion)); err != nil {
return err
}
}
- vlog.VI(4).Infof("sync: updateDbAndSyncSt: PutVersion %s %s", objid, newVersion)
+ vlog.Infof("sync: updateDbAndSyncSt: PutVersion %s %s", objid, newVersion)
if err := watchable.PutVersion(ctx, iSt.tx, []byte(objid), []byte(newVersion)); err != nil {
return err
}
} else {
- vlog.VI(4).Infof("sync: updateDbAndSyncSt: Deleting obj %s", objid)
+ vlog.Infof("sync: updateDbAndSyncSt: Deleting obj %s", objid)
if err := iSt.tx.Delete([]byte(objid)); err != nil {
return err
}
@@ -1078,7 +1078,7 @@
}
func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, objid, vers, par string, batchId, batchCount uint64) *LocalLogRec {
- vlog.VI(4).Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", objid, vers, par)
+ vlog.Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", objid, vers, par)
var gen, pos uint64
if iSt.sg {
@@ -1148,7 +1148,7 @@
if state.SyncPending {
curgv := genvecs[rpfx]
res := curgv.Compare(state.PendingGenVec)
- vlog.VI(4).Infof("sync: updateSyncSt: checking join pending %v, curgv %v, res %v", state.PendingGenVec, curgv, res)
+ vlog.Infof("sync: updateSyncSt: checking join pending %v, curgv %v, res %v", state.PendingGenVec, curgv, res)
if res >= 0 {
state.SyncPending = false
if err := setSGIdEntry(ctx, iSt.tx, gid, state); err != nil {
@@ -1261,19 +1261,19 @@
// runRemoteOp runs the remoteOp on the server specified by absName.
// It is the caller's responsibility to cancel the ctx argument.
func runRemoteOp(ctxIn *context.T, absName string, op remoteOp) (interface{}, error) {
- vlog.VI(4).Infof("sync: runRemoteOp: begin for %v", absName)
+ vlog.Infof("sync: runRemoteOp: begin for %v", absName)
ctx, cancel := context.WithCancel(ctxIn)
resp, err := op(ctx, absName)
if err == nil {
- vlog.VI(4).Infof("sync: runRemoteOp: end op established for %s", absName)
+ vlog.Infof("sync: runRemoteOp: end op established for %s", absName)
// Responsibility for calling cancel() is passed to caller.
return resp, nil
}
- vlog.VI(4).Infof("sync: runRemoteOp: end for %s, err %v", absName, err)
+ vlog.Infof("sync: runRemoteOp: end for %s, err %v", absName, err)
// TODO(hpucha): Fix error handling so that we do not assume that any error
// that is not ErrNoAccess or ErrGetTimeFailed is a connection error. Need to
diff --git a/services/syncbase/vsync/parameters.go b/services/syncbase/vsync/parameters.go
index 4b18430..b881d2e 100644
--- a/services/syncbase/vsync/parameters.go
+++ b/services/syncbase/vsync/parameters.go
@@ -14,7 +14,7 @@
// channelTimeout is the duration at which health checks will be requested on
// connections to peers.
- channelTimeout = 2 * time.Second
+ channelTimeout = 5 * time.Second
// NeighborConnectionTimeout is the time duration we wait for a connection to be
// established with a peer discovered from the neighborhood.
@@ -22,7 +22,7 @@
// TODO(suharshs): Make the timeouts below more dynamic based on network. (i.e.
// bt vs tcp). Currently bt connection establishment takes ~3 seconds, so
// neighborhood connections get more time than cloud connections.
- NeighborConnectionTimeout = 5 * time.Second
+ NeighborConnectionTimeout = 10 * time.Second
// cloudConnectionTimeout is the duration we wait for a connection to be
// established with a cloud peer.
diff --git a/services/syncbase/vsync/peer_manager.go b/services/syncbase/vsync/peer_manager.go
index 401ea06..ad17402 100644
--- a/services/syncbase/vsync/peer_manager.go
+++ b/services/syncbase/vsync/peer_manager.go
@@ -5,6 +5,8 @@
package vsync
import (
+ "bytes"
+ "fmt"
"sync"
"time"
@@ -14,6 +16,7 @@
"v.io/v23/verror"
"v.io/x/lib/set"
"v.io/x/lib/vlog"
+ "v.io/x/ref/lib/stats"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/ping"
"v.io/x/ref/services/syncbase/server/interfaces"
@@ -48,6 +51,8 @@
// updatePeerFromResponder updates information for a peer that the
// responder responds to.
updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gvs interfaces.Knowledge) error
+
+ exportStats(prefix string)
}
////////////////////////////////////////
@@ -121,6 +126,9 @@
// Once pinned.Unpin() is called, the connection will no longer be pinned in
// rpc cache, and healthCheck will return to the rpc default health check interval.
pinned flow.PinnedConn
+
+ // addedTime is the time at which the connection was put into the peer cache.
+ addedTime time.Time
}
type peerManagerImpl struct {
@@ -144,12 +152,37 @@
}
func newPeerManager(ctx *context.T, s *syncService, peerSelectionPolicy int) peerManager {
- return &peerManagerImpl{
+ pm := &peerManagerImpl{
s: s,
policy: peerSelectionPolicy,
peerTbl: make(map[string]*peerSyncInfo),
healthyPeerCache: make(map[string]*connInfo),
}
+ return pm
+}
+
+func (pm *peerManagerImpl) exportStats(prefix string) {
+ stats.NewStringFunc(naming.Join(prefix, "peers"), pm.debugStringForPeers)
+}
+
+func (pm *peerManagerImpl) debugStringForPeers() string {
+ pm.Lock()
+ defer pm.Unlock()
+ buf := &bytes.Buffer{}
+ for _, c := range pm.healthyPeerCache {
+ fmt.Fprintf(buf, "%v\n", c.debugString())
+ fmt.Fprintln(buf)
+ }
+ return buf.String()
+}
+
+func (c *connInfo) debugString() string {
+ buf := &bytes.Buffer{}
+ fmt.Fprintf(buf, "RELNAME: %v\n", c.relName)
+ fmt.Fprintf(buf, "MTTBLS: %v\n", c.mtTbls)
+ fmt.Fprintf(buf, "ADDRS: %v\n", c.addrs)
+ fmt.Fprintf(buf, "ADDEDTIME: %v\n", c.addedTime)
+ return buf.String()
}
func (pm *peerManagerImpl) managePeers(ctx *context.T) {
@@ -170,12 +203,12 @@
break
}
}
- vlog.VI(1).Info("sync: managePeers: channel closed, stop work and exit")
+ vlog.Info("sync: managePeers: channel closed, stop work and exit")
}
func (pm *peerManagerImpl) managePeersInternal(ctx *context.T) {
- vlog.VI(2).Info("sync: managePeersInternal: begin")
- defer vlog.VI(2).Info("sync: managePeersInternal: end")
+ //vlog.Info("sync: managePeersInternal: begin")
+ //defer vlog.Info("sync: managePeersInternal: end")
var peers []*connInfo
var viaMtTbl bool
@@ -218,7 +251,9 @@
// neighborhood peers until the cache entries expire.
}
+ now := time.Now()
for _, p := range peers {
+ p.addedTime = now
pm.healthyPeerCache[p.relName] = p
}
}
@@ -240,12 +275,12 @@
// Remove myself from the set.
delete(members, pm.s.name)
if len(members) == 0 {
- vlog.VI(4).Infof("sync: pickPeersToPingRandom: no sgmembers found")
+ //vlog.Infof("sync: pickPeersToPingRandom: no sgmembers found")
return nil, true
}
if pm.curCount == 0 {
- vlog.VI(4).Infof("sync: pickPeersToPingRandom: picking from all sgmembers")
+ //vlog.Infof("sync: pickPeersToPingRandom: picking from all sgmembers")
// Compute number of available peers.
n := 0
@@ -293,11 +328,11 @@
// Pick peers from the neighborhood if available.
neighbors := pm.s.filterDiscoveryPeers(members)
if len(neighbors) == 0 {
- vlog.VI(4).Infof("sync: pickPeersToPingRandom: no neighbors found")
+ //vlog.Infof("sync: pickPeersToPingRandom: no neighbors found")
return nil, false
}
- vlog.VI(4).Infof("sync: pickPeersToPingRandom: picking from neighbors")
+ //vlog.Infof("sync: pickPeersToPingRandom: picking from neighbors")
// Compute number of available peers.
n := 0
@@ -372,6 +407,7 @@
info.attemptTs = attemptTs
if failed { // Handle failed sync attempt.
// Evict the peer from healthyPeerCache.
+ ctx.Errorf("SUHARSH: Removing peer %v from cache due to failed RPC", peer)
delete(pm.healthyPeerCache, peer.relName)
if peer.addrs != nil {
@@ -416,6 +452,7 @@
// a goroutine monitoring each conn's status and reconnecting as needed.
select {
case <-info.pinned.Conn().Closed():
+ ctx.Errorf("SUHARSH: Removing peer %v from cache due to connection being closed", p)
delete(pm.healthyPeerCache, p)
info.pinned.Unpin()
default:
@@ -454,14 +491,14 @@
}
}
- vlog.VI(4).Infof("sync: pingPeers: sending names %v", names)
+ vlog.Infof("sync: pingPeers: sending names %v", names)
res, err := ping.PingInParallel(ctx, names, NeighborConnectionTimeout, channelTimeout)
if err != nil {
return nil
}
- vlog.VI(4).Infof("sync: pingPeers: returned result %v", res)
+ vlog.Infof("sync: pingPeers: returned result %v", res)
// Make a list of the successful peers with their mount tables or
// neighborhood addresses that succeeded.
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 49c16f3..62706c7 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -24,8 +24,8 @@
func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall,
req interfaces.DeltaReq, initiator string) (interfaces.DeltaFinalResp, error) {
- vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator)
- defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator)
+ vlog.Infof("sync: GetDeltas: begin: from initiator %s", initiator)
+ defer vlog.Infof("sync: GetDeltas: end: from initiator %s", initiator)
var finalResp interfaces.DeltaFinalResp
rSt, err := newResponderState(ctx, call, s, req, initiator)
@@ -135,11 +135,11 @@
// embedded, consider using a helper function to auto-fill it instead
// (see http://goo.gl/mEa4L0) but only incur that overhead when the
// logging level specified is enabled.
- vlog.VI(3).Infof("sync: sendDeltasPerDatabase: recvd %v: sgids %v, genvecs %v, sg %v", rSt.dbId, rSt.sgIds, rSt.initVecs, rSt.sg)
+ vlog.Infof("sync: sendDeltasPerDatabase: recvd %v: sgids %v, genvecs %v, sg %v", rSt.dbId, rSt.sgIds, rSt.initVecs, rSt.sg)
if !rSt.sync.isDbSyncable(ctx, rSt.dbId) {
// The database is offline. Skip the db till it becomes syncable again.
- vlog.VI(1).Infof("sync: sendDeltasPerDatabase: database not allowed to sync, skipping sync on db %v", rSt.dbId)
+ vlog.Infof("sync: sendDeltasPerDatabase: database not allowed to sync, skipping sync on db %v", rSt.dbId)
return interfaces.NewErrDbOffline(ctx, rSt.dbId)
}
@@ -149,7 +149,7 @@
// Check error from phase 1.
if err != nil {
- vlog.VI(4).Infof("sync: sendDeltasPerDatabase: failed authorization, err %v", err)
+ vlog.Infof("sync: sendDeltasPerDatabase: failed authorization, err %v", err)
return err
}
@@ -339,7 +339,7 @@
rSt.outVecs[pfx] = respgv
}
- vlog.VI(3).Infof("sync: computeDataDeltas: %v: diff %v, outvecs %v", rSt.dbId, rSt.diff, rSt.outVecs)
+ vlog.Infof("sync: computeDataDeltas: %v: diff %v, outvecs %v", rSt.dbId, rSt.diff, rSt.outVecs)
return nil
}
@@ -417,7 +417,7 @@
}
func (rSt *responderState) sendGenVecs(ctx *context.T) error {
- vlog.VI(3).Infof("sync: sendGenVecs: sending genvecs %v", rSt.outVecs)
+ vlog.Infof("sync: sendGenVecs: sending genvecs %v", rSt.outVecs)
sender := rSt.call.SendStream()
sender.Send(interfaces.DeltaRespGvs{rSt.outVecs})
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index d9ff29f..89f9bfd 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -29,6 +29,7 @@
"v.io/v23/verror"
"v.io/x/lib/vlog"
idiscovery "v.io/x/ref/lib/discovery"
+ "v.io/x/ref/lib/stats"
"v.io/x/ref/services/syncbase/common"
syncdis "v.io/x/ref/services/syncbase/discovery"
blob "v.io/x/ref/services/syncbase/localblobstore"
@@ -38,6 +39,34 @@
"v.io/x/ref/services/syncbase/vclock"
)
+var (
+ statMu sync.Mutex
+ statIdx int
+)
+
+func syncServiceStatName() string {
+ statMu.Lock()
+ ret := naming.Join("syncbase", "vsync", fmt.Sprint(statIdx))
+ statIdx++
+ statMu.Unlock()
+ return ret
+}
+func mapkeys(m map[string]*discovery.Advertisement) []string {
+ var ret []string
+ for k, v := range m {
+ ret = append(ret, fmt.Sprintf("%v: %v\n", k, *v))
+ }
+ return ret
+}
+
+func mapkeys2(m map[interfaces.GroupId]map[string]*discovery.Advertisement) []string {
+ var ret []string
+ for k := range m {
+ ret = append(ret, fmt.Sprint(k))
+ }
+ return ret
+}
+
// syncService contains the metadata for the sync module.
type syncService struct {
// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128
@@ -124,6 +153,9 @@
// Peer manager for managing peers to sync with.
pm peerManager
+
+ // Naming prefix at which debugging information is exported.
+ statPrefix string
}
// syncDatabase contains the metadata for syncing a database. This struct is
@@ -173,7 +205,17 @@
discovery: discovery,
publishInNh: publishInNh,
advSyncgroups: make(map[interfaces.GroupId]syncAdvertisementState),
+ statPrefix: syncServiceStatName(),
}
+ stats.NewStringFunc(s.statPrefix, func() string {
+ s.discoveryLock.Lock()
+ defer s.discoveryLock.Unlock()
+ return fmt.Sprintf(`
+Peers: %v
+Ads: %v
+Syncgroups: %v
+`, mapkeys(s.discoveryPeers), mapkeys(s.discoveryIds), mapkeys2(s.discoverySyncgroups))
+ })
data := &SyncData{}
if err := store.RunInTransaction(sv.St(), func(tx store.Transaction) error {
@@ -195,7 +237,7 @@
s.id = data.Id
s.name = syncbaseIdToName(s.id)
- vlog.VI(1).Infof("sync: New: Syncbase ID is %x", s.id)
+ vlog.Infof("sync: New: Syncbase ID is %x", s.id)
// Initialize in-memory state for the sync module before starting any threads.
if err := s.initSync(ctx); err != nil {
@@ -217,6 +259,7 @@
// Start the peer manager thread to maintain peers viable for syncing.
go s.pm.managePeers(ctx)
+ s.pm.exportStats(naming.Join("syncbase", s.name))
// Start initiator thread to periodically get deltas from peers. The
// initiator threads consults the peer manager to pick peers to sync
@@ -232,13 +275,14 @@
// Close waits for spawned sync threads to shut down, and closes the local blob
// store handle.
func Close(ss interfaces.SyncServerMethods) {
- vlog.VI(2).Infof("sync: Close: begin")
- defer vlog.VI(2).Infof("sync: Close: end")
+ vlog.Infof("sync: Close: begin")
+ defer vlog.Infof("sync: Close: end")
s := ss.(*syncService)
close(s.closed)
s.pending.Wait()
s.bst.Close()
+ stats.Delete(s.statPrefix)
}
func NewSyncDatabase(db interfaces.Database) *syncDatabase {
@@ -268,10 +312,11 @@
break
}
- vlog.VI(3).Infof("discoverNeighborhood: got an update, %+v\n", update)
+ vlog.Infof("discoverNeighborhood: got an update, %+v\n", update)
if update.IsLost() {
- s.updateDiscoveryInfo(update.Id().String(), nil)
+ ctx.Infof("HACK: ignoring LOST event for %s", update.Id())
+ // s.updateDiscoveryInfo(update.Id().String(), nil)
} else {
ad := update.Advertisement()
s.updateDiscoveryInfo(update.Id().String(), &ad)
@@ -282,7 +327,7 @@
}
}
- vlog.VI(1).Info("sync: discoverNeighborhood: channel closed, stop listening and exit")
+ vlog.Info("sync: discoverNeighborhood: channel closed, stop listening and exit")
}
// updateDiscoveryInfo adds or removes information about a sync peer or a
@@ -293,7 +338,7 @@
s.discoveryLock.Lock()
defer s.discoveryLock.Unlock()
- vlog.VI(3).Info("sync: updateDiscoveryInfo: %s: %+v, %p, current discoverySyncgroups is %+v", id, ad, ad, s.discoverySyncgroups)
+ vlog.Infof("sync: updateDiscoveryInfo: %s: %+v, %p, current discoverySyncgroups is %+v", id, ad, ad, s.discoverySyncgroups)
// The first time around initialize all discovery maps.
if s.discoveryIds == nil {
@@ -333,7 +378,7 @@
if admins == nil {
admins = make(map[string]*discovery.Advertisement)
s.discoverySyncgroups[gid] = admins
- vlog.VI(3).Infof("added advertisement %+v, %p as dbId %v, sgId %v, gid %v\n", admins, admins, dbId, sgId, gid)
+ vlog.Infof("added advertisement %+v, %p as dbId %v, sgId %v, gid %v\n", admins, admins, dbId, sgId, gid)
}
admins[id] = ad
@@ -341,7 +386,7 @@
delete(admins, id)
if len(admins) == 0 {
delete(s.discoverySyncgroups, gid)
- vlog.VI(3).Infof("deleted advertisement %+v, %p from dbId %v, sgId %v, gid %v\n", admins, admins, dbId, sgId, gid)
+ vlog.Infof("deleted advertisement %+v, %p from dbId %v, sgId %v, gid %v\n", admins, admins, dbId, sgId, gid)
}
}
}
@@ -403,8 +448,8 @@
// syncbased is restarted so that it can republish itself at the names being
// used in the syncgroups.
func AddNames(ctx *context.T, ss interfaces.SyncServerMethods, svr rpc.Server) error {
- vlog.VI(2).Infof("sync: AddNames: begin")
- defer vlog.VI(2).Infof("sync: AddNames: end")
+ vlog.Infof("sync: AddNames: begin")
+ defer vlog.Infof("sync: AddNames: end")
s := ss.(*syncService)
s.nameLock.Lock()
@@ -414,7 +459,7 @@
info := s.copyMemberInfo(ctx, s.name)
if info == nil || len(info.mtTables) == 0 {
- vlog.VI(2).Infof("sync: AddNames: end returning no names")
+ vlog.Infof("sync: AddNames: end returning no names")
return nil
}
@@ -424,12 +469,12 @@
// successful. So if a node is offline, it will publish the name
// when possible.
if err := svr.AddName(name); err != nil {
- vlog.VI(2).Infof("sync: AddNames: end returning AddName err %v", err)
+ vlog.Infof("sync: AddNames: end returning AddName err %v", err)
return err
}
}
if err := s.advertiseSyncbaseInNeighborhood(); err != nil {
- vlog.VI(2).Infof("sync: AddNames: end returning advertiseSyncbaseInNeighborhood err %v", err)
+ vlog.Infof("sync: AddNames: end returning advertiseSyncbaseInNeighborhood err %v", err)
return err
}
@@ -445,7 +490,7 @@
return err
}
if err := s.advertiseSyncgroupInNeighborhood(sg); err != nil {
- vlog.VI(2).Infof("sync: AddNames: end returning advertiseSyncgroupInNeighborhood err %v", err)
+ vlog.Infof("sync: AddNames: end returning advertiseSyncgroupInNeighborhood err %v", err)
return err
}
}
@@ -461,8 +506,8 @@
return nil
}
- vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: begin")
- defer vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: end")
+ vlog.Infof("sync: advertiseSyncbaseInNeighborhood: begin")
+ defer vlog.Infof("sync: advertiseSyncbaseInNeighborhood: end")
s.advLock.Lock()
defer s.advLock.Unlock()
@@ -484,7 +529,7 @@
ch, err := idiscovery.AdvertiseServer(ctx, s.discovery, s.svr, "", &sbService, nil)
if err == nil {
- vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: successful")
+ vlog.Infof("sync: advertiseSyncbaseInNeighborhood: successful")
s.cancelAdvSyncbase = func() {
stop()
<-ch
@@ -503,8 +548,8 @@
if !s.publishInNh {
return nil
}
- vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: begin")
- defer vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: end")
+ vlog.Infof("sync: advertiseSyncgroupInNeighborhood: begin")
+ defer vlog.Infof("sync: advertiseSyncgroupInNeighborhood: end")
s.advLock.Lock()
defer s.advLock.Unlock()
@@ -552,7 +597,7 @@
}
ctx, stop := context.WithCancel(s.ctx)
- vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: advertising %v", sbService)
+ vlog.Infof("sync: advertiseSyncgroupInNeighborhood: advertising %v", sbService)
// TODO(mattr): Unfortunately, discovery visibility isn't as powerful
// as an ACL. There's no way to represent the NotIn list. For now
@@ -561,7 +606,7 @@
visibility := sg.Spec.Perms[string(access.Read)].In
ch, err := idiscovery.AdvertiseServer(ctx, s.discovery, s.svr, "", &sbService, visibility)
if err == nil {
- vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: successful")
+ vlog.Infof("sync: advertiseSyncgroupInNeighborhood: successful")
cancel := func() {
stop()
<-ch
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index e63ce3b..ddba1a4 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -130,8 +130,8 @@
// c) republish names in mount tables for all syncgroups.
// d) in-memory queue of syncgroups to be published.
func (s *syncService) initSync(ctx *context.T) error {
- vlog.VI(2).Infof("sync: initSync: begin")
- defer vlog.VI(2).Infof("sync: initSync: end")
+ vlog.Infof("sync: initSync: begin")
+ defer vlog.Infof("sync: initSync: end")
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -159,7 +159,7 @@
dsInMem.isPaused = ds.IsPaused
}
- vlog.VI(2).Infof("sync: initSync: initing db %v, dsInMem %v", dbId, dsInMem)
+ vlog.Infof("sync: initSync: initing db %v, dsInMem %v", dbId, dsInMem)
sgCount := 0
@@ -205,13 +205,13 @@
}
info.checkptGen = info.gen - 1
- vlog.VI(4).Infof("sync: initSync: initing db %v sg %v info %v", dbId, sgoid, info)
+ vlog.Infof("sync: initSync: initing db %v sg %v info %v", dbId, sgoid, info)
return false
})
if sgCount == 0 {
- vlog.VI(2).Infof("sync: initSync: initing db %v done (no sgs found)", dbId)
+ vlog.Infof("sync: initSync: initing db %v done (no sgs found)", dbId)
return false
}
@@ -235,7 +235,7 @@
s.syncState[dbId] = dsInMem
- vlog.VI(2).Infof("sync: initSync: initing db %v done dsInMem %v (data %v)", dbId, dsInMem, dsInMem.data)
+ vlog.Infof("sync: initSync: initing db %v done dsInMem %v (data %v)", dbId, dsInMem, dsInMem.data)
return false
})
@@ -508,7 +508,7 @@
// updateDbPauseSt updates the db with the given isPaused state.
func (s *syncService) updateDbPauseSt(ctx *context.T, tx store.Transaction, dbId wire.Id, isPaused bool) error {
- vlog.VI(3).Infof("sync: updateDbPauseSt: updating sync paused for db %v with value %t", dbId, isPaused)
+ vlog.Infof("sync: updateDbPauseSt: updating sync paused for db %v with value %t", dbId, isPaused)
ss, err := getDbSyncState(ctx, tx)
if err != nil {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
diff --git a/services/syncbase/vsync/syncer.go b/services/syncbase/vsync/syncer.go
index a75275b..3a4a5f5 100644
--- a/services/syncbase/vsync/syncer.go
+++ b/services/syncbase/vsync/syncer.go
@@ -42,7 +42,7 @@
break
}
}
- vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
+ vlog.Info("sync: syncer: channel closed, stop work and exit")
}
func (s *syncService) syncerWork(ctx *context.T) {
@@ -60,10 +60,18 @@
}
err = s.syncVClock(ctx, peer)
+ if verror.ErrorID(err) == interfaces.ErrConnFail.ID {
+ ctx.Errorf("SUHARSHS: syncVClock failed due to %v", verror.DebugString(err))
+ }
+
// Abort syncing if there is a connection error with peer.
if verror.ErrorID(err) != interfaces.ErrConnFail.ID {
err = s.getDeltasFromPeer(ctx, peer)
}
+ if verror.ErrorID(err) == interfaces.ErrConnFail.ID {
+ ctx.Errorf("SUHARSHS: getDeltasFromPeer failed due to %v", verror.DebugString(err))
+ }
+
s.pm.updatePeerFromSyncer(ctx, peer, attemptTs, verror.ErrorID(err) == interfaces.ErrConnFail.ID)
}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 21ac9f3..dd678e2 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -660,8 +660,8 @@
// TODO(hpucha): Pass blessings along.
func (sd *syncDatabase) CreateSyncgroup(ctx *context.T, call rpc.ServerCall, sgId wire.Id, spec wire.SyncgroupSpec, myInfo wire.SyncgroupMemberInfo) error {
- vlog.VI(2).Infof("sync: CreateSyncgroup: begin: %s, spec %+v", sgId, spec)
- defer vlog.VI(2).Infof("sync: CreateSyncgroup: end: %s", sgId)
+ vlog.Infof("sync: CreateSyncgroup: begin: %s, spec %+v", sgId, spec)
+ defer vlog.Infof("sync: CreateSyncgroup: end: %s", sgId)
if err := pubutil.ValidateId(sgId); err != nil {
return verror.New(wire.ErrInvalidName, ctx, pubutil.EncodeId(sgId), err)
@@ -770,8 +770,8 @@
}
func (sd *syncDatabase) JoinSyncgroup(ctx *context.T, call rpc.ServerCall, remoteSyncbaseName string, expectedSyncbaseBlessings []string, sgId wire.Id, myInfo wire.SyncgroupMemberInfo) (wire.SyncgroupSpec, error) {
- vlog.VI(2).Infof("sync: JoinSyncgroup: begin: %v at %s, call is %v", sgId, remoteSyncbaseName, call)
- defer vlog.VI(2).Infof("sync: JoinSyncgroup: end: %v at %s", sgId, remoteSyncbaseName)
+ vlog.Infof("sync: JoinSyncgroup: begin: %v at %s, call is %v", sgId, remoteSyncbaseName, call)
+ defer vlog.Infof("sync: JoinSyncgroup: end: %v at %s", sgId, remoteSyncbaseName)
var sgErr error
var sg *interfaces.Syncgroup
@@ -903,8 +903,8 @@
}
func (sd *syncDatabase) ListSyncgroups(ctx *context.T, call rpc.ServerCall) ([]wire.Id, error) {
- vlog.VI(2).Infof("sync: ListSyncgroups: begin")
- defer vlog.VI(2).Infof("sync: ListSyncgroups: end")
+ vlog.Infof("sync: ListSyncgroups: begin")
+ defer vlog.Infof("sync: ListSyncgroups: end")
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
@@ -923,13 +923,13 @@
sort.Sort(syncgroups(sgIds))
- vlog.VI(2).Infof("sync: ListSyncgroups: %v", sgIds)
+ vlog.Infof("sync: ListSyncgroups: %v", sgIds)
return sgIds, nil
}
func (sd *syncDatabase) GetSyncgroupSpec(ctx *context.T, call rpc.ServerCall, sgId wire.Id) (wire.SyncgroupSpec, string, error) {
- vlog.VI(2).Infof("sync: GetSyncgroupSpec: begin %v", sgId)
- defer vlog.VI(2).Infof("sync: GetSyncgroupSpec: end: %v", sgId)
+ vlog.Infof("sync: GetSyncgroupSpec: begin %v", sgId)
+ defer vlog.Infof("sync: GetSyncgroupSpec: end: %v", sgId)
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
@@ -948,13 +948,13 @@
}
// TODO(hpucha): Check syncgroup ACL.
- vlog.VI(2).Infof("sync: GetSyncgroupSpec: %v spec %v", sgId, sg.Spec)
+ vlog.Infof("sync: GetSyncgroupSpec: %v spec %v", sgId, sg.Spec)
return sg.Spec, sg.SpecVersion, nil
}
func (sd *syncDatabase) GetSyncgroupMembers(ctx *context.T, call rpc.ServerCall, sgId wire.Id) (map[string]wire.SyncgroupMemberInfo, error) {
- vlog.VI(2).Infof("sync: GetSyncgroupMembers: begin %v", sgId)
- defer vlog.VI(2).Infof("sync: GetSyncgroupMembers: end: %v", sgId)
+ vlog.Infof("sync: GetSyncgroupMembers: begin %v", sgId)
+ defer vlog.Infof("sync: GetSyncgroupMembers: end: %v", sgId)
sn := sd.db.St().NewSnapshot()
defer sn.Abort()
@@ -972,7 +972,7 @@
// TODO(hpucha): Check syncgroup ACL.
- vlog.VI(2).Infof("sync: GetSyncgroupMembers: %v members %v, len %v", sgId, sg.Joiners, len(sg.Joiners))
+ vlog.Infof("sync: GetSyncgroupMembers: %v members %v, len %v", sgId, sg.Joiners, len(sg.Joiners))
joiners := make(map[string]wire.SyncgroupMemberInfo)
for key, value := range sg.Joiners {
joiners[key] = value.MemberInfo
@@ -981,8 +981,8 @@
}
func (sd *syncDatabase) SetSyncgroupSpec(ctx *context.T, call rpc.ServerCall, sgId wire.Id, spec wire.SyncgroupSpec, version string) error {
- vlog.VI(2).Infof("sync: SetSyncgroupSpec: begin %v %v %s", sgId, spec, version)
- defer vlog.VI(2).Infof("sync: SetSyncgroupSpec: end: %v", sgId)
+ vlog.Infof("sync: SetSyncgroupSpec: begin %v %v %s", sgId, spec, version)
+ defer vlog.Infof("sync: SetSyncgroupSpec: end: %v", sgId)
if err := verifySyncgroupSpec(ctx, &spec); err != nil {
return err
@@ -1026,7 +1026,7 @@
//
// Check if this peer is allowed to change the spec.
// blessingNames, _ := security.RemoteBlessingNames(ctx, call.Security())
- // vlog.VI(4).Infof("sync: SetSyncgroupSpec: authorizing blessings %v against permissions %v", blessingNames, sg.Spec.Perms)
+ // vlog.Infof("sync: SetSyncgroupSpec: authorizing blessings %v against permissions %v", blessingNames, sg.Spec.Perms)
// if !isAuthorizedForTag(sg.Spec.Perms, access.Admin, blessingNames) {
// return verror.New(verror.ErrNoAccess, ctx)
// }
@@ -1135,7 +1135,7 @@
// or is ejected before the status update)? Eventually
// some admin must decide to update the SG status anyway
// even if that causes extra SG mutations and conflicts.
- vlog.VI(3).Infof("sync: publishSyncgroup: %v: duplicate publish", sgId)
+ vlog.Infof("sync: publishSyncgroup: %v: duplicate publish", sgId)
return nil
}
@@ -1143,7 +1143,7 @@
// The publish operation failed with an error other
// than ErrExist then it must be retried later on.
// TODO(hpucha): Is there an RPC error that we can check here?
- vlog.VI(3).Infof("sync: publishSyncgroup: %v: failed, retry later: %v", sgId, err)
+ vlog.Infof("sync: publishSyncgroup: %v: failed, retry later: %v", sgId, err)
return err
}
}
@@ -1151,7 +1151,7 @@
// The publish operation is done because either it succeeded or it
// failed with the ErrExist error. Update the syncgroup status and, if
// the publish was successful, add the remote peer to the syncgroup.
- vlog.VI(3).Infof("sync: publishSyncgroup: %v: peer %s: done: status %s: %v",
+ vlog.Infof("sync: publishSyncgroup: %v: peer %s: done: status %s: %v",
sgId, peer, status.String(), err)
err = watchable.RunInTransaction(st, func(tx *watchable.Transaction) error {
@@ -1283,7 +1283,7 @@
}
func (sd *syncDatabase) joinSyncgroupAtAdmin(ctxIn *context.T, call rpc.ServerCall, dbId, sgId wire.Id, remoteSyncbaseName string, expectedSyncbaseBlessings []string, localSyncbaseName string, myInfo wire.SyncgroupMemberInfo) (interfaces.Syncgroup, string, interfaces.GenVector, error) {
- vlog.VI(2).Infof("sync: joinSyncgroupAtAdmin: begin, dbId %v, sgId %v, remoteSyncbaseName %v", dbId, sgId, remoteSyncbaseName)
+ vlog.Infof("sync: joinSyncgroupAtAdmin: begin, dbId %v, sgId %v, remoteSyncbaseName %v", dbId, sgId, remoteSyncbaseName)
if remoteSyncbaseName != "" {
ctx, cancel := context.WithTimeout(ctxIn, cloudConnectionTimeout)
@@ -1291,12 +1291,12 @@
sg, vers, gv, err := c.JoinSyncgroupAtAdmin(ctx, dbId, sgId, localSyncbaseName, myInfo)
cancel()
if err == nil {
- vlog.VI(2).Infof("sync: joinSyncgroupAtAdmin: end succeeded at %v, returned sg %v vers %v gv %v", sgId, sg, vers, gv)
+ vlog.Infof("sync: joinSyncgroupAtAdmin: end succeeded at %v, returned sg %v vers %v gv %v", sgId, sg, vers, gv)
return sg, vers, gv, err
}
}
- vlog.VI(2).Infof("sync: joinSyncgroupAtAdmin: try neighborhood %v", sgId)
+ vlog.Infof("sync: joinSyncgroupAtAdmin: try neighborhood %v", sgId)
// TODO(hpucha): Restrict the set of errors when retry happens to
// network related errors or other retriable errors.
@@ -1320,15 +1320,19 @@
// than a single resolved address at a time. This is important because it is possible for
// addresses in the list to be unreachable, causing large delays when trying join calls
// in serial.
+ vlog.Infof("sync: joinSyncgroupAtAdmin: trying neighborhood %v servers %v", sgId, me)
sg, vers, gv, err := c.JoinSyncgroupAtAdmin(ctx, dbId, sgId, localSyncbaseName, myInfo, options.Preresolved{me})
cancel()
if err == nil {
- vlog.VI(2).Infof("sync: joinSyncgroupAtAdmin: end succeeded at addresses %v, returned sg %v vers %v gv %v", me.Servers, sg, vers, gv)
+ vlog.Infof("sync: joinSyncgroupAtAdmin: end succeeded at addresses %v, returned sg %v vers %v gv %v", me.Servers, sg, vers, gv)
+ return sg, vers, gv, err
+ } else {
+ vlog.Infof("sync: joinSyncgroupAtAdmin: end failed at addresses %v: %s", me.Servers, verror.DebugString(err))
return sg, vers, gv, err
}
}
- vlog.VI(2).Infof("sync: joinSyncgroupAtAdmin: failed %v", sgId)
+ vlog.Infof("sync: joinSyncgroupAtAdmin: failed %v", sgId)
return interfaces.Syncgroup{}, "", interfaces.GenVector{}, verror.New(wire.ErrSyncgroupJoinFailed, ctxIn)
}
@@ -1365,8 +1369,8 @@
// Methods for syncgroup create/join between Syncbases.
func (s *syncService) PublishSyncgroup(ctx *context.T, call rpc.ServerCall, publisher string, sg interfaces.Syncgroup, version string, genvec interfaces.GenVector) (string, error) {
- vlog.VI(2).Infof("sync: PublishSyncgroup: begin: %s from peer %s", sg.Id, publisher)
- defer vlog.VI(2).Infof("sync: PublishSyncgroup: end: %s from peer %s", sg.Id, publisher)
+ vlog.Infof("sync: PublishSyncgroup: begin: %s from peer %s", sg.Id, publisher)
+ defer vlog.Infof("sync: PublishSyncgroup: end: %s from peer %s", sg.Id, publisher)
st, err := s.getDbStore(ctx, call, sg.DbId)
if err != nil {
@@ -1425,8 +1429,8 @@
}
func (s *syncService) JoinSyncgroupAtAdmin(ctx *context.T, call rpc.ServerCall, dbId, sgId wire.Id, joinerName string, joinerInfo wire.SyncgroupMemberInfo) (interfaces.Syncgroup, string, interfaces.GenVector, error) {
- vlog.VI(2).Infof("sync: JoinSyncgroupAtAdmin: begin: %+v from peer %s", sgId, joinerName)
- defer vlog.VI(2).Infof("sync: JoinSyncgroupAtAdmin: end: %+v from peer %s", sgId, joinerName)
+ vlog.Infof("sync: JoinSyncgroupAtAdmin: begin: %+v from peer %s", sgId, joinerName)
+ defer vlog.Infof("sync: JoinSyncgroupAtAdmin: end: %+v from peer %s", sgId, joinerName)
nullSG, nullGV := interfaces.Syncgroup{}, interfaces.GenVector{}
@@ -1445,7 +1449,7 @@
gid := SgIdToGid(dbId, sgId)
if _, err = getSyncgroupVersion(ctx, dbSt, gid); err != nil {
- vlog.VI(4).Infof("sync: JoinSyncgroupAtAdmin: end: %v from peer %s, err in sg search %v", sgId, joinerName, err)
+ vlog.Infof("sync: JoinSyncgroupAtAdmin: end: %v from peer %s, err in sg search %v", sgId, joinerName, err)
return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "Syncgroup not found", sgId)
}
@@ -1494,7 +1498,7 @@
})
if err != nil {
- vlog.VI(4).Infof("sync: JoinSyncgroupAtAdmin: end: %v from peer %s, err in tx %v", sgId, joinerName, err)
+ vlog.Infof("sync: JoinSyncgroupAtAdmin: end: %v from peer %s, err in tx %v", sgId, joinerName, err)
return nullSG, "", nullGV, err
}
@@ -1505,7 +1509,7 @@
sgs := sgSet{gid: struct{}{}}
gv, _, err := s.copyDbGenInfo(ctx, dbId, sgs)
if err != nil {
- vlog.VI(4).Infof("sync: JoinSyncgroupAtAdmin: end: %v from peer %s, err in copy %v", sgId, joinerName, err)
+ vlog.Infof("sync: JoinSyncgroupAtAdmin: end: %v from peer %s, err in copy %v", sgId, joinerName, err)
return nullSG, "", nullGV, err
}
// The retrieved genvector does not contain the mutation that adds the
@@ -1513,6 +1517,6 @@
// generations. Add that generation to this genvector.
gv[sgoid][s.id] = gen
- vlog.VI(2).Infof("sync: JoinSyncgroupAtAdmin: returning: sg %v, vers %v, genvec %v", sg, version, gv[sgoid])
+ vlog.Infof("sync: JoinSyncgroupAtAdmin: returning: sg %v, vers %v, genvec %v", sg, version, gv[sgoid])
return *sg, version, gv[sgoid], nil
}
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
index 5bdc7a0..9b59363 100644
--- a/services/syncbase/vsync/util.go
+++ b/services/syncbase/vsync/util.go
@@ -97,7 +97,7 @@
func (na namesAuthorizer) Authorize(ctx *context.T, securityCall security.Call) (err error) {
var peerBlessingNames []string
peerBlessingNames, _ = security.RemoteBlessingNames(ctx, securityCall)
- vlog.VI(4).Infof("sync: Authorize: names %v", peerBlessingNames)
+ //vlog.Infof("sync: Authorize: names %v", peerBlessingNames)
// Put the peerBlessingNames in a set, to make it easy to test whether
// na.expectedBlessingNames is a subset.
peerBlessingNamesMap := make(map[string]bool)
@@ -112,7 +112,7 @@
if !isSubset {
err = verror.New(verror.ErrInternal, ctx, "server blessings changed")
} else {
- vlog.VI(4).Infof("sync: Authorize: remote peer allowed")
+ //vlog.Infof("sync: Authorize: remote peer allowed")
}
return err
}
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index b14fcbc..c5ff560 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -69,7 +69,7 @@
// When an application makes a single non-transactional put, it is represented
// as a batch of one log record. Thus the watcher only deals with batches.
func (s *syncService) watchStore(ctx *context.T, dbId wire.Id, st *watchable.Store) {
- vlog.VI(1).Infof("sync: watchStore: DB %v: start watching updates", dbId)
+ vlog.Infof("sync: watchStore: DB %v: start watching updates", dbId)
updatesChan, cancel := watchable.WatchUpdates(st)
defer cancel()
@@ -77,9 +77,9 @@
moreWork := true
for moreWork && !s.isClosed() {
if s.processDatabase(ctx, dbId, st) {
- vlog.VI(2).Infof("sync: watchStore: DB %v: had updates", dbId)
+ vlog.Infof("sync: watchStore: DB %v: had updates", dbId)
} else {
- vlog.VI(2).Infof("sync: watchStore: DB %v: idle, wait for updates", dbId)
+ vlog.Infof("sync: watchStore: DB %v: idle, wait for updates", dbId)
select {
case _, moreWork = <-updatesChan:
@@ -89,7 +89,7 @@
}
}
- vlog.VI(1).Infof("sync: watchStore: DB %v: channel closed, stop watching and exit", dbId)
+ vlog.Infof("sync: watchStore: DB %v: channel closed, stop watching and exit", dbId)
}
// processDatabase fetches from the given database at most one new batch update
@@ -97,8 +97,8 @@
// records ending with one record having the "continued" flag set to false. The
// call returns true if a new batch update was processed.
func (s *syncService) processDatabase(ctx *context.T, dbId wire.Id, st store.Store) bool {
- vlog.VI(2).Infof("sync: processDatabase: begin: %v", dbId)
- defer vlog.VI(2).Infof("sync: processDatabase: end: %v", dbId)
+ vlog.Infof("sync: processDatabase: begin: %v", dbId)
+ defer vlog.Infof("sync: processDatabase: end: %v", dbId)
resMark, err := getResMark(ctx, st)
if err != nil {
@@ -146,7 +146,7 @@
return false
}
} else {
- vlog.VI(1).Infof("sync: processDatabase: %v database not allowed to sync, skipping cutting a gen", dbId)
+ vlog.Infof("sync: processDatabase: %v database not allowed to sync, skipping cutting a gen", dbId)
}
return true
}
@@ -210,7 +210,7 @@
}
}
- vlog.VI(3).Infof("sync: processWatchLogBatch: %v: sg snap %t, syncable %d, total %d", dbId, !appBatch, len(batch), totalCount)
+ vlog.Infof("sync: processWatchLogBatch: %v: sg snap %t, syncable %d, total %d", dbId, !appBatch, len(batch), totalCount)
if err := s.processWatchBlobRefs(ctx, dbId, st, batch); err != nil {
// There may be an error here if the database is recently
@@ -268,7 +268,7 @@
gen, pos := s.reserveGenAndPosInDbLog(ctx, dbId, "", count)
- vlog.VI(3).Infof("sync: processBatch: %v: len %d, total %d, btid %x, gen %d, pos %d", dbId, count, totalCount, batchId, gen, pos)
+ vlog.Infof("sync: processBatch: %v: len %d, total %d, btid %x, gen %d, pos %d", dbId, count, totalCount, batchId, gen, pos)
for _, rec := range batch {
// Update the log record. Portions of the record Metadata must
@@ -503,15 +503,15 @@
switch op := op.(type) {
case *sbwatchable.DbStateChangeRequestOp:
dbStateChangeRequest := op
- vlog.VI(1).Infof("sync: processDbStateChangeLogRecord: found a dbState change log record with state %#v", dbStateChangeRequest)
+ vlog.Infof("sync: processDbStateChangeLogRecord: found a dbState change log record with state %#v", dbStateChangeRequest)
isPaused := false
if err := store.RunInTransaction(st, func(tx store.Transaction) error {
switch dbStateChangeRequest.RequestType {
case sbwatchable.StateChangePauseSync:
- vlog.VI(1).Infof("sync: processDbStateChangeLogRecord: PauseSync request found. Pausing sync.")
+ vlog.Infof("sync: processDbStateChangeLogRecord: PauseSync request found. Pausing sync.")
isPaused = true
case sbwatchable.StateChangeResumeSync:
- vlog.VI(1).Infof("sync: processDbStateChangeLogRecord: ResumeSync request found. Resuming sync.")
+ vlog.Infof("sync: processDbStateChangeLogRecord: ResumeSync request found. Resuming sync.")
isPaused = false
default:
return fmt.Errorf("Unexpected DbStateChangeRequest found: %#v", dbStateChangeRequest)
@@ -556,7 +556,7 @@
addWatchPrefixSyncgroup(dbId, prefix, gid)
}
}
- vlog.VI(3).Infof("sync: processSyncgroupLogRecord: %v: gid %s, remove %t, prefixes: %q", dbId, gid, remove, op.Prefixes)
+ vlog.Infof("sync: processSyncgroupLogRecord: %v: gid %s, remove %t, prefixes: %q", dbId, gid, remove, op.Prefixes)
return op, nil
}
}