import * as cluster from 'cluster'; import {Logger} from '../../Logger'; import {DiskManagerTask, ThumbnailTask, WorkerMessage, WorkerTask, WorkerTaskTypes} from './Worker'; import {DirectoryDTO} from '../../../common/entities/DirectoryDTO'; import {RendererInput} from './ThumbnailWorker'; import {Config} from '../../../common/config/private/Config'; import {TaskQue, TaskQueEntry} from './TaskQue'; import {ITaskExecuter} from './TaskExecuter'; interface WorkerWrapper { worker: cluster.Worker; poolTask: TaskQueEntry; } export class ThreadPool { public static WorkerCount = 0; private workers: WorkerWrapper[] = []; private taskQue = new TaskQue(); constructor(private size: number) { Logger.silly('Creating thread pool with', size, 'workers'); for (let i = 0; i < size; i++) { this.startWorker(); } } private run = () => { if (this.taskQue.isEmpty()) { return; } const worker = this.getFreeWorker(); if (worker == null) { return; } const poolTask = this.taskQue.get(); worker.poolTask = poolTask; worker.worker.send(poolTask.data); }; protected executeTask(task: WorkerTask): Promise { const promise = this.taskQue.add(task).promise.obj; this.run(); return promise; } private getFreeWorker() { for (let i = 0; i < this.workers.length; i++) { if (this.workers[i].poolTask == null) { return this.workers[i]; } } return null; } private startWorker() { const worker = >{poolTask: null, worker: cluster.fork()}; this.workers.push(worker); worker.worker.on('online', () => { ThreadPool.WorkerCount++; Logger.debug('Worker ' + worker.worker.process.pid + ' is online, worker count:', ThreadPool.WorkerCount); }); worker.worker.on('exit', (code, signal) => { ThreadPool.WorkerCount--; Logger.warn('Worker ' + worker.worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal + ', worker count:', ThreadPool.WorkerCount); Logger.debug('Starting a new worker'); this.startWorker(); }); worker.worker.on('message', (msg: WorkerMessage) => { if (worker.poolTask == null) { throw new Error('No worker task after worker task is completed'); } if (msg.error) { worker.poolTask.promise.reject(msg.error); } else { worker.poolTask.promise.resolve(msg.result); } this.taskQue.ready(worker.poolTask); worker.poolTask = null; this.run(); }); } } export class DiskManagerTH extends ThreadPool implements ITaskExecuter { execute(relativeDirectoryName: string): Promise { return super.executeTask({ type: WorkerTaskTypes.diskManager, relativeDirectoryName: relativeDirectoryName }); } } export class ThumbnailTH extends ThreadPool implements ITaskExecuter { execute(input: RendererInput): Promise { return super.executeTask({ type: WorkerTaskTypes.thumbnail, input: input, renderer: Config.Server.thumbnail.processingLibrary }); } }