1
0
mirror of https://github.com/xuthus83/pigallery2.git synced 2025-01-14 14:43:17 +08:00

improving multi threading

This commit is contained in:
Patrik J. Braun 2018-12-08 18:17:33 +01:00
parent 452ead4fa7
commit 38f36891bd
14 changed files with 230 additions and 90 deletions

View File

@ -7,18 +7,18 @@ import {ErrorCodes, ErrorDTO} from '../../../common/entities/Error';
import {ContentWrapper} from '../../../common/entities/ConentWrapper';
import {DirectoryDTO} from '../../../common/entities/DirectoryDTO';
import {ProjectPath} from '../../ProjectPath';
import {PhotoDTO} from '../../../common/entities/PhotoDTO';
import {Config} from '../../../common/config/private/Config';
import {ThumbnailProcessingLib} from '../../../common/config/private/IPrivateConfig';
import {ThumbnailTH} from '../../model/threading/ThreadPool';
import {RendererInput, ThumbnailSourceType} from '../../model/threading/ThumbnailWorker';
import {ITaskQue, TaskQue} from '../../model/threading/TaskQue';
import {RendererInput, ThumbnailSourceType, ThumbnailWorker} from '../../model/threading/ThumbnailWorker';
import {MediaDTO} from '../../../common/entities/MediaDTO';
import {ITaskExecuter, TaskExecuter} from '../../model/threading/TaskExecuter';
export class ThumbnailGeneratorMWs {
private static initDone = false;
private static taskQue: ITaskQue = null;
private static taskQue: ITaskExecuter<RendererInput, void> = null;
public static init() {
if (this.initDone === true) {
@ -26,8 +26,7 @@ export class ThumbnailGeneratorMWs {
}
if (Config.Server.threading.enable === true ||
Config.Server.thumbnail.processingLibrary !== ThumbnailProcessingLib.Jimp) {
if (Config.Server.threading.enable === true) {
if (Config.Server.threading.thumbnailThreads > 0) {
Config.Client.Thumbnail.concurrentThumbnailGenerations = Config.Server.threading.thumbnailThreads;
} else {
@ -41,7 +40,8 @@ export class ThumbnailGeneratorMWs {
Config.Server.thumbnail.processingLibrary === ThumbnailProcessingLib.Jimp) {
this.taskQue = new ThumbnailTH(Config.Client.Thumbnail.concurrentThumbnailGenerations);
} else {
this.taskQue = new TaskQue(Config.Client.Thumbnail.concurrentThumbnailGenerations);
this.taskQue = new TaskExecuter(Config.Client.Thumbnail.concurrentThumbnailGenerations,
(input => ThumbnailWorker.render(input, Config.Server.thumbnail.processingLibrary)));
}
this.initDone = true;
@ -102,10 +102,10 @@ export class ThumbnailGeneratorMWs {
}
private static addThInfoTODir(directory: DirectoryDTO) {
if (typeof directory.media === 'undefined') {
if (typeof directory.media === 'undefined') {
directory.media = [];
}
if (typeof directory.directories === 'undefined') {
if (typeof directory.directories === 'undefined') {
directory.directories = [];
}
ThumbnailGeneratorMWs.addThInfoToPhotos(directory.media);
@ -124,7 +124,7 @@ export class ThumbnailGeneratorMWs {
const size = Config.Client.Thumbnail.thumbnailSizes[j];
const thPath = path.join(thumbnailFolder, ThumbnailGeneratorMWs.generateThumbnailName(fullMediaPath, size));
if (fs.existsSync(thPath) === true) {
if (typeof photos[i].readyThumbnails === 'undefined') {
if (typeof photos[i].readyThumbnails === 'undefined') {
photos[i].readyThumbnails = [];
}
photos[i].readyThumbnails.push(size);

View File

@ -0,0 +1,37 @@
import {TaskQue} from './TaskQue';
export interface ITaskExecuter<I, O> {
execute(input: I): Promise<O>;
}
export class TaskExecuter<I, O> implements ITaskExecuter<I, O> {
private taskQue = new TaskQue<I, O>();
private taskInProgress = 0;
private run = async () => {
if (this.taskQue.isEmpty() || this.taskInProgress >= this.size) {
return;
}
this.taskInProgress++;
const task = this.taskQue.get();
try {
task.promise.resolve(await this.worker(task.data));
} catch (err) {
task.promise.reject(err);
}
this.taskQue.ready(task);
this.taskInProgress--;
process.nextTick(this.run);
};
constructor(private size: number, private worker: (input: I) => Promise<O>) {
}
execute(input: I): Promise<O> {
const promise = this.taskQue.add(input).promise.obj;
this.run().catch(console.error);
return promise;
}
}

View File

@ -1,46 +1,58 @@
import {RendererInput, ThumbnailWorker} from './ThumbnailWorker';
import {Config} from '../../../common/config/private/Config';
import {Utils} from '../../../common/Utils';
interface QueTask {
data: RendererInput;
promise: { resolve: Function, reject: Function };
export interface TaskQueEntry<I, O> {
data: I;
promise: { obj: Promise<O>, resolve: Function, reject: Function };
}
export interface ITaskQue {
execute(input: any): Promise<any>;
}
export class TaskQue<I, O> {
export class TaskQue implements ITaskQue {
private tasks: TaskQueEntry<I, O>[] = [];
private processing: TaskQueEntry<I, O>[] = [];
private tasks: QueTask[] = [];
private taskInProgress = 0;
private run = async () => {
if (this.tasks.length === 0 || this.taskInProgress >= this.size) {
return;
}
this.taskInProgress++;
const task = this.tasks.shift();
try {
task.promise.resolve(await ThumbnailWorker.render(task.data, Config.Server.thumbnail.processingLibrary));
} catch (err) {
task.promise.reject(err);
}
this.taskInProgress--;
process.nextTick(this.run);
};
constructor(private size: number) {
constructor() {
}
execute(input: RendererInput): Promise<void> {
return new Promise((resolve: Function, reject: Function) => {
this.tasks.push({
data: input,
promise: {resolve: resolve, reject: reject}
});
this.run();
private getSameTask(input: I): TaskQueEntry<I, O> {
return this.tasks.find(t => Utils.equalsFilter(t.data, input)) ||
this.processing.find(t => Utils.equalsFilter(t.data, input));
}
private putNewTask(input: I): TaskQueEntry<I, O> {
const taskEntry: TaskQueEntry<I, O> = {
data: input,
promise: {
obj: null,
resolve: null,
reject: null
}
};
this.tasks.push(taskEntry);
taskEntry.promise.obj = new Promise<O>((resolve: Function, reject: Function) => {
taskEntry.promise.reject = reject;
taskEntry.promise.resolve = resolve;
});
return taskEntry;
}
public isEmpty(): boolean {
return this.tasks.length === 0;
}
public add(input: I): TaskQueEntry<I, O> {
return (this.getSameTask(input) || this.putNewTask(input));
}
public get(): TaskQueEntry<I, O> {
const task = this.tasks.shift();
this.processing.push(task);
return task;
}
public ready(task: TaskQueEntry<I, O>): void {
this.processing.slice(this.processing.indexOf(task), 1);
}
}

View File

@ -4,24 +4,20 @@ import {DiskManagerTask, ThumbnailTask, WorkerMessage, WorkerTask, WorkerTaskTyp
import {DirectoryDTO} from '../../../common/entities/DirectoryDTO';
import {RendererInput} from './ThumbnailWorker';
import {Config} from '../../../common/config/private/Config';
import {ITaskQue} from './TaskQue';
import {TaskQue, TaskQueEntry} from './TaskQue';
import {ITaskExecuter} from './TaskExecuter';
interface PoolTask {
task: WorkerTask;
promise: { resolve: Function, reject: Function };
}
interface WorkerWrapper {
interface WorkerWrapper<O> {
worker: cluster.Worker;
poolTask: PoolTask;
poolTask: TaskQueEntry<WorkerTask, O>;
}
export class ThreadPool {
export class ThreadPool<O> {
public static WorkerCount = 0;
private workers: WorkerWrapper[] = [];
private tasks: PoolTask[] = [];
private workers: WorkerWrapper<O>[] = [];
private taskQue = new TaskQue<WorkerTask, O>();
constructor(private size: number) {
Logger.silly('Creating thread pool with', size, 'workers');
@ -31,7 +27,7 @@ export class ThreadPool {
}
private run = () => {
if (this.tasks.length === 0) {
if (this.taskQue.isEmpty()) {
return;
}
const worker = this.getFreeWorker();
@ -39,16 +35,15 @@ export class ThreadPool {
return;
}
const poolTask = this.tasks.shift();
const poolTask = this.taskQue.get();
worker.poolTask = poolTask;
worker.worker.send(poolTask.task);
worker.worker.send(poolTask.data);
};
protected executeTask<T>(task: WorkerTask): Promise<T> {
return new Promise((resolve: Function, reject: Function) => {
this.tasks.push({task: task, promise: {resolve: resolve, reject: reject}});
this.run();
});
protected executeTask(task: WorkerTask): Promise<O> {
const promise = this.taskQue.add(task).promise.obj;
this.run();
return promise;
}
private getFreeWorker() {
@ -61,7 +56,7 @@ export class ThreadPool {
}
private startWorker() {
const worker = <WorkerWrapper>{poolTask: null, worker: cluster.fork()};
const worker = <WorkerWrapper<O>>{poolTask: null, worker: cluster.fork()};
this.workers.push(worker);
worker.worker.on('online', () => {
ThreadPool.WorkerCount++;
@ -84,6 +79,7 @@ export class ThreadPool {
} else {
worker.poolTask.promise.resolve(msg.result);
}
this.taskQue.ready(worker.poolTask);
worker.poolTask = null;
this.run();
});
@ -91,7 +87,7 @@ export class ThreadPool {
}
export class DiskManagerTH extends ThreadPool implements ITaskQue {
export class DiskManagerTH extends ThreadPool<DirectoryDTO> implements ITaskExecuter<string, DirectoryDTO> {
execute(relativeDirectoryName: string): Promise<DirectoryDTO> {
return super.executeTask(<DiskManagerTask>{
type: WorkerTaskTypes.diskManager,
@ -100,7 +96,7 @@ export class DiskManagerTH extends ThreadPool implements ITaskQue {
}
}
export class ThumbnailTH extends ThreadPool implements ITaskQue {
export class ThumbnailTH extends ThreadPool<void> implements ITaskExecuter<RendererInput, void> {
execute(input: RendererInput): Promise<void> {
return super.executeTask(<ThumbnailTask>{
type: WorkerTaskTypes.thumbnail,

View File

@ -2,6 +2,8 @@ import {DiskMangerWorker} from './DiskMangerWorker';
import {Logger} from '../../Logger';
import {RendererInput, ThumbnailWorker} from './ThumbnailWorker';
import {ThumbnailProcessingLib} from '../../../common/config/private/IPrivateConfig';
import {DirectoryDTO} from '../../../common/entities/DirectoryDTO';
import {Utils} from '../../../common/Utils';
export class Worker {
@ -22,7 +24,6 @@ export class Worker {
result = await ThumbnailWorker.render((<ThumbnailTask>task).input, (<ThumbnailTask>task).renderer);
break;
default:
Logger.error('Unknown worker task type');
throw new Error('Unknown worker task type');
}
process.send(<WorkerMessage>{
@ -54,7 +55,13 @@ export interface ThumbnailTask extends WorkerTask {
renderer: ThumbnailProcessingLib;
}
export interface WorkerMessage {
error: any;
result: any;
export module WorkerTask {
export const equals = (t1: WorkerTask, t2: WorkerTask): boolean => {
return Utils.equalsFilter(t1, t2);
};
}
export interface WorkerMessage {
error: Error;
result: DirectoryDTO | void;
}

View File

@ -13,7 +13,6 @@ import {appRoutes} from './app.routing';
import {UserService} from './model/network/user.service';
import {GalleryService} from './gallery/gallery.service';
import {NetworkService} from './model/network/network.service';
import {ThumbnailLoaderService} from './gallery/thumnailLoader.service';
import {GalleryCacheService} from './gallery/cache.gallery.service';
import {FullScreenService} from './gallery/fullscreen.service';
import {AuthenticationService} from './model/network/authentication.service';
@ -34,7 +33,7 @@ import {GalleryComponent} from './gallery/gallery.component';
import {StringifyRole} from './pipes/StringifyRolePipe';
import {GalleryMapComponent} from './gallery/map/map.gallery.component';
import {GalleryMapLightboxComponent} from './gallery/map/lightbox/lightbox.map.gallery.component';
import {ThumbnailManagerService} from './gallery/thumnailManager.service';
import {ThumbnailManagerService} from './gallery/thumbnailManager.service';
import {OverlayService} from './gallery/overlay.service';
import {SlimLoadingBarModule} from 'ng2-slim-loading-bar';
import {GalleryShareComponent} from './gallery/share/share.gallery.component';
@ -76,6 +75,7 @@ import {DurationPipe} from './pipes/DurationPipe';
import {MapService} from './gallery/map/map.service';
import {Icon} from 'leaflet';
import {MetaFileSettingsComponent} from './settings/metafiles/metafile.settings.component';
import {ThumbnailLoaderService} from './gallery/thumbnailLoader.service';
@Injectable()
@ -84,16 +84,7 @@ export class MyHammerConfig extends HammerGestureConfig {
'swipe': {direction: 31} // enable swipe up
};
}
/*
console.log(Icon);
console.log(Icon.Default);
console.log(Icon.Default.prototype);
console.log(Icon.Default.prototype.options);
Icon.Default.prototype.options.iconRetinaUrl = 'assets/leaflet/marker-icon-2x.png';
Icon.Default.imagePath = 'assets/leaflet/marker-icon.png';
Icon.Default.prototype.options.iconUrl = 'assets/leaflet/marker-icon.png';
Icon.Default.prototype.options.shadowUrl = 'assets/leaflet/marker-shadow.png';
*/
export class CustomUrlSerializer implements UrlSerializer {
private _defaultUrlSerializer: DefaultUrlSerializer = new DefaultUrlSerializer();

View File

@ -4,7 +4,7 @@ import {DirectoryDTO} from '../../../../common/entities/DirectoryDTO';
import {RouterLink} from '@angular/router';
import {Utils} from '../../../../common/Utils';
import {Media} from '../Media';
import {Thumbnail, ThumbnailManagerService} from '../thumnailManager.service';
import {Thumbnail, ThumbnailManagerService} from '../thumbnailManager.service';
import {PageHelper} from '../../model/page.helper';
import {QueryService} from '../../model/query.service';
import {PhotoDTO} from '../../../../common/entities/PhotoDTO';

View File

@ -3,7 +3,7 @@ import {Dimension, IRenderable} from '../../../model/IRenderable';
import {GridMedia} from '../GridMedia';
import {SearchTypes} from '../../../../../common/entities/AutoCompleteItem';
import {RouterLink} from '@angular/router';
import {Thumbnail, ThumbnailManagerService} from '../../thumnailManager.service';
import {Thumbnail, ThumbnailManagerService} from '../../thumbnailManager.service';
import {Config} from '../../../../../common/config/public/Config';
import {AnimationBuilder} from '@angular/animations';
import {PageHelper} from '../../../model/page.helper';

View File

@ -2,11 +2,9 @@ import {
ChangeDetectorRef,
Component,
ElementRef,
EventEmitter,
HostListener,
OnDestroy,
OnInit,
Output,
QueryList,
ViewChild
} from '@angular/core';

View File

@ -2,7 +2,7 @@ import {Component, ElementRef, HostListener, Input, OnChanges, ViewChild, AfterV
import {PhotoDTO} from '../../../../../common/entities/PhotoDTO';
import {Dimension} from '../../../model/IRenderable';
import {FullScreenService} from '../../fullscreen.service';
import {IconThumbnail, Thumbnail, ThumbnailManagerService} from '../../thumnailManager.service';
import {IconThumbnail, Thumbnail, ThumbnailManagerService} from '../../thumbnailManager.service';
import {MediaIcon} from '../../MediaIcon';
import {Media} from '../../Media';
import {PageHelper} from '../../../model/page.helper';

View File

@ -1,5 +1,5 @@
import {Injectable} from '@angular/core';
import {ThumbnailLoaderService, ThumbnailLoadingListener, ThumbnailLoadingPriority, ThumbnailTaskEntity} from './thumnailLoader.service';
import {ThumbnailLoaderService, ThumbnailLoadingListener, ThumbnailLoadingPriority, ThumbnailTaskEntity} from './thumbnailLoader.service';
import {Media} from './Media';
import {MediaIcon} from './MediaIcon';
@ -101,7 +101,7 @@ export class IconThumbnail extends ThumbnailBase {
this.loading = false;
this.thumbnailTask = null;
},
onError: (error) => {// onError
onError: () => {// onError
this.thumbnailTask = null;
this.loading = false;
this.error = true;
@ -203,7 +203,7 @@ export class Thumbnail extends ThumbnailBase {
this.loading = false;
this.thumbnailTask = null;
},
onError: (error) => {// onError
onError: () => {// onError
this.thumbnailTask = null;
this.loading = false;
this.error = true;

View File

@ -0,0 +1,66 @@
import {expect} from 'chai';
import {TaskExecuter} from '../../../../../backend/model/threading/TaskExecuter';
describe('TaskExecuter', () => {
it('should execute', async () => {
const taskWorker = (input: number) => {
return new Promise<number>((resolve, reject) => {
setTimeout(() => {
resolve(input * 2);
}, 1);
});
};
const tq = new TaskExecuter<number, number>(10, taskWorker);
expect(await tq.execute(1)).to.be.equal(2);
expect(await tq.execute(10)).to.be.equal(20);
expect(await tq.execute(1)).to.be.equal(2);
expect(await tq.execute(111)).to.be.equal(222);
});
it('should fail', async () => {
const taskWorker = (input: number) => {
return new Promise<number>((resolve, reject) => {
setTimeout(() => {
reject((input * 2).toString());
}, 1);
});
};
const tq = new TaskExecuter<number, number>(10, taskWorker);
try {
await tq.execute(1);
expect(false).to.be.equal(true); // should not reach
} catch (e) {
expect(e).to.be.equal('2');
}
});
it('should handle race condition', async () => {
let counter = 0;
const taskWorker = (input: number) => {
return new Promise<void>((resolve, reject) => {
setTimeout(() => {
counter++;
resolve();
}, 1);
});
};
const tq = new TaskExecuter<number, void>(10, taskWorker);
const prs = [];
prs.push(tq.execute(1));
prs.push(tq.execute(1));
prs.push(tq.execute(1));
prs.push(tq.execute(2));
prs.push(tq.execute(2));
await Promise.all(prs);
expect(counter).to.be.equal(2);
});
});

View File

@ -0,0 +1,33 @@
import {expect} from 'chai';
import {TaskQue} from '../../../../../backend/model/threading/TaskQue';
describe('TaskQue', () => {
it('should be empty', () => {
const tq = new TaskQue<number, number>();
expect(tq.isEmpty()).to.be.equal(true);
tq.add(2);
expect(tq.isEmpty()).to.be.equal(false);
tq.ready(tq.get());
expect(tq.isEmpty()).to.be.equal(true);
});
it('should get', () => {
const tq = new TaskQue<number, number>();
tq.add(2);
expect(tq.get().data).to.be.equal(2);
expect(tq.get).to.throw();
});
it('should set ready', () => {
const tq = new TaskQue<number, number>();
tq.add(2);
const task = tq.get();
tq.ready(task);
try {
tq.ready(task);
expect(false).to.be.equal(true); // should not reach
} catch (e) {
expect(e).not.to.be.equal(null);
}
});
});