Source: indexer/indexer.js

/*!
 * indexer.js - abstract interface for bcoin indexers
 * Copyright (c) 2018, the bcoin developers (MIT License).
 * https://github.com/bcoin-org/bcoin
 */

'use strict';

const assert = require('assert');
const path = require('path');
const fs = require('bfile');
const bio = require('bufio');
const EventEmitter = require('events');
const Logger = require('blgr');
const Network = require('../protocol/network');
const util = require('../utils/util');
const layout = require('./layout');
const CoinView = require('../coins/coinview');
const Block = require('../primitives/block');
const {ZERO_HASH} = require('../protocol/consensus');

/**
 * Indexer
 * The class which indexers inherit from and implement the
 * `indexBlock` and `unindexBlock` methods and database
 * and storage initialization for indexing blocks.
 * @alias module:indexer.Indexer
 * @extends EventEmitter
 * @abstract
 */

class Indexer extends EventEmitter {
  /**
   * Create an indexer.
   * @constructor
   * @param {String} module
   * @param {Object} options
   */

  constructor(module, options) {
    super();

    assert(typeof module === 'string');
    assert(module.length > 0);

    this.options = new IndexOptions(module, options);

    this.network = this.options.network;
    this.logger = this.options.logger.context(`${module}indexer`);
    this.blocks = this.options.blocks;
    this.chain = this.options.chain;

    this.closing = false;
    this.db = null;
    this.batch = null;
    this.bound = [];
    this.syncing = false;
    this.height = 0;
  }

  /**
   * Start a new batch write.
   * @returns {Batch}
   */

  start() {
    assert(this.batch === null, 'Already started.');
    this.batch = this.db.batch();
    return this.batch;
  }

  /**
   * Put key and value to the current batch.
   * @param {String} key
   * @param {Buffer} value
   */

  put(key, value) {
    this.batch.put(key, value);
  }

  /**
   * Delete key from the current batch.
   * @param {String} key
   */

  del(key) {
    this.batch.del(key);
  }

  /**
   * Commit the current batch.
   * @returns {Promise}
   */

  async commit() {
    await this.batch.write();
    this.batch = null;
  }

  /**
   * Open the indexer, open the database,
   * initialize height, and bind to events.
   * @returns {Promise}
   */

  async open() {
    this.logger.info('Indexer is loading.');

    this.closing = false;
    await this.ensure();
    await this.db.open();
    await this.db.verify(layout.V.encode(), 'index', 0);
    await this.verifyNetwork();

    // Initialize the indexed height.
    const data = await this.db.get(layout.R.encode());
    if (data)
      this.height = bio.readU32(data, 0);
    else
      await this.saveGenesis();

    // Bind to chain events.
    this.bind();
  }

  /**
   * Close the indexer, wait for the database to close,
   * unbind all events.
   * @returns {Promise}
   */

  async close() {
    this.closing = true;
    await this.db.close();
    for (const [event, listener] of this.bound)
      this.chain.removeListener(event, listener);

    this.bound.length = 0;
    this.closing = false;
  }

  /**
   * Ensure prefix directory (prefix/index).
   * @returns {Promise}
   */

  async ensure() {
    if (fs.unsupported)
      return;

    if (this.options.memory)
      return;

    await fs.mkdirp(this.options.prefix);
  }

  /**
   * Verify network of index.
   * @returns {Promise}
   */

  async verifyNetwork() {
    let raw = await this.db.get(layout.O.encode());

    if (!raw) {
      raw = bio.write(4).writeU32(this.network.magic).render();
      await this.db.put(layout.O.encode(), raw);
      return;
    }

    const magic = bio.readU32(raw, 0);

    if (magic !== this.network.magic)
      throw new Error('Indexer: Network mismatch.');
  }

  /**
   * A special case for indexing the genesis block. The genesis
   * block coins are not spendable, however indexers can still index
   * the block for historical and informational purposes.
   * @private
   * @returns {Promise}
   */

  async saveGenesis() {
    this.start();

    const block = Block.fromRaw(Buffer.from(this.network.genesisBlock, 'hex'));
    const meta = new BlockMeta(block.hash(), 0);

    await this.indexBlock(meta, block, new CoinView());
    await this._setTip(meta);
    await this.commit();

    this.height = 0;
  }

  /**
   * Bind to chain events and save listeners for removal on close
   * @private
   */

  bind() {
    const listener = async (entry, block, view) => {
      const meta = new BlockMeta(entry.hash, entry.height);

      try {
        await this.sync(meta, block, view);
      } catch (e) {
        this.emit('error', e);
      }
    };

    for (const event of ['connect', 'disconnect', 'reset']) {
      this.bound.push([event, listener]);
      this.chain.on(event, listener);
    }
  }

  /**
   * Get a chain entry for the main chain only.
   * @private
   * @returns {Promise}
   */

  async getEntry(hash) {
    const entry = await this.chain.getEntry(hash);

    if (!entry)
      return null;

    if (!await this.chain.isMainChain(entry))
      return null;

    return entry;
  }

  /**
   * Get a index block meta.
   * @param {Hash} hash
   * @returns {Promise}
   */

