Source: indexer/indexer.js

  1. /*!
  2. * indexer.js - abstract interface for bcoin indexers
  3. * Copyright (c) 2018, the bcoin developers (MIT License).
  4. * https://github.com/bcoin-org/bcoin
  5. */
  6. 'use strict';
  7. const assert = require('assert');
  8. const path = require('path');
  9. const fs = require('bfile');
  10. const bio = require('bufio');
  11. const EventEmitter = require('events');
  12. const Logger = require('blgr');
  13. const Network = require('../protocol/network');
  14. const util = require('../utils/util');
  15. const layout = require('./layout');
  16. const CoinView = require('../coins/coinview');
  17. const Block = require('../primitives/block');
  18. const {ZERO_HASH} = require('../protocol/consensus');
  19. /**
  20. * Indexer
  21. * The class which indexers inherit from and implement the
  22. * `indexBlock` and `unindexBlock` methods and database
  23. * and storage initialization for indexing blocks.
  24. * @alias module:indexer.Indexer
  25. * @extends EventEmitter
  26. * @abstract
  27. */
  28. class Indexer extends EventEmitter {
  29. /**
  30. * Create an indexer.
  31. * @constructor
  32. * @param {String} module
  33. * @param {Object} options
  34. */
  35. constructor(module, options) {
  36. super();
  37. assert(typeof module === 'string');
  38. assert(module.length > 0);
  39. this.options = new IndexOptions(module, options);
  40. this.network = this.options.network;
  41. this.logger = this.options.logger.context(`${module}indexer`);
  42. this.blocks = this.options.blocks;
  43. this.chain = this.options.chain;
  44. this.closing = false;
  45. this.db = null;
  46. this.batch = null;
  47. this.bound = [];
  48. this.syncing = false;
  49. this.height = 0;
  50. }
  51. /**
  52. * Start a new batch write.
  53. * @returns {Batch}
  54. */
  55. start() {
  56. assert(this.batch === null, 'Already started.');
  57. this.batch = this.db.batch();
  58. return this.batch;
  59. }
  60. /**
  61. * Put key and value to the current batch.
  62. * @param {String} key
  63. * @param {Buffer} value
  64. */
  65. put(key, value) {
  66. this.batch.put(key, value);
  67. }
  68. /**
  69. * Delete key from the current batch.
  70. * @param {String} key
  71. */
  72. del(key) {
  73. this.batch.del(key);
  74. }
  75. /**
  76. * Commit the current batch.
  77. * @returns {Promise}
  78. */
  79. async commit() {
  80. await this.batch.write();
  81. this.batch = null;
  82. }
  83. /**
  84. * Open the indexer, open the database,
  85. * initialize height, and bind to events.
  86. * @returns {Promise}
  87. */
  88. async open() {
  89. this.logger.info('Indexer is loading.');
  90. this.closing = false;
  91. await this.ensure();
  92. await this.db.open();
  93. await this.db.verify(layout.V.encode(), 'index', 0);
  94. await this.verifyNetwork();
  95. // Initialize the indexed height.
  96. const data = await this.db.get(layout.R.encode());
  97. if (data)
  98. this.height = bio.readU32(data, 0);
  99. else
  100. await this.saveGenesis();
  101. // Bind to chain events.
  102. this.bind();
  103. }
  104. /**
  105. * Close the indexer, wait for the database to close,
  106. * unbind all events.
  107. * @returns {Promise}
  108. */
  109. async close() {
  110. this.closing = true;
  111. await this.db.close();
  112. for (const [event, listener] of this.bound)
  113. this.chain.removeListener(event, listener);
  114. this.bound.length = 0;
  115. this.closing = false;
  116. }
  117. /**
  118. * Ensure prefix directory (prefix/index).
  119. * @returns {Promise}
  120. */
  121. async ensure() {
  122. if (fs.unsupported)
  123. return;
  124. if (this.options.memory)
  125. return;
  126. await fs.mkdirp(this.options.prefix);
  127. }
  128. /**
  129. * Verify network of index.
  130. * @returns {Promise}
  131. */
  132. async verifyNetwork() {
  133. let raw = await this.db.get(layout.O.encode());
  134. if (!raw) {
  135. raw = bio.write(4).writeU32(this.network.magic).render();
  136. await this.db.put(layout.O.encode(), raw);
  137. return;
  138. }
  139. const magic = bio.readU32(raw, 0);
  140. if (magic !== this.network.magic)
  141. throw new Error('Indexer: Network mismatch.');
  142. }
  143. /**
  144. * A special case for indexing the genesis block. The genesis
  145. * block coins are not spendable, however indexers can still index
  146. * the block for historical and informational purposes.
  147. * @private
  148. * @returns {Promise}
  149. */
  150. async saveGenesis() {
  151. this.start();
  152. const block = Block.fromRaw(Buffer.from(this.network.genesisBlock, 'hex'));
  153. const meta = new BlockMeta(block.hash(), 0);
  154. await this.indexBlock(meta, block, new CoinView());
  155. await this._setTip(meta);
  156. await this.commit();
  157. this.height = 0;
  158. }
  159. /**
  160. * Bind to chain events and save listeners for removal on close
  161. * @private
  162. */
  163. bind() {
  164. const listener = async (entry, block, view) => {
  165. const meta = new BlockMeta(entry.hash, entry.height);
  166. try {
  167. await this.sync(meta, block, view);
  168. } catch (e) {
  169. this.emit('error', e);
  170. }
  171. };
  172. for (const event of ['connect', 'disconnect', 'reset']) {
  173. this.bound.push([event, listener]);
  174. this.chain.on(event, listener);
  175. }
  176. }
  177. /**
  178. * Get a chain entry for the main chain only.
  179. * @private
  180. * @returns {Promise}
  181. */
  182. async getEntry(hash) {
  183. const entry = await this.chain.getEntry(hash);
  184. if (!entry)
  185. return null;
  186. if (!await this.chain.isMainChain(entry))
  187. return null;
  188. return entry;
  189. }
  190. /**
  191. * Get a index block meta.
  192. * @param {Hash} hash
  193. * @returns {Promise}
  194. */
  195. async getBlockMeta(height) {
  196. const data = await this.db.get(layout.h.encode(height));
  197. if (!data)
  198. return null;
  199. return new BlockMeta(data, height);
  200. }
  201. /**
  202. * Sync with the chain.
  203. * @param {BlockMeta} meta
  204. * @param {Block} block
  205. * @param {CoinView} view
  206. * @returns {Promise}
  207. */
  208. async sync(meta, block, view) {
  209. if (this.syncing)
  210. return;
  211. this.syncing = true;
  212. const connected = await this._syncBlock(meta, block, view);
  213. if (connected) {
  214. this.syncing = false;
  215. } else {
  216. (async () => {
  217. try {
  218. await this._syncChain();
  219. } catch (e) {
  220. this.emit('error', e);
  221. } finally {
  222. this.syncing = false;
  223. }
  224. })();
  225. }
  226. }
  227. /**
  228. * Sync with the chain with a block.
  229. * @private
  230. * @param {BlockMeta} meta
  231. * @param {Block} block
  232. * @param {CoinView} view
  233. * @returns {Promise}
  234. */
  235. async _syncBlock(meta, block, view) {
  236. // In the case that the next block is being
  237. // connected or the current block disconnected
  238. // use the block and view being passed directly,
  239. // instead of reading that information again.
  240. if (meta && block && view) {
  241. if (meta.height === this.height + 1) {
  242. // Make sure that the block is connected to
  243. // the indexer chain.
  244. const prev = await this.getBlockMeta(this.height);
  245. if (prev.hash.compare(block.prevBlock) !== 0)
  246. return false;
  247. await this._addBlock(meta, block, view);
  248. return true;
  249. } else if (meta.height === this.height) {
  250. // Make sure that this is the current block.
  251. const current = await this.getBlockMeta(this.height);
  252. if (current.hash.compare(block.hash()) !== 0)
  253. return false;
  254. await this._removeBlock(meta, block, view);
  255. return true;
  256. }
  257. }
  258. return false;
  259. }
  260. /**
  261. * Sync with the chain.
  262. * @private
  263. * @returns {Promise}
  264. */
  265. async _syncChain() {
  266. let height = this.height;
  267. // In the case that the indexer has never
  268. // started, sync to the best height.
  269. if (!height) {
  270. await this._rollforward();
  271. return;
  272. }
  273. // Check for a re-org that might
  274. // leave chain in a different state.
  275. // Scan chain backwards until we
  276. // find a common height.
  277. while (height > 0) {
  278. const meta = await this.getBlockMeta(height);
  279. assert(meta);
  280. if (await this.getEntry(meta.hash))
  281. break;
  282. height -= 1;
  283. }
  284. if (height < this.height) {
  285. await this._rollback(height);
  286. await this._rollforward();
  287. } else {
  288. await this._rollforward();
  289. }
  290. }
  291. /**
  292. * Scan blockchain to the best chain height.
  293. * @private
  294. * @returns {Promise}
  295. */
  296. async _rollforward() {
  297. this.logger.info('Indexing to best height from height (%d).', this.height);
  298. for (let height = this.height + 1; ; height++) {
  299. const entry = await this.getEntry(height);
  300. if (!entry)
  301. break;
  302. const meta = new BlockMeta(entry.hash, height);
  303. const block = await this.chain.getBlock(entry.hash);
  304. assert(block);
  305. const view = await this.chain.getBlockView(block);
  306. assert(view);
  307. if (this.closing)
  308. return;
  309. await this._addBlock(meta, block, view);
  310. }
  311. }
  312. /**
  313. * Rollback to a given chain height.
  314. * @param {Number} height
  315. * @returns {Promise}
  316. */
  317. async _rollback(height) {
  318. if (height > this.height) {
  319. this.logger.warning(
  320. 'Ignoring rollback to future height (%d).',
  321. height);
  322. return;
  323. }
  324. this.logger.info('Rolling back to height %d.', height);
  325. while (this.height > height && this.height > 1) {
  326. const meta = await this.getBlockMeta(this.height);
  327. assert(meta);
  328. const block = await this.chain.getBlock(meta.hash);
  329. assert(block);
  330. const view = await this.chain.getBlockView(block);
  331. assert(view);
  332. await this._removeBlock(meta, block, view);
  333. }
  334. }
  335. /**
  336. * Add a block's transactions without a lock.
  337. * @private
  338. * @param {BlockMeta} meta
  339. * @param {Block} block
  340. * @param {CoinView} view
  341. * @returns {Promise}
  342. */
  343. async _addBlock(meta, block, view) {
  344. const start = util.bench();
  345. if (meta.height !== this.height + 1)
  346. throw new Error('Indexer: Can not add block.');
  347. // Start the batch write.
  348. this.start();
  349. // Call the implemented indexer to add to
  350. // the batch write.
  351. await this.indexBlock(meta, block, view);
  352. // Sync the height to the new tip.
  353. const height = await this._setTip(meta);
  354. // Commit the write batch to disk.
  355. await this.commit();
  356. // Update height _after_ successful commit.
  357. this.height = height;
  358. // Log the current indexer status.
  359. this.logStatus(start, block, meta);
  360. }
  361. /**
  362. * Process block indexing
  363. * Indexers will implement this method to process the block for indexing
  364. * @param {BlockMeta} meta
  365. * @param {Block} block
  366. * @param {CoinView} view
  367. * @returns {Promise}
  368. */
  369. async indexBlock(meta, block, view) {
  370. ;
  371. }
  372. /**
  373. * Undo block indexing
  374. * Indexers will implement this method to undo indexing for the block
  375. * @param {BlockMeta} meta
  376. * @param {Block} block
  377. * @param {CoinView} view
  378. * @returns {Promise}
  379. */
  380. async unindexBlock(meta, block, view) {
  381. ;
  382. }
  383. /**
  384. * Prune block indexing
  385. * Indexers will implement this method to prune indexing for the block
  386. * @param {BlockMeta} meta
  387. * @param {Block} block
  388. * @param {CoinView} view
  389. * @returns {Promise}
  390. */
  391. async pruneBlock(meta, block, view) {
  392. ;
  393. }
  394. /**
  395. * Unconfirm a block's transactions.
  396. * @private
  397. * @param {BlockMeta} meta
  398. * @param {Block} block
  399. * @param {CoinView} view
  400. * @returns {Promise}
  401. */
  402. async _removeBlock(meta, block, view) {
  403. const start = util.bench();
  404. if (meta.height !== this.height)
  405. throw new Error('Indexer: Can not remove block.');
  406. // Start the batch write.
  407. this.start();
  408. // Call the implemented indexer to add to
  409. // the batch write.
  410. await this.unindexBlock(meta, block, view);
  411. const prev = await this.getBlockMeta(meta.height - 1);
  412. assert(prev);
  413. // Sync the height to the previous tip.
  414. const height = await this._setTip(prev);
  415. // Commit the write batch to disk.
  416. await this.commit();
  417. // Prune block data _after_ successful commit.
  418. await this.pruneBlock(meta);
  419. // Update height _after_ successful commit.
  420. this.height = height;
  421. // Log the current indexer status.
  422. this.logStatus(start, block, meta, true);
  423. }
  424. /**
  425. * Update the current height to tip.
  426. * @param {BlockMeta} tip
  427. * @returns {Promise}
  428. */
  429. async _setTip(meta) {
  430. if (meta.height < this.height) {
  431. assert(meta.height === this.height - 1);
  432. this.del(layout.h.encode(this.height));
  433. } else if (meta.height > this.height) {
  434. assert(meta.height === this.height + 1);
  435. }
  436. // Add to batch write to save tip and height.
  437. this.put(layout.h.encode(meta.height), meta.hash);
  438. const raw = bio.write(4).writeU32(meta.height).render();
  439. this.put(layout.R.encode(), raw);
  440. return meta.height;
  441. }
  442. /**
  443. * Test whether the indexer has reached its slow height.
  444. * @private
  445. * @returns {Boolean}
  446. */
  447. isSlow() {
  448. if (this.height === 1 || this.height % 20 === 0)
  449. return true;
  450. if (this.height >= this.network.block.slowHeight)
  451. return true;
  452. return false;
  453. }
  454. /**
  455. * Log the current indexer status.
  456. * @private
  457. * @param {Array} start
  458. * @param {Block} block
  459. * @param {BlockMeta} meta
  460. * @param {Boolean} reverse
  461. */
  462. logStatus(start, block, meta, reverse) {
  463. if (!this.isSlow())
  464. return;
  465. const elapsed = util.bench(start);
  466. const msg = reverse ? 'removed from' : 'added to';
  467. this.logger.info(
  468. 'Block (%d) %s indexer (txs=%d time=%d).',
  469. meta.height,
  470. msg,
  471. block.txs.length,
  472. elapsed);
  473. }
  474. }
  475. /**
  476. * Block Meta
  477. */
  478. class BlockMeta {
  479. constructor(hash, height) {
  480. this.hash = hash || ZERO_HASH;
  481. this.height = height || 0;
  482. assert(Buffer.isBuffer(this.hash) && this.hash.length === 32);
  483. assert(Number.isInteger(this.height));
  484. }
  485. }
  486. /**
  487. * Index Options
  488. */
  489. class IndexOptions {
  490. /**
  491. * Create index options.
  492. * @constructor
  493. * @param {String} module
  494. * @param {Object} options
  495. */
  496. constructor(module, options) {
  497. this.module = module;
  498. this.network = Network.primary;
  499. this.logger = Logger.global;
  500. this.blocks = null;
  501. this.chain = null;
  502. this.prefix = null;
  503. this.location = null;
  504. this.memory = true;
  505. this.maxFiles = 64;
  506. this.cacheSize = 16 << 20;
  507. this.compression = true;
  508. if (options)
  509. this.fromOptions(options);
  510. }
  511. /**
  512. * Inject properties from object.
  513. * @private
  514. * @param {Object} options
  515. * @returns {IndexOptions}
  516. */
  517. fromOptions(options) {
  518. assert(options.blocks && typeof options.blocks === 'object',
  519. 'Indexer requires a blockstore.');
  520. assert(options.chain && typeof options.chain === 'object',
  521. 'Indexer requires chain.');
  522. assert(!options.prune, 'Can not index while pruned.');
  523. this.blocks = options.blocks;
  524. this.chain = options.chain;
  525. if (options.network != null)
  526. this.network = Network.get(options.network);
  527. if (options.logger != null) {
  528. assert(typeof options.logger === 'object');
  529. this.logger = options.logger;
  530. }
  531. if (options.prefix != null) {
  532. assert(typeof options.prefix === 'string');
  533. this.prefix = options.prefix;
  534. this.prefix = path.join(this.prefix, 'index');
  535. this.location = path.join(this.prefix, this.module);
  536. }
  537. if (options.location != null) {
  538. assert(typeof options.location === 'string');
  539. this.location = options.location;
  540. }
  541. if (options.memory != null) {
  542. assert(typeof options.memory === 'boolean');
  543. this.memory = options.memory;
  544. }
  545. if (options.maxFiles != null) {
  546. assert((options.maxFiles >>> 0) === options.maxFiles);
  547. this.maxFiles = options.maxFiles;
  548. }
  549. if (options.cacheSize != null) {
  550. assert(Number.isSafeInteger(options.cacheSize) && options.cacheSize >= 0);
  551. this.cacheSize = options.cacheSize;
  552. }
  553. if (options.compression != null) {
  554. assert(typeof options.compression === 'boolean');
  555. this.compression = options.compression;
  556. }
  557. return this;
  558. }
  559. /**
  560. * Instantiate indexer options from object.
  561. * @param {Object} options
  562. * @returns {IndexOptions}
  563. */
  564. static fromOptions(options) {
  565. return new this().fromOptions(options);
  566. }
  567. }
  568. /*
  569. * Expose
  570. */
  571. module.exports = Indexer;