Commit 4d0547f8 authored by Eriksson Monteiro's avatar Eriksson Monteiro

update millix node v1.10.9-tangled

parent 46ec5ad3
......@@ -774,8 +774,8 @@ export const NETWORK_SHORT_TIME_WAIT_MAX = 1500;
export const DATABASE_ENGINE = 'sqlite';
export const DATABASE_CONNECTION = {};
export const MILLIX_CIRCULATION = 9e15;
export const NODE_MILLIX_BUILD_DATE = 1622201109;
export const NODE_MILLIX_VERSION = '1.10.8-tangled';
export const NODE_MILLIX_BUILD_DATE = 1622937315;
export const NODE_MILLIX_VERSION = '1.10.9-tangled';
export const DATA_BASE_DIR_MAIN_NETWORK = './millix-tangled';
export const DATA_BASE_DIR_TEST_NETWORK = './millix-tangled';
let DATA_BASE_DIR = MODE_TEST_NETWORK ? DATA_BASE_DIR_TEST_NETWORK : DATA_BASE_DIR_MAIN_NETWORK;
......
......@@ -186,21 +186,6 @@
"enable": true
}
},
"wallet_balance_sync": {
"type": "function",
"group": "wallet",
"processor": "localhost_wallet",
"payload": {
"module": "wallet",
"function_name": "_doSyncBalanceForAddresses"
},
"priority": 1,
"option_list": {
"run_always": 1,
"run_delay": 10000,
"enable": true
}
},
"wallet_retry_validation_update": {
"type": "function",
"group": "wallet",
......
......@@ -119,15 +119,28 @@ export class WalletSync {
}
database.firstShardZeroORShardRepository('transaction', shardID, (transactionRepository) => {
return new Promise((resolve, reject) => {
transactionRepository.getTransactionInput({
return transactionRepository.getTransactionInput({
output_transaction_id: transactionID,
output_shard_id : shardID,
output_position : outputPosition
})
.then(input => input ? transactionRepository.getTransaction(input.transaction_id) : reject())
.then(transaction => resolve(transaction))
.catch(() => reject());
}).then(input => {
if (input) {
/* check if there is any input that is double spend.
if so, we should force updating this transaction output as spent.
*/
return transactionRepository.listTransactionInput({
'transaction_input.transaction_id' : input.transaction_id,
is_double_spend: 1
}).then(doubleSpendInputList => {
if (doubleSpendInputList.length > 0) {
return Promise.reject();
}
return transactionRepository.getTransaction(input.transaction_id);
});
}
else {
return Promise.reject();
}
});
}).then(spendingTransaction => {
// skip if we already know that the tx is spent
......
......@@ -7,6 +7,7 @@ import config from '../config/config';
import async from 'async';
import _ from 'lodash';
import wallet from './wallet';
import walletUtils from './wallet-utils';
import ntp from '../ntp';
......@@ -67,7 +68,7 @@ export class WalletTransactionConsensus {
}
removeFromRejectedTransactions(transactionID) {
delete this._transactionValidationRejected.delete(transactionID);
this._transactionValidationRejected.delete(transactionID);
}
resetTransactionValidationRejected() {
......@@ -109,6 +110,9 @@ export class WalletTransactionConsensus {
responseData = {transaction_id: input.transaction_id};
return callback(true);
}
else if (transaction.status === 3) { // invalid transaction
return callback();
}
else if (!doubleSpendSet.has(transaction.transaction_id) && (!responseData || transaction.transaction_date < responseData.transaction_date
|| ((transaction.transaction_date.getTime() === responseData.transaction_date.getTime()) && (transaction.transaction_id < responseData.transaction_id)))) {
......@@ -125,8 +129,8 @@ export class WalletTransactionConsensus {
doubleSpendSet.add(transaction.transaction_id);
return callback();
}
else if (err.cause === 'transaction_not_found') {
responseType = 'transaction_not_found';
else if (err.cause === 'transaction_not_found' || err.cause === 'transaction_invalid') {
responseType = err.cause;
responseData = {transaction_id: err.transaction_id_fail};
}
else {
......@@ -147,16 +151,13 @@ export class WalletTransactionConsensus {
});
}
_setAsDoubleSpend(transactions, doubleSpendTransaction) {
_updateDoubleSpendTransaction(transactions, doubleSpendTransactionInput) {
console.log('[consensus][oracle] setting ', transactions.length, ' transaction as double spend');
async.eachSeries(transactions, (transaction, callback) => database.firstShards((shardID) => {
return new Promise((resolve, reject) => {
const transactionRepository = database.getRepository('transaction', shardID);
transactionRepository.getTransactionObject(transaction.transaction_id)
.then(transaction => transaction ? transactionRepository.setTransactionAsDoubleSpend(transaction, doubleSpendTransaction).then(() => resolve())
: reject());
async.eachSeries(transactions, (transactionID, callback) => {
database.getRepository('transaction')
.setTransactionAsDoubleSpend(transactionID, doubleSpendTransactionInput)
.then(() => callback());
});
}).then(() => callback()));
}
_validateTransaction(transaction, nodeID, depth = 0, transactionVisitedSet = new Set(), doubleSpendSet = new Set()) {
......@@ -177,38 +178,27 @@ export class WalletTransactionConsensus {
return new Promise((resolve, reject) => {
const transactionRepository = database.getRepository('transaction', shardID);
transactionRepository.getTransactionObject(transactionID)
.then(transaction => transaction ? resolve([
transaction,
shardID
]) : reject());
.then(transaction => transaction ? resolve(transaction) : reject());
});
}))().then(data => {
const [transaction, shardID] = data || [];
if (!transaction) {
return [];
}
return database.getRepository('audit_point', shardID)
.getAuditPoint(transactionID)
.then(auditPoint => [
transaction,
auditPoint ? auditPoint.audit_point_id : undefined
]);
}).then(([transaction, auditPointID]) => {
}))().then((transaction) => {
if (transaction && transaction.is_stable && _.every(transaction.transaction_output_list, output => output.is_stable && !output.is_double_spend)) {
console.log('[consensus][oracle] validated in consensus round after found a validated transaction at depth ', depth);
return resolve();
}
else if (transaction && transaction.status === 3) {
return reject({
cause : 'transaction_invalid',
transaction_id_fail: transactionID,
message : 'invalid transaction found: ' + transactionID
});
}
if (transaction && transaction.is_stable !== undefined) { // transaction object needs to be normalized
transaction = database.getRepository('transaction').normalizeTransactionObject(transaction);
}
if (auditPointID) {
console.log('[consensus][oracle] validated in consensus round after found in Local audit point ', auditPointID, ' at depth ', depth);
return resolve();
}
else if (!transaction) {
if (!transaction) {
return reject({
cause : 'transaction_not_found',
transaction_id_fail: transactionID,
......@@ -236,6 +226,16 @@ export class WalletTransactionConsensus {
return resolve();
}
walletUtils.verifyTransaction(transaction)
.then(valid => {
if (!valid) {
return reject({
cause : 'transaction_invalid',
transaction_id_fail: transaction.transaction_id,
message : `invalid transaction found: ${transaction.transaction_id}`
});
}
transactionVisitedSet.add(transactionID);
let sourceTransactions = new Set();
......@@ -247,8 +247,9 @@ export class WalletTransactionConsensus {
if (doubleSpendSet.has(input.output_transaction_id)) {
return callback({
cause : 'transaction_double_spend',
transaction_id_fail: input.output_transaction_id,
message : 'double spend found in ' + input.output_transaction_id
transaction_id_fail : transaction.transaction_id,
transaction_input_double_spend: input,
message : 'double spend found in ' + transaction.transaction_id
}, false);
}
......@@ -271,17 +272,24 @@ export class WalletTransactionConsensus {
this._getValidInputOnDoubleSpend(input.output_transaction_id, doubleSpendTransactions, nodeID, transactionVisitedSet, doubleSpendSet)
.then(({response_type: responseType, data}) => {
if (data && (responseType === 'transaction_double_spend' || responseType === 'transaction_valid')) {
let doubleSpendInputs = _.filter(doubleSpendTransactions, i => i.transaction_id !== data.transaction_id);
doubleSpendInputs.forEach(doubleSpendInput => doubleSpendSet.add(doubleSpendInput.transaction_id));
this._updateDoubleSpendTransaction(doubleSpendInputs, input);
}
if ((responseType === 'transaction_double_spend' && !data) ||
(responseType === 'transaction_valid' && data.transaction_id !== transaction.transaction_id)) {
return reject({
cause : 'transaction_double_spend',
transaction_id_fail: input.output_transaction_id,
message : 'double spend found in ' + input.output_transaction_id
transaction_id_fail : transaction.transaction_id,
message : 'double spend found in ' + transaction.transaction_id,
transaction_input_double_spend: input
});
}
else if (responseType === 'transaction_not_found') {
return reject({
cause : 'transaction_not_found',
cause : responseType,
transaction_id_fail: data.transaction_id,
message : 'no information found for ' + data.transaction_id
});
......@@ -294,10 +302,6 @@ export class WalletTransactionConsensus {
});
}
let doubleSpendInputs = _.filter(doubleSpendTransactions, i => i.transaction_id !== data.transaction_id);
doubleSpendInputs.forEach(doubleSpendInput => doubleSpendSet.add(doubleSpendInput.transaction_id));
this._setAsDoubleSpend(doubleSpendInputs, input.output_transaction_id);
resolve();
});
}
......@@ -397,6 +401,7 @@ export class WalletTransactionConsensus {
});
});
});
});
}
_validateTransactionInConsensusRound(data, ws) {
......@@ -763,8 +768,7 @@ export class WalletTransactionConsensus {
const lockerID = `locker-${consensusCount}`;
this._consensusRoundState[lockerID] = true;
console.log('[consensus][request] get unstable transactions');
return new Promise(resolve => {
database.applyShards((shardID) => {
return database.applyShards((shardID) => {
return database.getRepository('transaction', shardID)
.getWalletUnstableTransactions(wallet.defaultKeyIdentifier, excludeTransactionList)
.then(pendingTransactions => {
......@@ -800,7 +804,7 @@ export class WalletTransactionConsensus {
if (!pendingTransaction) {
console.log('[consensus][request] no pending funds available for validation.');
delete this._consensusRoundState[lockerID];
return resolve();
return;
}
const transactionID = pendingTransaction.transaction_id;
......@@ -813,7 +817,7 @@ export class WalletTransactionConsensus {
if (this._consensusRoundState[transactionID]) {
// remove locker
delete this._consensusRoundState[lockerID];
return resolve();
return;
}
delete this._consensusRoundState[lockerID];
......@@ -856,19 +860,16 @@ export class WalletTransactionConsensus {
return this._startConsensusRound(transactionID);
}
})
.then(() => {
delete this._transactionRetryValidation[transactionID];
delete this._consensusRoundState[transactionID];
delete this._validationPrepareState[transactionID];
resolve();
//check if there is another transaction to
// validate
setTimeout(() => this.doValidateTransaction(), 0);
})
.then(() => transactionID)
.catch((err) => {
console.log('[consensus] transaction not validated internally: ', err);
if (err.cause === 'transaction_double_spend') {
this._setAsDoubleSpend([pendingTransaction], err.transaction_id_fail);
this._transactionValidationRejected.add(transactionID);
delete this._validationPrepareState[transactionID];
}
else if (err.cause === 'transaction_invalid') {
wallet.findAndSetAllSpendersAsInvalid({transaction_id: err.transaction_id_fail})
.then(_ => _);
this._transactionValidationRejected.add(transactionID);
delete this._validationPrepareState[transactionID];
}
......@@ -899,24 +900,15 @@ export class WalletTransactionConsensus {
active : true
};
return this._startConsensusRound(transactionID)
.then(() => {
delete this._transactionRetryValidation[transactionID];
delete this._consensusRoundState[transactionID];
delete this._validationPrepareState[transactionID];
resolve();
}).catch(resolve);
.then(() => transactionID)
.catch(() => Promise.reject(true));
}
else {
// set timeout
this._transactionValidationRejected.add(transactionID);
return database.applyShardZeroAndShardRepository('transaction', pendingTransaction.shard_id, transactionRepository => {
return transactionRepository.timeoutTransaction(transactionID);
}).then(() => {
delete this._transactionRetryValidation[transactionID];
delete this._consensusRoundState[transactionID];
delete this._validationPrepareState[transactionID];
resolve();
});
}).then(() => transactionID);
}
}
}
......@@ -932,15 +924,21 @@ export class WalletTransactionConsensus {
};
}
}
setTimeout(() => {
delete this._transactionRetryValidation[transactionID];
delete this._consensusRoundState[transactionID];
resolve();
}, 5000);
});
}).catch(() => {
resolve();
return Promise.reject(true);
});
}).then(transactionID => {
delete this._consensusRoundState[transactionID];
delete this._validationPrepareState[transactionID];
//check if there is another transaction to
// validate
setTimeout(() => this.doValidateTransaction(), transactionID ? 0 : 10000);
}).catch(restartValidation => {
if (restartValidation) {
setTimeout(() => this.doValidateTransaction(), 1000);
}
return Promise.resolve();
});
}
......
......@@ -560,7 +560,12 @@ class WalletUtils {
resolve(true);
}
else {
this.isConsumingExpiredOutputs(transaction.transaction_input_list, transactionDate)
// before 1620603935 the refresh time was 3 days
// now the refresh time is 10 min (TRANSACTION_OUTPUT_EXPIRE_OLDER_THAN)
const expireMinutes = transactionDate.getTime() <= 1620603935000 ? 4320 : config.TRANSACTION_OUTPUT_EXPIRE_OLDER_THAN;
let maximumOldestDate = new Date(transactionDate.getTime());
maximumOldestDate.setMinutes(maximumOldestDate.getMinutes() - expireMinutes);
this.isConsumingExpiredOutputs(transaction.transaction_input_list, maximumOldestDate)
.then(isConsumingExpired => {
resolve(!isConsumingExpired);
})
......@@ -576,7 +581,7 @@ class WalletUtils {
return signature.verify(objectHash.getHashBuffer(message), sign, publicKey);
}
isConsumingExpiredOutputs(inputList, transactionDate) {
isConsumingExpiredOutputs(inputList, maximumOldestDate) {
return new Promise(resolve => {
async.eachSeries(inputList, (input, callback) => {
let output_shard = input.output_shard_id;
......@@ -589,10 +594,7 @@ class WalletUtils {
callback(false);
}
else {
let maximumOldest = new Date(transactionDate.getTime());
maximumOldest.setMinutes(maximumOldest.getMinutes() - config.TRANSACTION_OUTPUT_EXPIRE_OLDER_THAN);
if ((maximumOldest - sourceTransaction.transaction_date) > 0) {
if ((maximumOldestDate - sourceTransaction.transaction_date) > 0) {
// Meaning it
// consumed an
// expired output
......@@ -768,6 +770,9 @@ class WalletUtils {
return Promise.reject('private key set is required');
}
let maximumOldestDate = new Date(transactionDate.getTime());
maximumOldestDate.setMinutes(maximumOldestDate.getMinutes() - config.TRANSACTION_OUTPUT_EXPIRE_OLDER_THAN);
return new Promise((resolve, reject) => {
let allocatedFunds = 0;
const amount = _.sum(_.map(outputList, o => o.amount)) + _.sum(_.map(feeOutputList, o => o.amount));
......@@ -803,7 +808,7 @@ class WalletUtils {
}))
.then((signatureList) => peer.getNodeAddress()
.then(() => signatureList))
.then(signatureList => this.isConsumingExpiredOutputs(inputList, transactionDate).then(isConsumingExpiredOutputs => [
.then(signatureList => this.isConsumingExpiredOutputs(inputList, maximumOldestDate).then(isConsumingExpiredOutputs => [
signatureList,
isConsumingExpiredOutputs
]))
......
......@@ -380,23 +380,18 @@ class Wallet {
return signature.sign(objectHash.getHashBuffer(message), privateKeyBuf);
}
syncAddresses(ws) {
syncWalletTransactions(ws) {
if (!this.defaultKeyIdentifier) {
return Promise.resolve();
}
return new Promise(resolve => {
mutex.lock(['sync-address-balance-request'], unlock => {
let wallets = Object.keys(this.getActiveWallets());
async.eachSeries(wallets, (walletID, callback) => {
database.getRepository('keychain').getWalletAddresses(walletID)
.then(addresses => {
async.eachSeries(addresses, (address, callbackAddress) => {
mutex.lock(['sync-wallet-balance-request'], unlock => {
database.applyShards((shardID) => {
return database.getRepository('transaction', shardID)
.getLastTransactionByAddress(address.address);
}).then(lastUpdateByShard => _.max(lastUpdateByShard))
.then(updated => peer.addressTransactionSync(address.address, updated ? updated.toISOString() : undefined, ws))
.then(() => callbackAddress());
}, () => callback());
});
}, () => {
const transactionRepository = database.getRepository('transaction', shardID);
return transactionRepository.getTransactionByOutputAddressKeyIdentifier(this.defaultKeyIdentifier);
}).then(transactions => {
peer.walletTransactionSync(this.defaultKeyIdentifier, _.map(transactions, transaction => transaction.transaction_id), ws);
resolve();
unlock();
});
......@@ -571,16 +566,6 @@ class Wallet {
.then(shardInfo => peer.shardSyncResponse(shardInfo, ws));
}
_onTransactionSyncByDateResponse(data, ws) {
if (eventBus.listenerCount('transaction_sync_by_date_response:' + ws.nodeID) > 0) {
eventBus.emit('transaction_sync_by_date_response:' + ws.nodeID, data);
}
else {
walletSync.moveProgressiveSync(ws);
}
}
_onTransactionSyncResponse(data, ws) {
if (data && data.transaction) {
eventBus.emit('transaction_sync_response:' + data.transaction.transaction_id, {transaction_not_found: data.transaction_not_found});
......@@ -784,11 +769,17 @@ class Wallet {
}).then(data => data || []).then(([hasTransaction, isAuditPoint, hasTransactionData]) => {
if (!hasTransaction || isAuditPoint && hasKeyIdentifier) {
console.log('[Wallet] request sync input transaction ', inputTransaction.output_transaction_id);
let options = {};
// only flag transactions that don't have the key identifier and are from a wallet funding lineage, or transactions that are not from a funding lineage and have the key identifier
if (isFundingWallet || hasKeyIdentifier) {
this._transactionFundingActiveWallet[inputTransaction.output_transaction_id] = Date.now();
options = {
dispatch_request : true,
force_request_sync: true
};
}
peer.transactionSyncRequest(inputTransaction.output_transaction_id, {priority: syncPriority})
peer.transactionSyncRequest(inputTransaction.output_transaction_id, {priority: syncPriority, ...options})
.then(() => this._transactionRequested[inputTransaction.output_transaction_id] = Date.now())
.catch(_ => _);
}
......@@ -862,7 +853,8 @@ class Wallet {
// Finds all spenders of a single transaction
// This is a recursive function
// The spenders are added to an array that is passed in
findAllSpenders(transaction) {
findAllSpenders(transaction, processedTransaction = new Set()) {
processedTransaction.add(transaction.transaction_id);
console.log(`[Wallet] Querying all shards for potential spenders of transaction ${transaction.transaction_id}`);
return new Promise((resolve) => {
......@@ -882,8 +874,11 @@ class Wallet {
} // stops recursion
async.mapSeries(transactionSpenders, (spender, callback) => {
if (processedTransaction.has(spender.transaction_id)) {
return callback(false, []);
}
// continues recursion
this.findAllSpenders(spender)
this.findAllSpenders(spender, processedTransaction)
.then((spenders) => callback(false, spenders));
}, (err, mapOfSpenders) => {
let spenders = Array.prototype.concat.apply([], mapOfSpenders);
......@@ -898,33 +893,34 @@ class Wallet {
let spendersByShard = {};
for (let spender of spenders) {
if (!(spender.shard_id in spendersByShard)) {
spendersByShard[spender.shard_id] = [];
const shardID = spender.shard_id || genesisConfig.genesis_shard_id;
if (!(shardID in spendersByShard)) {
spendersByShard[shardID] = [];
}
spendersByShard[spender.shard_id].push(spender.transaction_id);
spendersByShard[shardID].push(spender.transaction_id);
}
return new Promise((resolve) => {
async.eachSeries(Object.entries(spendersByShard), ([shardID, transactionIDs], callback) => {
console.log(`[wallet] marking transactions ${transactionIDs.join(', ')} on shard ${shardID} as invalid.`);
const transactionRepository = database.getRepository('transaction', shardID);
if (!transactionRepository) {
console.log(`[wallet] cannot set transactions ${transactionIDs} as invalid: shard not found`);
return callback();
let chunkTransactionIDs = [];
while (transactionIDs.length) {
chunkTransactionIDs.push(transactionIDs.splice(0, 1000));
}
transactionRepository.markTransactionsAsInvalid(transactionIDs)
.then(() => {
console.log(`[wallet] set transactions ${transactionIDs} as invalid`);
callback();
})
.catch((err) => {
async.eachSeries(chunkTransactionIDs, (transactionIDList, chunkCallback) => {
database.applyShardZeroAndShardRepository('transaction', shardID, transactionRepository => {
return transactionRepository.markTransactionsAsInvalid(transactionIDList);
}).then(() => {
console.log(`[wallet] set transactions ${transactionIDList} as invalid`);
chunkCallback();
}).catch((err) => {
console.log(`[wallet] error while marking transactions as invalid: ${err}`);
callback();
chunkCallback();
});
}, () => callback());
}, () => {
console.log('[wallet] finished setting all spenders as invalid');
resolve();
......@@ -956,10 +952,8 @@ class Wallet {
}).then(transactionsByDate => {
// let's exclude the list of tx already present in our
// peer.
let transactions = new Set(_.map(transactions, transaction => transaction.transaction_id));
excludeTransactionList.forEach(transactionID => transactions.delete(transactionID));
transactionsByDate = _.filter(transactionsByDate, transactionID => transactions.has(transactionID));
transactions = Array.from(transactions);
const excludeTransactionSet = new Set(excludeTransactionList);
const transactions = _.filter(transactionsByDate, transactionID => !excludeTransactionSet.has(transactionID));
// get peers' current web socket
let ws = network.getWebSocketByID(connectionID);
......@@ -1134,23 +1128,24 @@ class Wallet {
});
}
_onSyncAddressBalance(data, ws) {
_onSyncWalletBalance(data, ws) {
let node = ws.node;
let connectionID = ws.connectionID;
mutex.lock(['sync-address-balance'], unlock => {
let address = data.address;
let updated = new Date(data.updated || 0);
console.log('[wallet] transaction sync for address ', address, 'from', updated);
mutex.lock(['sync-wallet-balance'], unlock => {
const addressKeyIdentifier = data.address_key_identifier;
const excludeTransactionIDSet = new Set(data.exclude_transaction_id_list);
console.log('[wallet] transaction sync for wallet key identifier ', addressKeyIdentifier);
eventBus.emit('wallet_event_log', {
type : 'address_transaction_sync',
type : 'wallet_transaction_sync',
content: data,
from : node
});
database.applyShards((shardID) => {
const transactionRepository = database.getRepository('transaction', shardID);
return transactionRepository.getTransactionByOutputAddress(address, updated);
return transactionRepository.getTransactionByOutputAddressKeyIdentifier(addressKeyIdentifier);
}).then(transactions => {
console.log('[wallet] >>', transactions.length, ' transaction will be synced to', address);
transactions = _.filter(transactions, transaction => !excludeTransactionIDSet.has(transaction.transaction_id));
console.log('[wallet] >>', transactions.length, ' transaction will be synced to wallet ', addressKeyIdentifier);
async.eachSeries(transactions, (dbTransaction, callback) => {
database.firstShardZeroORShardRepository('transaction', dbTransaction.shard_id, transactionRepository => {
return new Promise((resolve, reject) => {
......@@ -1744,7 +1739,7 @@ class Wallet {
_onNewPeerConnection(ws) {
if (this.initialized) {
this.syncAddresses(ws).then(_ => _);
this.syncWalletTransactions(ws).then(_ => _);
}
walletSync.doProgressiveSync(ws);
}
......@@ -1805,10 +1800,6 @@ class Wallet {
});
}
_doSyncBalanceForAddresses() {
return this.syncAddresses();
}
_doStateInspector() {
let networkTransactions = _.keys(this._transactionReceivedFromNetwork);
console.log('[wallet] status (_transactionReceivedFromNetwork:', networkTransactions.length, ' | _transactionValidationRejected:', walletTransactionConsensus.getRejectedTransactionList().size, ' | _activeConsensusRound:', _.keys(this._activeConsensusRound).length + ')');
......@@ -1984,15 +1975,26 @@ class Wallet {
proxyTransaction(srcInputs, dstOutputs, outputFee, addressAttributeMap, privateKeyMap, transactionVersion, propagateTransaction = true) {
const transactionRepository = database.getRepository('transaction');
const proxyErrorList = ['proxy_network_error', 'proxy_timeout', 'invalid_proxy_transaction_chain', 'proxy_connection_state_invalid'];
const proxyErrorList = [
'proxy_network_error',
'proxy_timeout',
'invalid_proxy_transaction_chain',
'proxy_connection_state_invalid'
];
return transactionRepository.getPeersAsProxyCandidate(_.uniq(_.map(network.registeredClients, ws => ws.nodeID)))
.then(proxyCandidates => {
return new Promise((resolve, reject) => {
async.eachSeries(proxyCandidates, (proxyCandidateData, callback) => {
this._tryProxyTransaction(proxyCandidateData, srcInputs, dstOutputs, outputFee, addressAttributeMap, privateKeyMap, transactionVersion, propagateTransaction)
.then(transaction => callback({error: false, transaction}))
.catch(e => typeof e === "string" && !proxyErrorList.includes(e) ? callback({error: true, message: e}) : callback());
}, data => data.error && typeof data.message === "string" && !proxyErrorList.includes(data.message) ? reject(data.message) :
.then(transaction => callback({
error: false,
transaction
}))
.catch(e => typeof e === 'string' && !proxyErrorList.includes(e) ? callback({
error : true,
message: e
}) : callback());
}, data => data.error && typeof data.message === 'string' && !proxyErrorList.includes(data.message) ? reject(data.message) :
data && data.transaction ? resolve(data.transaction) : reject('proxy_not_found'));
});
});
......@@ -2045,9 +2047,8 @@ class Wallet {
eventBus.on('transaction_sync', this._onSyncTransaction.bind(this));
eventBus.on('transaction_sync_by_date', this._onSyncTransactionByDate.bind(this));
eventBus.on('transaction_sync_response', this._onTransactionSyncResponse.bind(this));
eventBus.on('transaction_sync_by_date_response', this._onTransactionSyncByDateResponse.bind(this));
eventBus.on('shard_sync_request', this._onSyncShard.bind(this));
eventBus.on('address_transaction_sync', this._onSyncAddressBalance.bind(this));
eventBus.on('wallet_transaction_sync', this._onSyncWalletBalance.bind(this));
eventBus.on('transaction_validation_request', this._onTransactionValidationRequest.bind(this));
eventBus.on('transaction_validation_response', this._onTransactionValidationResponse.bind(this));
eventBus.on('transaction_spend_request', this._onSyncTransactionSpendTransaction.bind(this));
......
......@@ -136,7 +136,7 @@ export default class Transaction {
this.database.get('SELECT SUM(amount) as amount FROM transaction_output ' +
'INNER JOIN `transaction` ON `transaction`.transaction_id = transaction_output.transaction_id ' +
'WHERE transaction_output.address_key_identifier=? AND `transaction`.is_stable = ' + (stable ? 1 : 0) +
' AND is_spent = 0 AND is_double_spend = 0', [keyIdentifier],
' AND is_spent = 0 AND is_double_spend = 0 AND `transaction`.status != 3', [keyIdentifier],
(err, row) => {
resolve(row ? row.amount || 0 : 0);
});
......@@ -146,7 +146,7 @@ export default class Transaction {
getAddressBalance(address, stable) {
return new Promise((resolve) => {
this.database.get('SELECT SUM(amount) as amount FROM transaction_output INNER JOIN `transaction` ON `transaction`.transaction_id = transaction_output.transaction_id ' +
'WHERE address=? AND `transaction`.is_stable = ' + (stable ? 1 : 0) + ' AND is_spent = 0 AND is_double_spend = 0', [address],
'WHERE address=? AND `transaction`.is_stable = ' + (stable ? 1 : 0) + ' AND is_spent = 0 AND is_double_spend = 0 AND `transaction`.status != 3', [address],
(err, row) => {
resolve(row ? row.amount || 0 : 0);
});
......@@ -157,7 +157,7 @@ export default class Transaction {
return new Promise((resolve, reject) => {
this.database.all('SELECT DISTINCT `transaction`.* FROM `transaction` ' +
'INNER JOIN transaction_output ON transaction_output.transaction_id = `transaction`.transaction_id ' +
'WHERE transaction_output.address_key_identifier = ? ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map(() => '?').join(',') + ')' : '') + 'AND +`transaction`.is_stable = 0 AND transaction_output.is_spent=0 AND transaction_output.is_double_spend=0 ORDER BY transaction_date DESC LIMIT 100',
'WHERE transaction_output.address_key_identifier = ? ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map(() => '?').join(',') + ')' : '') + 'AND +`transaction`.is_stable = 0 AND transaction_output.is_spent=0 AND transaction_output.is_double_spend=0 AND `transaction`.status != 3 ORDER BY transaction_date DESC LIMIT 100',
[
addressKeyIdentifier
].concat(excludeTransactionIDList),
......@@ -193,6 +193,25 @@ export default class Transaction {
});
}
getTransactionByOutputAddressKeyIdentifier(addressKeyIdentifier) {
return new Promise((resolve, reject) => {
const {sql, parameters} = Database.buildQuery('SELECT DISTINCT `transaction`.* FROM `transaction` \
INNER JOIN transaction_output on `transaction`.transaction_id = transaction_output.transaction_id', {
address_key_identifier: addressKeyIdentifier
});
this.database.all(sql, parameters,
(err, rows) => {
if (err) {
console.log(err);
return reject(err);
}
resolve(rows);
}
);
});
}
getProxyCandidates(n, excludeNodeID) {
return new Promise((resolve, reject) => {
this.database.all(
......@@ -248,11 +267,11 @@ export default class Transaction {
'SELECT `transaction`.*, transaction_input.address as input_address, transaction_output.address as output_address, transaction_output.amount, transaction_output.address_key_identifier, transaction_output.output_position FROM `transaction` \
LEFT JOIN transaction_output on transaction_output.transaction_id = `transaction`.transaction_id \
LEFT JOIN transaction_input on transaction_input.transaction_id = `transaction`.transaction_id \
WHERE transaction_output.address_key_identifier = ? \
WHERE transaction_output.address_key_identifier = ? AND `transaction`.status != 3 \
UNION SELECT `transaction`.*, transaction_input.address as input_address, transaction_output.address as output_address, transaction_output.amount, transaction_output.address_key_identifier, transaction_output.output_position FROM `transaction` \
LEFT JOIN transaction_input on transaction_input.transaction_id = `transaction`.transaction_id \
LEFT JOIN transaction_output on transaction_output.transaction_id = `transaction`.transaction_id \
WHERE transaction_input.address_key_identifier = ? \
WHERE transaction_input.address_key_identifier = ? AND `transaction`.status != 3 \
ORDER BY `transaction`.transaction_date DESC',
[
keyIdentifier,
......@@ -983,9 +1002,13 @@ export default class Transaction {
});
}
getTransactionSpenders(transactionID) {
getTransactionSpenders(transactionID, outputPosition) {
return new Promise((resolve, reject) => {
this.database.all('SELECT transaction_id, shard_id FROM transaction_input WHERE output_transaction_id = ?', [transactionID], (err, rows) => {
let {sql, parameters} = Database.buildQuery('SELECT DISTINCT transaction_id, shard_id FROM transaction_input', {
output_transaction_id: transactionID,
output_position : outputPosition
});
this.database.all(sql, parameters, (err, rows) => {
if (err) {
return reject(err);
}
......@@ -1286,7 +1309,7 @@ export default class Transaction {
let unstableDateStart = ntp.now();
unstableDateStart.setMinutes(unstableDateStart.getMinutes() - config.TRANSACTION_OUTPUT_EXPIRE_OLDER_THAN);
unstableDateStart = Math.floor(unstableDateStart.getTime() / 1000);
this.database.all('SELECT DISTINCT `transaction`.* FROM `transaction` INNER JOIN transaction_output ON `transaction`.transaction_id = transaction_output.transaction_id WHERE `transaction`.transaction_date > ? AND `transaction`.create_date < ? AND +`transaction`.is_stable = 0 ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map(() => '?').join(',') + ')' : '') + 'ORDER BY transaction_date ASC LIMIT 1',
this.database.all('SELECT DISTINCT `transaction`.* FROM `transaction` INNER JOIN transaction_output ON `transaction`.transaction_id = transaction_output.transaction_id WHERE `transaction`.transaction_date > ? AND `transaction`.create_date < ? AND +`transaction`.is_stable = 0 ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map(() => '?').join(',') + ')' : '') + ' AND `transaction`.status != 3 ORDER BY transaction_date ASC LIMIT 1',
[
unstableDateStart,
insertDate
......@@ -1394,31 +1417,37 @@ export default class Transaction {
});
}
setTransactionAsDoubleSpend(rootTransaction, doubleSpendTransaction) {
setTransactionAsDoubleSpend(rootTransactionList, rootDoubleSpendTransactionInput) {
let now = ntp.now();
return new Promise(resolve => {
let depth = 0;
const dfs = (transactions, doubleSpendTransactions) => {
let allNewTransactions = [];
let allNewDoubleTransactions = [];
let processedDoubleSpendInputs = new Set();
async.eachOfSeries(transactions, (transaction, idx, callback) => {
const doubleSpendTransaction = doubleSpendTransactions[idx];
const doubleSpendTransactionInput = doubleSpendTransactions[idx];
// mark tx as stable
database.applyShardZeroAndShardRepository('transaction', transaction.shard_id, transactionRepository => transactionRepository.setTransactionAsStable(transaction.transaction_id))
// reset double spend on inputs
.then(() => database.applyShardZeroAndShardRepository('transaction', transaction.shard_id, transactionRepository => transactionRepository.updateAllTransactionInput(transaction.transaction_id, null)))
.then(() => {
.then(() => database.firstShardORShardZeroRepository('transaction', transaction.shard_id, transactionRepository => transactionRepository.getTransactionObject(transaction.transaction_id)))
.then((transaction) => {
return new Promise(resolve => {
// mark all outputs as double spend.
async.eachSeries(transaction.transaction_output_list, (output, callbackOutputs) => {
database.applyShardZeroAndShardRepository('transaction', transaction.shard_id, transactionRepository => transactionRepository.updateTransactionOutput(transaction.transaction_id, output.output_position, now, now, now))
.then(() => callbackOutputs());
}, () => {
async.eachSeries(transaction.transaction_input_list, (input, callbackInputs) => {
async.eachSeries(transaction.transaction_input_list, (input, callbackInputs) => { // get all transactions spending this input
database.applyShards((shardID) => database.getRepository('transaction', shardID).listOutputSpendTransaction(input.output_transaction_id, input.output_position))
.then(transactions => {
.then(transactions => { // update the spend date using the oldest date
let spendDate = _.min(_.map(_.filter(transactions, t => t.transaction_id !== transaction.transaction_id), t => t.transaction_date));
return database.applyShardZeroAndShardRepository('transaction', input.output_shard_id, transactionRepository => transactionRepository.updateTransactionOutput(input.output_transaction_id, input.output_position, !spendDate ? null : spendDate));
})
.then(() => {
if (input.output_transaction_id === doubleSpendTransaction) {
.then(() => { // mark the input as double spend if it caused the double spend issue.
if (input.output_transaction_id === doubleSpendTransactionInput.output_transaction_id &&
(!doubleSpendTransactionInput.output_position || input.output_position === doubleSpendTransactionInput.output_position)) {
return database.applyShardZeroAndShardRepository('transaction', transaction.shard_id, transactionRepository => transactionRepository.updateTransactionInput(transaction.transaction_id, input.input_position, now));
}
})
......@@ -1427,14 +1456,67 @@ export default class Transaction {
});
});
})
.then(() => { // check if this double spend input was processed and every input used in
// transactions spending were settled
const transactionInputID = `${doubleSpendTransactionInput.output_transaction_id}:${doubleSpendTransactionInput.output_position}`;
if (processedDoubleSpendInputs.has(transactionInputID)) {
return;
}
processedDoubleSpendInputs.add(transactionInputID); // fix inputs used in the double spend transactions. reset state of inputs used just once to unspent.
return database.applyShards(shardID => database.getRepository('transaction', shardID)
.getTransactionSpenders(doubleSpendTransactionInput.output_transaction_id, doubleSpendTransactionInput.output_position))
.then(spenders => { // update the state of all inputs
return new Promise(resolve => {
async.eachSeries(spenders, (spenderTransaction, callbackSpenders) => {
database.getRepository('transaction', spenderTransaction.shard_id)
.getTransactionInputs(spenderTransaction.transaction_id)
.then(transactionInputList => {
async.eachSeries(transactionInputList, (transactionInput, callbackInput) => {
if (transactionInput.output_transaction_id === doubleSpendTransactionInput.output_transaction_id) { // skip the double spend transaction
return callbackInput();
}
else { /* check the input spenders. if there is only this transaction, we should reset the state to unspent */
database.applyShards(shardID => database.getRepository('transaction', shardID)
.getTransactionSpenders(transactionInput.output_transaction_id, transactionInput.output_position))
.then(inputSpenders => {
return new Promise(resolve => {
const uniqueTransactionIDs = new Set(_.map(inputSpenders, i => i.transaction_id));
if (uniqueTransactionIDs.size === 1) {
database.applyShardZeroAndShardRepository('transaction', transactionInput.output_shard_id, transactionRepository => {
return transactionRepository.listTransactionInput({transaction_id: transactionInput.output_transaction_id})
.then(transactionInputList => {
// if any input is marked as double spend we should not toggle the state of the outputs
if (_.some(transactionInputList, input => input.is_double_spend === 1)) {
return Promise.resolve();
}
return database.applyShardZeroAndShardRepository('transaction', transactionInput.output_shard_id, transactionRepository => transactionRepository.updateTransactionOutput(transactionInput.output_transaction_id, transactionInput.output_position, null, undefined, null));
});
}).then(() => resolve());
}
else {
resolve();
}
});
})
.then(() => callbackInput());
}
}, () => callbackSpenders());
});
}, () => resolve());
});
});
})
.then(() => database.applyShards((shardID) => {
// get all transactions spending from this
// double spend transaction.
return database.getRepository('transaction', shardID)
.getTransactionObjectBySpentOutputTransaction(transaction.transaction_id);
}))
.then(newTransactions => {
// new list of double spend transactions
if (newTransactions && newTransactions.length) {
allNewTransactions.push(newTransactions);
allNewDoubleTransactions.push(Array(newTransactions.length).fill(transaction.transaction_id));
allNewDoubleTransactions.push(Array(newTransactions.length).fill({output_transaction_id: transaction.transaction_id}));
}
callback();
});
......@@ -1443,11 +1525,11 @@ export default class Transaction {
return resolve();
}
depth++;
dfs(allNewTransactions, allNewDoubleTransactions);
dfs(_.flatten(allNewTransactions), _.flatten(allNewDoubleTransactions));
});
};
dfs([rootTransaction], [doubleSpendTransaction]);
dfs([rootTransactionList], [rootDoubleSpendTransactionInput]);
});
}
......@@ -1803,7 +1885,8 @@ export default class Transaction {
getOutputSpendDate(outputTransactionID, outputPosition) {
return new Promise((resolve, reject) => {
this.database.get('SELECT `transaction`.transaction_date FROM transaction_input INNER JOIN `transaction` on transaction_input.transaction_id = `transaction`.transaction_id ' +
'WHERE output_transaction_id = ? and output_position = ?', [
'WHERE output_transaction_id = ? AND output_position = ? ' +
'AND NOT EXISTS(SELECT transaction_output.transaction_id FROM transaction_output WHERE transaction_output.transaction_id = `transaction`.transaction_id AND transaction_output.is_double_spend = 1)', [
outputTransactionID,
outputPosition
],
......
......@@ -109,4 +109,4 @@ db.initialize()
}
})
.then(() => setTimeout(() => wallet.syncAddresses(), 2000));
//millix v1.10.8-tangled
//millix v1.10.9-tangled
......@@ -774,18 +774,18 @@ class Peer {
});
}
addressTransactionSync(address, updated, ws) {
walletTransactionSync(addressKeyIdentifier, excludeTransactionList, ws) {
if (network.registeredClients.length === 0) {
return address;
return;
}
console.log('[peer] requesting transaction sync for address:', address, ' from ', updated);
console.log('[peer] requesting transaction sync for wallet: ', addressKeyIdentifier);
let payload = {
type : 'address_transaction_sync',
type : 'wallet_transaction_sync',
content: {
address,
updated
address_key_identifier: addressKeyIdentifier,
exclude_transaction_id_list: excludeTransactionList
}
};
......@@ -800,18 +800,6 @@ class Peer {
console.log('[WARN]: try to send data over a closed connection.');
}
}
else {
network.registeredClients.forEach(ws => {
try {
ws.nodeConnectionReady && !(ws.inBound && !ws.bidirectional) && ws.send(data);
}
catch (e) {
console.log('[WARN]: try to send data over a closed connection.');
}
});
}
return address;
}
transactionSyncResponse(content, ws) {
......@@ -954,11 +942,6 @@ class Peer {
return;
}
return walletSync.getTransactionUnresolvedData(transactionID)
.then(unresolvedTransaction => {
if (unresolvedTransaction) {
return;
}
let payload = {
type : 'transaction_sync_by_date_response:' + network.nodeID,
content: {transaction_id_list: transactionList}
......@@ -973,7 +956,6 @@ class Peer {
catch (e) {
console.log('[WARN]: try to send data over a closed connection.');
}
});
}
transactionSyncByDate(beginTimestamp, endTimestamp, excludeTransactionList, ws) {
......@@ -982,7 +964,7 @@ class Peer {
let start = Date.now();
let nodeID = ws.nodeID;
console.log(`[peer] requesting transaction sync by date from ${new Date(beginTimestamp)} to ${new Date(endTimestamp)} : node ${nodeID}`);
console.log(`[peer] requesting transaction sync by date from ${new Date(beginTimestamp * 1000)} to ${new Date(endTimestamp * 1000)} : node ${nodeID}`);
let payload = {
type : 'transaction_sync_by_date',
content: {
......
......@@ -96,7 +96,7 @@ CREATE TABLE `transaction`
is_parent TINYINT NOT NULL DEFAULT 0 CHECK (is_parent = 0 OR is_parent = 1),
timeout_date INT NULL CHECK(length(timeout_date) <= 10 AND TYPEOF(timeout_date) IN ('integer', 'null')),
is_timeout TINYINT NOT NULL DEFAULT 0 CHECK (is_timeout = 0 OR is_timeout = 1),
status TINYINT NOT NULL DEFAULT 1 CHECK (length(status) <= 3 AND TYPEOF(status) = 'integer'),
status TINYINT NOT NULL DEFAULT 1 CHECK (length(status) <= 3 AND TYPEOF(status) = 'integer'), /*1: default, 2: prune, 3: invalid*/
create_date INT NOT NULL DEFAULT (CAST(strftime('%s', 'now') AS INTEGER)) CHECK(length(create_date) <= 10 AND TYPEOF(create_date) = 'integer')
);
CREATE INDEX idx_transaction_status_is_stable_transaction_date ON `transaction` (status, is_stable, transaction_date);
......@@ -174,7 +174,7 @@ CREATE TABLE transaction_output
is_spent TINYINT NOT NULL DEFAULT 0 CHECK (is_spent = 0 OR is_spent = 1),
double_spend_date INT NULL CHECK(length(double_spend_date) <= 10 AND TYPEOF(double_spend_date) IN ('integer', 'null')), -- NOT NULL if double spend
is_double_spend TINYINT NOT NULL DEFAULT 0 CHECK (is_double_spend = 0 OR is_double_spend = 1),
status TINYINT NOT NULL DEFAULT 1 CHECK (length(status) <= 3 AND TYPEOF(status) = 'integer'),
status TINYINT NOT NULL DEFAULT 1 CHECK (length(status) <= 3 AND TYPEOF(status) = 'integer'), /*1: default, 2: expired*/
create_date INT NOT NULL DEFAULT (CAST(strftime('%s', 'now') AS INTEGER)) CHECK(length(create_date) <= 10 AND TYPEOF(create_date) = 'integer'),
PRIMARY KEY (transaction_id, output_position),
FOREIGN KEY (transaction_id) REFERENCES `transaction` (transaction_id),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment