Commit 17ed0b1c authored by Eriksson Monteiro's avatar Eriksson Monteiro

update millix node: use database connection pool (workers)

parent 95fbeb30
......@@ -2,7 +2,7 @@
<br>
<a href="#"><img src="https://github.com/millix/millix-wallet/blob/master/app/icon.png?raw=true" alt="millix node" width="200"></a>
<br>
millix node <small>v1.18.0</small>
millix node <small>v1.18.1</small>
<br>
</h1>
......
......@@ -779,8 +779,8 @@ export const NETWORK_SHORT_TIME_WAIT_MAX = 1500;
export const DATABASE_ENGINE = 'sqlite';
export const DATABASE_CONNECTION = {};
export const MILLIX_CIRCULATION = 9e15;
export const NODE_MILLIX_BUILD_DATE = 1650496416;
export const NODE_MILLIX_VERSION = '1.18.0-tangled';
export const NODE_MILLIX_BUILD_DATE = 1650985641;
export const NODE_MILLIX_VERSION = '1.18.1-tangled';
export const DATA_BASE_DIR_MAIN_NETWORK = './millix-tangled';
export const DATA_BASE_DIR_TEST_NETWORK = './millix-tangled';
let DATA_BASE_DIR = MODE_TEST_NETWORK ? DATA_BASE_DIR_TEST_NETWORK : DATA_BASE_DIR_MAIN_NETWORK;
......
......@@ -1011,7 +1011,12 @@ export class WalletTransactionConsensus {
cache.removeCacheItem('validation', transactionID);
consensusData.active = false;
walletSync.syncTransactionSpendingOutputs(transaction, config.MODE_NODE_SYNC_FULL);
if(transaction) {
walletSync.syncTransactionSpendingOutputs(transaction, config.MODE_NODE_SYNC_FULL);
} else {
console.log('[wallet-transaction-consensus] unexpected null transaction object detected');
}
console.log('[wallet-transaction-consensus] transaction object no present for tx id:', transactionID);
return database.applyShards(shardID => {
const transactionRepository = database.getRepository('transaction', shardID);
......
This diff is collapsed.
......@@ -8,10 +8,11 @@ import wallet from '../core/wallet/wallet';
import console from '../core/console';
import path from 'path';
import async from 'async';
import {Address, API, Config, Job, Keychain, Node, Schema, Shard as ShardRepository, Wallet, Normalization} from './repositories/repositories';
import {Address, API, Config, Job, Keychain, Node, Normalization, Schema, Shard as ShardRepository, Wallet} from './repositories/repositories';
import Shard from './shard';
import _ from 'lodash';
import eventBus from '../core/event-bus';
import {Pool} from './pool/pool';
export class Database {
......@@ -197,108 +198,14 @@ export class Database {
}
_initializeMillixSqlite3() {
return new Promise(resolve => {
const sqlite3 = require('sqlite3');
sqlite3.Database.prototype.runAsync = function(sql, ...params) {
return new Promise((resolve, reject) => {
this.run(sql, params, function(err) {
if (err) {
return reject(err);
}
resolve(this);
});
});
};
this.databaseRootFolder = path.join(os.homedir(), config.DATABASE_CONNECTION.FOLDER);
if (!fs.existsSync(this.databaseRootFolder)) {
fs.mkdirSync(path.join(this.databaseRootFolder));
}
let dbFile = path.join(this.databaseRootFolder, config.DATABASE_CONNECTION.FILENAME_MILLIX);
let doInitialize = false;
if (!fs.existsSync(dbFile)) {
doInitialize = true;
}
this.databaseMillix = new sqlite3.Database(dbFile, (err) => {
if (err) {
throw Error(err.message);
}
console.log('Connected to the millix database.');
config.MODE_DEBUG && Database.enableDebugger(this.databaseMillix);
if (doInitialize) {
console.log('Initializing database');
fs.readFile(config.DATABASE_CONNECTION.SCRIPT_INIT_MILLIX, 'utf8', (err, data) => {
if (err) {
throw Error(err.message);
}
this.databaseMillix.exec(data, (err) => {
if (err) {
return console.log(err.message);
}
console.log('Database initialized');
this.databaseMillix.run('PRAGMA journal_mode = WAL', () => this.databaseMillix.run('PRAGMA synchronous = NORMAL', () => resolve()));
});
});
}
else {
this.databaseMillix.run('PRAGMA journal_mode = WAL', () => this.databaseMillix.run('PRAGMA synchronous = NORMAL', () => resolve()));
}
});
});
this.databaseRootFolder = path.join(os.homedir(), config.DATABASE_CONNECTION.FOLDER);
this.databaseMillix = new Pool(this.databaseRootFolder, config.DATABASE_CONNECTION.FILENAME_MILLIX, config.DATABASE_CONNECTION.SCRIPT_INIT_MILLIX);
return this.databaseMillix.initialize();
}
_initializeJobEngineSqlite3() {
return new Promise(resolve => {
const sqlite3 = require('sqlite3');
if (!fs.existsSync(path.join(os.homedir(), config.DATABASE_CONNECTION.FOLDER))) {
fs.mkdirSync(path.join(os.homedir(), config.DATABASE_CONNECTION.FOLDER));
}
let dbFile = path.join(os.homedir(), config.DATABASE_CONNECTION.FOLDER + config.DATABASE_CONNECTION.FILENAME_JOB_ENGINE);
let doInitialize = false;
if (!fs.existsSync(dbFile)) {
doInitialize = true;
}
this.databaseJobEngine = new sqlite3.Database(dbFile, (err) => {
if (err) {
throw Error(err.message);
}
console.log('Connected to the job engine database.');
if (doInitialize) {
console.log('Initializing database');
fs.readFile(config.DATABASE_CONNECTION.SCRIPT_INIT_MILLIX_JOB_ENGINE, 'utf8', (err, data) => {
if (err) {
throw Error(err.message);
}
this.databaseJobEngine.exec(data, function(err) {
if (err) {
return console.log(err.message);
}
console.log('Database initialized');
resolve();
});
});
}
else {
resolve();
}
});
});
this.databaseJobEngine = new Pool(this.databaseRootFolder, config.DATABASE_CONNECTION.FILENAME_JOB_ENGINE, config.DATABASE_CONNECTION.SCRIPT_INIT_MILLIX_JOB_ENGINE, 1);
return this.databaseJobEngine.initialize();
}
addShard(shard, updateTables) {
......@@ -669,7 +576,7 @@ export class Database {
if (err) {
console.error(err.message);
}
console.log('Close the millix database connection.');
console.log('[database] the millix database connection was closed.');
callback();
});
}
......@@ -683,13 +590,25 @@ export class Database {
if (err) {
console.error(err.message);
}
console.log('Close the job engine database connection.');
console.log('[database] job engine database connection was closed.');
callback();
});
}
else {
callback();
}
},
(callback) => {
async.eachSeries(_.keys(this.shards), (shardID, callback) => {
if (this.shards[shardID]) {
this.shards[shardID].close().then(() => callback());
}
else {
callback();
}
}, () => {
callback();
});
}
], () => resolve());
});
......
import {Worker} from 'worker_threads';
import os from 'os';
import async from 'async';
export class Pool {
constructor(databaseFolder, databaseName, initScriptFile, size) {
this.initialized = false;
this.closed = false;
this.databaseFolder = databaseFolder;
this.databaseName = databaseName;
this.initScriptFile = initScriptFile;
this.workerList = [];
this.queue = [];
this.size = size || os.cpus().length;
}
drainQueue() {
for (const worker of this.workerList) {
worker.takeWork();
}
}
initialize() {
if (this.initialized) {
return Promise.resolve();
}
this.initialized = true;
return this._spawnAllWorkers()
.then(() => new Promise((resolve, reject) => {
async.eachSeries(this.workerList, (worker, callback) => {
this._sendJobToWorker(worker, 'init', {
database_folder : this.databaseFolder,
database_name : this.databaseName,
init_script_file: this.initScriptFile
}).then(() => callback()).catch(err => callback(err));
}, err => err ? reject(err) : resolve());
}));
}
_spawnAllWorkers() {
return new Promise((resolve, reject) => {
/*
Spawn workers that try to drain the queue.
*/
async.timesSeries(this.size, (i, callback) => {
this._createWorker()
.then(() => callback())
.catch(err => callback(err));
}, err => err ? reject(err) : resolve());
});
}
_getParameterAndCallback(...parameters) {
if (parameters.length === 1) {
return {
callback : parameters[0],
parameters: undefined
};
}
else {
return {
callback : parameters[1],
parameters: parameters[0]
};
}
}
_execSQL(type, sql, ...data) {
const {
callback,
parameters
} = this._getParameterAndCallback(...data);
if (this.initialized === false || this.closed === true) {
return callback({
error : 'database_closed',
message: 'database closed'
});
}
this._addJob(type, {
sql,
parameters
}).then(result => callback(result.err, result.data)).catch(callback);
}
all(sql, ...parameters) {
this._execSQL('all', sql, ...parameters);
}
get(sql, ...parameters) {
this._execSQL('get', sql, ...parameters);
}
run(sql, ...parameters) {
this._execSQL('run', sql, ...parameters);
}
exec(sql, ...parameters) {
this._execSQL('exec', sql, ...parameters);
}
serialize(callback) {
callback();
}
close(callback) {
this.closed = true;
async.eachSeries(this.workerList, (worker, eachCallback) => {
this._sendJobToWorker(worker, 'close', {})
.then(() => eachCallback())
.catch(err => eachCallback(err));
}, err => callback(err));
}
_addJob(type, data) {
return new Promise((resolve, reject) => {
this.queue.push({
resolve,
reject,
message: {
type,
data
}
});
this.drainQueue();
});
}
_sendJobToWorker(worker, type, data) {
return new Promise((resolve, reject) => {
worker.priorityWork.push({
resolve,
reject,
message: {
type,
data
}
});
if (!worker.job) {
worker.takeWork();
}
});
}
_createWorker() {
return new Promise((resolve) => {
const worker = new Worker('./database/pool/worker.mjs');
let error = null; // Error that caused the worker to crash
worker.priorityWork = [];
worker.takeWork = () => {
if (!worker.job && (worker.priorityWork.length > 0 || this.queue.length)) {
// If there's a job in the queue, send it to the worker
worker.job = worker.priorityWork.shift() || this.queue.shift();
worker.postMessage(worker.job.message);
}
};
worker
.on('online', () => {
this.workerList.push(worker);
worker.takeWork();
resolve();
})
.on('message', (result) => {
worker.job.resolve(result);
worker.job = null;
worker.takeWork(); // Check if there's more work to do
})
.on('error', (err) => {
console.log('[pool-worker] error', err);
error = err;
})
.on('exit', (code) => {
this.workerList = this.workerList.filter(w => w !== worker);
if (worker.job) {
worker.job.reject(error || new Error('worker died'));
}
if (code !== 0) {
console.log(`[pool-worker] worker exited with code ${code}`);
// Worker died, so spawn a new one
this._createWorker().then(_ => _);
}
});
});
}
}
import {parentPort} from 'worker_threads';
import sqlite3 from 'sqlite3';
import path from 'path';
import fs from 'fs';
let database;
function initializeDB(databaseRootFolder, databaseName, initializeScriptFile) {
return new Promise((resolve, reject) => {
if (!fs.existsSync(databaseRootFolder)) {
fs.mkdirSync(path.join(databaseRootFolder));
}
let dbFile = path.join(databaseRootFolder, databaseName);
let doInitialize = false;
if (!fs.existsSync(dbFile)) {
doInitialize = true;
}
database = new sqlite3.Database(dbFile, (err) => {
if (err) {
return reject(err.message);
}
if (doInitialize) {
fs.readFile(initializeScriptFile, 'utf8', (err, data) => {
if (err) {
return reject(err.message);
}
database.exec(data, (err) => {
if (err) {
return reject(err.message);
}
database.run('PRAGMA journal_mode = WAL', () => database.run('PRAGMA synchronous = NORMAL', () => resolve()));
});
});
} else {
database.run('PRAGMA journal_mode = WAL', () => database.run('PRAGMA synchronous = NORMAL', () => resolve()));
}
});
});
}
parentPort.on('message', ({
type,
data
}) => {
if (type === 'init') {
initializeDB(data.database_folder, data.database_name, data.init_script_file)
.then(() => parentPort.postMessage({
type: 'init_response',
initialized: true
}))
.catch(e => {
throw Error(e)
});
} else if (type === 'close') {
database.close(() => {
parentPort.postMessage({type: 'close_response', closed: true});
setImmediate(() => process.exit(0))
})
} else if (type === 'all' || type === 'get' || type === 'run') {
const {
sql,
parameters
} = data;
database[type](sql, parameters, (err, data) => {
parentPort.postMessage({err, data});
});
} else if (type === 'exec') {
const {
sql
} = data;
database.exec(sql, (err, data) => {
parentPort.postMessage({err, data});
});
} else {
throw Error('execution type not supported');
}
});
......@@ -1064,8 +1064,7 @@ export default class Transaction {
});
})
.then(transaction => resolve(_.cloneDeep(transaction))) /*TODO: addTransactionFromObject is deleting address from output and input object. that is changing the cachedItem. we should not change the object. now we are creating a clone (refactor)*/
.catch((e) => {
console.log('[transaction-object] cannot get transaction with id', transactionID, 'err:', e);
.catch(() => {
resolve(null);
});
});
......@@ -2701,14 +2700,14 @@ export default class Transaction {
if (err) {
return reject();
}
async.eachSeries(transactionList, (transaction, callback) => {
if (transaction.shard_id === 'AyAC3kjLtjM4vktAJ5Xq6mbXKjzEqXoSsmGhhgjnkXUvjtF2M') { /* do not prune this special shard id. TODO: refactor and activate shard */
async.eachSeries(transactionList, (data, callback) => {
if (data.shard_id === 'AyAC3kjLtjM4vktAJ5Xq6mbXKjzEqXoSsmGhhgjnkXUvjtF2M') { /* do not prune this special shard id. TODO: refactor and activate shard */
return callback();
}
this.getTransactionObject(transaction.transaction_id)
this.getTransactionObject(data.transaction_id)
.then(transaction => {
if (!transaction) {
return;
return this.deleteTransaction(data.transaction_id);
}
if (database.getShard(transaction.shard_id)) {
......
......@@ -8,6 +8,7 @@ import eventBus from '../core/event-bus';
import os from 'os';
import async from 'async';
import _ from 'lodash';
import {Pool} from './pool/pool';
export default class Shard {
constructor(databaseFile, shardID) {
......@@ -99,59 +100,9 @@ export default class Shard {
}
_initializeMillixShardSqlite3() {
return new Promise(resolve => {
const sqlite3 = require('sqlite3');
sqlite3.Database.prototype.runAsync = function(sql, ...params) {
return new Promise((resolve, reject) => {
this.run(sql, params, function(err) {
if (err) {
return reject(err);
}
resolve(this);
});
});
};
let shardFolder = path.dirname(this.databaseFile);
if (!fs.existsSync(shardFolder)) {
fs.mkdirSync(shardFolder);
}
let doInitialize = false;
if (!fs.existsSync(this.databaseFile)) {
doInitialize = true;
}
this.database = new sqlite3.Database(this.databaseFile, (err) => {
if (err) {
throw Error(err.message);
}
console.log('[shard] connected to the shard database: ', this.shardID);
config.MODE_DEBUG && Database.enableDebugger(this.database);
if (doInitialize) {
console.log('[shard] initializing database');
fs.readFile(config.DATABASE_CONNECTION.SCRIPT_INIT_MILLIX_SHARD, 'utf8', (err, data) => {
if (err) {
throw Error(err.message);
}
this.database.exec(data, (err) => {
if (err) {
throw Error(err.message);
}
console.log('[shard] database initialized');
this.database.shardID = this.shardID;
this.database.run('PRAGMA journal_mode = WAL', () => this.database.run('PRAGMA synchronous = NORMAL', () => resolve()));
});
});
}
else {
this.database.run('PRAGMA journal_mode = WAL', () => this.database.run('PRAGMA synchronous = NORMAL', () => resolve()));
}
});
});
this.database = new Pool(path.dirname(this.databaseFile), path.basename(this.databaseFile), config.DATABASE_CONNECTION.SCRIPT_INIT_MILLIX_SHARD);
this.database.shardID = this.shardID;
return this.database.initialize();
}
checkup() {
......@@ -167,4 +118,16 @@ export default class Shard {
});
}
close() {
return new Promise(resolve => {
this.database.close(err => {
if (err) {
console.error(err.message);
}
console.log('[shard] the database connection was closed.');
resolve();
});
});
}
}
......@@ -85,13 +85,16 @@ if (!argv.natPmp) {
process.title = 'millix-node';
process.on('SIGINT', function() {
console.log('\nGracefully shutting down from SIGINT (Crtl-C)');
process.exit(0);
});
process.on('exit', async() => {
await db.close();
let shutdown = false;
process.on('SIGINT', async function() {
if (!shutdown) {
shutdown = true;
console.log('\n[main] gracefully shutting down from SIGINT (Crtl-C)');
console.log('[main] closing all db connections');
await db.close();
console.log('[main] all db connections closed');
process.exit(0);
}
});
console.log('starting millix-core');
......@@ -125,5 +128,5 @@ db.initialize()
});
}
});
//millix v1.18.0-tangled
//millix v1.18.1-tangled
\ No newline at end of file
......@@ -52,7 +52,7 @@
"request": "^2.88.0",
"secp256k1": "^3.6.2",
"socks": "^2.3.2",
"sqlite3": "^4.0.6",
"sqlite3": "^5.0.5",
"thirty-two": "^1.0.1",
"utf-8-validate": "^5.0.2",
"ws": "^6.2.1",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment