blob: afc5b368b673375c13def76bfdaeb5fb22a1845b [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Todd Wang8c4e5cc2015-04-09 11:30:52 -07005// Package timekeeper implements simulated time against the
6// v.io/x/ref/lib/timekeeper.TimeKeeper interface.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07007package timekeeper
8
9import (
10 "container/heap"
11 "sync"
12 "time"
13
Jiri Simsaffceefa2015-02-28 11:03:34 -080014 "v.io/x/ref/lib/timekeeper"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070015)
16
17// ManualTime is a time keeper that allows control over the advancement of time.
18type ManualTime interface {
19 timekeeper.TimeKeeper
20 // AdvanceTime advances the current time by d.
21 AdvanceTime(d time.Duration)
22 // Requests provides a channel where the requested delays for After and
23 // Sleep can be observed.
24 Requests() <-chan time.Duration
25}
26
27// item is a heap element: every request for After is added to the heap with its
28// wake-up time as key (where wake-up time is current time + requested delay
29// duration). As current time advances, items are plucked from the heap and the
30// clients are notified.
31type item struct {
32 t time.Time // Wake-up time.
33 ch chan<- time.Time // Client notification channel.
34}
35
36type timeHeap []*item
37
38func (th timeHeap) Len() int { return len(th) }
39
40func (th timeHeap) Less(i, j int) bool {
41 return th[i].t.Before(th[j].t)
42}
43
44func (th timeHeap) Swap(i, j int) {
45 th[i], th[j] = th[j], th[i]
46}
47
48func (th *timeHeap) Push(x interface{}) {
49 item := x.(*item)
50 *th = append(*th, item)
51}
52
53func (th *timeHeap) Pop() interface{} {
54 old := *th
55 n := len(old)
56 item := old[n-1]
57 *th = old[0 : n-1]
58 return item
59}
60
61// manualTime implements TimeKeeper.
62type manualTime struct {
63 sync.Mutex
64 current time.Time // The current time.
65 schedule timeHeap // The heap of items still to be woken up.
66 requests chan time.Duration
67}
68
69// After implements TimeKeeper.After.
70func (mt *manualTime) After(d time.Duration) <-chan time.Time {
71 mt.Lock()
72 defer mt.Unlock()
73 ch := make(chan time.Time, 1)
74 if d <= 0 {
75 ch <- mt.current
76 } else {
77 heap.Push(&mt.schedule, &item{t: mt.current.Add(d), ch: ch})
78 }
79 mt.requests <- d
80 return ch
81}
82
83// Sleep implements TimeKeeper.Sleep.
84func (mt *manualTime) Sleep(d time.Duration) {
85 <-mt.After(d)
86}
87
88// AdvanceTime implements ManualTime.AdvanceTime.
89func (mt *manualTime) AdvanceTime(d time.Duration) {
90 mt.Lock()
91 defer mt.Unlock()
92 if d > 0 {
93 mt.current = mt.current.Add(d)
94 }
95 for {
96 if mt.schedule.Len() == 0 {
97 break
98 }
99 top := mt.schedule[0]
100 if top.t.After(mt.current) {
101 break
102 }
103 top.ch <- mt.current
104 heap.Pop(&mt.schedule)
105 }
106}
107
108// Requests implements ManualTime.Requests.
109func (mt *manualTime) Requests() <-chan time.Duration { return mt.requests }
110
111// NewManualTime constructs a new instance of ManualTime, with current time set
112// at 0.
113func NewManualTime() ManualTime {
114 mt := &manualTime{
115 schedule: make([]*item, 0),
116 // 1000 should be plenty to avoid blocking on adding items
117 // to the channel, but technically we can still end up blocked.
118 requests: make(chan time.Duration, 1000),
119 }
120 heap.Init(&mt.schedule)
121 return mt
122}