Commit 18eaa302 authored by Dmitry Khaleev's avatar Dmitry Khaleev

Merge remote-tracking branch 'origin/master' into issue/TG-7

parents 0744f8df d1e2fa5e
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
<br> <br>
<a href="#"><img src="https://github.com/millix/millix-wallet/blob/master/app/icon.png?raw=true" alt="millix node" width="200"></a> <a href="#"><img src="https://github.com/millix/millix-wallet/blob/master/app/icon.png?raw=true" alt="millix node" width="200"></a>
<br> <br>
millix node <small>v1.11.2</small> millix node <small>v1.11.3</small>
<br> <br>
</h1> </h1>
......
...@@ -774,8 +774,8 @@ export const NETWORK_SHORT_TIME_WAIT_MAX = 1500; ...@@ -774,8 +774,8 @@ export const NETWORK_SHORT_TIME_WAIT_MAX = 1500;
export const DATABASE_ENGINE = 'sqlite'; export const DATABASE_ENGINE = 'sqlite';
export const DATABASE_CONNECTION = {}; export const DATABASE_CONNECTION = {};
export const MILLIX_CIRCULATION = 9e15; export const MILLIX_CIRCULATION = 9e15;
export const NODE_MILLIX_BUILD_DATE = 1629213000; export const NODE_MILLIX_BUILD_DATE = 1629813663;
export const NODE_MILLIX_VERSION = '1.11.2-tangled'; export const NODE_MILLIX_VERSION = '1.11.4-tangled';
export const DATA_BASE_DIR_MAIN_NETWORK = './millix-tangled'; export const DATA_BASE_DIR_MAIN_NETWORK = './millix-tangled';
export const DATA_BASE_DIR_TEST_NETWORK = './millix-tangled'; export const DATA_BASE_DIR_TEST_NETWORK = './millix-tangled';
let DATA_BASE_DIR = MODE_TEST_NETWORK ? DATA_BASE_DIR_TEST_NETWORK : DATA_BASE_DIR_MAIN_NETWORK; let DATA_BASE_DIR = MODE_TEST_NETWORK ? DATA_BASE_DIR_TEST_NETWORK : DATA_BASE_DIR_MAIN_NETWORK;
...@@ -786,6 +786,7 @@ export const WALLET_KEY_PATH = DATA_BASE_DIR + ...@@ -786,6 +786,7 @@ export const WALLET_KEY_PATH = DATA_BASE_DIR +
export const JOB_CONFIG_PATH = DATA_BASE_DIR + '/job.json'; export const JOB_CONFIG_PATH = DATA_BASE_DIR + '/job.json';
export const JOB_CONFIG_VERSION = 4; export const JOB_CONFIG_VERSION = 4;
export const SHARD_ZERO_NAME = 'shard_zero'; export const SHARD_ZERO_NAME = 'shard_zero';
export const DEBUG_LOG_FILTER = [];
export const PEER_ROTATION_MORE_THAN_AVERAGE = 0.5; export const PEER_ROTATION_MORE_THAN_AVERAGE = 0.5;
export const PEER_ROTATION_MORE_THAN_MOST = 0.2; export const PEER_ROTATION_MORE_THAN_MOST = 0.2;
export const PEER_ROTATION_MORE_THAN_ALL = 0.01; export const PEER_ROTATION_MORE_THAN_ALL = 0.01;
...@@ -893,5 +894,6 @@ export default { ...@@ -893,5 +894,6 @@ export default {
PEER_ROTATION_MORE_THAN_ALL, PEER_ROTATION_MORE_THAN_ALL,
PEER_ROTATION_CONFIG, PEER_ROTATION_CONFIG,
JOB_CONFIG_PATH, JOB_CONFIG_PATH,
JOB_CONFIG_VERSION JOB_CONFIG_VERSION,
DEBUG_LOG_FILTER
}; };
...@@ -19,4 +19,6 @@ console.log = function() { ...@@ -19,4 +19,6 @@ console.log = function() {
enabled && showLog && config.MODE_DEBUG && _consoleLog.apply(console, arguments); enabled && showLog && config.MODE_DEBUG && _consoleLog.apply(console, arguments);
}; };
config.DEBUG_LOG_FILTER.forEach(filter => console.addFilter(filter));
export default console; export default console;
...@@ -29,7 +29,13 @@ class Task { ...@@ -29,7 +29,13 @@ class Task {
}); });
} }
else { else {
task(); try {
task();
}
catch (e) {
this.debug && console.log(`[task] error running task ${taskName}: ${e}`);
}
if (!once) { if (!once) {
self.runningTask[taskName] = setTimeout(run, waitTime); self.runningTask[taskName] = setTimeout(run, waitTime);
} }
......
...@@ -225,7 +225,7 @@ export class WalletTransactionConsensus { ...@@ -225,7 +225,7 @@ export class WalletTransactionConsensus {
message : `not validated in a depth of ${depth}` message : `not validated in a depth of ${depth}`
}); });
} }
else if (transaction.status === 2 || database.getRepository('transaction').isExpired(transaction.transaction_date)) { else if (transaction && (transaction.status === 2 || database.getRepository('transaction').isExpired(transaction.transaction_date))) {
return reject({ return reject({
cause : 'transaction_validation_max_depth', cause : 'transaction_validation_max_depth',
transaction_id_fail: transactionID, transaction_id_fail: transactionID,
...@@ -883,7 +883,9 @@ export class WalletTransactionConsensus { ...@@ -883,7 +883,9 @@ export class WalletTransactionConsensus {
} }
delete this._consensusRoundState[lockerID]; delete this._consensusRoundState[lockerID];
this._consensusRoundState[transactionID] = {}; this._consensusRoundState[transactionID] = {
timestamp: Date.now()
};
let unstableDateStart = ntp.now(); let unstableDateStart = ntp.now();
unstableDateStart.setMinutes(unstableDateStart.getMinutes() - config.TRANSACTION_OUTPUT_EXPIRE_OLDER_THAN); unstableDateStart.setMinutes(unstableDateStart.getMinutes() - config.TRANSACTION_OUTPUT_EXPIRE_OLDER_THAN);
...@@ -901,7 +903,8 @@ export class WalletTransactionConsensus { ...@@ -901,7 +903,8 @@ export class WalletTransactionConsensus {
return (() => { return (() => {
if (unstableDateStart.getTime() < pendingTransaction.transaction_date.getTime()) { // if not hibernated yet, we try to do a local validation first if (unstableDateStart.getTime() < pendingTransaction.transaction_date.getTime()) { // if not hibernated yet, we try to do a local validation first
return this._validateTransaction(transactionID, null, 0); return this._validateTransaction(transactionID, null, 0)
.catch(() => Promise.resolve());
} }
else { else {
return Promise.resolve(); return Promise.resolve();
......
...@@ -156,14 +156,14 @@ export default class Transaction { ...@@ -156,14 +156,14 @@ export default class Transaction {
this.database.all('SELECT * FROM (SELECT `transaction`.* FROM `transaction` ' + this.database.all('SELECT * FROM (SELECT `transaction`.* FROM `transaction` ' +
'INNER JOIN transaction_input ON transaction_input.transaction_id = `transaction`.transaction_id ' + 'INNER JOIN transaction_input ON transaction_input.transaction_id = `transaction`.transaction_id ' +
'INNER JOIN transaction_output ON transaction_output.transaction_id = transaction_input.transaction_id ' + 'INNER JOIN transaction_output ON transaction_output.transaction_id = transaction_input.transaction_id ' +
'WHERE transaction_input.address_key_identifier = ?1 ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map((_, idx) => `?${idx + 2}`).join(',') + ')' : '') + 'AND transaction_output.is_stable = 0 ORDER BY transaction_date LIMIT ' + config.CONSENSUS_VALIDATION_PARALLEL_PROCESS_MAX + ')' + 'WHERE transaction_input.address_key_identifier = ?1 ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map((_, idx) => `?${idx + 2}`).join(',') + ')' : '') + 'AND transaction_output.is_stable = 0 ORDER BY transaction_date DESC LIMIT ' + config.CONSENSUS_VALIDATION_PARALLEL_PROCESS_MAX + ') ' +
'UNION SELECT * FROM (SELECT `transaction`.* FROM `transaction` ' + 'UNION SELECT * FROM (SELECT `transaction`.* FROM `transaction` ' +
'INNER JOIN transaction_output ON transaction_output.transaction_id = `transaction`.transaction_id ' + 'INNER JOIN transaction_output ON transaction_output.transaction_id = `transaction`.transaction_id ' +
'WHERE transaction_output.address_key_identifier = ?1 ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map((_, idx) => `?${idx + 2}`).join(',') + ')' : '') + 'AND transaction_output.is_stable = 0 ORDER BY transaction_date LIMIT ' + config.CONSENSUS_VALIDATION_PARALLEL_PROCESS_MAX + ')' + 'WHERE transaction_output.address_key_identifier = ?1 ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map((_, idx) => `?${idx + 2}`).join(',') + ')' : '') + 'AND transaction_output.is_stable = 0 ORDER BY transaction_date DESC LIMIT ' + config.CONSENSUS_VALIDATION_PARALLEL_PROCESS_MAX + ') ' +
'UNION SELECT * FROM (SELECT t.* FROM transaction_input i ' + 'UNION SELECT * FROM (SELECT `transaction`.* FROM transaction_input ' +
'INNER JOIN `transaction` t ON t.transaction_id = i.transaction_id ' + 'INNER JOIN `transaction` ON `transaction`.transaction_id = transaction_input.transaction_id ' +
'WHERE output_transaction_id IN (SELECT transaction_id FROM transaction_output WHERE address_key_identifier = ?1 ' + 'WHERE output_transaction_id IN (SELECT transaction_id FROM transaction_output WHERE address_key_identifier = ?1 ' +
'AND is_stable = 1 AND is_spent = 1 AND status = 2) ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map((_, idx) => `?${idx + 2}`).join(',') + ')' : '') + 'AND t.is_stable = 0 ORDER BY transaction_date LIMIT ' + config.CONSENSUS_VALIDATION_PARALLEL_PROCESS_MAX + ')', 'AND is_stable = 1 AND is_spent = 1 AND status = 2) ' + (excludeTransactionIDList && excludeTransactionIDList.length > 0 ? 'AND `transaction`.transaction_id NOT IN (' + excludeTransactionIDList.map((_, idx) => `?${idx + 2}`).join(',') + ')' : '') + 'AND `transaction`.is_stable = 0 ORDER BY transaction_date DESC LIMIT ' + config.CONSENSUS_VALIDATION_PARALLEL_PROCESS_MAX + ') ',
[ [
addressKeyIdentifier addressKeyIdentifier
].concat(excludeTransactionIDList), ].concat(excludeTransactionIDList),
...@@ -1034,7 +1034,8 @@ export default class Transaction { ...@@ -1034,7 +1034,8 @@ export default class Transaction {
on i.output_transaction_id = s.transaction_id on i.output_transaction_id = s.transaction_id
) )
select transaction_id select transaction_id
from transaction_invalid_spenders where status != 3; from transaction_invalid_spenders
where status != 3;
update 'transaction' update 'transaction'
set status = 3, set status = 3,
is_stable = 1, is_stable = 1,
...@@ -1783,68 +1784,51 @@ export default class Transaction { ...@@ -1783,68 +1784,51 @@ export default class Transaction {
setPathAsStableFrom(transactionID) { setPathAsStableFrom(transactionID) {
return new Promise(resolve => { return new Promise((resolve, reject) => {
const dfs = (transactions, depth) => { this.database.exec(`
let newTransactions = []; DROP TABLE IF EXISTS transaction_input_chain;
async.eachSeries(transactions, (transaction, callback) => { CREATE TEMPORARY TABLE transaction_input_chain AS
mutex.lock(['path-as-stable' + (this.database.shardID ? '_' + this.database.shardID : '')], pathAsStableUnlock => { WITH RECURSIVE transaction_input_chain (transaction_id, status)
mutex.lock(['transaction' + (this.database.shardID ? '_' + this.database.shardID : '')], transactionUnlock => { AS (
const transactionRepository = transaction.repository; SELECT "${transactionID}", 1
if (!transactionRepository) { UNION
transactionUnlock(); SELECT i.output_transaction_id, i.status
pathAsStableUnlock(); FROM transaction_input i
return callback(); INNER JOIN transaction_input_chain c
} ON i.transaction_id = c.transaction_id)
transactionRepository.setTransactionAsStable(transaction.transaction_id) SELECT transaction_id
.then(() => transactionRepository.listTransactionOutput({'`transaction`.transaction_id': transaction.transaction_id})) FROM transaction_input_chain
.then(outputs => { WHERE status = 1;
return new Promise(resolve => { UPDATE 'transaction' AS t
async.eachSeries(outputs, (output, callback) => { SET is_stable = 1, stable_date = CAST(strftime('%s', 'now') AS INTEGER)
database.applyShards((shardID) => { WHERE transaction_id IN (SELECT transaction_id FROM transaction_input_chain);
const transactionRepositorySpendDate = database.getRepository('transaction', shardID); UPDATE transaction_input
return transactionRepositorySpendDate.getOutputSpendDate(output.transaction_id, output.output_position) SET is_double_spend = 0,
.then(spendDate => !!spendDate ? Promise.resolve(spendDate) : Promise.reject()); double_spend_date = NULL
}).then(spendDate => { WHERE transaction_id IN
spendDate = spendDate.length > 0 ? new Date(_.min(spendDate) * 1000) : undefined; (SELECT transaction_id FROM transaction_input_chain);
transactionRepository.updateTransactionOutput(output.transaction_id, output.output_position, spendDate, ntp.now()) UPDATE transaction_output AS o
.then(() => callback()); SET is_double_spend = 0, double_spend_date = NULL, is_stable = 1, stable_date = CAST(strftime('%s', 'now') AS INTEGER), is_spent = EXISTS (
}); SELECT i.output_transaction_id FROM transaction_input i
}, () => resolve()); INNER JOIN transaction_output o2 ON i.transaction_id = o2.transaction_id
}); WHERE i.output_transaction_id = o.transaction_id AND i.output_position = o.output_position AND
}) o2.status != 3 AND o2.is_double_spend = 0
.then(() => transactionRepository.setInputsAsSpend(transaction.transaction_id)) ), spent_date = (
.then(() => transactionRepository.getTransactionUnstableInputs(transaction.transaction_id)) SELECT t.transaction_date FROM 'transaction' t
.then(inputs => { INNER JOIN transaction_input i ON i.transaction_id = t.transaction_id
_.each(inputs, input => { INNER JOIN transaction_output o2 ON i.transaction_id = o2.transaction_id
newTransactions.push({ WHERE i.output_transaction_id = o.transaction_id AND i.output_position = o.output_position AND
transaction_id: input.output_transaction_id, o2.status != 3 and o2.is_double_spend = 0
repository : database.getRepository('transaction') // shard zero )
}); WHERE transaction_id IN (SELECT transaction_id FROM transaction_input_chain);
newTransactions.push({ `, (err) => {
transaction_id: input.output_transaction_id, if (err) {
repository : database.getRepository('transaction', input.output_shard_id) return reject(err);
});
});
transactionUnlock();
pathAsStableUnlock();
callback();
});
}, true);
});
}, () => {
if (newTransactions.length === 0 || depth >= config.CONSENSUS_VALIDATION_REQUEST_DEPTH_MAX) {
console.log('[setPathAsStableFrom] max depth was', depth);
return resolve();
}
dfs(newTransactions, depth + 1);
});
};
dfs([
{
transaction_id: transactionID,
repository : this
} }
], 0); else {
return resolve();
}
});
}); });
} }
......
...@@ -108,4 +108,4 @@ db.initialize() ...@@ -108,4 +108,4 @@ db.initialize()
}); });
} }
}); });
//millix v1.11.2-tangled //millix v1.11.4-tangled
...@@ -19,6 +19,8 @@ import os from 'os'; ...@@ -19,6 +19,8 @@ import os from 'os';
class JobEngine { class JobEngine {
static JOB_WAIT_TIME = 1000;
constructor() { constructor() {
this.debug = false; this.debug = false;
this.configJobEngine = null; this.configJobEngine = null;
...@@ -93,95 +95,91 @@ class JobEngine { ...@@ -93,95 +95,91 @@ class JobEngine {
this.debug && console.log(`[job-engine] running job ${job.job_id} : ${job.job_name}`); this.debug && console.log(`[job-engine] running job ${job.job_id} : ${job.job_name}`);
let unlocked = false; let unlocked = false;
const timestampBegin = ntp.now(); const timestampBegin = ntp.now();
this.jobRepository.updateJobProgressStatus(job.job_id, 1, {last_date_begin: timestampBegin}) const payload = JSON.parse(job.job_payload);
.then(() => this.jobRepository.lockJobObject(job.job_id)) const module = payload ? this.modules[payload.module] : undefined;
.then(() => { return this.jobRepository.updateJobProgressStatus(job.job_id, 1, {last_date_begin: timestampBegin})
unlock(); .then(() => this.jobRepository.lockJobObject(job.job_id))
unlocked = true; .then(() => {
// run job unlock();
if (job.job_type_id === this.types['function']) { unlocked = true;
const payload = JSON.parse(job.job_payload); // run job
const module = this.modules[payload.module]; if (job.job_type_id === this.types['function']) {
this.debug && console.log(`[job-engine] running function ${payload.module}:${payload.function_name}`); this.debug && console.log(`[job-engine] running function ${payload.module}:${payload.function_name}`);
let postJob = () => { if (!module || !payload || !module[payload.function_name]) {
const timestampEnd = ntp.now(); return Promise.reject('job_invalid');
const lastElapse = timestampEnd.getTime() - timestampBegin.getTime(); }
this.debug && console.log(`[job-engine] done ${payload.module}:${payload.function_name} - ${lastElapse} ms`);
mutex.lock(['job-engine'], (unlockUpdate) => {
this.jobRepository.updateJobProgressStatus(job.job_id, 0, {
last_date_end: timestampEnd,
last_elapse : lastElapse,
last_response: 'done'
})
.then(() => this.jobRepository.unlockJobObject(job.job_id))
.then(() => {
this.processorsStatus[processorTag]['running'] = false;
unlockUpdate();
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), 500, false, true);
})
.catch(() => {
this.processorsStatus[processorTag]['running'] = false;
unlockUpdate();
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), 500, false, true);
});
});
};
module[payload.function_name]() let postJob = () => {
.then(postJob) const timestampEnd = ntp.now();
.catch(postJob); const lastElapse = timestampEnd.getTime() - timestampBegin.getTime();
} this.debug && console.log(`[job-engine] done ${payload.module}:${payload.function_name} - ${lastElapse} ms`);
else { return new Promise((resolve, reject) => {
mutex.lock(['job-engine'], (unlockUpdate) => { mutex.lock(['job-engine'], (unlockUpdate) => {
this.jobRepository this.jobRepository.updateJobProgressStatus(job.job_id, 0, {
.updateJobProgressStatus(job.job_id, 0, { last_date_end: timestampEnd,
last_date_end: ntp.now(), last_elapse : lastElapse,
last_elapse : 0, last_response: 'done'
last_response: 'fail' })
}) .then(() => this.jobRepository.unlockJobObject(job.job_id))
.then(() => this.jobRepository.unlockJobObject(job.job_id)) .then(() => {
.then(() => { unlockUpdate();
this.processorsStatus[processorTag]['running'] = false; resolve();
unlockUpdate(); })
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), 500, false, true); .catch(() => {
}) unlockUpdate();
.catch(() => { reject();
this.processorsStatus[processorTag]['running'] = false; });
unlockUpdate(); });
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), 500, false, true); });
}); };
});
}
})
.catch(() => {
mutex.lock(['job-engine'], (unlockUpdate) => {
let postJob = () => {
this.processorsStatus[processorTag]['running'] = false;
unlockUpdate();
if (!unlocked) {
unlock();
}
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), 500, false, true);
};
this.jobRepository return module[payload.function_name]()
.updateJobProgressStatus(job.job_id, 0, { .then(postJob)
last_date_end: ntp.now(), .catch(postJob);
last_elapse : 0, }
last_response: 'fail' else {
}) return Promise.reject('job_not_supported');
.then(() => this.jobRepository.unlockJobObject(job.job_id)) }
.then(postJob) })
.catch(postJob); .catch(() => {
}); const timestampEnd = ntp.now();
}); const lastElapse = timestampEnd.getTime() - timestampBegin.getTime();
this.debug && console.log(`[job-engine] done ${payload.module}:${payload.function_name} - ${lastElapse} ms`);
mutex.lock(['job-engine'], (unlockUpdate) => {
let postJob = () => {
this.processorsStatus[processorTag]['running'] = false;
unlockUpdate();
if (!unlocked) {
unlock();
}
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), JobEngine.JOB_WAIT_TIME, false, true);
};
this.jobRepository
.updateJobProgressStatus(job.job_id, 0, {
last_date_end: ntp.now(),
last_elapse : 0,
last_response: 'fail'
})
.then(() => this.jobRepository.unlockJobObject(job.job_id))
.then(postJob)
.catch(postJob);
});
});
} }
else { else {
this.processorsStatus[processorTag]['running'] = false; return Promise.reject('job_not_found');
unlock();
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), 500, false, true);
} }
})
.then(() => {
this.processorsStatus[processorTag]['running'] = false;
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), 500, false, true);
})
.catch(() => {
this.processorsStatus[processorTag]['running'] = false;
unlock();
this.running && task.scheduleTask(processorTag, this._getTask.bind(this, processorTag, processorID), JobEngine.JOB_WAIT_TIME, false, true);
}); });
}); });
} }
......
...@@ -287,11 +287,11 @@ class Network { ...@@ -287,11 +287,11 @@ class Network {
database.getRepository('node') database.getRepository('node')
.listNodes() .listNodes()
.then((nodes) => { .then((nodes) => {
async.eachSeries(nodes, (node, callback) => { async.eachSeries(_.shuffle(nodes), (node, callback) => {
this.addNode(node.node_prefix, node.node_address, node.node_port, node.node_port_api, node.node_id); this.addNode(node.node_prefix, node.node_address, node.node_port, node.node_port_api, node.node_id);
callback(); callback();
}, () => { }, () => {
_.each(config.NODE_INITIAL_LIST, ({host, port_protocol: port, port_api: portApi}) => { _.each(_.shuffle(config.NODE_INITIAL_LIST), ({host, port_protocol: port, port_api: portApi}) => {
let prefix = config.WEBSOCKET_PROTOCOL; let prefix = config.WEBSOCKET_PROTOCOL;
let url = `${prefix}://${host}:${port}`; let url = `${prefix}://${host}:${port}`;
if ((!this._nodeList[url] || !this._nodeList[url].node_id) && (prefix && host && port && portApi)) { if ((!this._nodeList[url] || !this._nodeList[url].node_id) && (prefix && host && port && portApi)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment