Refactoring
This commit is contained in:
@@ -1,10 +1,18 @@
|
||||
const { spawn } = require('child_process');
|
||||
const { from, interval } = require('rxjs');
|
||||
const { throttle } = require('rxjs/operators');
|
||||
const { insertDownload, deleteDownloadByPID, pruneDownloads } = require('./db');
|
||||
const { Socket } = require('socket.io');
|
||||
const { pruneDownloads } = require('./db');
|
||||
const { logger } = require('./logger');
|
||||
const { retriveStdoutFromProcFd, killProcess } = require('./procUtils');
|
||||
const Process = require('./Process');
|
||||
const ProcessPool = require('./ProcessPool');
|
||||
const { killProcess } = require('./procUtils');
|
||||
|
||||
// settings read from settings.json
|
||||
let settings;
|
||||
let coldRestart = true;
|
||||
|
||||
const pool = new ProcessPool();
|
||||
|
||||
try {
|
||||
settings = require('../settings.json');
|
||||
@@ -13,10 +21,14 @@ catch (e) {
|
||||
console.warn("settings.json not found");
|
||||
}
|
||||
|
||||
const isWindows = process.platform === 'win32';
|
||||
|
||||
/**
|
||||
* Invoke a new download.
|
||||
* Called by the websocket messages listener.
|
||||
* @param {Socket} socket current connection socket
|
||||
* @param {object} payload frontend download payload
|
||||
* @returns
|
||||
*/
|
||||
async function download(socket, payload) {
|
||||
|
||||
if (!payload || payload.url === '' || payload.url === null) {
|
||||
socket.emit('progress', { status: 'Done!' });
|
||||
return;
|
||||
@@ -25,82 +37,124 @@ async function download(socket, payload) {
|
||||
const url = payload.url
|
||||
const params = payload.params?.xa ? '-x' : '';
|
||||
|
||||
await getDownloadInfo(socket, url);
|
||||
const p = new Process(url, params, settings);
|
||||
|
||||
const ytldp = spawn(`./lib/yt-dlp${isWindows ? '.exe' : ''}`,
|
||||
[
|
||||
'-o', `${settings.download_path || 'downloads/'}%(title)s.%(ext)s`,
|
||||
params,
|
||||
url
|
||||
]
|
||||
);
|
||||
p.start().then(downloader => {
|
||||
pool.add(p)
|
||||
let infoLock = true;
|
||||
let pid = downloader.getPid();
|
||||
|
||||
await insertDownload(url, null, null, null, ytldp.pid);
|
||||
|
||||
from(ytldp.stdout) // stdout as observable
|
||||
.pipe(throttle(() => interval(500))) // discard events closer than 500ms
|
||||
.subscribe({
|
||||
next: (stdout) => {
|
||||
//let _stdout = String(stdout)
|
||||
socket.emit('progress', formatter(String(stdout))) // finally, emit
|
||||
//logger('download', `Fetching ${_stdout}`)
|
||||
},
|
||||
complete: () => {
|
||||
socket.emit('progress', { status: 'Done!' })
|
||||
}
|
||||
});
|
||||
|
||||
ytldp.on('exit', () => {
|
||||
socket.emit('progress', { status: 'Done!' })
|
||||
logger('download', 'Done!')
|
||||
|
||||
deleteDownloadByPID(ytldp.pid).then(() => {
|
||||
logger('db', `Deleted ${ytldp.pid} because SIGKILL`)
|
||||
})
|
||||
from(downloader.getStdout()) // stdout as observable
|
||||
.pipe(throttle(() => interval(500))) // discard events closer than 500ms
|
||||
.subscribe({
|
||||
next: (stdout) => {
|
||||
if (infoLock) {
|
||||
if (downloader.getInfo() === null) {
|
||||
return;
|
||||
}
|
||||
socket.emit('info', downloader.getInfo());
|
||||
infoLock = false;
|
||||
}
|
||||
socket.emit('progress', formatter(String(stdout), pid)) // finally, emit
|
||||
},
|
||||
complete: () => {
|
||||
downloader.kill().then(() => {
|
||||
socket.emit('progress', {
|
||||
status: 'Done!',
|
||||
process: pid,
|
||||
})
|
||||
pool.remove(downloader);
|
||||
})
|
||||
},
|
||||
error: () => {
|
||||
socket.emit('progress', { status: 'Done!' });
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve all downloads.
|
||||
* If the server has just been launched retrieve the ones saved to the database.
|
||||
* If the server is running fetches them from the process pool.
|
||||
* @param {Socket} socket current connection socket
|
||||
* @returns
|
||||
*/
|
||||
async function retriveDownload(socket) {
|
||||
const downloads = await pruneDownloads();
|
||||
if (downloads.length > 0) {
|
||||
for (const _download of downloads) {
|
||||
await killProcess(_download.process_pid);
|
||||
await download(socket, _download);
|
||||
// it's a cold restart: the server has just been started with pending
|
||||
// downloads, so fetch them from the database and resume.
|
||||
if (coldRestart) {
|
||||
coldRestart = false;
|
||||
let downloads = await pruneDownloads();
|
||||
downloads = [... new Set(downloads)];
|
||||
logger('dl', `Cold restart, retrieving ${downloads.length} jobs`)
|
||||
for (const entry of downloads) {
|
||||
if (entry) {
|
||||
await download(socket, entry);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// it's an hot-reload the server it's running and the frontend ask for
|
||||
// the pending job: retrieve them from the "in-memory database" (ProcessPool)
|
||||
logger('dl', `Retrieving jobs from pool`)
|
||||
const it = pool.iterator();
|
||||
|
||||
for (const entry of it) {
|
||||
const [pid, process] = entry;
|
||||
await killProcess(pid);
|
||||
await download(socket, {
|
||||
url: process.url,
|
||||
params: process.params
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function getDownloadInfo(socket, url) {
|
||||
let stdoutChunks = [];
|
||||
const ytdlpInfo = spawn(`./lib/yt-dlp${isWindows ? '.exe' : ''}`, ['-s', '-j', url]);
|
||||
/**
|
||||
* Abort a specific download if pid is provided, in the other case
|
||||
* calls the abortAllDownloads function
|
||||
* @see abortAllDownloads
|
||||
* @param {Socket} socket currenct connection socket
|
||||
* @param {*} args args sent by the frontend. MUST contain the PID.
|
||||
* @returns
|
||||
*/
|
||||
function abortDownload(socket, args) {
|
||||
if (!args) {
|
||||
abortAllDownloads(socket);
|
||||
return;
|
||||
}
|
||||
const { pid } = args;
|
||||
|
||||
ytdlpInfo.stdout.on('data', (data) => {
|
||||
stdoutChunks.push(data);
|
||||
});
|
||||
spawn('kill', [pid])
|
||||
.on('exit', () => {
|
||||
socket.emit('progress', {
|
||||
status: 'Aborted',
|
||||
process: pid,
|
||||
});
|
||||
logger('dl', `Aborting download ${pid}`);
|
||||
});
|
||||
}
|
||||
|
||||
ytdlpInfo.on('exit', () => {
|
||||
try {
|
||||
const buffer = Buffer.concat(stdoutChunks);
|
||||
const json = JSON.parse(buffer.toString());
|
||||
socket.emit('info', json);
|
||||
} catch (e) {
|
||||
/**
|
||||
* Unconditionally kills all yt-dlp process.
|
||||
* @param {Socket} socket currenct connection socket
|
||||
*/
|
||||
function abortAllDownloads(socket) {
|
||||
spawn('killall', ['yt-dlp'])
|
||||
.on('exit', () => {
|
||||
socket.emit('progress', { status: 'Aborted' });
|
||||
logger('download', 'Done!');
|
||||
}
|
||||
})
|
||||
logger('dl', 'Aborting downloads');
|
||||
});
|
||||
}
|
||||
|
||||
function abortDownload(socket) {
|
||||
const res = process.platform === 'win32' ?
|
||||
spawn('taskkill', ['/IM', 'yt-dlp.exe', '/F', '/T']) :
|
||||
spawn('killall', ['yt-dlp']);
|
||||
res.on('exit', () => {
|
||||
socket.emit('progress', { status: 'Aborted' });
|
||||
logger('download', 'Aborting downloads');
|
||||
});
|
||||
}
|
||||
|
||||
const formatter = (stdout) => {
|
||||
/**
|
||||
* @private Formats the yt-dlp stdout to a frontend-readable format
|
||||
* @param {string} stdout stdout as string
|
||||
* @param {number} pid current process id relative to stdout
|
||||
* @returns
|
||||
*/
|
||||
const formatter = (stdout, pid) => {
|
||||
const cleanStdout = stdout
|
||||
.replace(/\s\s+/g, ' ')
|
||||
.split(' ');
|
||||
@@ -112,6 +166,7 @@ const formatter = (stdout) => {
|
||||
progress: cleanStdout[1],
|
||||
size: cleanStdout[3],
|
||||
dlSpeed: cleanStdout[5],
|
||||
pid: pid,
|
||||
}
|
||||
case 'merge':
|
||||
return {
|
||||
@@ -126,5 +181,6 @@ const formatter = (stdout) => {
|
||||
module.exports = {
|
||||
download: download,
|
||||
abortDownload: abortDownload,
|
||||
abortAllDownloads: abortAllDownloads,
|
||||
retriveDownload: retriveDownload,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user