  async getBlockMeta(height) {
    const data = await this.db.get(layout.h.encode(height));

    if (!data)
      return null;

    return new BlockMeta(data, height);
  }

  /**
   * Sync with the chain.
   * @param {BlockMeta} meta
   * @param {Block} block
   * @param {CoinView} view
   * @returns {Promise}
   */

  async sync(meta, block, view) {
    if (this.syncing)
      return;

    this.syncing = true;

    const connected = await this._syncBlock(meta, block, view);

    if (connected) {
      this.syncing = false;
    } else {
      (async () => {
        try {
          await this._syncChain();
        } catch (e) {
          this.emit('error', e);
        } finally {
          this.syncing = false;
        }
      })();
    }
  }

  /**
   * Sync with the chain with a block.
   * @private
   * @param {BlockMeta} meta
   * @param {Block} block
   * @param {CoinView} view
   * @returns {Promise}
   */

  async _syncBlock(meta, block, view) {
    // In the case that the next block is being
    // connected or the current block disconnected
    // use the block and view being passed directly,
    // instead of reading that information again.
    if (meta && block && view) {
      if (meta.height === this.height + 1) {
        // Make sure that the block is connected to
        // the indexer chain.
        const prev = await this.getBlockMeta(this.height);
        if (prev.hash.compare(block.prevBlock) !== 0)
          return false;

        await this._addBlock(meta, block, view);
        return true;
      } else if (meta.height === this.height) {
        // Make sure that this is the current block.
        const current = await this.getBlockMeta(this.height);
        if (current.hash.compare(block.hash()) !== 0)
          return false;

        await this._removeBlock(meta, block, view);
        return true;
      }
    }
    return false;
  }

  /**
   * Sync with the chain.
   * @private
   * @returns {Promise}
   */

  async _syncChain() {
    let height = this.height;

    // In the case that the indexer has never
    // started, sync to the best height.
    if (!height) {
      await this._rollforward();
      return;
    }

    // Check for a re-org that might
    // leave chain in a different state.
    // Scan chain backwards until we
    // find a common height.
    while (height > 0) {
      const meta = await this.getBlockMeta(height);
      assert(meta);

      if (await this.getEntry(meta.hash))
        break;

      height -= 1;
    }

    if (height < this.height) {
      await this._rollback(height);
      await this._rollforward();
    } else {
      await this._rollforward();
    }
  }

  /**
   * Scan blockchain to the best chain height.
   * @private
   * @returns {Promise}
   */

  async _rollforward() {
    this.logger.info('Indexing to best height from height (%d).', this.height);

    for (let height = this.height + 1; ; height++) {
      const entry = await this.getEntry(height);
      if (!entry)
        break;

      const meta = new BlockMeta(entry.hash, height);

      const block = await this.chain.getBlock(entry.hash);
      assert(block);

      const view = await this.chain.getBlockView(block);
      assert(view);

      if (this.closing)
        return;

      await this._addBlock(meta, block, view);
    }
  }

  /**
   * Rollback to a given chain height.
   * @param {Number} height
   * @returns {Promise}
   */

  async _rollback(height) {
    if (height > this.height) {
      this.logger.warning(
        'Ignoring rollback to future height (%d).',
        height);
      return;
    }

    this.logger.info('Rolling back to height %d.', height);

    while (this.height > height && this.height > 1) {
      const meta = await this.getBlockMeta(this.height);
      assert(meta);

      const block = await this.chain.getBlock(meta.hash);
      assert(block);

      const view = await this.chain.getBlockView(block);
      assert(view);

      await this._removeBlock(meta, block, view);
    }
  }

  /**
   * Add a block's transactions without a lock.
   * @private
   * @param {BlockMeta} meta
   * @param {Block} block
   * @param {CoinView} view
   * @returns {Promise}
   */

  async _addBlock(meta, block, view) {
    const start = util.bench();

    if (meta.height !== this.height + 1)
      throw new Error('Indexer: Can not add block.');

    // Start the batch write.
    this.start();

    // Call the implemented indexer to add to
    // the batch write.
    await this.indexBlock(meta, block, view);

    // Sync the height to the new tip.
    const height = await this._setTip(meta);

    // Commit the write batch to disk.
    await this.commit();

    // Update height _after_ successful commit.
    this.height = height;

    // Log the current indexer status.
    this.logStatus(start, block, meta);
  }

  /**
   * Process block indexing
   * Indexers will implement this method to process the block for indexing
   * @param {BlockMeta} meta
   * @param {Block} block
   * @param {CoinView} view
   * @returns {Promise}
   */

  async indexBlock(meta, block, view) {
    ;
  }

  /**
   * Undo block indexing
   * Indexers will implement this method to undo indexing for the block
   * @param {BlockMeta} meta
   * @param {Block} block
   * @param {CoinView} view
   * @returns {Promise}
   */

  async unindexBlock(meta, block, view) {
    ;
  }

  /**
   * Prune block indexing
   * Indexers will implement this method to prune indexing for the block
   * @param {BlockMeta} meta
   * @param {Block} block
   * @param {CoinView} view
   * @returns {Promise}
   */

