Reworked resume download feature.

This commit is contained in:
2022-08-28 17:55:45 +02:00
parent ed127d68b1
commit 80478cfa67
10 changed files with 120 additions and 205 deletions

View File

@@ -1,20 +1,19 @@
import { spawn } from 'child_process';
import { from, interval } from 'rxjs';
import { map, throttle } from 'rxjs/operators';
import { killProcess } from '../utils/procUtils';
import { Socket } from 'socket.io';
import { IPayload } from '../interfaces/IPayload';
import { ISettings } from '../interfaces/ISettings';
import Logger from '../utils/BetterLogger';
import Process from './Process';
import ProcessPool from './ProcessPool';
import MemoryDB from '../db/memoryDB';
// settings read from settings.json
let settings: ISettings;
let coldRestart = true;
const log = Logger.instance;
const pool = new ProcessPool();
const mem_db = new MemoryDB();
try {
settings = require('../../settings.json');
@@ -56,42 +55,62 @@ export async function download(socket: Socket, payload: IPayload) {
let p = new Process(url, params, settings);
p.start().then(downloader => {
pool.add(p)
const pid = downloader.getPid();
p.getInfo().then(info => {
socket.emit('info', {
pid: pid,
info: info
});
});
from(downloader.getStdout()) // stdout as observable
.pipe(
throttle(() => interval(500)), // discard events closer than 500ms
map(stdout => formatter(String(stdout), pid))
)
.subscribe({
next: (stdout) => socket.emit('progress', stdout),
complete: () => {
downloader.kill().then(() => {
socket.emit('progress', {
status: 'Done!',
pid: pid,
})
pool.remove(downloader);
})
},
error: () => {
socket.emit('progress', {
status: 'Done!', pid: pid
});
pool.remove(downloader);
}
});
mem_db.add(downloader)
displayDownloadInfo(downloader, socket);
streamProcess(downloader, socket);
});
}
/**
* Send via websocket download info "chunk"
* @param process
* @param socket
*/
function displayDownloadInfo(process: Process, socket: Socket) {
process.getInfo().then(info => {
socket.emit('info', {
pid: process.getPid(),
info: info
});
});
}
/**
* Stream via websocket download stdoud "chunks"
* @param process
* @param socket
*/
function streamProcess(process: Process, socket: Socket) {
const emitAbort = () => {
socket.emit('progress', {
status: 'Done!',
pid: process.getPid(),
});
}
const stdout = process.getStdout()
stdout.removeAllListeners()
from(stdout) // stdout as observable
.pipe(
throttle(() => interval(500)), // discard events closer than 500ms
map(stdout => formatter(String(stdout), process.getPid()))
)
.subscribe({
next: (stdout) => socket.emit('progress', stdout),
complete: () => {
process.kill().then(() => {
emitAbort();
mem_db.remove(process);
});
},
error: () => {
emitAbort();
mem_db.remove(process);
}
});
}
/**
* Retrieve all downloads.
* If the server has just been launched retrieve the ones saved to the database.
@@ -102,43 +121,35 @@ export async function download(socket: Socket, payload: IPayload) {
export async function retrieveDownload(socket: Socket) {
// it's a cold restart: the server has just been started with pending
// downloads, so fetch them from the database and resume.
if (coldRestart) {
coldRestart = false;
let downloads = [];
// sanitize
downloads = [...new Set(downloads.filter(el => el !== undefined))];
log.info('dl', `Cold restart, retrieving ${downloads.length} jobs`)
for (const entry of downloads) {
if (entry) {
await download(socket, entry);
}
}
return;
}
// if (coldRestart) {
// coldRestart = false;
// let downloads = [];
// // sanitize
// downloads = [...new Set(downloads.filter(el => el !== undefined))];
// log.info('dl', `Cold restart, retrieving ${downloads.length} jobs`)
// for (const entry of downloads) {
// if (entry) {
// await download(socket, entry);
// }
// }
// return;
// }
// it's an hot-reload the server it's running and the frontend ask for
// the pending job: retrieve them from the "in-memory database" (ProcessPool)
const _poolSize = pool.size()
const _poolSize = mem_db.size()
log.info('dl', `Retrieving ${_poolSize} jobs from pool`)
socket.emit('pending-jobs', _poolSize)
const it = pool.iterator();
const tempWorkQueue = new Array<Process>();
// sanitize
for (const entry of it) {
const [pid, process] = entry;
pool.removeByPid(pid);
await killProcess(pid);
tempWorkQueue.push(process);
}
const it = mem_db.iterator();
// resume the jobs
for (const entry of tempWorkQueue) {
await download(socket, {
url: entry.url,
params: entry.params,
});
for (const entry of it) {
const [, process] = entry
displayDownloadInfo(process, socket);
streamProcess(process, socket);
}
}
@@ -177,13 +188,14 @@ export function abortAllDownloads(socket: Socket) {
socket.emit('progress', { status: 'Aborted' });
log.info('dl', 'Aborting downloads');
});
mem_db.flush();
}
/**
* Get pool current size
*/
export function getQueueSize(): number {
return pool.size();
return mem_db.size();
}
/**

View File

View File

@@ -1,103 +0,0 @@
// import { v1 } from 'uuid';
// import { existsInProc } from '../utils/procUtils';
// import { IRecord } from '../interfaces/IRecord';
// import Logger from '../utils/BetterLogger';
// const db = require('better-sqlite3')('downloads.db');
// const log = new Logger();
// /**
// * Inits the repository, the tables.
// */
// export async function init() {
// try {
// db.exec(`CREATE TABLE downloads (
// uid varchar(36) NOT NULL,
// url text NOT NULL,
// title text,
// thumbnail text,
// created date,
// size text,
// params text,
// pid int NOT NULL,
// PRIMARY KEY (uid)
// )`)
// } catch (e) {
// log.warn('db', 'Table already created, ignoring')
// }
// }
// /**
// * Get an instance of the db.
// * @returns {BetterSqlite3.Database} Current database instance
// */
// export async function get_db(): Promise<any> {
// return db
// }
// /**
// * Insert an new download to the database
// * @param {string} url the video url
// * @param {string} title the title fetched by the info process
// * @param {string} thumbnail the thumbnail url fetched by the info process
// * @param {string} size optional - the download size
// * @param {string} params optional - the download parameters, cli arguments
// * @param {number} PID the pid of the downloader
// * @returns {Promise<string>} the download UUID
// */
// export async function insertDownload(url: string, title: string, thumbnail: string, size: string, params: string, PID: number): Promise<string> {
// const uid = v1()
// try {
// db
// .prepare(`
// INSERT INTO downloads
// (uid, url, title, thumbnail, size, params, pid)
// VALUES (?, ?, ?, ?, ?, ?, ?)`
// )
// .run(uid, url, title, thumbnail, size, params, PID)
// } catch (error) {
// log.err('db', error)
// }
// return uid
// }
// /**
// * Retrieve all downloads from the database
// * @returns {ArrayLike} a collection of results
// */
// export async function retrieveAll(): Promise<Array<IRecord>> {
// return db
// .prepare('SELECT * FROM downloads')
// .all()
// }
// /**
// * Delete a download by its uuid
// * @param {string} uid the to-be-deleted download uuid
// */
// export async function deleteDownloadById(uid: string) {
// db.prepare(`DELETE FROM downloads WHERE uid=${uid}`).run()
// }
// /**
// * Delete a download by its pid
// * @param {string} pid the to-be-deleted download pid
// */
// export async function deleteDownloadByPID(pid: number) {
// db.prepare(`DELETE FROM downloads WHERE pid=${pid}`).run()
// }
// /**
// * Deletes the downloads that aren't active anymore
// * @returns {Promise<ArrayLike>}
// */
// export async function pruneDownloads(): Promise<Array<IRecord>> {
// const all = await retrieveAll()
// return all.map(job => {
// if (existsInProc(job.pid)) {
// return job
// }
// deleteDownloadByPID(job.pid)
// })
// }

View File

@@ -1,17 +1,20 @@
/**
* @class
* Represents a download process that spawns yt-dlp.
*/
import Process from "./Process";
import Process from "../core/Process";
class ProcessPool {
private _pool: Map<number, Process>;
private _size: number;
class MemoryDB {
private _pool: Map<number, Process>
private _size: number
constructor() {
this._pool = new Map();
this._size = 0;
this.init()
}
private init() {
this._pool = new Map<number, Process>()
this._size = 0
}
/**
@@ -19,7 +22,7 @@ class ProcessPool {
* @returns {number} pool's size
*/
size(): number {
return this._size;
return this._size
}
/**
@@ -28,6 +31,7 @@ class ProcessPool {
*/
add(process: Process) {
this._pool.set(process.getPid(), process)
this._size++
}
/**
@@ -36,6 +40,7 @@ class ProcessPool {
*/
remove(process: Process) {
this._pool.delete(process.getPid())
this._size--
}
/**
@@ -62,6 +67,13 @@ class ProcessPool {
getByPid(pid: number): Process {
return this._pool.get(pid)
}
/**
* Clear memory db
*/
flush() {
this.init()
}
}
export default ProcessPool;
export default MemoryDB;

View File

@@ -1,5 +1,5 @@
import { exec, spawn } from 'child_process';
import fs = require('fs');
import { statSync } from 'fs';
import Logger from './BetterLogger';
// import net = require('net');
@@ -12,7 +12,7 @@ const log = Logger.instance;
*/
export function existsInProc(pid: number): any {
try {
return fs.statSync(`/proc/${pid}`)
return statSync(`/proc/${pid}`)
} catch (e) {
log.warn('proc', `pid ${pid} not found in procfs`)
}
@@ -43,8 +43,8 @@ export async function killProcess(pid: number) {
}
export function getFreeDiskSpace(socket: any) {
let message: string = 'free-space';
exec("df -h / | tail -1 | awk '{print $4}'", (_, stdout) => {
const message: string = 'free-space';
exec("df -P -h | tail -1 | awk '{print $4}'", (_, stdout) => {
socket.emit(message, stdout)
})
}