// Syncbase-based implementation of Dispatcher.
//
// Schema design doc (a bit outdated):
// https://docs.google.com/document/d/1GtBk75QmjSorUW6T6BATCoiS_LTqOrGksgqjqJ1Hiow/edit#
//
// TODO(sadovsky): Support arbitrary item reordering using a Dewey-Decimal-like
// order-tracking scheme, with suffix randomization to prevent conflicts.
//
// NOTE: For now, when an item is deleted, any sub-items that were added
// concurrently (on some other device) are orphaned. Eventually, we'll GC
// orphaned records; for now, we don't bother. This orphaning-based approach
// enables us to use simple last-one-wins conflict resolution for all records
// stored in Syncbase.
//
// TODO(sadovsky): Orphaning degrades performance, because scan responses (e.g.
// scan to get all todos for a given list) include orphaned records.

'use strict';

var _ = require('lodash');
var async = require('async');
var inherits = require('inherits');
var randomBytes = require('randombytes');
var syncbase = require('syncbase');
var nosql = syncbase.nosql;
var vanadium = require('vanadium');
var vtrace = vanadium.vtrace;

var bm = require('./benchmark');
var Dispatcher = require('./dispatcher');

inherits(SyncbaseDispatcher, Dispatcher);
module.exports = SyncbaseDispatcher;

function SyncbaseDispatcher(rt, db) {
  Dispatcher.call(this);
  this.rt_ = rt;
  this.ctx_ = rt.getContext();
  this.db_ = db;
  this.tb_ = db.table('tb');

  // Start the watch loop to periodically poll for changes from sync.
  // TODO(sadovsky): Remove this once we have client watch.
  var that = this;
  process.nextTick(function() {
    that.watchLoop_();
  });
}

////////////////////////////////////////
// Helpers

function noop() {}

var SEP = '.';  // separator for key parts

function join() {
  // TODO(sadovsky): Switch to using naming.join() once Syncbase allows slashes
  // in row keys. Also, restrict which chars are allowed in tag names.
  var args = Array.prototype.slice.call(arguments);
  return args.join(SEP);
}

function uuid(len) {
  len = len || 16;
  return randomBytes(Math.ceil(len / 2)).toString('hex').substr(0, len);
}

function newListKey() {
  return uuid();
}

function newTodoKey(listId) {
  return join(listId, 'todos', uuid());
}

function tagKey(todoId, tag) {
  return join(todoId, 'tags', tag);
}

function keyToListId(key) {
  var parts = key.split(SEP);
  // Assume key is for list, todo, or tag.
  if (parts.length === 1 || parts.length === 3 || parts.length === 5) {
    return parts[0];
  }
  throw new Error('bad key: ' + key);
}

// TODO(sadovsky): Maybe switch from JSON to VOM/VDL.
function marshal(x) {
  return JSON.stringify(x);
}
function unmarshal(x) {
  return JSON.parse(x);
}

////////////////////////////////////////
// Vanadium helpers

// Returns a new Vanadium context object with a timeout.
function wt(ctx, timeout) {
  return ctx.withTimeout(timeout || 5000);
}

// Returns a new Vanadium context object with the given name.
function wn(ctx, name) {
  return vtrace.withNewSpan(ctx, name);
}

var SILENT = new vanadium.context.ContextKey();

// Defines a SyncbaseDispatcher method. If the first argument to fn is not a
// context, creates a new context with a timeout.
function define(name, fn) {
  SyncbaseDispatcher.prototype[name] = function() {
    var args = Array.prototype.slice.call(arguments);
    var ctx = args[0];
    if (ctx instanceof vanadium.context.Context) {
      args.shift();
    } else {
      ctx = wt(this.ctx_);
    }
    args = [wn(ctx, name)].concat(args);
    // The callback argument is optional, and clients don't always pass one.
    if (typeof args[args.length - 1] !== 'function') {
      args.push(noop);
    }
    if (!ctx.value(SILENT)) {
      // Build args string for logging the invocation. Drop ctx and cb, convert
      // to JSON, drop square brackets.
      var cb = args[args.length - 1];
      var argsStr = JSON.stringify(args.slice(1, -1)).slice(1, -1);
      args[args.length - 1] = bm.logFn(name + '(' + argsStr + ')', cb);
    }
    return fn.apply(this, args);
  };
}

////////////////////////////////////////
// SyncbaseDispatcher impl