  async pruneBlock(meta, block, view) {
    ;
  }

  /**
   * Unconfirm a block's transactions.
   * @private
   * @param {BlockMeta} meta
   * @param {Block} block
   * @param {CoinView} view
   * @returns {Promise}
   */

  async _removeBlock(meta, block, view) {
    const start = util.bench();

    if (meta.height !== this.height)
      throw new Error('Indexer: Can not remove block.');

    // Start the batch write.
    this.start();

    // Call the implemented indexer to add to
    // the batch write.
    await this.unindexBlock(meta, block, view);

    const prev = await this.getBlockMeta(meta.height - 1);
    assert(prev);

    // Sync the height to the previous tip.
    const height = await this._setTip(prev);

    // Commit the write batch to disk.
    await this.commit();

    // Prune block data _after_ successful commit.
    await this.pruneBlock(meta);

    // Update height _after_ successful commit.
    this.height = height;

    // Log the current indexer status.
    this.logStatus(start, block, meta, true);
  }

  /**
   * Update the current height to tip.
   * @param {BlockMeta} tip
   * @returns {Promise}
   */

  async _setTip(meta) {
    if (meta.height < this.height) {
      assert(meta.height === this.height - 1);
      this.del(layout.h.encode(this.height));
    } else if (meta.height > this.height) {
      assert(meta.height === this.height + 1);
    }

    // Add to batch write to save tip and height.
    this.put(layout.h.encode(meta.height), meta.hash);

    const raw = bio.write(4).writeU32(meta.height).render();
    this.put(layout.R.encode(), raw);

    return meta.height;
  }

  /**
   * Test whether the indexer has reached its slow height.
   * @private
   * @returns {Boolean}
   */

  isSlow() {
    if (this.height === 1 || this.height % 20 === 0)
      return true;

    if (this.height >= this.network.block.slowHeight)
      return true;

    return false;
  }

  /**
   * Log the current indexer status.
   * @private
   * @param {Array} start
   * @param {Block} block
   * @param {BlockMeta} meta
   * @param {Boolean} reverse
   */

  logStatus(start, block, meta, reverse) {
    if (!this.isSlow())
      return;

    const elapsed = util.bench(start);

    const msg = reverse ? 'removed from' : 'added to';

    this.logger.info(
      'Block (%d) %s indexer (txs=%d time=%d).',
      meta.height,
      msg,
      block.txs.length,
      elapsed);
  }
}

/**
 * Block Meta
 */

class BlockMeta {
  constructor(hash, height) {
    this.hash = hash || ZERO_HASH;
    this.height =  height || 0;

    assert(Buffer.isBuffer(this.hash) && this.hash.length === 32);
    assert(Number.isInteger(this.height));
  }
}

/**
 * Index Options
 */

class IndexOptions {
  /**
   * Create index options.
   * @constructor
   * @param {String} module
   * @param {Object} options
   */

  constructor(module, options) {
    this.module = module;
    this.network = Network.primary;
    this.logger = Logger.global;
    this.blocks = null;
    this.chain = null;

    this.prefix = null;
    this.location = null;
    this.memory = true;
    this.maxFiles = 64;
    this.cacheSize = 16 << 20;
    this.compression = true;

    if (options)
      this.fromOptions(options);
  }

  /**
   * Inject properties from object.
   * @private
   * @param {Object} options
   * @returns {IndexOptions}
   */

  fromOptions(options) {
    assert(options.blocks && typeof options.blocks === 'object',
           'Indexer requires a blockstore.');
    assert(options.chain && typeof options.chain === 'object',
           'Indexer requires chain.');
    assert(!options.prune, 'Can not index while pruned.');

    this.blocks = options.blocks;
    this.chain = options.chain;

    if (options.network != null)
      this.network = Network.get(options.network);

    if (options.logger != null) {
      assert(typeof options.logger === 'object');
      this.logger = options.logger;
    }

    if (options.prefix != null) {
      assert(typeof options.prefix === 'string');
      this.prefix = options.prefix;
      this.prefix = path.join(this.prefix, 'index');
      this.location = path.join(this.prefix, this.module);
    }

    if (options.location != null) {
      assert(typeof options.location === 'string');
      this.location = options.location;
    }

    if (options.memory != null) {
      assert(typeof options.memory === 'boolean');
      this.memory = options.memory;
    }

    if (options.maxFiles != null) {
      assert((options.maxFiles >>> 0) === options.maxFiles);
      this.maxFiles = options.maxFiles;
    }

    if (options.cacheSize != null) {
      assert(Number.isSafeInteger(options.cacheSize) && options.cacheSize >= 0);
      this.cacheSize = options.cacheSize;
    }

    if (options.compression != null) {
      assert(typeof options.compression === 'boolean');
      this.compression = options.compression;
    }

    return this;
  }

  /**
   * Instantiate indexer options from object.
   * @param {Object} options
   * @returns {IndexOptions}
   */

  static fromOptions(options) {
    return new this().fromOptions(options);
  }
}

/*
 * Expose
 */

module.exports = Indexer;