g1mp/plugins/upload.py

217 lines
9.6 KiB
Python
Raw Normal View History

2025-02-13 04:55:42 +00:00
import aiohttp
import aiofiles
import irc3
import tempfile
import os
import re
import asyncio
from irc3.plugins.command import command
import ircstyle
import yt_dlp
from yt_dlp.utils import DownloadError
from urllib.parse import urlparse
@irc3.plugin
class UploadPlugin:
"""IRC bot plugin for uploading files to hardfiles.org using yt-dlp for downloads."""
def __init__(self, bot):
self.bot = bot
@command
async def upload(self, mask, target, args):
"""
Upload a file to hardfiles.org (Max 100MB).
%%upload [--mp3] <url>
"""
url = args.get('<url>')
mp3 = args.get('--mp3')
if not url:
self.bot.privmsg(
target,
ircstyle.style("Usage: !upload [--mp3] <url>", fg="red", bold=True, reset=True)
)
return
try:
# Directly await the upload task.
await self.do_upload(url, target, mp3)
except Exception as exc:
self.bot.privmsg(
target,
ircstyle.style(f"Upload task error: {exc}", fg="red", bold=True, reset=True)
)
async def do_upload(self, url, target, mp3):
"""Download a file using yt-dlp and upload it."""
max_size = 100 * 1024 * 1024 # 100MB limit
# Use a temporary directory context manager so cleanup is automatic.
with tempfile.TemporaryDirectory() as tmp_dir:
# Parse URL and determine if a header check is needed.
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
skip_check_domains = ("x.com", "instagram.com", "youtube.com", "youtu.be", "streamable.com")
should_check_headers = not any(domain.endswith(d) for d in skip_check_domains)
if should_check_headers:
async with aiohttp.ClientSession() as session:
async with session.head(url) as response:
if response.status != 200:
self.bot.privmsg(
target,
ircstyle.style(f"Failed to fetch headers: HTTP {response.status}", fg="red", bold=True, reset=True)
)
return
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 100MB limit",
fg="red", bold=True, reset=True
)
)
return
# Set up yt-dlp options.
ydl_opts = {
'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'),
'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best',
'restrictfilenames': True,
'noplaylist': True,
'quiet': True,
'concurrent_fragment_downloads': 5,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}] if mp3 else [],
}
# Use yt_dlp to extract information (first without downloading).
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=False)
except DownloadError as e:
self.bot.privmsg(
target,
ircstyle.style(f"Info extraction failed: {e}", fg="red", bold=True, reset=True)
)
return
# Check the estimated file size if available.
estimated_size = info.get('filesize') or info.get('filesize_approx')
if estimated_size and estimated_size > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({estimated_size // (1024 * 1024)}MB) exceeds 100MB limit",
fg="red", bold=True, reset=True
)
)
return
# Proceed with the download.
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=True)
except DownloadError as e:
self.bot.privmsg(
target,
ircstyle.style(f"Download failed: {e}", fg="red", bold=True, reset=True)
)
return
# Prepare and send metadata (if available).
metadata_parts = []
title = info.get("title")
uploader = info.get("uploader")
duration = info.get("duration")
upload_date = info.get("upload_date")
view_count = info.get("view_count")
description = info.get("description")
if title:
metadata_parts.append(ircstyle.style(f"Title: {title}", fg="yellow", bold=True, reset=True))
if uploader:
metadata_parts.append(ircstyle.style(f"Uploader: {uploader}", fg="purple", bold=True, reset=True))
if duration:
metadata_parts.append(ircstyle.style(f"Duration: {self._format_duration(duration)}", fg="green", bold=True, reset=True))
if upload_date:
# Format date as YYYY-MM-DD if possible.
formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}" if len(upload_date) == 8 else upload_date
metadata_parts.append(ircstyle.style(f"Upload Date: {formatted_date}", fg="aqua", bold=True, reset=True))
if view_count is not None:
metadata_parts.append(ircstyle.style(f"Views: {view_count}", fg="royal", bold=True, reset=True))
if description:
if len(description) > 200:
description = description[:200] + "..."
metadata_parts.append(ircstyle.style(f"Description: {description}", fg="silver", reset=True))
if metadata_parts:
self.bot.privmsg(target, " | ".join(metadata_parts))
# Verify that a file was downloaded.
downloaded_files = info.get('requested_downloads', [])
if not downloaded_files:
self.bot.privmsg(
target,
ircstyle.style("No files downloaded", fg="red", bold=True, reset=True)
)
return
# Retrieve the file path.
first_file = downloaded_files[0]
downloaded_file = first_file.get('filepath', first_file.get('filename'))
if not downloaded_file or not os.path.exists(downloaded_file):
self.bot.privmsg(
target,
ircstyle.style(f"Downloaded file not found: {downloaded_file}", fg="red", bold=True, reset=True)
)
return
# Check the actual file size as an extra safeguard.
file_size = os.path.getsize(downloaded_file)
if file_size > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({file_size // (1024 * 1024)}MB) exceeds 100MB limit",
fg="red", bold=True, reset=True
)
)
return
# Upload the file to hardfiles.org.
async with aiohttp.ClientSession() as session:
form = aiohttp.FormData()
async with aiofiles.open(downloaded_file, 'rb') as f:
file_content = await f.read()
form.add_field('file', file_content, filename=os.path.basename(downloaded_file))
async with session.post('https://hardfiles.org/', data=form, allow_redirects=False) as resp:
if resp.status not in [200, 201, 302, 303]:
self.bot.privmsg(
target,
ircstyle.style(f"Upload failed: HTTP {resp.status}", fg="red", bold=True, reset=True)
)
else:
response_text = await resp.text()
upload_url = self.extract_url_from_response(response_text) or "Unknown URL"
response_msg = (
ircstyle.style("Upload successful: ", fg="green", bold=True, reset=True) +
ircstyle.style(upload_url, fg="blue", underline=True, reset=True)
)
self.bot.privmsg(target, response_msg)
def extract_url_from_response(self, response_text):
"""Extract the first URL found in the response text."""
match = re.search(r'https?://\S+', response_text)
return match.group(0) if match else None
def _format_duration(self, seconds):
"""Convert seconds into a human-readable Hh Mm Ss format."""
seconds = int(seconds)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}h {m}m {s}s" if h else f"{m}m {s}s"