import * as cluster from 'cluster'; import {Logger} from '../../Logger'; import {DiskManagerTask, ThumbnailTask, WorkerMessage, WorkerTask, WorkerTaskTypes} from './Worker'; import {DirectoryDTO} from '../../../common/entities/DirectoryDTO'; import {RendererInput} from './ThumbnailWoker'; import {Config} from '../../../common/config/private/Config'; import {ITaskQue} from './TaskQue'; interface PoolTask { task: WorkerTask; promise: { resolve: Function, reject: Function }; } interface WorkerWrapper { worker: cluster.Worker; poolTask: PoolTask; } export class ThreadPool { public static WorkerCount = 0; private workers: WorkerWrapper[] = []; private tasks: PoolTask[] = []; constructor(private size: number) { Logger.silly('Creating thread pool with', size, 'workers'); for (let i = 0; i < size; i++) { this.startWorker(); } } private startWorker() { const worker = {poolTask: null, worker: cluster.fork()}; this.workers.push(worker); worker.worker.on('online', () => { ThreadPool.WorkerCount++; Logger.debug('Worker ' + worker.worker.process.pid + ' is online, worker count:', ThreadPool.WorkerCount); }); worker.worker.on('exit', (code, signal) => { ThreadPool.WorkerCount--; Logger.warn('Worker ' + worker.worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal + ', worker count:', ThreadPool.WorkerCount); Logger.debug('Starting a new worker'); this.startWorker(); }); worker.worker.on('message', (msg: WorkerMessage) => { if (worker.poolTask == null) { throw 'No worker task after worker task is completed'; } if (msg.error) { worker.poolTask.promise.reject(msg.error); } else { worker.poolTask.promise.resolve(msg.result); } worker.poolTask = null; this.run(); }); } protected executeTask(task: WorkerTask): Promise { return new Promise((resolve: Function, reject: Function) => { this.tasks.push({task: task, promise: {resolve: resolve, reject: reject}}); this.run(); }); } private getFreeWorker() { for (let i = 0; i < this.workers.length; i++) { if (this.workers[i].poolTask == null) { return this.workers[i]; } } return null; } private run = () => { if (this.tasks.length == 0) { return; } const worker = this.getFreeWorker(); if (worker == null) { return; } const poolTask = this.tasks.shift(); worker.poolTask = poolTask; worker.worker.send(poolTask.task); }; } export class DiskManagerTH extends ThreadPool implements ITaskQue { execute(relativeDirectoryName: string): Promise { return super.executeTask({ type: WorkerTaskTypes.diskManager, relativeDirectoryName: relativeDirectoryName }); } } export class ThumbnailTH extends ThreadPool implements ITaskQue { execute(input: RendererInput): Promise { return super.executeTask({ type: WorkerTaskTypes.thumbnail, input: input, renderer: Config.Server.thumbnail.processingLibrary }); } }