From 38f36891bddfa6cf20799b849d3219aaa121d60b Mon Sep 17 00:00:00 2001 From: "Patrik J. Braun" Date: Sat, 8 Dec 2018 18:17:33 +0100 Subject: [PATCH] improving multi threading --- .../thumbnail/ThumbnailGeneratorMWs.ts | 20 ++--- backend/model/threading/TaskExecuter.ts | 37 +++++++++ backend/model/threading/TaskQue.ts | 80 +++++++++++-------- backend/model/threading/ThreadPool.ts | 40 +++++----- backend/model/threading/Worker.ts | 15 +++- frontend/app/app.module.ts | 15 +--- .../directory/directory.gallery.component.ts | 2 +- .../photo/photo.grid.gallery.component.ts | 2 +- .../lightbox/lightbox.gallery.component.ts | 2 - .../lightbox.map.gallery.component.ts | 2 +- ....service.ts => thumbnailLoader.service.ts} | 0 ...service.ts => thumbnailManager.service.ts} | 6 +- .../unit/model/threading/TaskExecuter.spec.ts | 66 +++++++++++++++ .../unit/model/threading/TaskQue.spec.ts | 33 ++++++++ 14 files changed, 230 insertions(+), 90 deletions(-) create mode 100644 backend/model/threading/TaskExecuter.ts rename frontend/app/gallery/{thumnailLoader.service.ts => thumbnailLoader.service.ts} (100%) rename frontend/app/gallery/{thumnailManager.service.ts => thumbnailManager.service.ts} (97%) create mode 100644 test/backend/unit/model/threading/TaskExecuter.spec.ts create mode 100644 test/backend/unit/model/threading/TaskQue.spec.ts diff --git a/backend/middlewares/thumbnail/ThumbnailGeneratorMWs.ts b/backend/middlewares/thumbnail/ThumbnailGeneratorMWs.ts index 144cbf18..b7d361aa 100644 --- a/backend/middlewares/thumbnail/ThumbnailGeneratorMWs.ts +++ b/backend/middlewares/thumbnail/ThumbnailGeneratorMWs.ts @@ -7,18 +7,18 @@ import {ErrorCodes, ErrorDTO} from '../../../common/entities/Error'; import {ContentWrapper} from '../../../common/entities/ConentWrapper'; import {DirectoryDTO} from '../../../common/entities/DirectoryDTO'; import {ProjectPath} from '../../ProjectPath'; -import {PhotoDTO} from '../../../common/entities/PhotoDTO'; import {Config} from '../../../common/config/private/Config'; import {ThumbnailProcessingLib} from '../../../common/config/private/IPrivateConfig'; import {ThumbnailTH} from '../../model/threading/ThreadPool'; -import {RendererInput, ThumbnailSourceType} from '../../model/threading/ThumbnailWorker'; -import {ITaskQue, TaskQue} from '../../model/threading/TaskQue'; +import {RendererInput, ThumbnailSourceType, ThumbnailWorker} from '../../model/threading/ThumbnailWorker'; + import {MediaDTO} from '../../../common/entities/MediaDTO'; +import {ITaskExecuter, TaskExecuter} from '../../model/threading/TaskExecuter'; export class ThumbnailGeneratorMWs { private static initDone = false; - private static taskQue: ITaskQue = null; + private static taskQue: ITaskExecuter = null; public static init() { if (this.initDone === true) { @@ -26,8 +26,7 @@ export class ThumbnailGeneratorMWs { } - if (Config.Server.threading.enable === true || - Config.Server.thumbnail.processingLibrary !== ThumbnailProcessingLib.Jimp) { + if (Config.Server.threading.enable === true) { if (Config.Server.threading.thumbnailThreads > 0) { Config.Client.Thumbnail.concurrentThumbnailGenerations = Config.Server.threading.thumbnailThreads; } else { @@ -41,7 +40,8 @@ export class ThumbnailGeneratorMWs { Config.Server.thumbnail.processingLibrary === ThumbnailProcessingLib.Jimp) { this.taskQue = new ThumbnailTH(Config.Client.Thumbnail.concurrentThumbnailGenerations); } else { - this.taskQue = new TaskQue(Config.Client.Thumbnail.concurrentThumbnailGenerations); + this.taskQue = new TaskExecuter(Config.Client.Thumbnail.concurrentThumbnailGenerations, + (input => ThumbnailWorker.render(input, Config.Server.thumbnail.processingLibrary))); } this.initDone = true; @@ -102,10 +102,10 @@ export class ThumbnailGeneratorMWs { } private static addThInfoTODir(directory: DirectoryDTO) { - if (typeof directory.media === 'undefined') { + if (typeof directory.media === 'undefined') { directory.media = []; } - if (typeof directory.directories === 'undefined') { + if (typeof directory.directories === 'undefined') { directory.directories = []; } ThumbnailGeneratorMWs.addThInfoToPhotos(directory.media); @@ -124,7 +124,7 @@ export class ThumbnailGeneratorMWs { const size = Config.Client.Thumbnail.thumbnailSizes[j]; const thPath = path.join(thumbnailFolder, ThumbnailGeneratorMWs.generateThumbnailName(fullMediaPath, size)); if (fs.existsSync(thPath) === true) { - if (typeof photos[i].readyThumbnails === 'undefined') { + if (typeof photos[i].readyThumbnails === 'undefined') { photos[i].readyThumbnails = []; } photos[i].readyThumbnails.push(size); diff --git a/backend/model/threading/TaskExecuter.ts b/backend/model/threading/TaskExecuter.ts new file mode 100644 index 00000000..f3ab6f9e --- /dev/null +++ b/backend/model/threading/TaskExecuter.ts @@ -0,0 +1,37 @@ +import {TaskQue} from './TaskQue'; + + +export interface ITaskExecuter { + execute(input: I): Promise; +} + +export class TaskExecuter implements ITaskExecuter { + + private taskQue = new TaskQue(); + private taskInProgress = 0; + private run = async () => { + if (this.taskQue.isEmpty() || this.taskInProgress >= this.size) { + return; + } + this.taskInProgress++; + const task = this.taskQue.get(); + try { + task.promise.resolve(await this.worker(task.data)); + } catch (err) { + task.promise.reject(err); + } + this.taskQue.ready(task); + this.taskInProgress--; + process.nextTick(this.run); + }; + + constructor(private size: number, private worker: (input: I) => Promise) { + } + + + execute(input: I): Promise { + const promise = this.taskQue.add(input).promise.obj; + this.run().catch(console.error); + return promise; + } +} diff --git a/backend/model/threading/TaskQue.ts b/backend/model/threading/TaskQue.ts index 4d0514c8..6be0da10 100644 --- a/backend/model/threading/TaskQue.ts +++ b/backend/model/threading/TaskQue.ts @@ -1,46 +1,58 @@ -import {RendererInput, ThumbnailWorker} from './ThumbnailWorker'; -import {Config} from '../../../common/config/private/Config'; +import {Utils} from '../../../common/Utils'; -interface QueTask { - data: RendererInput; - promise: { resolve: Function, reject: Function }; +export interface TaskQueEntry { + data: I; + promise: { obj: Promise, resolve: Function, reject: Function }; } -export interface ITaskQue { - execute(input: any): Promise; -} +export class TaskQue { -export class TaskQue implements ITaskQue { + private tasks: TaskQueEntry[] = []; + private processing: TaskQueEntry[] = []; - private tasks: QueTask[] = []; - private taskInProgress = 0; - private run = async () => { - if (this.tasks.length === 0 || this.taskInProgress >= this.size) { - return; - } - this.taskInProgress++; - const task = this.tasks.shift(); - try { - task.promise.resolve(await ThumbnailWorker.render(task.data, Config.Server.thumbnail.processingLibrary)); - } catch (err) { - task.promise.reject(err); - } - this.taskInProgress--; - process.nextTick(this.run); - }; - - constructor(private size: number) { + constructor() { } - execute(input: RendererInput): Promise { - return new Promise((resolve: Function, reject: Function) => { - this.tasks.push({ - data: input, - promise: {resolve: resolve, reject: reject} - }); - this.run(); + + private getSameTask(input: I): TaskQueEntry { + return this.tasks.find(t => Utils.equalsFilter(t.data, input)) || + this.processing.find(t => Utils.equalsFilter(t.data, input)); + } + + private putNewTask(input: I): TaskQueEntry { + const taskEntry: TaskQueEntry = { + data: input, + promise: { + obj: null, + resolve: null, + reject: null + } + }; + this.tasks.push(taskEntry); + taskEntry.promise.obj = new Promise((resolve: Function, reject: Function) => { + taskEntry.promise.reject = reject; + taskEntry.promise.resolve = resolve; }); + return taskEntry; + } + + public isEmpty(): boolean { + return this.tasks.length === 0; + } + + public add(input: I): TaskQueEntry { + return (this.getSameTask(input) || this.putNewTask(input)); + } + + public get(): TaskQueEntry { + const task = this.tasks.shift(); + this.processing.push(task); + return task; + } + + public ready(task: TaskQueEntry): void { + this.processing.slice(this.processing.indexOf(task), 1); } } diff --git a/backend/model/threading/ThreadPool.ts b/backend/model/threading/ThreadPool.ts index c53ddff2..2a118e40 100644 --- a/backend/model/threading/ThreadPool.ts +++ b/backend/model/threading/ThreadPool.ts @@ -4,24 +4,20 @@ import {DiskManagerTask, ThumbnailTask, WorkerMessage, WorkerTask, WorkerTaskTyp import {DirectoryDTO} from '../../../common/entities/DirectoryDTO'; import {RendererInput} from './ThumbnailWorker'; import {Config} from '../../../common/config/private/Config'; -import {ITaskQue} from './TaskQue'; +import {TaskQue, TaskQueEntry} from './TaskQue'; +import {ITaskExecuter} from './TaskExecuter'; -interface PoolTask { - task: WorkerTask; - promise: { resolve: Function, reject: Function }; -} - -interface WorkerWrapper { +interface WorkerWrapper { worker: cluster.Worker; - poolTask: PoolTask; + poolTask: TaskQueEntry; } -export class ThreadPool { +export class ThreadPool { public static WorkerCount = 0; - private workers: WorkerWrapper[] = []; - private tasks: PoolTask[] = []; + private workers: WorkerWrapper[] = []; + private taskQue = new TaskQue(); constructor(private size: number) { Logger.silly('Creating thread pool with', size, 'workers'); @@ -31,7 +27,7 @@ export class ThreadPool { } private run = () => { - if (this.tasks.length === 0) { + if (this.taskQue.isEmpty()) { return; } const worker = this.getFreeWorker(); @@ -39,16 +35,15 @@ export class ThreadPool { return; } - const poolTask = this.tasks.shift(); + const poolTask = this.taskQue.get(); worker.poolTask = poolTask; - worker.worker.send(poolTask.task); + worker.worker.send(poolTask.data); }; - protected executeTask(task: WorkerTask): Promise { - return new Promise((resolve: Function, reject: Function) => { - this.tasks.push({task: task, promise: {resolve: resolve, reject: reject}}); - this.run(); - }); + protected executeTask(task: WorkerTask): Promise { + const promise = this.taskQue.add(task).promise.obj; + this.run(); + return promise; } private getFreeWorker() { @@ -61,7 +56,7 @@ export class ThreadPool { } private startWorker() { - const worker = {poolTask: null, worker: cluster.fork()}; + const worker = >{poolTask: null, worker: cluster.fork()}; this.workers.push(worker); worker.worker.on('online', () => { ThreadPool.WorkerCount++; @@ -84,6 +79,7 @@ export class ThreadPool { } else { worker.poolTask.promise.resolve(msg.result); } + this.taskQue.ready(worker.poolTask); worker.poolTask = null; this.run(); }); @@ -91,7 +87,7 @@ export class ThreadPool { } -export class DiskManagerTH extends ThreadPool implements ITaskQue { +export class DiskManagerTH extends ThreadPool implements ITaskExecuter { execute(relativeDirectoryName: string): Promise { return super.executeTask({ type: WorkerTaskTypes.diskManager, @@ -100,7 +96,7 @@ export class DiskManagerTH extends ThreadPool implements ITaskQue { } } -export class ThumbnailTH extends ThreadPool implements ITaskQue { +export class ThumbnailTH extends ThreadPool implements ITaskExecuter { execute(input: RendererInput): Promise { return super.executeTask({ type: WorkerTaskTypes.thumbnail, diff --git a/backend/model/threading/Worker.ts b/backend/model/threading/Worker.ts index c49f81e6..7397aa9a 100644 --- a/backend/model/threading/Worker.ts +++ b/backend/model/threading/Worker.ts @@ -2,6 +2,8 @@ import {DiskMangerWorker} from './DiskMangerWorker'; import {Logger} from '../../Logger'; import {RendererInput, ThumbnailWorker} from './ThumbnailWorker'; import {ThumbnailProcessingLib} from '../../../common/config/private/IPrivateConfig'; +import {DirectoryDTO} from '../../../common/entities/DirectoryDTO'; +import {Utils} from '../../../common/Utils'; export class Worker { @@ -22,7 +24,6 @@ export class Worker { result = await ThumbnailWorker.render((task).input, (task).renderer); break; default: - Logger.error('Unknown worker task type'); throw new Error('Unknown worker task type'); } process.send({ @@ -54,7 +55,13 @@ export interface ThumbnailTask extends WorkerTask { renderer: ThumbnailProcessingLib; } -export interface WorkerMessage { - error: any; - result: any; +export module WorkerTask { + export const equals = (t1: WorkerTask, t2: WorkerTask): boolean => { + return Utils.equalsFilter(t1, t2); + }; +} + +export interface WorkerMessage { + error: Error; + result: DirectoryDTO | void; } diff --git a/frontend/app/app.module.ts b/frontend/app/app.module.ts index d0552f46..370bccac 100644 --- a/frontend/app/app.module.ts +++ b/frontend/app/app.module.ts @@ -13,7 +13,6 @@ import {appRoutes} from './app.routing'; import {UserService} from './model/network/user.service'; import {GalleryService} from './gallery/gallery.service'; import {NetworkService} from './model/network/network.service'; -import {ThumbnailLoaderService} from './gallery/thumnailLoader.service'; import {GalleryCacheService} from './gallery/cache.gallery.service'; import {FullScreenService} from './gallery/fullscreen.service'; import {AuthenticationService} from './model/network/authentication.service'; @@ -34,7 +33,7 @@ import {GalleryComponent} from './gallery/gallery.component'; import {StringifyRole} from './pipes/StringifyRolePipe'; import {GalleryMapComponent} from './gallery/map/map.gallery.component'; import {GalleryMapLightboxComponent} from './gallery/map/lightbox/lightbox.map.gallery.component'; -import {ThumbnailManagerService} from './gallery/thumnailManager.service'; +import {ThumbnailManagerService} from './gallery/thumbnailManager.service'; import {OverlayService} from './gallery/overlay.service'; import {SlimLoadingBarModule} from 'ng2-slim-loading-bar'; import {GalleryShareComponent} from './gallery/share/share.gallery.component'; @@ -76,6 +75,7 @@ import {DurationPipe} from './pipes/DurationPipe'; import {MapService} from './gallery/map/map.service'; import {Icon} from 'leaflet'; import {MetaFileSettingsComponent} from './settings/metafiles/metafile.settings.component'; +import {ThumbnailLoaderService} from './gallery/thumbnailLoader.service'; @Injectable() @@ -84,16 +84,7 @@ export class MyHammerConfig extends HammerGestureConfig { 'swipe': {direction: 31} // enable swipe up }; } -/* -console.log(Icon); -console.log(Icon.Default); -console.log(Icon.Default.prototype); -console.log(Icon.Default.prototype.options); -Icon.Default.prototype.options.iconRetinaUrl = 'assets/leaflet/marker-icon-2x.png'; -Icon.Default.imagePath = 'assets/leaflet/marker-icon.png'; -Icon.Default.prototype.options.iconUrl = 'assets/leaflet/marker-icon.png'; -Icon.Default.prototype.options.shadowUrl = 'assets/leaflet/marker-shadow.png'; -*/ + export class CustomUrlSerializer implements UrlSerializer { private _defaultUrlSerializer: DefaultUrlSerializer = new DefaultUrlSerializer(); diff --git a/frontend/app/gallery/directory/directory.gallery.component.ts b/frontend/app/gallery/directory/directory.gallery.component.ts index 64007415..4d0585ef 100644 --- a/frontend/app/gallery/directory/directory.gallery.component.ts +++ b/frontend/app/gallery/directory/directory.gallery.component.ts @@ -4,7 +4,7 @@ import {DirectoryDTO} from '../../../../common/entities/DirectoryDTO'; import {RouterLink} from '@angular/router'; import {Utils} from '../../../../common/Utils'; import {Media} from '../Media'; -import {Thumbnail, ThumbnailManagerService} from '../thumnailManager.service'; +import {Thumbnail, ThumbnailManagerService} from '../thumbnailManager.service'; import {PageHelper} from '../../model/page.helper'; import {QueryService} from '../../model/query.service'; import {PhotoDTO} from '../../../../common/entities/PhotoDTO'; diff --git a/frontend/app/gallery/grid/photo/photo.grid.gallery.component.ts b/frontend/app/gallery/grid/photo/photo.grid.gallery.component.ts index 9f15c895..8af6b460 100644 --- a/frontend/app/gallery/grid/photo/photo.grid.gallery.component.ts +++ b/frontend/app/gallery/grid/photo/photo.grid.gallery.component.ts @@ -3,7 +3,7 @@ import {Dimension, IRenderable} from '../../../model/IRenderable'; import {GridMedia} from '../GridMedia'; import {SearchTypes} from '../../../../../common/entities/AutoCompleteItem'; import {RouterLink} from '@angular/router'; -import {Thumbnail, ThumbnailManagerService} from '../../thumnailManager.service'; +import {Thumbnail, ThumbnailManagerService} from '../../thumbnailManager.service'; import {Config} from '../../../../../common/config/public/Config'; import {AnimationBuilder} from '@angular/animations'; import {PageHelper} from '../../../model/page.helper'; diff --git a/frontend/app/gallery/lightbox/lightbox.gallery.component.ts b/frontend/app/gallery/lightbox/lightbox.gallery.component.ts index 20a00dc1..9924d292 100644 --- a/frontend/app/gallery/lightbox/lightbox.gallery.component.ts +++ b/frontend/app/gallery/lightbox/lightbox.gallery.component.ts @@ -2,11 +2,9 @@ import { ChangeDetectorRef, Component, ElementRef, - EventEmitter, HostListener, OnDestroy, OnInit, - Output, QueryList, ViewChild } from '@angular/core'; diff --git a/frontend/app/gallery/map/lightbox/lightbox.map.gallery.component.ts b/frontend/app/gallery/map/lightbox/lightbox.map.gallery.component.ts index 7c16bb1a..43d7217b 100644 --- a/frontend/app/gallery/map/lightbox/lightbox.map.gallery.component.ts +++ b/frontend/app/gallery/map/lightbox/lightbox.map.gallery.component.ts @@ -2,7 +2,7 @@ import {Component, ElementRef, HostListener, Input, OnChanges, ViewChild, AfterV import {PhotoDTO} from '../../../../../common/entities/PhotoDTO'; import {Dimension} from '../../../model/IRenderable'; import {FullScreenService} from '../../fullscreen.service'; -import {IconThumbnail, Thumbnail, ThumbnailManagerService} from '../../thumnailManager.service'; +import {IconThumbnail, Thumbnail, ThumbnailManagerService} from '../../thumbnailManager.service'; import {MediaIcon} from '../../MediaIcon'; import {Media} from '../../Media'; import {PageHelper} from '../../../model/page.helper'; diff --git a/frontend/app/gallery/thumnailLoader.service.ts b/frontend/app/gallery/thumbnailLoader.service.ts similarity index 100% rename from frontend/app/gallery/thumnailLoader.service.ts rename to frontend/app/gallery/thumbnailLoader.service.ts diff --git a/frontend/app/gallery/thumnailManager.service.ts b/frontend/app/gallery/thumbnailManager.service.ts similarity index 97% rename from frontend/app/gallery/thumnailManager.service.ts rename to frontend/app/gallery/thumbnailManager.service.ts index b1b07369..e0dc15d8 100644 --- a/frontend/app/gallery/thumnailManager.service.ts +++ b/frontend/app/gallery/thumbnailManager.service.ts @@ -1,5 +1,5 @@ import {Injectable} from '@angular/core'; -import {ThumbnailLoaderService, ThumbnailLoadingListener, ThumbnailLoadingPriority, ThumbnailTaskEntity} from './thumnailLoader.service'; +import {ThumbnailLoaderService, ThumbnailLoadingListener, ThumbnailLoadingPriority, ThumbnailTaskEntity} from './thumbnailLoader.service'; import {Media} from './Media'; import {MediaIcon} from './MediaIcon'; @@ -101,7 +101,7 @@ export class IconThumbnail extends ThumbnailBase { this.loading = false; this.thumbnailTask = null; }, - onError: (error) => {// onError + onError: () => {// onError this.thumbnailTask = null; this.loading = false; this.error = true; @@ -203,7 +203,7 @@ export class Thumbnail extends ThumbnailBase { this.loading = false; this.thumbnailTask = null; }, - onError: (error) => {// onError + onError: () => {// onError this.thumbnailTask = null; this.loading = false; this.error = true; diff --git a/test/backend/unit/model/threading/TaskExecuter.spec.ts b/test/backend/unit/model/threading/TaskExecuter.spec.ts new file mode 100644 index 00000000..a00ff498 --- /dev/null +++ b/test/backend/unit/model/threading/TaskExecuter.spec.ts @@ -0,0 +1,66 @@ +import {expect} from 'chai'; +import {TaskExecuter} from '../../../../../backend/model/threading/TaskExecuter'; + +describe('TaskExecuter', () => { + + it('should execute', async () => { + const taskWorker = (input: number) => { + return new Promise((resolve, reject) => { + setTimeout(() => { + resolve(input * 2); + }, 1); + }); + }; + + const tq = new TaskExecuter(10, taskWorker); + + expect(await tq.execute(1)).to.be.equal(2); + expect(await tq.execute(10)).to.be.equal(20); + expect(await tq.execute(1)).to.be.equal(2); + expect(await tq.execute(111)).to.be.equal(222); + }); + + it('should fail', async () => { + const taskWorker = (input: number) => { + return new Promise((resolve, reject) => { + setTimeout(() => { + reject((input * 2).toString()); + }, 1); + }); + }; + + const tq = new TaskExecuter(10, taskWorker); + + try { + await tq.execute(1); + expect(false).to.be.equal(true); // should not reach + } catch (e) { + expect(e).to.be.equal('2'); + } + }); + + it('should handle race condition', async () => { + let counter = 0; + const taskWorker = (input: number) => { + return new Promise((resolve, reject) => { + setTimeout(() => { + counter++; + resolve(); + }, 1); + }); + }; + + const tq = new TaskExecuter(10, taskWorker); + + const prs = []; + prs.push(tq.execute(1)); + prs.push(tq.execute(1)); + prs.push(tq.execute(1)); + prs.push(tq.execute(2)); + prs.push(tq.execute(2)); + await Promise.all(prs); + expect(counter).to.be.equal(2); + + }); + +}); diff --git a/test/backend/unit/model/threading/TaskQue.spec.ts b/test/backend/unit/model/threading/TaskQue.spec.ts new file mode 100644 index 00000000..92985588 --- /dev/null +++ b/test/backend/unit/model/threading/TaskQue.spec.ts @@ -0,0 +1,33 @@ +import {expect} from 'chai'; +import {TaskQue} from '../../../../../backend/model/threading/TaskQue'; + +describe('TaskQue', () => { + + it('should be empty', () => { + const tq = new TaskQue(); + expect(tq.isEmpty()).to.be.equal(true); + tq.add(2); + expect(tq.isEmpty()).to.be.equal(false); + tq.ready(tq.get()); + expect(tq.isEmpty()).to.be.equal(true); + }); + + it('should get', () => { + const tq = new TaskQue(); + tq.add(2); + expect(tq.get().data).to.be.equal(2); + expect(tq.get).to.throw(); + }); + it('should set ready', () => { + const tq = new TaskQue(); + tq.add(2); + const task = tq.get(); + tq.ready(task); + try { + tq.ready(task); + expect(false).to.be.equal(true); // should not reach + } catch (e) { + expect(e).not.to.be.equal(null); + } + }); +});