| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package io.v.syncslides.db; |
| |
| import android.os.Handler; |
| import android.os.Looper; |
| import android.util.Log; |
| |
| import com.google.common.collect.Sets; |
| |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| |
| import io.v.impl.google.naming.NamingUtil; |
| import io.v.syncslides.model.Session; |
| import io.v.v23.InputChannels; |
| import io.v.v23.VIterable; |
| import io.v.v23.context.CancelableVContext; |
| import io.v.v23.context.VContext; |
| import io.v.v23.services.watch.ResumeMarker; |
| import io.v.v23.syncbase.nosql.BatchDatabase; |
| import io.v.v23.syncbase.nosql.ChangeType; |
| import io.v.v23.syncbase.nosql.Database; |
| import io.v.v23.syncbase.nosql.Table; |
| import io.v.v23.syncbase.nosql.WatchChange; |
| import io.v.v23.verror.VException; |
| import io.v.v23.vom.VomUtil; |
| |
| import static io.v.v23.VFutures.sync; |
| |
| /** |
| * Watches both the local slide number as well as the live presentation's slide number. |
| * If the local number is INVALID_LOCAL_SLIDE_NUM, notifies listeners whenever the live |
| * presentation's slide number changes. Otherwise, it notifies listeners whenever the |
| * local number changes. |
| */ |
| class SlideNumberWatcher { |
| private static final String TAG = "SlideNumberWatcher"; |
| |
| private final VContext mBaseContext; |
| private final Database mDb; |
| private final Set<Session.SlideNumberListener> mListeners; |
| private final ExecutorService mExecutor; |
| private final Handler mHandler; |
| private final String mSessionId; |
| private final String mDeckId; |
| private final String mPresentationId; |
| private int mLocalSlideNum; |
| private VCurrentSlide mCurrentSlide; |
| private CancelableVContext mCurrentContext; |
| |
| /** |
| * If presentationId is non-null, SlideNumberWatcher will watch for changes in addition |
| * to watching the session's local slide number. |
| */ |
| SlideNumberWatcher(VContext context, Database db, String sessionId, String deckId, |
| String presentationId) { |
| mBaseContext = context; |
| mDb = db; |
| mSessionId = sessionId; |
| mDeckId = deckId; |
| mPresentationId = presentationId; |
| mListeners = Sets.newHashSet(); |
| mLocalSlideNum = SyncbaseSession.INVALID_LOCAL_SLIDE_NUM; |
| mHandler = new Handler(Looper.getMainLooper()); |
| mExecutor = Executors.newFixedThreadPool(2); |
| } |
| |
| void addListener(Session.SlideNumberListener listener) { |
| mListeners.add(listener); |
| if (mListeners.size() == 1) { |
| // First listener. Start the threads. |
| mCurrentContext = mBaseContext.withCancel(); |
| mExecutor.submit(() -> watchLocalSlideNum()); |
| mExecutor.submit(() -> watchCurrentSlide()); |
| } |
| listener.onChange(getSlideNum()); |
| } |
| |
| void removeListener(Session.SlideNumberListener listener) { |
| mListeners.remove(listener); |
| if (mListeners.isEmpty()) { |
| // Stop watchers via cancel. |
| mCurrentContext.cancel(); |
| mCurrentContext = null; |
| mHandler.removeCallbacksAndMessages(null); |
| } |
| } |
| |
| private void currentSlideChanged(VCurrentSlide slide) { |
| mCurrentSlide = slide; |
| notifyListeners(); |
| } |
| |
| private void localSlideChanged(int localSlide) { |
| mLocalSlideNum = localSlide; |
| notifyListeners(); |
| } |
| |
| private void notifyListeners() { |
| int slideNum = getSlideNum(); |
| for (Session.SlideNumberListener listener : mListeners) { |
| listener.onChange(slideNum); |
| } |
| } |
| |
| private int getSlideNum() { |
| int slideNum = mLocalSlideNum; |
| if (slideNum == SyncbaseSession.INVALID_LOCAL_SLIDE_NUM && mCurrentSlide != null) { |
| slideNum = mCurrentSlide.getSlideNum(); |
| } |
| return slideNum; |
| } |
| |
| private void notifyError(Exception e) { |
| for (Session.SlideNumberListener listener : mListeners) { |
| listener.onError(e); |
| } |
| } |
| |
| // Runs in a background thread. |
| private void watchCurrentSlide() { |
| try { |
| Log.i(TAG, "watchCurrentSlide"); |
| String rowKey = NamingUtil.join(mDeckId, mPresentationId, SyncbaseDB.CURRENT_SLIDE); |
| BatchDatabase batch = sync(mDb.beginBatch(mCurrentContext, null)); |
| Table presentations = batch.getTable(SyncbaseDB.PRESENTATIONS_TABLE); |
| if (sync(presentations.getRow(rowKey).exists(mCurrentContext))) { |
| final VCurrentSlide slide = (VCurrentSlide) presentations.get( |
| mCurrentContext, rowKey, VCurrentSlide.class); |
| mHandler.post(() -> currentSlideChanged(slide)); |
| } |
| ResumeMarker marker = sync(batch.getResumeMarker(mCurrentContext)); |
| |
| VIterable<WatchChange> changes = InputChannels.asIterable( |
| mDb.watch(mCurrentContext, SyncbaseDB.PRESENTATIONS_TABLE, rowKey, marker)); |
| for (WatchChange change : changes) { |
| String key = change.getRowName(); |
| Log.i(TAG, "Found CurrentSlide change " + key); |
| if (!key.equals(rowKey)) { |
| continue; |
| } |
| if (change.getChangeType().equals(ChangeType.PUT_CHANGE)) { |
| final VCurrentSlide slide = (VCurrentSlide) VomUtil.decode( |
| change.getVomValue(), VCurrentSlide.class); |
| mHandler.post(() -> currentSlideChanged(slide)); |
| } |
| } |
| if (changes.error() != null) { |
| throw changes.error(); |
| } |
| } catch (final VException e) { |
| mHandler.post(() -> notifyError(e)); |
| } |
| } |
| |
| // Runs in a background thread. |
| private void watchLocalSlideNum() { |
| try { |
| BatchDatabase batch = sync(mDb.beginBatch(mCurrentContext, null)); |
| Table ui = batch.getTable(SyncbaseDB.UI_TABLE); |
| final VSession vSession = (VSession) sync(ui.get( |
| mCurrentContext, mSessionId, VSession.class)); |
| mHandler.post(() -> localSlideChanged(vSession.getLocalSlide())); |
| ResumeMarker marker = sync(batch.getResumeMarker(mCurrentContext)); |
| VIterable<WatchChange> changes = |
| InputChannels.asIterable(mDb.watch( |
| mCurrentContext, SyncbaseDB.UI_TABLE, mSessionId, marker)); |
| for (WatchChange change : changes) { |
| String key = change.getRowName(); |
| Log.i(TAG, "Found local slide change " + key); |
| if (!key.equals(mSessionId)) { |
| continue; |
| } |
| if (change.getChangeType().equals(ChangeType.PUT_CHANGE)) { |
| final VSession vSession1 = (VSession) VomUtil.decode( |
| change.getVomValue(), VSession.class); |
| mHandler.post(() -> localSlideChanged(vSession1.getLocalSlide())); |
| } |
| } |
| if (changes.error() != null) { |
| throw changes.error(); |
| } |
| } catch (final VException e) { |
| mHandler.post(() -> notifyError(e)); |
| } |
| } |
| } |