import aiohttp import aiofiles import irc3 import tempfile import os import re import asyncio from irc3.plugins.command import command import ircstyle import yt_dlp from yt_dlp.utils import DownloadError from urllib.parse import urlparse @irc3.plugin class UploadPlugin: """IRC bot plugin for uploading files to hardfiles.org using yt-dlp for downloads.""" def __init__(self, bot): self.bot = bot @command async def upload(self, mask, target, args): """ Upload a file to hardfiles.org (Max 100MB). %%upload [--mp3] """ url = args.get('') mp3 = args.get('--mp3') if not url: self.bot.privmsg( target, ircstyle.style("Usage: !upload [--mp3] ", fg="red", bold=True, reset=True) ) return try: # Directly await the upload task. await self.do_upload(url, target, mp3) except Exception as exc: self.bot.privmsg( target, ircstyle.style(f"Upload task error: {exc}", fg="red", bold=True, reset=True) ) async def do_upload(self, url, target, mp3): """Download a file using yt-dlp and upload it.""" max_size = 100 * 1024 * 1024 # 100MB limit # Use a temporary directory context manager so cleanup is automatic. with tempfile.TemporaryDirectory() as tmp_dir: # Parse URL and determine if a header check is needed. parsed_url = urlparse(url) domain = parsed_url.netloc.lower() skip_check_domains = ("x.com", "instagram.com", "youtube.com", "youtu.be", "streamable.com") should_check_headers = not any(domain.endswith(d) for d in skip_check_domains) if should_check_headers: async with aiohttp.ClientSession() as session: async with session.head(url) as response: if response.status != 200: self.bot.privmsg( target, ircstyle.style(f"Failed to fetch headers: HTTP {response.status}", fg="red", bold=True, reset=True) ) return content_length = response.headers.get('Content-Length') if content_length and int(content_length) > max_size: self.bot.privmsg( target, ircstyle.style( f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 100MB limit", fg="red", bold=True, reset=True ) ) return # Set up yt-dlp options. ydl_opts = { 'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'), 'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best', 'restrictfilenames': True, 'noplaylist': True, 'quiet': True, 'concurrent_fragment_downloads': 5, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }] if mp3 else [], } # Use yt_dlp to extract information (first without downloading). with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = await asyncio.to_thread(ydl.extract_info, url, download=False) except DownloadError as e: self.bot.privmsg( target, ircstyle.style(f"Info extraction failed: {e}", fg="red", bold=True, reset=True) ) return # Check the estimated file size if available. estimated_size = info.get('filesize') or info.get('filesize_approx') if estimated_size and estimated_size > max_size: self.bot.privmsg( target, ircstyle.style( f"File size ({estimated_size // (1024 * 1024)}MB) exceeds 100MB limit", fg="red", bold=True, reset=True ) ) return # Proceed with the download. try: info = await asyncio.to_thread(ydl.extract_info, url, download=True) except DownloadError as e: self.bot.privmsg( target, ircstyle.style(f"Download failed: {e}", fg="red", bold=True, reset=True) ) return # Prepare and send metadata (if available). metadata_parts = [] title = info.get("title") uploader = info.get("uploader") duration = info.get("duration") upload_date = info.get("upload_date") view_count = info.get("view_count") description = info.get("description") if title: metadata_parts.append(ircstyle.style(f"Title: {title}", fg="yellow", bold=True, reset=True)) if uploader: metadata_parts.append(ircstyle.style(f"Uploader: {uploader}", fg="purple", bold=True, reset=True)) if duration: metadata_parts.append(ircstyle.style(f"Duration: {self._format_duration(duration)}", fg="green", bold=True, reset=True)) if upload_date: # Format date as YYYY-MM-DD if possible. formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}" if len(upload_date) == 8 else upload_date metadata_parts.append(ircstyle.style(f"Upload Date: {formatted_date}", fg="aqua", bold=True, reset=True)) if view_count is not None: metadata_parts.append(ircstyle.style(f"Views: {view_count}", fg="royal", bold=True, reset=True)) if description: if len(description) > 200: description = description[:200] + "..." metadata_parts.append(ircstyle.style(f"Description: {description}", fg="silver", reset=True)) if metadata_parts: self.bot.privmsg(target, " | ".join(metadata_parts)) # Verify that a file was downloaded. downloaded_files = info.get('requested_downloads', []) if not downloaded_files: self.bot.privmsg( target, ircstyle.style("No files downloaded", fg="red", bold=True, reset=True) ) return # Retrieve the file path. first_file = downloaded_files[0] downloaded_file = first_file.get('filepath', first_file.get('filename')) if not downloaded_file or not os.path.exists(downloaded_file): self.bot.privmsg( target, ircstyle.style(f"Downloaded file not found: {downloaded_file}", fg="red", bold=True, reset=True) ) return # Check the actual file size as an extra safeguard. file_size = os.path.getsize(downloaded_file) if file_size > max_size: self.bot.privmsg( target, ircstyle.style( f"File size ({file_size // (1024 * 1024)}MB) exceeds 100MB limit", fg="red", bold=True, reset=True ) ) return # Upload the file to hardfiles.org. async with aiohttp.ClientSession() as session: form = aiohttp.FormData() async with aiofiles.open(downloaded_file, 'rb') as f: file_content = await f.read() form.add_field('file', file_content, filename=os.path.basename(downloaded_file)) async with session.post('https://hardfiles.org/', data=form, allow_redirects=False) as resp: if resp.status not in [200, 201, 302, 303]: self.bot.privmsg( target, ircstyle.style(f"Upload failed: HTTP {resp.status}", fg="red", bold=True, reset=True) ) else: response_text = await resp.text() upload_url = self.extract_url_from_response(response_text) or "Unknown URL" response_msg = ( ircstyle.style("Upload successful: ", fg="green", bold=True, reset=True) + ircstyle.style(upload_url, fg="blue", underline=True, reset=True) ) self.bot.privmsg(target, response_msg) def extract_url_from_response(self, response_text): """Extract the first URL found in the response text.""" match = re.search(r'https?://\S+', response_text) return match.group(0) if match else None def _format_duration(self, seconds): """Convert seconds into a human-readable Hh Mm Ss format.""" seconds = int(seconds) m, s = divmod(seconds, 60) h, m = divmod(m, 60) return f"{h}h {m}m {s}s" if h else f"{m}m {s}s"