Refactoring code and structure
This commit is contained in:
120
server/Process.js
Normal file
120
server/Process.js
Normal file
@@ -0,0 +1,120 @@
|
||||
const { spawn } = require('child_process');
|
||||
const { deleteDownloadByPID, insertDownload } = require('./db');
|
||||
const { logger } = require('./logger');
|
||||
|
||||
/**
|
||||
* Represents a download process that spawns yt-dlp.
|
||||
* @constructor
|
||||
* @param {string} url - The downlaod url.
|
||||
* @param {Array<String>} params - The cli arguments passed by the frontend.
|
||||
* @param {*} settings - The download settings passed by the frontend.
|
||||
*/
|
||||
|
||||
class Process {
|
||||
constructor(url, params, settings) {
|
||||
this.url = url;
|
||||
this.params = params || ' ';
|
||||
this.settings = settings
|
||||
this.stdout = undefined;
|
||||
this.pid = undefined;
|
||||
this.info = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* function that launch the download process, sets the stdout property and the pid
|
||||
* @param {Function} callback not yet implemented
|
||||
* @returns {Promise<this>} the process instance
|
||||
*/
|
||||
async start(callback) {
|
||||
await this.#__internalGetInfo();
|
||||
|
||||
const ytldp = spawn('./server/yt-dlp',
|
||||
['-o', `${this.settings?.download_path || 'downloads/'}%(title)s.%(ext)s`]
|
||||
.concat(this.params)
|
||||
.concat([this.url])
|
||||
);
|
||||
|
||||
this.pid = ytldp.pid;
|
||||
this.stdout = ytldp.stdout;
|
||||
|
||||
logger('proc', `Spawned a new process, pid: ${this.pid}`)
|
||||
|
||||
await insertDownload(this.url, null, null, null, this.pid);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
* function used internally by the download process to fetch information, usually thumbnail and title
|
||||
* @returns Promise to the lock
|
||||
*/
|
||||
async #__internalGetInfo() {
|
||||
let lock = true;
|
||||
let stdoutChunks = [];
|
||||
const ytdlpInfo = spawn('./server/yt-dlp', ['-s', '-j', this.url]);
|
||||
|
||||
ytdlpInfo.stdout.on('data', (data) => {
|
||||
stdoutChunks.push(data);
|
||||
});
|
||||
|
||||
ytdlpInfo.on('exit', () => {
|
||||
try {
|
||||
const buffer = Buffer.concat(stdoutChunks);
|
||||
const json = JSON.parse(buffer.toString());
|
||||
this.info = json;
|
||||
this.lock = false;
|
||||
|
||||
} catch (e) {
|
||||
this.info = {
|
||||
title: "",
|
||||
thumbnail: "",
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
if (!lock) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* function that kills the current process
|
||||
*/
|
||||
async kill() {
|
||||
spawn('kill', [this.pid]).on('exit', () => {
|
||||
deleteDownloadByPID(this.pid).then(() => {
|
||||
logger('db', `Deleted ${this.pid} because SIGKILL`)
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* pid getter function
|
||||
* @returns {number} pid
|
||||
*/
|
||||
getPid() {
|
||||
if (!this.pid) {
|
||||
throw "Process isn't started"
|
||||
}
|
||||
return this.pid;
|
||||
}
|
||||
|
||||
/**
|
||||
* stdout getter function
|
||||
* @returns {ReadableStream} stdout as stream
|
||||
*/
|
||||
getStdout() {
|
||||
return this.stdout
|
||||
}
|
||||
|
||||
/**
|
||||
* download info getter function
|
||||
* @returns {object}
|
||||
*/
|
||||
getInfo() {
|
||||
return this.info
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Process;
|
||||
62
server/ProcessPool.js
Normal file
62
server/ProcessPool.js
Normal file
@@ -0,0 +1,62 @@
|
||||
/**
|
||||
* @class
|
||||
* Represents a download process that spawns yt-dlp.
|
||||
*/
|
||||
|
||||
class ProcessPool {
|
||||
constructor() {
|
||||
this._pool = new Map();
|
||||
this._size = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pool size getter
|
||||
* @returns {number} pool's size
|
||||
*/
|
||||
size() {
|
||||
return this._size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a process to the pool
|
||||
* @param {Process} process
|
||||
*/
|
||||
add(process) {
|
||||
this._pool.set(process.getPid(), process)
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a process from the pool
|
||||
* @param {Process} process
|
||||
*/
|
||||
remove(process) {
|
||||
this._pool.delete(process.getPid())
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a process from the pool by its pid
|
||||
* @param {number} pid
|
||||
*/
|
||||
removeByPid(pid) {
|
||||
this._pool.delete(pid)
|
||||
}
|
||||
|
||||
/**
|
||||
* get an iterator for the pool
|
||||
* @returns {IterableIterator} iterator
|
||||
*/
|
||||
iterator() {
|
||||
return this._pool.entries()
|
||||
}
|
||||
|
||||
/**
|
||||
* get a process by its pid
|
||||
* @param {number} pid
|
||||
* @returns {Process}
|
||||
*/
|
||||
getByPid(pid) {
|
||||
return this._pool.get(pid)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = ProcessPool;
|
||||
109
server/db.js
Normal file
109
server/db.js
Normal file
@@ -0,0 +1,109 @@
|
||||
const uuid = require('uuid')
|
||||
const { logger } = require('./logger')
|
||||
const { existsInProc } = require('./procUtils')
|
||||
|
||||
const db = require('better-sqlite3')('downloads.db')
|
||||
|
||||
/**
|
||||
* Inits the repository, the tables.
|
||||
*/
|
||||
async function init() {
|
||||
try {
|
||||
db.exec(`CREATE TABLE downloads (
|
||||
uid varchar(36) NOT NULL,
|
||||
url text NOT NULL,
|
||||
title text,
|
||||
thumbnail text,
|
||||
created date,
|
||||
size text,
|
||||
process_pid int NOT NULL,
|
||||
PRIMARY KEY (uid)
|
||||
)`)
|
||||
} catch (e) {
|
||||
logger('db', 'Table already created, ignoring')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an instance of the db.
|
||||
* @returns {BetterSqlite3.Database} Current database instance
|
||||
*/
|
||||
async function get_db() {
|
||||
return db
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert an new download to the database
|
||||
* @param {string} url the video url
|
||||
* @param {string} title the title fetched by the info process
|
||||
* @param {string} thumbnail the thumbnail url fetched by the info process
|
||||
* @param {string} size optional - the download size
|
||||
* @param {number} PID the pid of the downloader
|
||||
* @returns {Promise<string>} the download UUID
|
||||
*/
|
||||
async function insertDownload(url, title, thumbnail, size, PID) {
|
||||
const uid = uuid.v1()
|
||||
try {
|
||||
db
|
||||
.prepare(`
|
||||
INSERT INTO downloads
|
||||
(uid, url, title, thumbnail, size, process_pid)
|
||||
VALUES (?, ?, ?, ?, ?, ?)`
|
||||
)
|
||||
.run(uid, url, title, thumbnail, size, PID)
|
||||
} catch (error) {
|
||||
logger('db', 'some error occourred')
|
||||
}
|
||||
|
||||
return uid
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve all downloads from the database
|
||||
* @returns {ArrayLike} a collection of results
|
||||
*/
|
||||
async function retrieveAll() {
|
||||
return db
|
||||
.prepare('SELECT * FROM downloads')
|
||||
.all()
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a download by its uuid
|
||||
* @param {string} uid the to-be-deleted download uuid
|
||||
*/
|
||||
async function deleteDownloadById(uid) {
|
||||
db.prepare(`DELETE FROM downloads WHERE uid=${uid}`).run()
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a download by its pid
|
||||
* @param {string} pid the to-be-deleted download pid
|
||||
*/
|
||||
async function deleteDownloadByPID(PID) {
|
||||
db.prepare(`DELETE FROM downloads WHERE process_pid=${PID}`).run()
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the downloads that aren't active anymore
|
||||
* @returns {Promise<ArrayLike>}
|
||||
*/
|
||||
async function pruneDownloads() {
|
||||
const all = await retrieveAll()
|
||||
return all.map(job => {
|
||||
if (existsInProc(job.process_pid)) {
|
||||
return job
|
||||
}
|
||||
deleteDownloadByPID(job.process_pid)
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
init: init,
|
||||
getDB: get_db,
|
||||
insertDownload: insertDownload,
|
||||
retrieveAll: retrieveAll,
|
||||
deleteDownloadById: deleteDownloadById,
|
||||
deleteDownloadByPID: deleteDownloadByPID,
|
||||
pruneDownloads: pruneDownloads,
|
||||
}
|
||||
202
server/downloader.js
Normal file
202
server/downloader.js
Normal file
@@ -0,0 +1,202 @@
|
||||
const { spawn } = require('child_process');
|
||||
const { from, interval } = require('rxjs');
|
||||
const { throttle } = require('rxjs/operators');
|
||||
const { Socket } = require('socket.io');
|
||||
const { pruneDownloads } = require('./db');
|
||||
const { logger } = require('./logger');
|
||||
const Process = require('./Process');
|
||||
const ProcessPool = require('./ProcessPool');
|
||||
const { killProcess } = require('./procUtils');
|
||||
|
||||
// settings read from settings.json
|
||||
let settings;
|
||||
let coldRestart = true;
|
||||
|
||||
const pool = new ProcessPool();
|
||||
|
||||
try {
|
||||
settings = require('../settings.json');
|
||||
}
|
||||
catch (e) {
|
||||
console.warn("settings.json not found");
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke a new download.
|
||||
* Called by the websocket messages listener.
|
||||
* @param {Socket} socket current connection socket
|
||||
* @param {object} payload frontend download payload
|
||||
* @returns
|
||||
*/
|
||||
async function download(socket, payload) {
|
||||
if (!payload || payload.url === '' || payload.url === null) {
|
||||
socket.emit('progress', { status: 'Done!' });
|
||||
return;
|
||||
}
|
||||
|
||||
const url = payload.url;
|
||||
const params = payload.params.split(' ');
|
||||
|
||||
console.log(params)
|
||||
|
||||
const p = new Process(url, params, settings);
|
||||
|
||||
p.start().then(downloader => {
|
||||
|
||||
pool.add(p)
|
||||
let infoLock = true;
|
||||
let pid = downloader.getPid();
|
||||
|
||||
from(downloader.getStdout()) // stdout as observable
|
||||
.pipe(throttle(() => interval(500))) // discard events closer than 500ms
|
||||
.subscribe({
|
||||
next: (stdout) => {
|
||||
if (infoLock) {
|
||||
if (downloader.getInfo() === null) {
|
||||
return;
|
||||
}
|
||||
socket.emit('info', {
|
||||
pid: pid, info: downloader.getInfo()
|
||||
});
|
||||
infoLock = false;
|
||||
}
|
||||
socket.emit('progress', formatter(String(stdout), pid)) // finally, emit
|
||||
},
|
||||
complete: () => {
|
||||
downloader.kill().then(() => {
|
||||
socket.emit('progress', {
|
||||
status: 'Done!',
|
||||
pid: pid,
|
||||
})
|
||||
pool.remove(downloader);
|
||||
})
|
||||
},
|
||||
error: () => {
|
||||
socket.emit('progress', {
|
||||
status: 'Done!', pid: pid
|
||||
});
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve all downloads.
|
||||
* If the server has just been launched retrieve the ones saved to the database.
|
||||
* If the server is running fetches them from the process pool.
|
||||
* @param {Socket} socket current connection socket
|
||||
* @returns
|
||||
*/
|
||||
async function retriveDownload(socket) {
|
||||
// it's a cold restart: the server has just been started with pending
|
||||
// downloads, so fetch them from the database and resume.
|
||||
if (coldRestart) {
|
||||
coldRestart = false;
|
||||
let downloads = await pruneDownloads();
|
||||
downloads = [... new Set(downloads)];
|
||||
logger('dl', `Cold restart, retrieving ${downloads.length} jobs`)
|
||||
for (const entry of downloads) {
|
||||
if (entry) {
|
||||
await download(socket, entry);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// it's an hot-reload the server it's running and the frontend ask for
|
||||
// the pending job: retrieve them from the "in-memory database" (ProcessPool)
|
||||
logger('dl', `Retrieving jobs from pool`)
|
||||
|
||||
const it = pool.iterator();
|
||||
tempWorkQueue = new Array();
|
||||
|
||||
// sanitize
|
||||
for (const entry of it) {
|
||||
const [pid, process] = entry;
|
||||
pool.removeByPid(pid);
|
||||
await killProcess(pid);
|
||||
tempWorkQueue.push(process);
|
||||
}
|
||||
|
||||
// resume the jobs
|
||||
for (const entry of tempWorkQueue) {
|
||||
await download(socket, {
|
||||
url: entry.url,
|
||||
params: entry.params,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort a specific download if pid is provided, in the other case
|
||||
* calls the abortAllDownloads function
|
||||
* @see abortAllDownloads
|
||||
* @param {Socket} socket currenct connection socket
|
||||
* @param {*} args args sent by the frontend. MUST contain the PID.
|
||||
* @returns
|
||||
*/
|
||||
function abortDownload(socket, args) {
|
||||
if (!args) {
|
||||
abortAllDownloads(socket);
|
||||
return;
|
||||
}
|
||||
const { pid } = args;
|
||||
|
||||
spawn('kill', [pid])
|
||||
.on('exit', () => {
|
||||
socket.emit('progress', {
|
||||
status: 'Aborted',
|
||||
process: pid,
|
||||
});
|
||||
logger('dl', `Aborting download ${pid}`);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Unconditionally kills all yt-dlp process.
|
||||
* @param {Socket} socket currenct connection socket
|
||||
*/
|
||||
function abortAllDownloads(socket) {
|
||||
spawn('killall', ['yt-dlp'])
|
||||
.on('exit', () => {
|
||||
socket.emit('progress', { status: 'Aborted' });
|
||||
logger('dl', 'Aborting downloads');
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @private Formats the yt-dlp stdout to a frontend-readable format
|
||||
* @param {string} stdout stdout as string
|
||||
* @param {number} pid current process id relative to stdout
|
||||
* @returns
|
||||
*/
|
||||
const formatter = (stdout, pid) => {
|
||||
const cleanStdout = stdout
|
||||
.replace(/\s\s+/g, ' ')
|
||||
.split(' ');
|
||||
const status = cleanStdout[0].replace(/\[|\]|\r/g, '');
|
||||
switch (status) {
|
||||
case 'download':
|
||||
return {
|
||||
status: cleanStdout[0].replace(/\[|\]|\r/g, ''),
|
||||
progress: cleanStdout[1],
|
||||
size: cleanStdout[3],
|
||||
dlSpeed: cleanStdout[5],
|
||||
pid: pid,
|
||||
}
|
||||
case 'merge':
|
||||
return {
|
||||
status: 'merging',
|
||||
progress: '100',
|
||||
}
|
||||
default:
|
||||
return { progress: '0' }
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
download: download,
|
||||
abortDownload: abortDownload,
|
||||
abortAllDownloads: abortAllDownloads,
|
||||
retriveDownload: retriveDownload,
|
||||
}
|
||||
16
server/fetch-yt-dlp.sh
Executable file
16
server/fetch-yt-dlp.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
#!/bin/bash
|
||||
|
||||
echo "Downloading latest yt-dlp build..."
|
||||
|
||||
rm -f yt-dlp
|
||||
|
||||
RELEASE=$(curl --silent "https://api.github.com/repos/yt-dlp/yt-dlp/releases/latest" |
|
||||
grep '"tag_name":' |
|
||||
sed -E 's/.*"([^"]+)".*/\1/'
|
||||
)
|
||||
|
||||
wget "https://github.com/yt-dlp/yt-dlp/releases/download/$RELEASE/yt-dlp"
|
||||
|
||||
chmod +x yt-dlp
|
||||
|
||||
echo "Done!"
|
||||
23
server/logger.js
Normal file
23
server/logger.js
Normal file
@@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Simplest logger function, takes two argument: first one put between
|
||||
* square brackets (the protocol), the second one it's the effective message
|
||||
* @param {string} proto protocol
|
||||
* @param {string} args message
|
||||
*/
|
||||
const logger = (proto, args) => {
|
||||
console.log(`[${proto}]\t${args}`)
|
||||
}
|
||||
|
||||
/**
|
||||
* CLI splash
|
||||
*/
|
||||
const splash = () => {
|
||||
console.log("-------------------------------------------------")
|
||||
console.log("yt-dlp-webUI - A web-ui for yt-dlp, simply enough")
|
||||
console.log("-------------------------------------------------")
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
logger: logger,
|
||||
splash: splash,
|
||||
}
|
||||
47
server/procUtils.js
Normal file
47
server/procUtils.js
Normal file
@@ -0,0 +1,47 @@
|
||||
const { spawn } = require('child_process');
|
||||
const fs = require('fs');
|
||||
const net = require('net');
|
||||
const { logger } = require('./logger');
|
||||
|
||||
/**
|
||||
* Browse /proc in order to find the specific pid
|
||||
* @param {number} pid
|
||||
* @returns {*} process stats if any
|
||||
*/
|
||||
function existsInProc(pid) {
|
||||
try {
|
||||
return fs.statSync(`/proc/${pid}`)
|
||||
} catch (e) {
|
||||
logger('proc', `pid ${pid} not found in procfs`)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
function retriveStdoutFromProcFd(pid) {
|
||||
if (existsInProc(pid)) {
|
||||
const unixSocket = fs.readlinkSync(`/proc/${pid}/fd/1`).replace('socket:[', '127.0.0.1:').replace(']', '')
|
||||
if (unixSocket) {
|
||||
console.log(unixSocket)
|
||||
logger('proc', `found pending job on pid: ${pid} attached to UNIX socket: ${unixSocket}`)
|
||||
return net.createConnection(unixSocket)
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Kills a process with a sys-call
|
||||
* @param {number} pid the killed process pid
|
||||
*/
|
||||
async function killProcess(pid) {
|
||||
const res = spawn('kill', [pid])
|
||||
res.on('exit', () => {
|
||||
logger('proc', `Successfully killed yt-dlp process, pid: ${pid}`)
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
existsInProc: existsInProc,
|
||||
//retriveStdoutFromProcFd: retriveStdoutFromProcFd,
|
||||
killProcess: killProcess,
|
||||
}
|
||||
95
server/updater.js
Normal file
95
server/updater.js
Normal file
@@ -0,0 +1,95 @@
|
||||
const https = require('https');
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const { Socket } = require('socket.io');
|
||||
|
||||
// endpoint to github API
|
||||
const options = {
|
||||
hostname: 'api.github.com',
|
||||
path: '/repos/yt-dlp/yt-dlp/releases/latest',
|
||||
headers: {
|
||||
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0'
|
||||
},
|
||||
method: 'GET',
|
||||
port: 443,
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the binary url based on the release tag
|
||||
* @param {string} release yt-dlp GitHub release tag
|
||||
* @returns {*} the fetch options with the correct tag and headers
|
||||
*/
|
||||
function buildDonwloadOptions(release) {
|
||||
return {
|
||||
hostname: 'github.com',
|
||||
path: `/yt-dlp/yt-dlp/releases/download/${release}/yt-dlp`,
|
||||
headers: {
|
||||
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0'
|
||||
},
|
||||
method: 'GET',
|
||||
port: 443,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* gets the yt-dlp latest binary URL from GitHub API
|
||||
*/
|
||||
async function update() {
|
||||
// ensure that the binary has been removed
|
||||
try {
|
||||
fs.rmSync(path.join(__dirname, 'yt-dlp'))
|
||||
}
|
||||
catch (e) {
|
||||
console.log('file not found!')
|
||||
}
|
||||
// body buffer
|
||||
let chunks = []
|
||||
https.get(options, res => {
|
||||
// push the http packets chunks into the buffer
|
||||
res.on('data', chunk => {
|
||||
chunks.push(chunk)
|
||||
});
|
||||
// the connection has ended so build the body from the buffer
|
||||
// parse it as a JSON and get the tag_name
|
||||
res.on('end', () => {
|
||||
const buffer = Buffer.concat(chunks)
|
||||
const release = JSON.parse(buffer.toString())['tag_name']
|
||||
console.log('The latest release is:', release)
|
||||
// invoke the binary downloader
|
||||
downloadBinary(buildDonwloadOptions(release))
|
||||
})
|
||||
})
|
||||
}
|
||||
/**
|
||||
* Utility that Pipes the latest binary to a file
|
||||
* @param {string} url yt-dlp GitHub release url
|
||||
*/
|
||||
function downloadBinary(url) {
|
||||
https.get(url, res => {
|
||||
// if it is a redirect follow the url
|
||||
if (res.statusCode === 301 || res.statusCode === 302) {
|
||||
return downloadBinary(res.headers.location)
|
||||
}
|
||||
let bin = fs.createWriteStream(path.join(__dirname, 'yt-dlp'))
|
||||
res.pipe(bin)
|
||||
// once the connection has ended make the file executable
|
||||
res.on('end', () => {
|
||||
fs.chmod(path.join(__dirname, 'yt-dlp'), 0o775, err => {
|
||||
err ? console.error('failed updating!') : console.log('done!')
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
/**
|
||||
* Invoke the yt-dlp update procedure
|
||||
* @param {Socket} socket the current connection socket
|
||||
*/
|
||||
function updateFromFrontend(socket) {
|
||||
update().then(() => {
|
||||
socket.emit('updated')
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
ytdlpUpdater: updateFromFrontend
|
||||
}
|
||||
Reference in New Issue
Block a user