blob: 9bd3c5728d1e175e937460a34e04fa896adf72a1 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.baku.toolkit.bind;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.v.rx.syncbase.RangeWatchBatch;
import io.v.rx.syncbase.RangeWatchEvent;
import io.v.rx.syncbase.RxTable;
import io.v.v23.syncbase.nosql.ChangeType;
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func2;
/**
* This class accumulates prefix watch streams into observable lists. It is meant to be used in
* conjunction with {@link Observable#scan(Object, Func2)}:
*
* <p>{@code .scan(new PrefixListAccumulator<>(...), PrefixListAccumulator::add)}
* @param <T>
*/
public class PrefixListAccumulator<T> implements ListAccumulator<RxTable.Row<T>> {
private static final String ERR_INCONSISTENT = "Sorted data are inconsistent with map data";
private final Map<String, T> mRows = new HashMap<>();
private final List<RxTable.Row<T>> mSorted = new ArrayList<>();
private final Ordering<? super RxTable.Row<T>> mOrdering;
public PrefixListAccumulator(final Ordering<? super RxTable.Row<T>> ordering) {
// ensure deterministic ordering by always applying secondary order on row name
mOrdering = ordering.compound(Ordering.natural().onResultOf(RxTable.Row::getRowName));
}
public Observable<? extends PrefixListAccumulator<T>> scanFrom(
final Observable<RangeWatchBatch<T>> watch) {
return watch
.concatMap(RangeWatchBatch::collectChanges)
.observeOn(AndroidSchedulers.mainThread()) // required unless we copy
.scan(this, PrefixListAccumulator::withUpdates);
}
private int findRowForEdit(final String rowName, final T oldValue) {
final int oldIndex = Collections.binarySearch(mSorted,
new RxTable.Row<>(rowName, oldValue), mOrdering);
if (oldIndex < 0) {
throw new ConcurrentModificationException(ERR_INCONSISTENT);
} else {
return oldIndex;
}
}
protected PrefixListAccumulator<T> withUpdates(final Collection<RangeWatchEvent<T>> events) {
// TODO(rosswang): more efficient updates for larger batches
// TODO(rosswang): allow option to copy on add (immutable accumulator)
// If we copy on add, don't forget to override the clone in PrefixListDeltaAccumulator.
for (final RangeWatchEvent<T> e : events) {
if (e.getChangeType() == ChangeType.DELETE_CHANGE) {
removeOne(e.getRow());
} else {
updateOne(e.getRow());
}
}
return this;
}
private void removeOne(final RxTable.Row<T> entry) {
final T old = mRows.remove(entry.getRowName());
if (old != null) {
remove(findRowForEdit(entry.getRowName(), old));
}
}
private int insertionIndex(final RxTable.Row<T> entry) {
final int bs = Collections.binarySearch(mSorted, entry, mOrdering);
return bs < 0 ? ~bs : bs;
}
private void updateOne(final RxTable.Row<T> entry) {
final T old = mRows.put(entry.getRowName(), entry.getValue());
if (old == null) {
insert(insertionIndex(entry), entry);
} else {
final int oldIndex = findRowForEdit(entry.getRowName(), old);
final int newIndex = insertionIndex(entry);
if (oldIndex == newIndex) {
change(newIndex, entry);
} else {
move(oldIndex, newIndex, entry);
}
}
}
protected void remove(final int index) {
mSorted.remove(index);
}
protected void insert(final int index, final RxTable.Row<T> entry) {
mSorted.add(index, entry);
}
protected void move(final int from, final int to, final RxTable.Row<T> entry) {
mSorted.remove(from);
mSorted.add(to, entry);
}
protected void change(final int index, final RxTable.Row<T> entry) {
mSorted.set(index, entry);
}
@Override
public int getCount() {
return mRows.size();
}
@Override
public RxTable.Row<T> getRowAt(final int position) {
return mSorted.get(position);
}
public T getValue(final String rowName) {
return mRows.get(rowName);
}
@Override
public int getRowIndex(final String rowName) {
return Collections.binarySearch(mSorted, new RxTable.Row<>(rowName, mRows.get(rowName)),
mOrdering);
}
@Override
public boolean containsRow(final String rowName) {
return mRows.containsKey(rowName);
}
@Override
public ImmutableList<RxTable.Row<T>> getListSnapshot() {
return ImmutableList.copyOf(mSorted);
}
}