// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

require('es6-shim');

var promisify = require('es6-promisify');
var syncbase = require('syncbase');
var vanadium = require('vanadium');
var verror = vanadium.verror;

var $ = require('../util/jquery');
var defineClass = require('../util/define-class');

var debug = require('../debug');

/**
 * Create app, db, and table structure in Syncbase.
 */
function setUp(context, app, db) {
  function nonfatals(err) {
    if (err instanceof verror.ExistError) {
      console.info(err.msg);
    } else {
      throw err;
    }
  }

  //TODO(rosswang) If {} will remain empty, can it be omitted?
  return promisify(app.create.bind(app))(context, {})
    .catch(nonfatals)
    .then(function() {
      return promisify(db.create.bind(db))(context, {});
    })
    .catch(nonfatals)
    .then(function() {
      var table = db.table('t');
      return promisify(table.create.bind(table))(context, {});
    })
    .catch(nonfatals);
}

function joinKey(key) {
  return key.join('.');
}

function splitKey(key) {
  return key.split('.');
}

/**
 * Translate Syncbase hierarchical keys to object structure for easier
 * processing. '.' is chosen as the separator; '/' is reserved in Syncbase.
 *
 * It might be ideal to have the separator configurable, but certain separators
 * need regex escaping.
 */
function recursiveSet(root, key, value) {
  var member = key[0];

  if (key.length > 1) {
    var child = root[member];
    if (!child) {
      child = root[member] = {};
    } else if (typeof child !== 'object') {
      child = root[member] = { _: child };
    }

    recursiveSet(child, key.slice(1), value);
  } else {
    var obj = root[member];
    if (obj) {
      obj._ = value;
    } else {
      root[member] = value;
    }
  }
}

function recursiveDelete(root, key) {
  var member = key[0];

  if (key.length > 1) {
    var child = root[member];
    if (typeof child === 'object') {
      recursiveDelete(child, key.slice(1));
      if ($.isEmptyObject(child)) {
        delete root[member];
      }
    }
  } else {
    delete root[member];
  }
}

var SG_MEMBER_INFO = new syncbase.nosql.SyncgroupMemberInfo();

// TODO(rosswang): generalize this
// If this is updated, the regex in escapeKeyElement needs updating too.
var ESC = {
  '_': '_',
  '.': 'd',
  '@': 'a',
  '/': 's',
  ':': 'c'
};

var INV = {};
$.each(ESC, function(k, v) {
  INV[v] = k;
});

var SyncbaseWrapper = defineClass({
  statics: {
    start: function(context, mountName) {
      var service = syncbase.newService(mountName);
      var app = service.app('travel');
      var db = app.noSqlDatabase('db');

      return setUp(context, app, db).then(function() {
        return new SyncbaseWrapper(context, db, mountName);
      });
    },

    escapeKeyElement: function(str) {
      return str.replace(/_|\.|@|\/|:/g, function(m) {
        return '_' + ESC[m];
      });
    },

    unescapeKeyElement: function(str) {
      return str.replace(/_(.)/g, function(m, p1) {
        return INV[p1];
      });
    }
  },

  publics: {
    /**
     * @param fn a function executing the batch operations, receiving as its
     *  `this` context and first parameter the batch operation methods
     *  (put, delete), each of which returns a promise. The callback must return
     *  the overarching promise.
     */
    batch: function(fn){
      var self = this;
      var opts = new syncbase.nosql.BatchOptions();

      return this.manageWrite(this.runInBatch(this.context, this.db, opts,
        function(db, cb) {
          var t = db.table('t');
          var putToSyncbase = promisify(t.put.bind(t));
          var deleteFromSyncbase = promisify(t.deleteRange.bind(t));

          var ops = {
            put: function(k, v) {
              return self.standardPut(putToSyncbase, k, v);
            },
            delete: function(k) {
              return self.standardDelete(deleteFromSyncbase, k);
            }
          };

          var p = fn.call(ops, ops);
          if (p) {
            p.then(function(result) {
              return cb(null, result);
            }, function(err) {
              return cb(err);
            });
          } else {
            cb();
          }
        }));
    },

    /**
     * @param k array of key elements
     * @param v serialized value
     */
    put: function(k, v) {
      return this.manageWrite(this.standardPut(this.putToSyncbase, k, v));
    },

    delete: function(k) {
      return this.manageWrite(this.standardDelete(this.deleteFromSyncbase, k));
    },

    // TODO(rosswang): transitional

    getData: function() {
      return this.data;
    },

    /**
     * Since I/O is asynchronous, sparse, and fast, let's avoid concurrency/
     * merging with the local syncbase instance by only starting a refresh if
     * no writes are in progress and the refresh finishes before any new writes
     * have started. Client watch should help make this better. In any case if
     * this becomes starved, we can be smarter by being sensitive to keys being
     * updated at any given time.
     *
     * We can also get around this problem by restructuring the data flow to
     * be unidirectional with the local Syncbase as the authority, though that
     * introduces (hopefully negligible) latency and complicates forked response
     * on user input for the same data.
     *
     * @returns a void promise for this refresh
     */
    refresh: function() {
      var self = this;

      var current = this.pull.current;
      if (!current) {
        current = this.pull.current = this.pull().then(function(data) {
            self.pull.current = null;
            self.data = data;
            self.onUpdate(data);
            return data;
          }, function(err) {
            self.pull.current = null;
            throw err;
          });
      }

      return current;
    },

    /**
     * @see refresh
     */
    pull: function(prefix) {
      var self = this;

      function repull() {
        return self.pull(prefix);
      }

      if (this.writes.size) {
        return Promise.all(this.writes)
          .then(repull, repull);

      } else {
        this.dirty = false;

        return new Promise(function(resolve, reject) {
          var newData = {};
          var abort = false;

          var isHeader = true;

          var query = 'select k, v from t';
          if (prefix) {
            query += ' where k like "' + joinKey(prefix) + '%"';
          }

          self.db.exec(self.context, query, function(err) {
            if (err) {
              reject(err);
            } else if (abort) {
              //no-op; promise has already been resolved.
            } else if (self.dirty) {
              debug.log('Syncbase: aborting refresh due to writes');
              resolve(repull()); //try/wait for idle again
            } else {
              resolve(newData);
            }
          }).on('data', function(row) {
            if (isHeader) {
              isHeader = false;
              return;
            }

            if (abort) {
              //no-op
            } else if (self.dirty) {
              abort = true;
              resolve(repull()); //try/wait for idle again
              /* It would be nice to abort this stream for real, but we can't.
               * Leave this handler attached but no-oping to drain the stream.
               */
            } else {
              recursiveSet(newData, splitKey(row[0]), row[1]);
            }
          }).on('error', reject);
        }).catch(function(err) {
          if (err instanceof verror.InternalError) {
            console.error(err);
          } else {
            throw err;
          }
        });
      }
    },

    // TODO(rosswang): end transitional

    syncgroup: function(sgAdmin, name) {
      var self = this;

      name = vanadium.naming.join(sgAdmin, '%%sync', name);
      var sg = this.db.syncgroup(name);

      //syncgroup-promisified
      var sgp;

      function chainable(cb) {
        return function(err) {
          cb(err, sgp);
        };
      }

      var create = promisify(function(spec, cb) {
        debug.log('Syncbase: create syncgroup ' + name);
        sg.create(self.context, spec, SG_MEMBER_INFO, chainable(cb));
      });

      var destroy = promisify(function(cb) {
        debug.log('Syncbase: destroy syncgroup ' + name);
        sg.destroy(self.context, cb);
      });

      var join = promisify(function(cb) {
        sg.join(self.context, SG_MEMBER_INFO, chainable(cb));
      });

      var getSpec = promisify(function(cb) {
        sg.getSpec(self.context, function(err, spec, version) {
          cb(err, {
            spec: spec,
            version: version
          });
        });
      });

      var setSpec = promisify(function(spec, version, cb) {
        sg.setSpec(self.context, spec, version, chainable(cb));
      });

      /* Be explicit about arg lists because promisify is sensitive to extra
       * args. i.e. even though destroy and join could just be fn refs, since
       * they're made by promisify, wrap them in a fn that actually takes 0
       * args. */
      sgp = {
        buildSpec: function(prefixes, mountTables, admin, initialPermissions) {
          return new syncbase.nosql.SyncgroupSpec({
            perms: new Map([
              ['Admin', {in: [admin]}],
              ['Read', {in: initialPermissions}],
              ['Write', {in: initialPermissions}],
              ['Resolve', {in: initialPermissions}],
              ['Debug', {in: [admin]}]
            ]),
            prefixes: prefixes.map(function(p) {
              return new syncbase.nosql.TableRow({
                tableName: 't',
                row: joinKey(p)
              });
            }),
            mountTables: mountTables
          });
        },

        create: function(spec) { return create(spec); },
        destroy: function() { return destroy(); },
        join: function() { return join(); },
        getSpec: function() { return getSpec(); },
        setSpec: function(spec, version) { return setSpec(spec, version); },
        changeSpec: function(fn) {
          return sgp.getSpec().then(function(versionedSpec) {
            var spec = versionedSpec.spec;
            return sgp.setSpec(fn(spec) || spec, versionedSpec.version)
              .catch(function(err) {
                if (err instanceof verror.VersionError) {
                  return sgp.changeSpec(fn);
                } else {
                  throw err;
                }
              });
          });
        },

        createOrJoin: function(spec) {
          return sgp.create(spec)
            .catch(function(err) {
              if (err instanceof verror.ExistError) {
                debug.log('Syncbase: syncgroup ' + name + ' already exists.');
                return sgp.join();
              } else {
                throw err;
              }
            });
        },

        joinOrCreate: function(spec) {
          return sgp.join()
            .catch(function(err) {
              //if (err instanceof verror.NoExistError) {
                debug.log('Syncbase: syncgroup ' + name + ' does not exist.');
                return sgp.createOrJoin(spec);
              /*} else {
                throw err;
              }*/
            });
        }
      };

      return sgp;
    },

    /**
     * @return {
     *    data,
     *    onChange*(key, ?value, continued),
     *    onUpdate*(data),
     *    onError*(err),
     *    onClose*(?err)
     *  }
     */
    getWatchedObject: function(prefix) {
      var result = {
        data: {}
      };

      var onChange = defineClass.event(result, 'onChange');
      var onUpdate = defineClass.event(result, 'onUpdate');
      var onError = defineClass.event(result, 'onError');
      var onClose = defineClass.event(result, 'onClose', 'memory');

      function put(k, v) {
        recursiveSet(result.data, k, v);
      }

      return this.getRawWatched(prefix, {
        onData: put
      }, {
        onPut: function(k, v, continued) {
          put(k, v);
          onChange(k, v, continued);
        },
        onDelete: function(k, continued) {
          recursiveDelete(result.data, k);
          onChange(k, null, continued);
        },
        onBatchEnd: function() {
          onUpdate(result.data);
        },
        onError: onError,
        onClose: onClose
      }).then(function() {
        return result;
      });
    },

    /**
     * Pulls data from Syncbase and registers watch handlers. Returns a promise
     * resolving after the initial pull.
     *
     * @param pullHandler { onData(key, value), onError(err) }
     * @param streamHandler {
     *    ?onPut(key, value, continued),
     *    ?onDelete(key, continued),
     *    ?onBatchEnd(),
     *    onError(err),
     *    onClose(?err)
     *  }; These are callbacks rather than events to guarantee that no updates
     *  are missed. `continued` indicates whether a change is followed by more
     *  changes in same batch.
     * @return a promise resolving after the initial pull completes. Watch
     *  callbacks may continue to be called until onClose.
     */
    getRawWatched: function(prefix, pullHandler, streamHandler) {
      var self = this;

      var resumeMarker;

      var opts = new syncbase.nosql.BatchOptions();
      return self.runInBatch(self.context, self.db, opts, function(db, cb) {
          Promise.all([
            self.pull2(db, prefix, pullHandler.onData, pullHandler.onError),
            promisify(db.getResumeMarker.bind(db))(self.context)
          ]).then(function(args) {
            resumeMarker = args[1];
            cb('abort');
          }, cb);
        }).catch(function(err) {
          if (err !== 'abort') {
            throw err;
          }

          var stream = self.db.watch(self.context, 't', joinKey(prefix),
            resumeMarker, streamHandler.onClose);
          stream.on('data', function(change) {
            try {
              switch(change.changeType) {
              case 'put':
                new Promise(function(resolve, reject) {
                  if (streamHandler.onPut) {
                    change.getValue(function(err, value) {
                      if (err) {
                        reject(err);
                      } else {
                        resolve(streamHandler.onPut(splitKey(change.rowName),
                          value, change.continued));
                      }
                    });
                  } else {
                    resolve();
                  }
                }).then(function() {
                  if (!change.continued && streamHandler.onBatchEnd) {
                    return streamHandler.onBatchEnd();
                  }
                }).catch(streamHandler.onError);
                break;
              case 'delete':
                new Promise(function(resolve, reject) {
                  if (streamHandler.onDelete) {
                    resolve(streamHandler.onDelete(splitKey(change.rowName),
                      change.continued));
                  } else {
                    resolve();
                  }
                }).then(function() {
                  if (!change.continued && streamHandler.onBatchEnd) {
                    return streamHandler.onBatchEnd();
                  }
                }).catch(streamHandler.onError);
                break;
              default:
                streamHandler.onError(
                  new Error('Invalid change type ' + change.changeType));
              }
            } catch(err) {
              streamHandler.onError(err);
            }
          }).on('error', streamHandler.onError);
        });
    },
  },

  privates: {
    /**
     * TODO(rosswang): transitional: pull2 => pull
     *
     * Handlers may continue to be called even after the promise has been
     * rejected.
     *
     * @param onData handler callback that takes a key, value pair.
     * @param onError handler callback that takes an error.
     * @return a promise that resolves when the pull is complete or rejects if
     *  the pull failed or either handler threw.
     */
    pull2: function(db, prefix, onData, onError) {
      var self = this;

      return new Promise(function(resolve, reject) {
        var isHeader = true;

        var query = 'select k, v from t';
        if (prefix) {
          query += ' where k like "' + joinKey(prefix) + '%"';
        }

        db.exec(self.context, query, function(err) {
          if (err) {
            reject(err);
          } else {
            resolve();
          }
        }).on('data', function(row) {
          if (isHeader) {
            isHeader = false;
            return;
          }

          try {
            onData(splitKey(row[0]), row[1]);
          } catch (err) {
            reject(err);
          }
        }).on('error', function(err) {
          if (!onError) {
            reject(err);
          } else {
            try {
              onError(err);
            } catch (err2) {
              reject(err2);
            }
          }
        });
      });
    },

    /* TODO(rosswang): Keep this around even though the dirty flag and write
     * records are not used since watch integration, because there is still a
     * potential race condition; I'm just not particularly sure how to deal with
     * it yet. If it turns out we don't have to worry about it, delete this. */
    manageWrite: function(promise) {
      var writes = this.writes;

      this.dirty = true;
      writes.add(promise);

      return promise.then(function(v) {
        writes.delete(promise);
        return v;
      }, function(err) {
        writes.delete(promise);
        throw err;
      });
    },

    standardPut: function(fn, k, v) {
      k = joinKey(k);
      return fn(this.context, k, v);
    },

    standardDelete: function(fn, k) {
      k = joinKey(k);
      debug.log('Syncbase: delete ' + k);
      return fn(this.context, syncbase.nosql.rowrange.prefix(k));
    }
  },

  constants: [ 'mountName' ],

  // TODO(rosswang): transitional
  events: {
    onError: 'memory',
    onUpdate: '',
  },
  // TODO(rosswang): end transitional

  init: function(context, db, mountName) {
    // TODO(rosswang): mountName probably won't be necessary after syncgroup
    // admin instances are hosted (see group-manager).
    var self = this;
    this.context = context;
    this.db = db;
    this.t = db.table('t');
    this.mountName = mountName;

    this.writes = new Set();

    this.runInBatch = promisify(syncbase.nosql.runInBatch);
    this.putToSyncbase = promisify(this.t.put.bind(this.t));
    this.deleteFromSyncbase = promisify(this.t.deleteRange.bind(this.t));

    // TODO(rosswang): transitional
    function watchLoop() {
      if (!self.pull.current) {
        self.refresh().catch(self.onError);
      }
      setTimeout(watchLoop, 500);
    }
    process.nextTick(watchLoop);
    // TODO(rosswang): end transitional
  }
});

module.exports = SyncbaseWrapper;
