/*!
* mempool.js - mempool for bcoin
* Copyright (c) 2014-2017, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
const assert = require('bsert');
const path = require('path');
const EventEmitter = require('events');
const bdb = require('bdb');
const {RollingFilter} = require('bfilter');
const Heap = require('bheep');
const {BufferMap, BufferSet} = require('buffer-map');
const common = require('../blockchain/common');
const consensus = require('../protocol/consensus');
const policy = require('../protocol/policy');
const util = require('../utils/util');
const random = require('bcrypto/lib/random');
const {VerifyError} = require('../protocol/errors');
const Script = require('../script/script');
const Outpoint = require('../primitives/outpoint');
const TX = require('../primitives/tx');
const Coin = require('../primitives/coin');
const TXMeta = require('../primitives/txmeta');
const MempoolEntry = require('./mempoolentry');
const Network = require('../protocol/network');
const layout = require('./layout');
const AddrIndexer = require('./addrindexer');
const Fees = require('./fees');
const CoinView = require('../coins/coinview');
/**
* Mempool
* Represents a mempool.
* @extends EventEmitter
* @alias module:mempool.Mempool
*/
class Mempool extends EventEmitter {
/**
* Create a mempool.
* @constructor
* @param {Object} options
*/
constructor(options) {
super();
this.opened = false;
this.options = new MempoolOptions(options);
this.network = this.options.network;
this.logger = this.options.logger.context('mempool');
this.workers = this.options.workers;
this.chain = this.options.chain;
this.fees = this.options.fees;
this.locker = this.chain.locker;
this.cache = new MempoolCache(this.options);
this.size = 0;
this.freeCount = 0;
this.lastTime = 0;
this.lastFlush = 0;
this.tip = this.network.genesis.hash;
this.waiting = new BufferMap();
this.orphans = new BufferMap();
this.map = new BufferMap();
this.spents = new BufferMap();
this.rejects = new RollingFilter(120000, 0.000001);
this.addrindex = new AddrIndexer(this.network);
}
/**
* Open the chain, wait for the database to load.
* @returns {Promise}
*/
async open() {
assert(!this.opened, 'Mempool is already open.');
this.opened = true;
await this.cache.open();
if (this.options.persistent) {
const entries = await this.cache.getEntries();
for (const entry of entries)
this.trackEntry(entry);
for (const entry of entries) {
this.updateAncestors(entry, addFee);
if (this.options.indexAddress) {
const view = await this.getCoinView(entry.tx);
this.indexEntry(entry, view);
}
}
this.logger.info(
'Loaded mempool from disk (%d entries).',
entries.length);
if (this.fees) {
const fees = await this.cache.getFees();
if (fees) {
this.fees.inject(fees);
this.logger.info(
'Loaded mempool fee data (rate=%d).',
this.fees.estimateFee());
}
}
}
this.tip = this.chain.tip.hash;
const size = (this.options.maxSize / 1024).toFixed(2);
this.logger.info('Mempool loaded (maxsize=%dkb).', size);
}
/**
* Close the chain, wait for the database to close.
* @returns {Promise}
*/
async close() {
assert(this.opened, 'Mempool is not open.');
this.opened = false;
return this.cache.close();
}
/**
* Notify the mempool that a new block has come
* in (removes all transactions contained in the
* block from the mempool).
* @method
* @param {ChainEntry} block
* @param {TX[]} txs
* @returns {Promise}
*/
async addBlock(block, txs) {
const unlock = await this.locker.lock();
try {
return await this._addBlock(block, txs);
} finally {
unlock();
}
}
/**
* Notify the mempool that a new block
* has come without a lock.
* @private
* @param {ChainEntry} block
* @param {TX[]} txs
* @returns {Promise}
*/
async _addBlock(block, txs) {
if (this.map.size === 0) {
this.tip = block.hash;
return;
}
const entries = [];
for (let i = txs.length - 1; i >= 1; i--) {
const tx = txs[i];
const hash = tx.hash();
const entry = this.getEntry(hash);
if (!entry) {
this.removeOrphan(hash);
this.removeDoubleSpends(tx);
if (this.waiting.has(hash))
await this.handleOrphans(tx);
continue;
}
this.removeEntry(entry);
this.emit('confirmed', tx, block);
entries.push(entry);
}
// We need to reset the rejects filter periodically.
// There may be a locktime in a TX that is now valid.
this.rejects.reset();
if (this.fees) {
this.fees.processBlock(block.height, entries, this.chain.synced);
this.cache.writeFees(this.fees);
}
this.cache.sync(block.hash);
await this.cache.flush();
this.tip = block.hash;
if (entries.length === 0)
return;
this.logger.debug(
'Removed %d txs from mempool for block %d.',
entries.length, block.height);
}
/**
* Notify the mempool that a block has been disconnected
* from the main chain (reinserts transactions into the mempool).
* @method
* @param {ChainEntry} block
* @param {TX[]} txs
* @returns {Promise}
*/
async removeBlock(block, txs) {
const unlock = await this.locker.lock();
try {
return await this._removeBlock(block, txs);
} finally {
unlock();
}
}
/**
* Notify the mempool that a block
* has been disconnected without a lock.
* @method
* @private
* @param {ChainEntry} block
* @param {TX[]} txs
* @returns {Promise}
*/
async _removeBlock(block, txs) {
if (this.map.size === 0) {
this.tip = block.prevBlock;
return;
}
let total = 0;
for (let i = 1; i < txs.length; i++) {
const tx = txs[i];
const hash = tx.hash();
if (this.hasEntry(hash))
continue;
try {
await this.insertTX(tx, -1);
total++;
} catch (e) {
this.emit('error', e);
continue;
}
this.emit('unconfirmed', tx, block);
}
this.rejects.reset();
this.cache.sync(block.prevBlock);
await this.cache.flush();
this.tip = block.prevBlock;
if (total === 0)
return;
this.logger.debug(
'Added %d txs back into the mempool for block %d.',
total, block.height);
}
/**
* Sanitize the mempool after a reorg.
* @private
* @returns {Promise}
*/
async _handleReorg() {
const height = this.chain.height + 1;
const mtp = await this.chain.getMedianTime(this.chain.tip);
const remove = [];
for (const [hash, entry] of this.map) {
const {tx} = entry;
if (!tx.isFinal(height, mtp)) {
remove.push(hash);
continue;
}
if (tx.version > 1) {
let hasLocks = false;
for (const {sequence} of tx.inputs) {
if (!(sequence & consensus.SEQUENCE_DISABLE_FLAG)) {
hasLocks = true;
break;
}
}
if (hasLocks) {
remove.push(hash);
continue;
}
}
if (entry.coinbase)
remove.push(hash);
}
for (const hash of remove) {
const entry = this.getEntry(hash);
if (!entry)
continue;
this.evictEntry(entry);
}
}
/**
* Reset the mempool.
* @method
* @returns {Promise}
*/
async reset() {
const unlock = await this.locker.lock();
try {
return await this._reset();
} finally {
unlock();
}
}
/**
* Reset the mempool without a lock.
* @private
*/
async _reset() {
this.logger.info('Mempool reset (%d txs removed).', this.map.size);
this.size = 0;
this.waiting.clear();
this.orphans.clear();
this.map.clear();
this.spents.clear();
this.addrindex.reset();
this.freeCount = 0;
this.lastTime = 0;
if (this.fees)
this.fees.reset();
this.rejects.reset();
if (this.options.persistent) {
await this.cache.wipe();
this.cache.clear();
}
this.tip = this.chain.tip.hash;
}
/**
* Ensure the size of the mempool stays below `maxSize`.
* Evicts entries by timestamp and cumulative fee rate.
* @param {MempoolEntry} added
* @returns {Promise}
*/
limitSize(added) {
const maxSize = this.options.maxSize;
if (this.size <= maxSize)
return false;
const threshold = maxSize - (maxSize / 10);
const expiryTime = this.options.expiryTime;
const now = util.now();
let start = util.bench();
const queue = new Heap(cmpRate);
for (const entry of this.map.values()) {
if (this.hasDepends(entry.tx))
continue;
if (now < entry.time + expiryTime) {
queue.insert(entry);
continue;
}
this.logger.debug(
'Removing package %h from mempool (too old).',
entry.hash());
this.evictEntry(entry);
}
if (this.size <= threshold)
return !this.hasEntry(added);
this.logger.debug(
'(bench) Heap mempool traversal: %d.',
util.bench(start));
start = util.bench();
this.logger.debug(
'(bench) Heap mempool queue size: %d.',
queue.size());
while (queue.size() > 0) {
const entry = queue.shift();
const hash = entry.hash();
assert(this.hasEntry(hash));
this.logger.debug(
'Removing package %h from mempool (low fee).',
entry.hash());
this.evictEntry(entry);
if (this.size <= threshold)
break;
}
this.logger.debug(
'(bench) Heap mempool map removal: %d.',
util.bench(start));
return !this.hasEntry(added);
}
/**
* Retrieve a transaction from the mempool.
* @param {Hash} hash
* @returns {TX}
*/
getTX(hash) {
const entry = this.map.get(hash);
if (!entry)
return null;
return entry.tx;
}
/**
* Retrieve a transaction from the mempool.
* @param {Hash} hash
* @returns {MempoolEntry}
*/
getEntry(hash) {
return this.map.get(hash);
}
/**
* Retrieve a coin from the mempool (unspents only).
* @param {Hash} hash
* @param {Number} index
* @returns {Coin}
*/
getCoin(hash, index) {
const entry = this.map.get(hash);
if (!entry)
return null;
if (this.isSpent(hash, index))
return null;
if (index >= entry.tx.outputs.length)
return null;
return Coin.fromTX(entry.tx, index, -1);
}
/**
* Check whether coin is still unspent.
* @param {Hash} hash
* @param {Number} index
* @returns {boolean}
*/
hasCoin(hash, index) {
const entry = this.map.get(hash);
if (!entry)
return false;
if (this.isSpent(hash, index))
return false;
if (index >= entry.tx.outputs.length)
return false;
return true;
}
/**
* Check to see if a coin has been spent. This differs from
* {@link ChainDB#isSpent} in that it actually maintains a
* map of spent coins, whereas ChainDB may return `true`
* for transaction outputs that never existed.
* @param {Hash} hash
* @param {Number} index
* @returns {Boolean}
*/
isSpent(hash, index) {
const key = Outpoint.toKey(hash, index);
return this.spents.has(key);
}
/**
* Get an output's spender entry.
* @param {Hash} hash
* @param {Number} index
* @returns {MempoolEntry}
*/
getSpent(hash, index) {
const key = Outpoint.toKey(hash, index);
return this.spents.get(key);
}
/**
* Get an output's spender transaction.
* @param {Hash} hash
* @param {Number} index
* @returns {MempoolEntry}
*/
getSpentTX(hash, index) {
const key = Outpoint.toKey(hash, index);
const entry = this.spents.get(key);
if (!entry)
return null;
return entry.tx;
}
/**
* Find all transactions pertaining to a certain address.
* @param {Address} addr
* @param {Object} options
* @param {Number} options.limit
* @param {Number} options.reverse
* @param {Buffer} options.after
* @returns {TX[]}
*/
getTXByAddress(addr, options) {
return this.addrindex.get(addr, options);
}
/**
* Find all transactions pertaining to a certain address.
* @param {Address} addr
* @param {Object} options
* @param {Number} options.limit
* @param {Number} options.reverse
* @param {Buffer} options.after
* @returns {TXMeta[]}
*/
getMetaByAddress(addr, options) {
return this.addrindex.getMeta(addr, options);
}
/**
* Retrieve a transaction from the mempool.
* @param {Hash} hash
* @returns {TXMeta}
*/
getMeta(hash) {
const entry = this.getEntry(hash);
if (!entry)
return null;
const meta = TXMeta.fromTX(entry.tx);
meta.mtime = entry.time;
return meta;
}
/**
* Test the mempool to see if it contains a transaction.
* @param {Hash} hash
* @returns {Boolean}
*/
hasEntry(hash) {
return this.map.has(hash);
}
/**
* Test the mempool to see if it
* contains a transaction or an orphan.
* @param {Hash} hash
* @returns {Boolean}
*/
has(hash) {
if (this.locker.has(hash))
return true;
if (this.hasOrphan(hash))
return true;
return this.hasEntry(hash);
}
/**
* Test the mempool to see if it
* contains a transaction or an orphan.
* @private
* @param {Hash} hash
* @returns {Boolean}
*/
exists(hash) {
if (this.locker.pending(hash))
return true;
if (this.hasOrphan(hash))
return true;
return this.hasEntry(hash);
}
/**
* Test the mempool to see if it
* contains a recent reject.
* @param {Hash} hash
* @returns {Boolean}
*/
hasReject(hash) {
return this.rejects.test(hash);
}
/**
* Add a transaction to the mempool. Note that this
* will lock the mempool until the transaction is
* fully processed.
* @method
* @param {TX} tx
* @param {Number?} id
* @returns {Promise}
*/
async addTX(tx, id) {
const hash = tx.hash();
const unlock = await this.locker.lock(hash);
try {
return await this._addTX(tx, id);
} finally {
unlock();
}
}
/**
* Add a transaction to the mempool without a lock.
* @method
* @private
* @param {TX} tx
* @param {Number?} id
* @returns {Promise}
*/
async _addTX(tx, id) {
if (id == null)
id = -1;
let missing;
try {
missing = await this.insertTX(tx, id);
} catch (err) {
if (err.type === 'VerifyError') {
if (!tx.hasWitness() && !err.malleated)
this.rejects.add(tx.hash());
}
throw err;
}
if (util.now() - this.lastFlush > 10) {
await this.cache.flush();
this.lastFlush = util.now();
}
return missing;
}
/**
* Add a transaction to the mempool without a lock.
* @method
* @private
* @param {TX} tx
* @param {Number?} id
* @returns {Promise}
*/
async insertTX(tx, id) {
assert(!tx.mutable, 'Cannot add mutable TX to mempool.');
const lockFlags = common.lockFlags.STANDARD_LOCKTIME_FLAGS;
const height = this.chain.height;
const hash = tx.hash();
// Basic sanity checks.
// This is important because it ensures
// other functions will be overflow safe.
const [valid, reason, score] = tx.checkSanity();
if (!valid)
throw new VerifyError(tx, 'invalid', reason, score);
// Coinbases are an insta-ban.
// Why? Who knows.
if (tx.isCoinbase()) {
throw new VerifyError(tx,
'invalid',
'coinbase',
100);
}
// Do not allow CSV until it's activated.
if (this.options.requireStandard) {
if (!this.chain.state.hasCSV() && tx.version >= 2) {
throw new VerifyError(tx,
'nonstandard',
'premature-version2-tx',
0);
}
}
// Do not allow segwit until it's activated.
if (!this.chain.state.hasWitness() && !this.options.prematureWitness) {
if (tx.hasWitness()) {
throw new VerifyError(tx,
'nonstandard',
'no-witness-yet',
0,
true);
}
}
// Non-contextual standardness checks.
if (this.options.requireStandard) {
const [valid, reason, score] = tx.checkStandard();
if (!valid)
throw new VerifyError(tx, 'nonstandard', reason, score);
if (!this.options.replaceByFee) {
if (tx.isRBF()) {
throw new VerifyError(tx,
'nonstandard',
'replace-by-fee',
0);
}
}
}
// Verify transaction finality (see isFinal()).
if (!await this.verifyFinal(tx, lockFlags)) {
throw new VerifyError(tx,
'nonstandard',
'non-final',
0);
}
// We can maybe ignore this.
if (this.exists(hash)) {
throw new VerifyError(tx,
'alreadyknown',
'txn-already-in-mempool',
0);
}
// We can test whether this is an
// non-fully-spent transaction on
// the chain.
if (await this.chain.hasCoins(tx)) {
throw new VerifyError(tx,
'alreadyknown',
'txn-already-known',
0);
}
// Quick and dirty test to verify we're
// not double-spending an output in the
// mempool.
if (this.isDoubleSpend(tx)) {
this.emit('conflict', tx);
throw new VerifyError(tx,
'duplicate',
'bad-txns-inputs-spent',
0);
}
// Get coin viewpoint as it
// pertains to the mempool.
const view = await this.getCoinView(tx);
// Maybe store as an orphan.
const missing = this.maybeOrphan(tx, view, id);
// Return missing outpoint hashes.
if (missing)
return missing;
// Create a new mempool entry
// at current chain height.
const entry = MempoolEntry.fromTX(tx, view, height);
// Contextual verification.
await this.verify(entry, view);
// Add and index the entry.
await this.addEntry(entry, view);
// Trim size if we're too big.
if (this.limitSize(hash)) {
throw new VerifyError(tx,
'insufficientfee',
'mempool full',
0);
}
return null;
}
/**
* Verify a transaction with mempool standards.
* @method
* @param {TX} tx
* @param {CoinView} view
* @returns {Promise}
*/
async verify(entry, view) {
const height = this.chain.height + 1;
const lockFlags = common.lockFlags.STANDARD_LOCKTIME_FLAGS;
const tx = entry.tx;
// Verify sequence locks.
if (!await this.verifyLocks(tx, view, lockFlags)) {
throw new VerifyError(tx,
'nonstandard',
'non-BIP68-final',
0);
}
// Check input an witness standardness.
if (this.options.requireStandard) {
if (!tx.hasStandardInputs(view)) {
throw new VerifyError(tx,
'nonstandard',
'bad-txns-nonstandard-inputs',
0);
}
if (this.chain.state.hasWitness()) {
if (!tx.hasStandardWitness(view)) {
throw new VerifyError(tx,
'nonstandard',
'bad-witness-nonstandard',
0,
true);
}
}
}
// Annoying process known as sigops counting.
if (entry.sigops > policy.MAX_TX_SIGOPS_COST) {
throw new VerifyError(tx,
'nonstandard',
'bad-txns-too-many-sigops',
0);
}
// Make sure this guy gave a decent fee.
const minFee = policy.getMinFee(entry.size, this.options.minRelay);
if (this.options.relayPriority && entry.fee < minFee) {
if (!entry.isFree(height)) {
throw new VerifyError(tx,
'insufficientfee',
'insufficient priority',
0);
}
}
// Continuously rate-limit free (really, very-low-fee)
// transactions. This mitigates 'penny-flooding'.
if (this.options.limitFree && entry.fee < minFee) {
const now = util.now();
// Use an exponentially decaying ~10-minute window.
this.freeCount *= Math.pow(1 - 1 / 600, now - this.lastTime);
this.lastTime = now;
// The limitFreeRelay unit is thousand-bytes-per-minute
// At default rate it would take over a month to fill 1GB.
if (this.freeCount > this.options.limitFreeRelay * 10 * 1000) {
throw new VerifyError(tx,
'insufficientfee',
'rate limited free transaction',
0);
}
this.freeCount += entry.size;
}
// Important safety feature.
if (this.options.rejectAbsurdFees && entry.fee > minFee * 10000)
throw new VerifyError(tx, 'highfee', 'absurdly-high-fee', 0);
// Why do we have this here? Nested transactions are cool.
if (this.countAncestors(entry) + 1 > this.options.maxAncestors) {
throw new VerifyError(tx,
'nonstandard',
'too-long-mempool-chain',
0);
}
// Contextual sanity checks.
const [fee, reason, score] = tx.checkInputs(view, height);
if (fee === -1)
throw new VerifyError(tx, 'invalid', reason, score);
// Script verification.
let flags = Script.flags.STANDARD_VERIFY_FLAGS;
try {
await this.verifyInputs(tx, view, flags);
} catch (err) {
if (tx.hasWitness())
throw err;
// Try without segwit and cleanstack.
flags &= ~Script.flags.VERIFY_WITNESS;
flags &= ~Script.flags.VERIFY_CLEANSTACK;
// If it failed, the first verification
// was the only result we needed.
if (!await this.verifyResult(tx, view, flags))
throw err;
// If it succeeded, segwit may be causing the
// failure. Try with segwit but without cleanstack.
flags |= Script.flags.VERIFY_WITNESS;
// Cleanstack was causing the failure.
if (await this.verifyResult(tx, view, flags))
throw err;
// Do not insert into reject cache.
err.malleated = true;
throw err;
}
// Paranoid checks.
if (this.options.paranoidChecks) {
const flags = Script.flags.MANDATORY_VERIFY_FLAGS;
assert(await this.verifyResult(tx, view, flags),
'BUG: Verify failed for mandatory but not standard.');
}
}
/**
* Verify inputs, return a boolean
* instead of an error based on success.
* @method
* @param {TX} tx
* @param {CoinView} view
* @param {VerifyFlags} flags
* @returns {Promise}
*/
async verifyResult(tx, view, flags) {
try {
await this.verifyInputs(tx, view, flags);
} catch (err) {
if (err.type === 'VerifyError')
return false;
throw err;
}
return true;
}
/**
* Verify inputs for standard
* _and_ mandatory flags on failure.
* @method
* @param {TX} tx
* @param {CoinView} view
* @param {VerifyFlags} flags
* @returns {Promise}
*/
async verifyInputs(tx, view, flags) {
if (await tx.verifyAsync(view, flags, this.workers))
return;
if (flags & Script.flags.ONLY_STANDARD_VERIFY_FLAGS) {
flags &= ~Script.flags.ONLY_STANDARD_VERIFY_FLAGS;
if (await tx.verifyAsync(view, flags, this.workers)) {
throw new VerifyError(tx,
'nonstandard',
'non-mandatory-script-verify-flag',
0);
}
}
throw new VerifyError(tx,
'nonstandard',
'mandatory-script-verify-flag',
100);
}
/**
* Add a transaction to the mempool without performing any
* validation. Note that this method does not lock the mempool
* and may lend itself to race conditions if used unwisely.
* This function will also resolve orphans if possible (the
* resolved orphans _will_ be validated).
* @method
* @param {MempoolEntry} entry
* @param {CoinView} view
* @returns {Promise}
*/
async addEntry(entry, view) {
const tx = entry.tx;
this.trackEntry(entry, view);
this.updateAncestors(entry, addFee);
this.emit('tx', tx, view);
this.emit('add entry', entry);
if (this.fees)
this.fees.processTX(entry, this.chain.synced);
this.logger.debug(
'Added %h to mempool (txs=%d).',
tx.hash(), this.map.size);
this.cache.save(entry);
await this.handleOrphans(tx);
}
/**
* Remove a transaction from the mempool.
* Generally only called when a new block
* is added to the main chain.
* @param {MempoolEntry} entry
*/
removeEntry(entry) {
const tx = entry.tx;
const hash = tx.hash();
this.untrackEntry(entry);
if (this.fees)
this.fees.removeTX(hash);
this.cache.remove(hash);
this.emit('remove entry', entry);
}
/**
* Remove a transaction from the mempool.
* Recursively remove its spenders.
* @param {MempoolEntry} entry
*/
evictEntry(entry) {
this.removeSpenders(entry);
this.updateAncestors(entry, removeFee);
this.removeEntry(entry);
}
/**
* Recursively remove spenders of a transaction.
* @private
* @param {MempoolEntry} entry
*/
removeSpenders(entry) {
const tx = entry.tx;
const hash = tx.hash();
for (let i = 0; i < tx.outputs.length; i++) {
const spender = this.getSpent(hash, i);
if (!spender)
continue;
this.removeSpenders(spender);
this.removeEntry(spender);
}
}
/**
* Count the highest number of
* ancestors a transaction may have.
* @param {MempoolEntry} entry
* @returns {Number}
*/
countAncestors(entry) {
return this._countAncestors(entry, new BufferSet(), entry, nop);
}
/**
* Count the highest number of
* ancestors a transaction may have.
* Update descendant fees and size.
* @param {MempoolEntry} entry
* @param {Function} map
* @returns {Number}
*/
updateAncestors(entry, map) {
return this._countAncestors(entry, new BufferSet(), entry, map);
}
/**
* Traverse ancestors and count.
* @private
* @param {MempoolEntry} entry
* @param {Object} set
* @param {MempoolEntry} child
* @param {Function} map
* @returns {Number}
*/
_countAncestors(entry, set, child, map) {
const tx = entry.tx;
for (const {prevout} of tx.inputs) {
const hash = prevout.hash;
const parent = this.getEntry(hash);
if (!parent)
continue;
if (set.has(hash))
continue;
set.add(hash);
map(parent, child);
if (set.size > this.options.maxAncestors)
break;
this._countAncestors(parent, set, child, map);
if (set.size > this.options.maxAncestors)
break;
}
return set.size;
}
/**
* Count the highest number of
* descendants a transaction may have.
* @param {MempoolEntry} entry
* @returns {Number}
*/
countDescendants(entry) {
return this._countDescendants(entry, new BufferSet());
}
/**
* Count the highest number of
* descendants a transaction may have.
* @private
* @param {MempoolEntry} entry
* @param {Object} set
* @returns {Number}
*/
_countDescendants(entry, set) {
const tx = entry.tx;
const hash = tx.hash();
for (let i = 0; i < tx.outputs.length; i++) {
const child = this.getSpent(hash, i);
if (!child)
continue;
const next = child.hash();
if (set.has(next))
continue;
set.add(next);
this._countDescendants(child, set);
}
return set.size;
}
/**
* Get all transaction ancestors.
* @param {MempoolEntry} entry
* @returns {MempoolEntry[]}
*/
getAncestors(entry) {
return this._getAncestors(entry, [], new BufferSet());
}
/**
* Get all transaction ancestors.
* @private
* @param {MempoolEntry} entry
* @param {MempoolEntry[]} entries
* @param {Object} set
* @returns {MempoolEntry[]}
*/
_getAncestors(entry, entries, set) {
const tx = entry.tx;
for (const {prevout} of tx.inputs) {
const hash = prevout.hash;
const parent = this.getEntry(hash);
if (!parent)
continue;
if (set.has(hash))
continue;
set.add(hash);
entries.push(parent);
this._getAncestors(parent, entries, set);
}
return entries;
}
/**
* Get all a transaction descendants.
* @param {MempoolEntry} entry
* @returns {MempoolEntry[]}
*/
getDescendants(entry) {
return this._getDescendants(entry, [], new BufferSet());
}
/**
* Get all a transaction descendants.
* @param {MempoolEntry} entry
* @param {MempoolEntry[]} entries
* @param {Object} set
* @returns {MempoolEntry[]}
*/
_getDescendants(entry, entries, set) {
const tx = entry.tx;
const hash = tx.hash();
for (let i = 0; i < tx.outputs.length; i++) {
const child = this.getSpent(hash, i);
if (!child)
continue;
const next = child.hash();
if (set.has(next))
continue;
set.add(next);
entries.push(child);
this._getDescendants(child, entries, set);
}
return entries;
}
/**
* Find a unconfirmed transactions that
* this transaction depends on.
* @param {TX} tx
* @returns {Hash[]}
*/
getDepends(tx) {
const prevout = tx.getPrevout();
const depends = [];
for (const hash of prevout) {
if (this.hasEntry(hash))
depends.push(hash);
}
return depends;
}
/**
* Test whether a transaction has dependencies.
* @param {TX} tx
* @returns {Boolean}
*/
hasDepends(tx) {
for (const {prevout} of tx.inputs) {
if (this.hasEntry(prevout.hash))
return true;
}
return false;
}
/**
* Return the full balance of all unspents in the mempool
* (not very useful in practice, only used for testing).
* @returns {Amount}
*/
getBalance() {
let total = 0;
for (const [hash, entry] of this.map) {
const tx = entry.tx;
for (let i = 0; i < tx.outputs.length; i++) {
const coin = this.getCoin(hash, i);
if (coin)
total += coin.value;
}
}
return total;
}
/**
* Retrieve _all_ transactions from the mempool.
* @returns {TX[]}
*/
getHistory() {
const txs = [];
for (const entry of this.map.values())
txs.push(entry.tx);
return txs;
}
/**
* Retrieve an orphan transaction.
* @param {Hash} hash
* @returns {TX}
*/
getOrphan(hash) {
return this.orphans.get(hash);
}
/**
* @param {Hash} hash
* @returns {Boolean}
*/
hasOrphan(hash) {
return this.orphans.has(hash);
}
/**
* Maybe store an orphaned transaction.
* @param {TX} tx
* @param {CoinView} view
* @param {Number} id
*/
maybeOrphan(tx, view, id) {
const hashes = new BufferSet();
const missing = [];
for (const {prevout} of tx.inputs) {
if (view.hasEntry(prevout))
continue;
if (this.hasReject(prevout.hash)) {
this.logger.debug(
'Not storing orphan %h (rejected parents).',
tx.hash());
this.rejects.add(tx.hash());
return missing;
}
if (this.hasEntry(prevout.hash)) {
this.logger.debug(
'Not storing orphan %h (non-existent output).',
tx.hash());
this.rejects.add(tx.hash());
return missing;
}
hashes.add(prevout.hash);
}
// Not an orphan.
if (hashes.size === 0)
return null;
// Weight limit for orphans.
if (tx.getWeight() > policy.MAX_TX_WEIGHT) {
this.logger.debug('Ignoring large orphan: %h', tx.hash());
if (!tx.hasWitness())
this.rejects.add(tx.hash());
return missing;
}
if (this.options.maxOrphans === 0)
return missing;
this.limitOrphans();
const hash = tx.hash();
for (const prev of hashes.keys()) {
if (!this.waiting.has(prev))
this.waiting.set(prev, new BufferSet());
this.waiting.get(prev).add(hash);
missing.push(prev);
}
this.orphans.set(hash, new Orphan(tx, missing.length, id));
this.logger.debug('Added orphan %h to mempool.', tx.hash());
this.emit('add orphan', tx);
return missing;
}
/**
* Resolve orphans and attempt to add to mempool.
* @method
* @param {TX} parent
* @returns {Promise} - Returns {@link TX}[].
*/
async handleOrphans(parent) {
const resolved = this.resolveOrphans(parent);
for (const orphan of resolved) {
let tx, missing;
try {
tx = orphan.toTX();
} catch (e) {
this.logger.warning('%s %s',
'Warning: possible memory corruption.',
'Orphan failed deserialization.');
continue;
}
try {
missing = await this.insertTX(tx, orphan.id);
} catch (err) {
if (err.type === 'VerifyError') {
this.logger.debug(
'Could not resolve orphan %h: %s.',
tx.hash(), err.message);
if (!tx.hasWitness() && !err.malleated)
this.rejects.add(tx.hash());
this.emit('bad orphan', err, orphan.id);
continue;
}
throw err;
}
// Can happen if an existing parent is
// evicted in the interim between fetching
// the non-present parents.
if (missing && missing.length > 0) {
this.logger.debug(
'Transaction %h was double-orphaned in mempool.',
tx.hash());
this.removeOrphan(tx.hash());
continue;
}
this.logger.debug('Resolved orphan %h in mempool.', tx.hash());
}
}
/**
* Potentially resolve any transactions
* that redeem the passed-in transaction.
* Deletes all orphan entries and
* returns orphan objects.
* @param {TX} parent
* @returns {Orphan[]}
*/
resolveOrphans(parent) {
const hash = parent.hash();
const set = this.waiting.get(hash);
if (!set)
return [];
assert(set.size > 0);
const resolved = [];
for (const hash of set.keys()) {
const orphan = this.getOrphan(hash);
assert(orphan);
if (--orphan.missing === 0) {
this.orphans.delete(hash);
resolved.push(orphan);
}
}
this.waiting.delete(hash);
return resolved;
}
/**
* Remove a transaction from the mempool.
* @param {Hash} tx
* @returns {Boolean}
*/
removeOrphan(hash) {
const orphan = this.getOrphan(hash);
if (!orphan)
return false;
let tx;
try {
tx = orphan.toTX();
} catch (e) {
this.orphans.delete(hash);
this.logger.warning('%s %s',
'Warning: possible memory corruption.',
'Orphan failed deserialization.');
return false;
}
for (const prev of tx.getPrevout()) {
const set = this.waiting.get(prev);
if (!set)
continue;
assert(set.has(hash));
set.delete(hash);
if (set.size === 0)
this.waiting.delete(prev);
}
this.orphans.delete(hash);
this.emit('remove orphan', tx);
return true;
}
/**
* Remove a random orphan transaction from the mempool.
* @returns {Boolean}
*/
limitOrphans() {
if (this.orphans.size < this.options.maxOrphans)
return false;
let index = random.randomRange(0, this.orphans.size);
let hash;
for (hash of this.orphans.keys()) {
if (index === 0)
break;
index--;
}
assert(hash);
this.logger.debug('Removing orphan %h from mempool.', hash);
this.removeOrphan(hash);
return true;
}
/**
* Test all of a transactions outpoints to see if they are doublespends.
* Note that this will only test against the mempool spents, not the
* blockchain's. The blockchain spents are not checked against because
* the blockchain does not maintain a spent list. The transaction will
* be seen as an orphan rather than a double spend.
* @param {TX} tx
* @returns {Promise} - Returns Boolean.
*/
isDoubleSpend(tx) {
for (const {prevout} of tx.inputs) {
const {hash, index} = prevout;
if (this.isSpent(hash, index))
return true;
}
return false;
}
/**
* Get coin viewpoint (lock).
* Note: this does not return
* historical view of coins from the indexers.
* @method
* @param {TX} tx
* @returns {Promise} - Returns {@link CoinView}.
*/
async getSpentView(tx) {
const unlock = await this.locker.lock();
try {
return await this._getSpentView(tx);
} finally {
unlock();
}
}
/**
* Get coin viewpoint
* @param {TX} tx
* @returns {Promise} - Returns {@link CoinView}
*/
async _getSpentView(tx) {
const view = new CoinView();
for (const {prevout} of tx.inputs) {
const {hash, index} = prevout;
const tx = this.getTX(hash);
if (tx) {
if (index < tx.outputs.length)
view.addIndex(tx, index, -1);
continue;
}
const coin = await this.chain.readCoin(prevout);
if (coin)
view.addEntry(prevout, coin);
}
return view;
}
/**
* Get coin viewpoint (no lock).
* @method
* @param {TX} tx
* @returns {Promise} - Returns {@link CoinView}.
*/
async getCoinView(tx) {
const view = new CoinView();
for (const {prevout} of tx.inputs) {
const {hash, index} = prevout;
const tx = this.getTX(hash);
if (tx) {
if (this.hasCoin(hash, index))
view.addIndex(tx, index, -1);
continue;
}
const coin = await this.chain.readCoin(prevout);
if (coin)
view.addEntry(prevout, coin);
}
return view;
}
/**
* Get a snapshot of all transaction hashes in the mempool. Used
* for generating INV packets in response to MEMPOOL packets.
* @returns {Hash[]}
*/
getSnapshot() {
const keys = [];
for (const hash of this.map.keys())
keys.push(hash);
return keys;
}
/**
* Check sequence locks on a transaction against the current tip.
* @param {TX} tx
* @param {CoinView} view
* @param {LockFlags} flags
* @returns {Promise} - Returns Boolean.
*/
verifyLocks(tx, view, flags) {
return this.chain.verifyLocks(this.chain.tip, tx, view, flags);
}
/**
* Check locktime on a transaction against the current tip.
* @param {TX} tx
* @param {LockFlags} flags
* @returns {Promise} - Returns Boolean.
*/
verifyFinal(tx, flags) {
return this.chain.verifyFinal(this.chain.tip, tx, flags);
}
/**
* Map a transaction to the mempool.
* @private
* @param {MempoolEntry} entry
* @param {CoinView} view
*/
trackEntry(entry, view) {
const tx = entry.tx;
const hash = tx.hash();
assert(!this.map.has(hash));
this.map.set(hash, entry);
assert(!tx.isCoinbase());
for (const {prevout} of tx.inputs) {
const key = prevout.toKey();
this.spents.set(key, entry);
}
if (this.options.indexAddress && view)
this.indexEntry(entry, view);
this.size += entry.memUsage();
}
/**
* Unmap a transaction from the mempool.
* @private
* @param {MempoolEntry} entry
*/
untrackEntry(entry) {
const tx = entry.tx;
const hash = tx.hash();
assert(this.map.has(hash));
this.map.delete(hash);
assert(!tx.isCoinbase());
for (const {prevout} of tx.inputs) {
const key = prevout.toKey();
this.spents.delete(key);
}
if (this.options.indexAddress)
this.unindexEntry(entry);
this.size -= entry.memUsage();
}
/**
* Index an entry by address.
* @private
* @param {MempoolEntry} entry
* @param {CoinView} view
*/
indexEntry(entry, view) {
this.addrindex.insert(entry, view);
}
/**
* Unindex an entry by address.
* @private
* @param {MempoolEntry} entry
*/
unindexEntry(entry) {
const hash = entry.tx.hash();
this.addrindex.remove(hash);
}
/**
* Recursively remove double spenders
* of a mined transaction's outpoints.
* @private
* @param {TX} tx
*/
removeDoubleSpends(tx) {
for (const {prevout} of tx.inputs) {
const {hash, index} = prevout;
const spent = this.getSpent(hash, index);
if (!spent)
continue;
this.logger.debug(
'Removing double spender from mempool: %h.',
spent.hash());
this.evictEntry(spent);
this.emit('double spend', spent);
}
}
/**
* Calculate the memory usage of the entire mempool.
* @see DynamicMemoryUsage()
* @returns {Number} Usage in bytes.
*/
getSize() {
return this.size;
}
/**
* Prioritise transaction.
* @param {MempoolEntry} entry
* @param {Number} pri
* @param {Amount} fee
*/
prioritise(entry, pri, fee) {
if (-pri > entry.priority)
pri = -entry.priority;
entry.priority += pri;
if (-fee > entry.deltaFee)
fee = -entry.deltaFee;
if (fee === 0)
return;
this.updateAncestors(entry, prePrioritise);
entry.deltaFee += fee;
entry.descFee += fee;
this.updateAncestors(entry, postPrioritise);
}
}
/**
* Mempool Options
* @alias module:mempool.MempoolOptions
*/
class MempoolOptions {
/**
* Create mempool options.
* @constructor
* @param {Object}
*/
constructor(options) {
this.network = Network.primary;
this.chain = null;
this.logger = null;
this.workers = null;
this.fees = null;
this.limitFree = true;
this.limitFreeRelay = 15;
this.relayPriority = true;
this.requireStandard = this.network.requireStandard;
this.rejectAbsurdFees = true;
this.prematureWitness = false;
this.paranoidChecks = false;
this.replaceByFee = false;
this.maxSize = policy.MEMPOOL_MAX_SIZE;
this.maxOrphans = policy.MEMPOOL_MAX_ORPHANS;
this.maxAncestors = policy.MEMPOOL_MAX_ANCESTORS;
this.expiryTime = policy.MEMPOOL_EXPIRY_TIME;
this.minRelay = this.network.minRelay;
this.prefix = null;
this.location = null;
this.memory = true;
this.maxFiles = 64;
this.cacheSize = 32 << 20;
this.compression = true;
this.persistent = false;
this.fromOptions(options);
}
/**
* Inject properties from object.
* @private
* @param {Object} options
* @returns {MempoolOptions}
*/
fromOptions(options) {
assert(options, 'Mempool requires options.');
assert(options.chain && typeof options.chain === 'object',
'Mempool requires a blockchain.');
this.chain = options.chain;
this.network = options.chain.network;
this.logger = options.chain.logger;
this.workers = options.chain.workers;
this.requireStandard = this.network.requireStandard;
this.minRelay = this.network.minRelay;
if (options.logger != null) {
assert(typeof options.logger === 'object');
this.logger = options.logger;
}
if (options.workers != null) {
assert(typeof options.workers === 'object');
this.workers = options.workers;
}
if (options.fees != null) {
assert(typeof options.fees === 'object');
this.fees = options.fees;
}
if (options.limitFree != null) {
assert(typeof options.limitFree === 'boolean');
this.limitFree = options.limitFree;
}
if (options.limitFreeRelay != null) {
assert((options.limitFreeRelay >>> 0) === options.limitFreeRelay);
this.limitFreeRelay = options.limitFreeRelay;
}
if (options.relayPriority != null) {
assert(typeof options.relayPriority === 'boolean');
this.relayPriority = options.relayPriority;
}
if (options.requireStandard != null) {
assert(typeof options.requireStandard === 'boolean');
this.requireStandard = options.requireStandard;
}
if (options.rejectAbsurdFees != null) {
assert(typeof options.rejectAbsurdFees === 'boolean');
this.rejectAbsurdFees = options.rejectAbsurdFees;
}
if (options.prematureWitness != null) {
assert(typeof options.prematureWitness === 'boolean');
this.prematureWitness = options.prematureWitness;
}
if (options.paranoidChecks != null) {
assert(typeof options.paranoidChecks === 'boolean');
this.paranoidChecks = options.paranoidChecks;
}
if (options.replaceByFee != null) {
assert(typeof options.replaceByFee === 'boolean');
this.replaceByFee = options.replaceByFee;
}
if (options.maxSize != null) {
assert((options.maxSize >>> 0) === options.maxSize);
this.maxSize = options.maxSize;
}
if (options.maxOrphans != null) {
assert((options.maxOrphans >>> 0) === options.maxOrphans);
this.maxOrphans = options.maxOrphans;
}
if (options.maxAncestors != null) {
assert((options.maxAncestors >>> 0) === options.maxAncestors);
this.maxAncestors = options.maxAncestors;
}
if (options.expiryTime != null) {
assert((options.expiryTime >>> 0) === options.expiryTime);
this.expiryTime = options.expiryTime;
}
if (options.minRelay != null) {
assert((options.minRelay >>> 0) === options.minRelay);
this.minRelay = options.minRelay;
}
if (options.prefix != null) {
assert(typeof options.prefix === 'string');
this.prefix = options.prefix;
this.location = path.join(this.prefix, 'mempool');
}
if (options.location != null) {
assert(typeof options.location === 'string');
this.location = options.location;
}
if (options.memory != null) {
assert(typeof options.memory === 'boolean');
this.memory = options.memory;
}
if (options.maxFiles != null) {
assert((options.maxFiles >>> 0) === options.maxFiles);
this.maxFiles = options.maxFiles;
}
if (options.cacheSize != null) {
assert(Number.isSafeInteger(options.cacheSize));
assert(options.cacheSize >= 0);
this.cacheSize = options.cacheSize;
}
if (options.compression != null) {
assert(typeof options.compression === 'boolean');
this.compression = options.compression;
}
if (options.persistent != null) {
assert(typeof options.persistent === 'boolean');
this.persistent = options.persistent;
}
if (options.indexAddress != null) {
assert(typeof options.indexAddress === 'boolean');
this.indexAddress = options.indexAddress;
}
return this;
}
/**
* Instantiate mempool options from object.
* @param {Object} options
* @returns {MempoolOptions}
*/
static fromOptions(options) {
return new MempoolOptions().fromOptions(options);
}
}
/**
* Orphan
* @ignore
*/
class Orphan {
/**
* Create an orphan.
* @constructor
* @param {TX} tx
* @param {Hash[]} missing
* @param {Number} id
*/
constructor(tx, missing, id) {
this.raw = tx.toRaw();
this.missing = missing;
this.id = id;
}
toTX() {
return TX.fromRaw(this.raw);
}
}
/**
* Mempool Cache
* @ignore
*/
class MempoolCache {
/**
* Create a mempool cache.
* @constructor
* @param {Object} options
*/
constructor(options) {
this.logger = options.logger;
this.chain = options.chain;
this.network = options.network;
this.db = null;
this.batch = null;
if (options.persistent)
this.db = bdb.create(options);
}
async getVersion() {
const data = await this.db.get(layout.v.encode());
if (!data)
return -1;
return data.readUInt32LE(0, true);
}
async getTip() {
return this.db.get(layout.R.encode());
}
async getFees() {
const data = await this.db.get(layout.F.encode());
if (!data)
return null;
let fees = null;
try {
fees = Fees.fromRaw(data);
} catch (e) {
this.logger.warning(
'Fee data failed deserialization: %s.',
e.message);
}
return fees;
}
getEntries() {
return this.db.values({
gte: layout.e.min(),
lte: layout.e.max(),
parse: data => MempoolEntry.fromRaw(data)
});
}
getKeys() {
return this.db.keys({
gte: layout.e.min(),
lte: layout.e.max()
});
}
async open() {
if (!this.db)
return;
await this.db.open();
await this.db.verify(layout.V.encode(), 'mempool', 0);
await this.verify();
this.batch = this.db.batch();
}
async close() {
if (!this.db)
return;
await this.db.close();
this.batch = null;
}
save(entry) {
if (!this.db)
return;
this.batch.put(layout.e.encode(entry.hash()), entry.toRaw());
}
remove(hash) {
if (!this.db)
return;
this.batch.del(layout.e.encode(hash));
}
sync(tip) {
if (!this.db)
return;
this.batch.put(layout.R.encode(), tip);
}
writeFees(fees) {
if (!this.db)
return;
this.batch.put(layout.F.encode(), fees.toRaw());
}
clear() {
this.batch.clear();
this.batch = this.db.batch();
}
async flush() {
if (!this.db)
return;
await this.batch.write();
this.batch = this.db.batch();
}
async init(hash) {
const batch = this.db.batch();
batch.put(layout.v.encode(), fromU32(MempoolCache.VERSION));
batch.put(layout.R.encode(), hash);
await batch.write();
}
async verify() {
let version = await this.getVersion();
let tip;
if (version === -1) {
version = MempoolCache.VERSION;
tip = this.chain.tip.hash;
this.logger.info(
'Mempool cache is empty. Writing tip %h.',
tip);
await this.init(tip);
}
if (version !== MempoolCache.VERSION) {
this.logger.warning(
'Mempool cache version mismatch (%d != %d)!',
version,
MempoolCache.VERSION);
this.logger.warning('Invalidating mempool cache.');
await this.wipe();
return false;
}
tip = await this.getTip();
if (!tip || !tip.equals(this.chain.tip.hash)) {
this.logger.warning(
'Mempool tip not consistent with chain tip (%h != %h)!',
tip,
this.chain.tip.hash);
this.logger.warning('Invalidating mempool cache.');
await this.wipe();
return false;
}
return true;
}
async wipe() {
const batch = this.db.batch();
const keys = await this.getKeys();
for (const key of keys)
batch.del(key);
batch.put(layout.v.encode(), fromU32(MempoolCache.VERSION));
batch.put(layout.R.encode(), this.chain.tip.hash);
batch.del(layout.F.encode());
await batch.write();
this.logger.info('Removed %d mempool entries from disk.', keys.length);
}
}
MempoolCache.VERSION = 2;
/*
* Helpers
*/
function nop(parent, child) {
;
}
function addFee(parent, child) {
parent.descFee += child.deltaFee;
parent.descSize += child.size;
}
function removeFee(parent, child) {
parent.descFee -= child.descFee;
parent.descSize -= child.descSize;
}
function prePrioritise(parent, child) {
parent.descFee -= child.deltaFee;
}
function postPrioritise(parent, child) {
parent.descFee += child.deltaFee;
}
function cmpRate(a, b) {
let xf = a.deltaFee;
let xs = a.size;
let yf = b.deltaFee;
let ys = b.size;
let x, y;
if (useDesc(a)) {
xf = a.descFee;
xs = a.descSize;
}
if (useDesc(b)) {
yf = b.descFee;
ys = b.descSize;
}
x = xf * ys;
y = xs * yf;
if (x === y) {
x = a.time;
y = b.time;
}
return x - y;
}
function useDesc(a) {
const x = a.deltaFee * a.descSize;
const y = a.descFee * a.size;
return y > x;
}
function fromU32(num) {
const data = Buffer.allocUnsafe(4);
data.writeUInt32LE(num, 0, true);
return data;
}
/*
* Expose
*/
module.exports = Mempool;