// Returns list objects without 'sg' fields.
// TODO(sadovsky): Use a query for better performance. It should be possible to
// query based on row key regexp.
define('getListsOnly_', function(ctx, cb) {
  this.getRows_(ctx, null, function(err, rows) {
    if (err) return cb(err);
    var lists = [];
    _.forEach(rows, function(row) {
      if (row.key.indexOf(SEP) >= 0) {
        return;  // ignore nested (and therefore non-list) records
      }
      lists.push(_.assign(unmarshal(row.value), {_id: row.key}));
    });
    return cb(null, lists);
  });
});

// Returns a list of objects describing SyncGroups.
// TODO(sadovsky): Would be nice if this could be done with a single RPC.
define('getSyncGroups_', function(ctx, cb) {
  var that = this;
  this.db_.getSyncGroupNames(ctx, function(err, sgNames) {
    if (err) return cb(err);
    async.map(sgNames, function(sgName, cb) {
      that.getSyncGroup_(ctx, that.sgNameToListId_(sgName), cb);
    }, cb);
  });
});

// Returns list objects with 'sg' fields.
define('getLists', function(ctx, cb) {
  var that = this;
  async.parallel([
    function(cb) {
      that.getListsOnly_(ctx, cb);
    },
    function(cb) {
      that.getSyncGroups_(ctx, cb);
    }
  ], function(err, results) {
    if (err) return cb(err);
    var lists = results[0], sgs = results[1];
    // Left join: add 'sg' field to each list for which there's an SG.
    _.forEach(lists, function(list) {
      var listId = list._id;
      _.forEach(sgs, function(sg) {
        console.assert(sg.spec.prefixes.length === 1);
        if (listId === sg.spec.prefixes[0].slice(3)) {  // drop 'tb:' prefix
          list.sg = sg;
        }
      });
    });
    return cb(null, lists);
  });
});

define('getTodos', function(ctx, listId, cb) {
  this.getRows_(ctx, listId, function(err, rows) {
    if (err) return cb(err);
    var todos = [];
    var todo = {};
    _.forEach(rows, function(row) {
      var parts = row.key.split(SEP);
      if (parts.length < 2 || parts[0] !== listId) {
        return;
      } else if (parts.length === 3) {  // next todo
        if (todo._id) {
          todos.push(todo);
        }
        todo = _.assign(unmarshal(row.value), {_id: row.key});
      } else if (parts.length === 5) {  // tag for current todo
        if (!todo.tags) {
          todo.tags = [];
        }
        todo.tags.push(parts[4]);  // push tag name
      } else {
        throw new Error('bad key: ' + row.key);
      }
    });
    return cb(null, todos);
  });
});

define('addList', function(ctx, list, cb) {
  console.assert(!list._id);
  var listId = newListKey();
  var v = marshal(list);
  this.tb_.put(ctx, listId, v, this.maybeEmit_(function(err) {
    if (err) return cb(err);
    return cb(null, listId);
  }, listId));
});

define('editListName', function(ctx, listId, name, cb) {
  this.update_(ctx, listId, function(list) {
    return _.assign(list, {name: name});
  }, cb);
});

define('addTodo', function(ctx, listId, todo, cb) {
  console.assert(!todo._id);
  var that = this;
  var tags = todo.tags;
  delete todo.tags;
  var todoId = newTodoKey(listId);
  var v = marshal(todo);
  // Write todo and tags in a batch.
  var opts = new nosql.BatchOptions();
  nosql.runInBatch(ctx, this.db_, opts, function(db, cb) {
    // NOTE: Dealing with tables is awkward given that batches and syncgroups
    // are database-level. Maybe we should just get rid of tables. Doing so
    // would solve other problems as well, e.g. the API inconsistency for
    // creating databases vs. tables.
    var tb = db.table('tb');
    tb.put(wn(ctx, 'put:' + todoId), todoId, v, function(err) {
      if (err) return cb(err);
      async.each(tags, function(tag, cb) {
        that.addTagImpl_(ctx, tb, todoId, tag, cb);
      }, cb);
    });
  }, this.maybeEmit_(cb, todoId));
});

define('removeTodo', function(ctx, todoId, cb) {
  this.tb_.row(todoId).delete(ctx, this.maybeEmit_(cb, todoId));
});

define('editTodoText', function(ctx, todoId, text, cb) {
  this.update_(ctx, todoId, function(todo) {
    return _.assign(todo, {text: text});
  }, cb);
});

define('markTodoDone', function(ctx, todoId, done, cb) {
  this.update_(ctx, todoId, function(todo) {
    return _.assign(todo, {done: done});
  }, cb);
});

define('addTag', function(ctx, todoId, tag, cb) {
  this.addTagImpl_(ctx, this.tb_, todoId, tag, this.maybeEmit_(cb, todoId));
});

define('removeTag', function(ctx, todoId, tag, cb) {
  // NOTE: Table.delete is awkward (it takes a range), so instead we use
  // Row.delete. It would be nice for Table.delete to operate on a single row
  // and have a separate Table.deleteRowRange.
  this.tb_.row(tagKey(todoId, tag)).delete(ctx, this.maybeEmit_(cb, todoId));
});

////////////////////////////////////////
// SyncGroup methods

// TODO(sadovsky): It's not clear from the Syncbase API how SG names should be
// constructed, and it's also weird that db.SyncGroup(name) expects an absolute
// name. We should probably allow clients to specify DB-relative SG names.

// Currently, SG names must be of the form <syncbaseName>/$sync/<suffix>.
// We use <app>/<db>/<table>/<listId> for the suffix part.

SyncbaseDispatcher.prototype.sgNameToListId_ = function(sgName) {
  return sgName.replace(new RegExp('.*/\\$sync/todos/db/tb/'), '');
};

SyncbaseDispatcher.prototype.listIdToSgName_ = function(listId) {
  var prefix = this.tb_.fullName.replace('/todos/db/tb',
                                         '/$sync/todos/db/tb');
  return prefix + '/' + listId;
};

// Returns an object describing the SyncGroup for the given list id.
// Currently, this object will have just one field, 'spec'.
define('getSyncGroup_', function(ctx, listId, cb) {
  var sgName = this.listIdToSgName_(listId), sg = this.db_.syncGroup(sgName);
  async.parallel([
    function(cb) {
      sg.getSpec(ctx, function(err, spec, version) {
        if (err) return cb(err);
        cb(null, spec);
      });
    },
    function(cb) {
      // TODO(sadovsky): For now, in the UI we just want to show who's on the
      // ACL for a given list, so we don't bother with getMembers. On top of
      // that, currently getMembers returns a map of random Syncbase instance
      // ids to SyncGroupMemberInfo structs, neither of which tell us anything
      // useful.
      if (true) {
        process.nextTick(cb);
      } else {
        sg.getMembers(ctx, cb);
      }
    }
  ], function(err, results) {
    if (err) return cb(err);
    cb(null, {
      spec: results[0],
    });
  });
});

// TODO(sadovsky): Copied from test-syncgroup.js. I have no idea whether this
// value is appropriate.
var MEMBER_INFO = new nosql.SyncGroupMemberInfo({
  syncPriority: 8
});

define('createSyncGroup', function(ctx, listId, blessings, cb) {
  var sgName = this.listIdToSgName_(listId), sg = this.db_.syncGroup(sgName);
  var spec = new nosql.SyncGroupSpec({
    // TODO(sadovsky): Maybe make perms more restrictive.
    perms: new Map([
      ['Admin',   {'in': blessings}],
      ['Read',    {'in': blessings}],
      ['Write',   {'in': blessings}],
      ['Resolve', {'in': blessings}],
      ['Debug',   {'in': blessings}]
    ]),
    // TODO(sadovsky): Update this once we switch to {table, prefix} tuples.
    prefixes: ['tb:' + listId]
  });
  sg.create(ctx, spec, MEMBER_INFO, this.maybeEmit_(cb));
});

define('joinSyncGroup', function(ctx, listId, cb) {
  var sgName = this.listIdToSgName_(listId), sg = this.db_.syncGroup(sgName);
  sg.join(ctx, MEMBER_INFO, this.maybeEmit_(cb));
});

////////////////////////////////////////
// vtrace methods

// DO NOT USE THIS. vtrace RPCs are extremely slow in JavaScript because VOM
// decoding is slow for trace records, which are deeply nested. E.g. 100 puts
// can take 20+ seconds with vtrace vs. 2 seconds without.
// EDIT: It might not be quite that bad - the "20+ seconds" cited above might
// also include the latency added by having the Chrome dev console open.
SyncbaseDispatcher.prototype.resetTraceRecords = function() {
  this.ctx_ = vtrace.withNewStore(this.rt_.getContext());
  vtrace.getStore(this.ctx_).setCollectRegexp('.*');
};

SyncbaseDispatcher.prototype.getTraceRecords = function() {
  return vtrace.getStore(this.ctx_).traceRecords();
};

SyncbaseDispatcher.prototype.logTraceRecords = function() {
  console.log(vtrace.formatTraces(this.getTraceRecords()));
};

////////////////////////////////////////
// Polling-based watch

// Random number, used to implement watch. Each client writes to their own watch
// key to signify that they've written new data, and each client periodically
// polls all watch keys to see if anything has changed.
var clientId = uuid();
function watchPrefix(listId) {
  return join(listId, 'watch');
}
function watchKey(listId) {
  return join(watchPrefix(listId), clientId);
}

var seq = 0;  // for our own writes
var prevVersions = null;  // map of list id to version string

// Increments stored seq for this client.
define('bumpSeq_', function(ctx, listId, cb) {
  seq++;
  this.tb_.put(ctx, watchKey(listId), seq, cb);
});

// Returns a watch seq map for the given list id.
define('getWatchSeqMap_', function(ctx, listId, cb) {
  this.getRows_(ctx, watchPrefix(listId), function(err, rows) {
    if (err) return cb(err);
    var seqMap = {};  // map of client id to seq
    _.forEach(rows, function(row) {
      var parts = row.key.split(SEP);
      console.assert(parts.length === 3);  // <listId>/watch/<clientId>
      seqMap[parts[2]] = row.value;
    });
    cb(null, seqMap);
  });
});

// Returns true if any data has arrived via sync, false if not.
define('checkForChanges_', function(ctx, cb) {
  var that = this;
  this.getListsOnly_(ctx, function(err, lists) {
    if (err) return cb(err);
    // Build a map of list id to current version.
    var currVersions = {};
    var listIds = _.pluck(lists, '_id');
    async.each(listIds, function(listId, cb) {
      that.getWatchSeqMap_(ctx, listId, function(err, seqMap) {
        if (err) return cb(err);
        // Remove self from seqMap, join the rest into a version string.
        delete seqMap[clientId];
        var strs = _.map(seqMap, function(v, k) {
          return k + ':' + v;
        });
        currVersions[listId] = strs.sort().join(',');
        cb();
      });
    }, function(err) {
      if (err) return cb(err);
      // Note that _.isEqual performs a deep comparison.
      var changed = prevVersions && !_.isEqual(currVersions, prevVersions);
      prevVersions = currVersions;
      cb(null, changed);
    });
  });
});

// Runs checkForChanges_ periodically and emits a 'change' event whenever a
// change (from remote peer) is detected.
SyncbaseDispatcher.prototype.watchLoop_ = function() {
  var that = this;
  var ctx = wt(this.ctx_).withValue(SILENT, 1);
  this.checkForChanges_(ctx, function(err, changed) {
    if (err) {
      console.log('checkForChanges_ failed: ' + err);
    } else if (changed) {
      console.log('checkForChanges_ detected a change');
      that.emit('change');
    }
    window.setTimeout(that.watchLoop_.bind(that), 500);
  });
};

////////////////////////////////////////
// Internal helpers

// TODO(sadovsky): Drop this method once we have client watch.
SyncbaseDispatcher.prototype.maybeEmit_ = function(cb, key) {
  var that = this;
  cb = cb || noop;
  return function(err) {
    cb.apply(null, arguments);
    if (err) return;
    that.emit('change');
    if (key) {
      // Run bumpSeq_ in the background.
      that.bumpSeq_(keyToListId(key), function(err) {
        if (err) console.log('bumpSeq_ failed: ' + err);
      });
    }
  };
};

// Writes the given tag into the given table.
define('addTagImpl_', function(ctx, tb, todoId, tag, cb) {
  // TODO(sadovsky): Syncbase currently disallows whitespace in keys, so as a
  // quick hack we drop all whitespace before storing tags.
  tag = tag.replace(/\s+/g, '');
  tb.put(ctx, tagKey(todoId, tag), null, cb);
});

// Returns all rows in the table.
define('getRows_', function(ctx, prefix, cb) {
  var rows = [], streamErr = null;
  var range = nosql.rowrange.prefix(prefix || '');
  this.tb_.scan(ctx, range, function(err) {
    if (err) return cb(err);
    if (streamErr) return cb(streamErr);
    cb(null, rows);
  }).on('data', function(row) {
    rows.push(row);
  }).on('error', function(err) {
    streamErr = streamErr || err.error;
  });
});

// Performs a read-modify-write on key, applying updateFn to the value.
// Takes care of value marshalling and unmarshalling.
// TODO(sadovsky): Atomic read-modify-write requires 4 RPCs. MongoDB-style API
// would bring it down to 1.
define('update_', function(ctx, key, updateFn, cb) {
  var opts = new nosql.BatchOptions();
  nosql.runInBatch(ctx, this.db_, opts, function(db, cb) {
    var tb = db.table('tb');
    tb.get(wn(ctx, 'get:' + key), key, function(err, value) {
      if (err) return cb(err);
      var newValue = marshal(updateFn(unmarshal(value)));
      tb.put(wn(ctx, 'put:' + key), key, newValue, cb);
    });
  }, this.maybeEmit_(cb, key));
});
