Commit 67dd86dd authored by Eriksson Monteiro's avatar Eriksson Monteiro

update millix-node module

parent 6266c93a
......@@ -5,6 +5,7 @@ import _ from 'lodash';
import network from '../../net/network';
import logManager from '../../core/log-manager';
import genesisConfig from '../../core/genesis/genesis-config';
import config from '../../core/config/config';
/**
......@@ -44,10 +45,14 @@ class _rKclyiLtHx0dx55M extends Endpoint {
unstable
},
network : {
online : network.initialized,
peer_count : network.registeredClients.length,
node_is_public: network.nodeIsPublic === undefined ? 'unknown' : network.nodeIsPublic,
node_public_ip: network.nodePublicIp
online : network.initialized,
peer_count : network.registeredClients.length,
node_id : network.nodeID,
node_port : config.NODE_PORT,
node_bind_ip : config.NODE_BIND_IP,
node_is_public : network.nodeIsPublic === undefined ? 'unknown' : network.nodeIsPublic,
node_public_ip : network.nodePublicIp,
node_network_addresses: network.networkInterfaceAddresses
},
log : {
log_count : logManager.lastIdx,
......
import _ from 'lodash';
import task from './task';
class Cache {
constructor() {
this.initialized = false;
this.jobRunning = false;
this.cache = {};
}
_purgeCache() {
const now = Date.now();
_.each(_.keys(this.cache), store => {
_.each(_.keys(this.cache[store]), key => {
if (now > this.cache[store][key].purge_time) {
delete this.cache[store][key];
}
});
});
}
removeCacheItem(store, key) {
if (this.cache[store] && this.cache[store][key]) {
delete this.cache[store][key];
}
}
refreshCacheTime(store, key, cacheTime = 30000) {
if (this.cache[store] && this.cache[store][key]) {
this.cache[store][key].purge_time = Date.now() + cacheTime;
}
}
setCacheItem(store, key, value, cacheTime = 30000) {
if (!this.cache[store]) {
this.cache[store] = {};
}
this.cache[store][key] = {
value,
purge_time: Date.now() + cacheTime
};
}
getCacheItem(store, key) {
if (this.cache[store] && this.cache[store][key]) {
return this.cache[store][key].value;
}
return null;
}
initialize() {
if (this.initialized) {
if (!this.jobRunning) {
task.scheduleTask('cache_purge', this._purgeCache.bind(this), 30000);
}
return Promise.resolve();
}
this.initialized = true;
task.scheduleTask('cache_purge', this._purgeCache.bind(this), 30000);
return Promise.resolve();
}
stop() {
task.removeTask('cache_purge');
this.jobRunning = false;
}
}
export default new Cache();
......@@ -772,7 +772,7 @@ export const DATABASE_ENGINE = 'sqlite';
export const DATABASE_CONNECTION = {};
export const MILLIX_CIRCULATION = 9e15;
export const NODE_MILLIX_BUILD_DATE = 1631097631;
export const NODE_MILLIX_VERSION = '1.11.15-tangled';
export const NODE_MILLIX_VERSION = '1.11.16-tangled';
export const DATA_BASE_DIR_MAIN_NETWORK = './millix-tangled';
export const DATA_BASE_DIR_TEST_NETWORK = './millix-tangled';
let DATA_BASE_DIR = MODE_TEST_NETWORK ? DATA_BASE_DIR_TEST_NETWORK : DATA_BASE_DIR_MAIN_NETWORK;
......
......@@ -7,6 +7,7 @@ import jobEngine from '../../job/job-engine';
import console from '../console';
import logManager from '../log-manager';
import database from '../../database/database';
import cache from '../cache';
class Service {
......@@ -30,6 +31,7 @@ class Service {
return logManager.initialize()
.then(() => server.initialize())
.then(() => wallet.setMode(this.mode).initialize(initializeWalletEvent))
.then(() => cache.initialize())
.then(() => network.initialize())
.then(() => peer.initialize())
.then(() => peerRotation.initialize())
......@@ -49,6 +51,7 @@ class Service {
}
this.initialized = false;
wallet.stop();
cache.stop();
network.stop();
peer.stop();
peerRotation.stop();
......
......@@ -118,7 +118,7 @@ export class WalletSync {
output_transaction_id : transactionID,
output_shard_id : outputShardID,
output_position : outputPosition,
'``transaction`.status!': 3
'`transaction`.status!': 3
}).then(inputList => {
const spendingInputs = [];
return new Promise((resolve) => {
......@@ -213,7 +213,7 @@ export class WalletSync {
output_transaction_id: transactionID,
output_shard_id : outputShardID,
output_position : outputPosition,
'``transaction`.status!': 3
'`transaction`.status!': 3
}).then(inputList => {
const spendingInputs = [];
return new Promise((resolve) => {
......
......@@ -12,6 +12,7 @@ import walletUtils from './wallet-utils';
import ntp from '../ntp';
import console from '../console';
import task from '../task';
import cache from '../cache';
export class WalletTransactionConsensus {
......@@ -456,11 +457,13 @@ export class WalletTransactionConsensus {
console.log('[wallet-transaction-consensus-oracle] transaction ', transactionID, ' was validated for a consensus');
let ws = network.getWebSocketByID(connectionID);
if (ws) {
peer.transactionValidationResponse({
const validationResult = {
transaction_id: transactionID,
valid : true,
type : 'validation_response'
}, ws, true);
};
cache.setCacheItem('validation', transactionID, validationResult, 90000);
peer.transactionValidationResponse(validationResult, ws, true);
}
delete this._transactionValidationState[nodeID];
})
......@@ -478,12 +481,14 @@ export class WalletTransactionConsensus {
}
if (ws) {
peer.transactionValidationResponse({
const validationResult = {
...err,
transaction_id: transactionID,
valid : false,
type : 'validation_response'
}, ws, true);
};
cache.setCacheItem('validation', transactionID, validationResult, 90000);
peer.transactionValidationResponse(validationResult, ws, true);
}
});
......@@ -608,6 +613,16 @@ export class WalletTransactionConsensus {
processTransactionValidationRequest(data, ws) {
// deal with the allocation process
const cachedValidation = cache.getCacheItem('validation', data.transaction_id);
if (cachedValidation) {
peer.transactionValidationResponse({
...data,
type: 'validation_start'
}, ws);
peer.transactionValidationResponse(cachedValidation, ws, true);
cache.refreshCacheTime('validation', data.transaction_id, 90000);
return;
}
if (_.keys(this._transactionValidationState).length >= config.CONSENSUS_VALIDATION_PARALLEL_REQUEST_MAX) {
peer.transactionValidationResponse({
......@@ -754,6 +769,7 @@ export class WalletTransactionConsensus {
consensusData.consensus_round_double_spend_count++;
console.log('[wallet-transaction-consensus] increase number of double spend rounds to', consensusData.consensus_round_double_spend_count);
if (consensusData.consensus_round_double_spend_count >= config.CONSENSUS_ROUND_DOUBLE_SPEND_MAX) {
cache.removeCacheItem('validation', transactionID);
consensusData.active = false;
this._transactionValidationRejected.add(transactionID);
console.log('[wallet-transaction-consensus] the transaction ', transactionID, ' was not validated (due to double spend) during consensus round number ', consensusData.consensus_round_count);
......@@ -772,6 +788,7 @@ export class WalletTransactionConsensus {
consensusData.consensus_round_not_found_count++;
console.log('[wallet-transaction-consensus] increase number of not found rounds to', consensusData.consensus_round_not_found_count);
if (consensusData.consensus_round_not_found_count >= config.CONSENSUS_ROUND_NOT_FOUND_MAX) {
cache.removeCacheItem('validation', transactionID);
consensusData.active = false;
console.log('[wallet-transaction-consensus] the transaction ', transactionID, ' was not validated (due to not found reply) during consensus round number ', consensusData.consensus_round_count);
this._transactionValidationRejected.add(transactionID);
......@@ -786,6 +803,7 @@ export class WalletTransactionConsensus {
consensusData.consensus_round_invalid_count++;
console.log('[wallet-transaction-consensus] increase number of double spend rounds to', consensusData.consensus_round_invalid_count);
if (consensusData.consensus_round_invalid_count >= config.CONSENSUS_ROUND_DOUBLE_SPEND_MAX) {
cache.removeCacheItem('validation', transactionID);
consensusData.active = false;
console.log('[wallet-transaction-consensus] the transaction ', transactionID, ' was not validated (due to not invalid tx) during consensus round number ', consensusData.consensus_round_count);
this._transactionValidationRejected.add(transactionID);
......@@ -803,6 +821,7 @@ export class WalletTransactionConsensus {
console.log('[wallet-transaction-consensus] increase number of valid rounds to', consensusData.consensus_round_validation_count);
if (consensusData.consensus_round_validation_count >= config.CONSENSUS_ROUND_VALIDATION_REQUIRED) {
console.log('[wallet-transaction-consensus] transaction ', transactionID, ' validated after receiving all replies for this consensus round');
cache.removeCacheItem('validation', transactionID);
consensusData.active = false;
if (!transaction) {
......@@ -933,7 +952,7 @@ export class WalletTransactionConsensus {
this._transactionRetryValidation[transactionID] = Date.now();
if (isTransactionFundingWallet) {
this._runningValidationForWalletTransaction = true;
this._runningValidationForWalletTransaction = true;
}
delete this._consensusRoundState[lockerID];
......
......@@ -19,6 +19,7 @@ import path from 'path';
import console from '../console';
import base58 from 'bs58';
import task from '../task';
import cache from '../cache';
export const WALLET_MODE = {
CONSOLE: 'CONSOLE',
......@@ -41,7 +42,6 @@ class Wallet {
this._maxBacklogThresholdReached = false;
this.initialized = false;
this._transactionSendInterrupt = false;
this.cache = {};
}
get isProcessingNewTransactionFromNetwork() {
......@@ -1084,7 +1084,7 @@ class Wallet {
mutex.lock(['sync-wallet-balance-response'], unlock => {
const transactions = data.transaction_id_list || [];
async.eachSeries(transactions, (transactionID, callback) => {
if (!!this._getCacheItem('sync', transactionID)) {
if (!!cache.getCacheItem('sync', transactionID)) {
return callback();
}
database.firstShards((shardID) => {
......@@ -1098,7 +1098,7 @@ class Wallet {
.catch(_ => _);
}
else {
this._setCacheItem('sync', transactionID, true, Number.MAX_SAFE_INTEGER);
cache.setCacheItem('sync', transactionID, true, Number.MAX_SAFE_INTEGER);
}
callback();
});
......@@ -1663,7 +1663,7 @@ class Wallet {
if (transactions && transactions.length > 0) {
mutex.lock(['transaction-list-propagate'], unlock => {
async.eachSeries(transactions, (transaction, callback) => {
if (!!this._getCacheItem('propagation', transaction.transaction_id)) {
if (!!cache.getCacheItem('propagation', transaction.transaction_id)) {
return callback();
}
const transactionRepository = database.getRepository('transaction');
......@@ -1678,7 +1678,7 @@ class Wallet {
.catch(_ => _);
}
else {
this._setCacheItem('propagation', transaction.transaction_id, true, (transaction.transaction_date * 1000) + (config.TRANSACTION_OUTPUT_REFRESH_OLDER_THAN * 60 * 1000));
cache.setCacheItem('propagation', transaction.transaction_id, true, (transaction.transaction_date * 1000) + (config.TRANSACTION_OUTPUT_REFRESH_OLDER_THAN * 60 * 1000));
}
callback();
});
......@@ -1687,40 +1687,10 @@ class Wallet {
}
}
_purgeCache() {
const now = Date.now();
_.each(_.keys(this.cache), store => {
_.each(_.keys(this.cache[store]), key => {
if (now > this.cache[store][key].purge_time) {
delete this.cache[store][key];
}
});
});
}
_setCacheItem(store, key, value, cacheTime = 30000) {
if (!this.cache[store]) {
this.cache[store] = {};
}
this.cache[store][key] = {
value,
purge_time: Date.now() + cacheTime
};
}
_getCacheItem(store, key) {
if (this.cache[store] && this.cache[store][key]) {
return this.cache[store][key].value;
}
return null;
}
_initializeEvents() {
walletSync.initialize()
.then(() => walletTransactionConsensus.initialize())
.then(() => {
task.scheduleTask('cache_purge', this._purgeCache.bind(this), 30000);
task.scheduleTask('transaction_propagate', this._propagateTransactions.bind(this), 10000);
eventBus.on('transaction_list_propagate', this.onPropagateTransactionList.bind(this));
eventBus.on('peer_connection_new', this._onNewPeerConnection.bind(this));
......@@ -1793,7 +1763,6 @@ class Wallet {
stop() {
this.initialized = false;
walletSync.close().then(_ => _).catch(_ => _);
task.removeTask('cache_purge');
eventBus.removeAllListeners('peer_connection_new');
eventBus.removeAllListeners('peer_connection_closed');
eventBus.removeAllListeners('transaction_new_request_proxy');
......
......@@ -126,6 +126,6 @@ db.initialize()
});
}
});
//millix v1.11.15-tangled
//millix v1.11.16-tangled
\ No newline at end of file
......@@ -19,6 +19,7 @@ import signature from '../core/crypto/signature';
import NatAPI from 'nat-api';
import statistics from '../core/statistics';
import console from '../core/console';
import os from 'os';
const WebSocketServer = Server;
......@@ -33,6 +34,7 @@ class Network {
this._bidirectionaOutboundConnectionCount = 0;
this._bidirectionaInboundConnectionCount = 0;
this._wss = null;
this.networkInterfaceAddresses = [];
this.nodeID = null;
this.nodeIsPublic = undefined;
this.certificatePem = null;
......@@ -957,7 +959,21 @@ class Network {
eventBus.on('nat_check_response', this._onNATCheckResponse.bind(this));
}
loadNetworkInterfaceIpList() {
this.networkInterfaceAddresses = [];
const nets = os.networkInterfaces();
for (const name of Object.keys(nets)) {
for (const net of nets[name]) {
// Skip over non-IPv4 and internal (i.e. 127.0.0.1) addresses
if (net.family === 'IPv4' && !net.internal) {
this.networkInterfaceAddresses.push(net.address);
}
}
}
}
initialize() {
this.loadNetworkInterfaceIpList();
this.nodeConnectionID = this.generateNewID();
return new Promise(resolve => {
console.log('[network] starting network');
......
......@@ -1280,20 +1280,21 @@ class Peer {
this.nodeAttributeCache[content.node_id] = {};
}
const now = Date.now();
if (!this.nodeAttributeCache[content.node_id][content.attribute_type]) {
this.nodeAttributeCache[content.node_id] = {
[content.attribute_type]: {
value : content.value,
updatedAt: Date.now()
updatedAt: now
}
};
}
else {
this.nodeAttributeCache[content.node_id][content.attribute_type].updatedAt = Date.now();
if (this.nodeAttributeCache[content.node_id][content.attribute_type].value === content.value) {
this.nodeAttributeCache[content.node_id][content.attribute_type].value = content.value;
if (now < this.nodeAttributeCache[content.node_id][content.attribute_type].updatedAt + 60000) {
return;
}
this.nodeAttributeCache[content.node_id][content.attribute_type].value = content.value;
this.nodeAttributeCache[content.node_id][content.attribute_type].updatedAt = now;
}
statistics.newEvent('add_or_update_attribute');
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment