blob: 761b4f095bf3326d03b2ebc687759565c50148bc [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
part of syncbase_client;
// StreamFlowControl is a mixin that exposes methods initFlowControl() and
// onNextUnlock(), enabling mixers to add flow control to their stream
// controllers.
class StreamFlowControl {
// We are in "locked" state if and only if _mutex is not null.
// We start out locked, until we gain our first subscriber.
Completer _mutex = new Completer();
// Initializes flow control by adding listeners to the stream controller.
// The mixer's constructor should call this method. (Mixins can't have
// constructors.)
initFlowControl(StreamController sc) {
// Unlock when gaining our first subscriber.
sc.onListen = _unlock;
// Lock when losing our last subscriber.
sc.onCancel = _lock;
// Lock when paused.
sc.onPause = _lock;
// Unlock when resumed.
sc.onResume = _unlock;
}
// Returns a future that completes immediately if we are unlocked, or as soon
// as we become unlocked otherwise.
// Mixers can use this method to decide when to ack, i.e. when to tell the
// server to continue sending events.
Future onNextUnlock() {
if (_mutex == null) {
// We are not locked; return a completed future.
return new Future.value();
}
return _mutex.future;
}
// Locks the stream controller.
// When locked, the server cannot send us any more change events.
// This happens because we don't ack the server's change events when locked.
_lock() {
if (_mutex == null) {
_mutex = new Completer();
}
}
// Unlocks the stream controller.
// When unlocked, the server can send us more change events.
// This happens because we ack the server's change events when unlocked.
_unlock() {
if (_mutex != null) {
_mutex.complete();
_mutex = null;
}
}
}