| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package io.v.v23.syncbase; |
| |
| import com.google.common.util.concurrent.AsyncFunction; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.SettableFuture; |
| import io.v.v23.VFutures; |
| import io.v.v23.context.VContext; |
| import io.v.v23.services.syncbase.BatchOptions; |
| import io.v.v23.services.syncbase.ConcurrentBatchException; |
| |
| import javax.annotation.CheckReturnValue; |
| |
| /** |
| * Various utility methods for the batch operations. |
| */ |
| public class Batch { |
| |
| /** |
| * Runs the given batch operation, managing retries and |
| * {@link BatchDatabase#commit commit()}/{@link BatchDatabase#abort abort()}s. |
| * <p> |
| * The returned future is guaranteed to be executed on an {@link java.util.concurrent.Executor} |
| * specified in {@code context} (see {@link io.v.v23.V#withExecutor}). |
| * <p> |
| * The returned future will fail with {@link java.util.concurrent.CancellationException} if |
| * {@code context} gets canceled. |
| * |
| * @param context Vanadium context |
| * @param db database on which the batch operation is to be performed |
| * @param opts batch configuration |
| * @param op batch operation |
| */ |
| @CheckReturnValue |
| public static ListenableFuture<Void> runInBatch(VContext context, Database db, |
| BatchOptions opts, BatchOperation op) { |
| return VFutures.withUserLandChecks(context, |
| Futures.transform(Futures.immediateFuture(false), |
| getRetryFn(context, db, opts, op, 0))); |
| } |
| |
| private static AsyncFunction<Boolean, Void> getRetryFn(final VContext ctx, |
| final Database db, |
| final BatchOptions opts, |
| final BatchOperation op, |
| final int round) { |
| return new AsyncFunction<Boolean, Void>() { |
| @Override |
| public ListenableFuture<Void> apply(Boolean success) throws Exception { |
| if (success) { |
| return Futures.immediateFuture(null); |
| } |
| if (round >= 3) { |
| throw new ConcurrentBatchException(ctx); |
| } |
| return Futures.transform(tryBatch(ctx, db, opts, op), |
| getRetryFn(ctx, db, opts, op, round + 1)); |
| } |
| }; |
| } |
| |
| @CheckReturnValue |
| private static ListenableFuture<Boolean> tryBatch(final VContext ctx, |
| final Database db, |
| final BatchOptions opts, |
| final BatchOperation op) { |
| final SettableFuture<Boolean> ret = SettableFuture.create(); |
| Futures.addCallback(db.beginBatch(ctx, opts), new FutureCallback<BatchDatabase>() { |
| @Override |
| public void onFailure(Throwable t) { |
| ret.setException(t); |
| } |
| |
| @Override |
| public void onSuccess(final BatchDatabase batch) { |
| Futures.addCallback(op.run(batch), new FutureCallback<Void>() { |
| @Override |
| public void onFailure(final Throwable t) { |
| Futures.addCallback(batch.abort(ctx), new FutureCallback<Void>() { |
| @Override |
| public void onSuccess(Void result) { |
| ret.setException(t); |
| } |
| |
| @Override |
| public void onFailure(Throwable newT) { |
| ret.setException(t); |
| } |
| }); |
| } |
| |
| @Override |
| public void onSuccess(Void result) { |
| Futures.addCallback( |
| opts.getReadOnly() ? batch.abort(ctx) : batch.commit(ctx), |
| new FutureCallback<Void>() { |
| @Override |
| public void onSuccess(Void result) { |
| ret.set(true); // success |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| if (t instanceof ConcurrentBatchException) { |
| // retry |
| ret.set(false); |
| } else { |
| ret.setException(t); |
| } |
| } |
| }); |
| } |
| }); |
| } |
| }); |
| return ret; |
| } |
| |
| /** |
| * Interface for a batch operation that is executed as part of {@link #runInBatch runInBatch()}. |
| */ |
| public interface BatchOperation { |
| /** |
| * Performs the batch operation. |
| * |
| * @param db batch database on which the operation is performed |
| */ |
| @CheckReturnValue |
| ListenableFuture<Void> run(BatchDatabase db); |
| } |
| |
| } |