# -*- coding: utf-8 -*- """ IRC Bot Plugin for Uploading Files to hardfiles.org This plugin allows users to upload files to hardfiles.org using yt-dlp for downloads. It supports downloading files from various sources (YouTube, Instagram, etc.) and can optionally convert videos to MP3 format before uploading. Files larger than 500MB are rejected. Usage: !upload [--mp3] ... Example: !upload never gonna give you up Dependencies: - aiohttp - aiofiles - irc3 - yt-dlp - ircstyle Author: Zodiac Version: 1.3 Date: 2025-02-12 """ import aiohttp import aiofiles import irc3 import tempfile import os import re import asyncio from irc3.plugins.command import command import ircstyle import yt_dlp from yt_dlp.utils import DownloadError from urllib.parse import urlparse import googleapiclient.discovery from irc3.compat import Queue # Global headers to mimic a real browser (ban evasion) HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Referer": "https://www.google.com/", "Connection": "keep-alive", "DNT": "1", "Upgrade-Insecure-Requests": "1" } # Constants for YouTube API API_SERVICE_NAME = "youtube" API_VERSION = "v3" DEVELOPER_KEY = "AIzaSyBNrqOA0ZIziUVLYm0K5W76n9ndqz6zTxI" # Initialize YouTube API client youtube = googleapiclient.discovery.build( API_SERVICE_NAME, API_VERSION, developerKey=DEVELOPER_KEY ) @irc3.plugin class UploadPlugin: """IRC bot plugin for downloading files via yt-dlp and uploading them to hardfiles.org.""" def __init__(self, bot): """ Initialize the UploadPlugin with an IRC bot instance. Args: bot (irc3.IrcBot): The IRC bot instance. """ self.bot = bot self.queue = Queue() # Initialize the queue self.bot.loop.create_task(self.process_queue()) # Start queue processing async def process_queue(self): """Process upload tasks from the queue.""" while True: task = await self.queue.get() url, target, mp3 = task try: await self.do_upload(url, target, mp3) except Exception as e: exc_msg = self._ensure_str(e) self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"Upload failed: {exc_msg}", fg="red", bold=True) ) finally: self.queue.task_done() def _ensure_str(self, value): """ Ensure the value is a string. If it's bytes, decode it as UTF-8 with error replacement. Args: value (Union[str, bytes, None]): The value to ensure as a string. Returns: str: The value as a string. """ if isinstance(value, bytes): return value.decode('utf-8', errors='replace') if value is None: return '' return str(value) async def search_youtube(self, query): """ Search YouTube for the given query and return the URL of the first result. Args: query (str): The search query. Returns: str: The URL of the first search result. """ try: request = youtube.search().list( part="id", q=query, type="video", maxResults=1 ) result = await self.bot.loop.run_in_executor(None, request.execute) if result.get("items"): video_id = result["items"][0]["id"]["videoId"] return f"https://www.youtube.com/watch?v={video_id}" except Exception as e: self.bot.log.error(f"YouTube search error: {e}") return None @command(aliases=['u'], public=True) async def upload(self, mask, target, args): """ Upload a file to hardfiles.org (Max 500MB). Args: mask (str): The user mask (nickname@host) of the command issuer. target (str): The channel or user where the command was issued. args (dict): Parsed command arguments. Usage: %%upload [--mp3] ... Example: !upload never gonna give you up """ url_or_search_term = args.get('') if isinstance(url_or_search_term, list): url_or_search_term = " ".join(url_or_search_term) mp3 = args.get('--mp3') if not url_or_search_term: self.bot.privmsg( target, ircstyle.style("❓ ", fg="red") + ircstyle.style("Usage: !upload [--mp3] ...", fg="red", bold=True) ) return if not re.match(r'^https?://', url_or_search_term): self.bot.privmsg( target, ircstyle.style("🔍 ", fg="blue") + ircstyle.style("Searching YouTube for: ", fg="blue") + ircstyle.style(url_or_search_term, fg="green", underline=True, italics=True) ) url = await self.search_youtube(url_or_search_term) if not url: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style("No results found on YouTube.", fg="red", bold=True) ) return else: url = url_or_search_term # Enqueue the upload task queue_size = self.queue.qsize() + 1 self.queue.put_nowait((url, target, mp3)) self.bot.privmsg( target, ircstyle.style("⏳ ", fg="yellow") + ircstyle.style("Upload queued ", fg="blue", bold=False, italics=True) + ircstyle.style(f"(Position #{queue_size})",italics=True, fg="grey") + ircstyle.style(" Processing soon...", fg="blue", italics=True) ) async def do_upload(self, url, target, mp3): """ Download a file using yt-dlp and upload it to hardfiles.org. Handles binary data and non-UTF-8 strings to avoid decoding errors. Args: url (str): The URL of the file to download. target (str): The channel or user to send messages to. mp3 (bool): Whether to convert the downloaded file to MP3. """ max_size = 500 * 1024 * 1024 # 500MB limit with tempfile.TemporaryDirectory() as tmp_dir: parsed_url = urlparse(url) domain = parsed_url.netloc.lower() skip_check_domains = ( "x.com", "instagram.com", "youtube.com", "youtu.be", "streamable.com", "reddit.com", "twitter.com", "tiktok.com", "facebook.com", "dailymotion.com", ) should_check_headers = not any(domain.endswith(d) for d in skip_check_domains) if should_check_headers: try: async with aiohttp.ClientSession(headers=HEADERS) as session: async with session.head(url) as response: if response.status != 200: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"Failed to fetch headers: HTTP {response.status}", fg="red", bold=True) ) return content_length = response.headers.get('Content-Length') if content_length and int(content_length) > max_size: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 500MB limit", fg="red", bold=True) ) return except Exception as e: err_msg = self._ensure_str(e) self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"Error during header check: {err_msg}", fg="red", bold=True) ) return ydl_opts = { 'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'), 'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best', 'restrictfilenames': True, 'noplaylist': True, 'quiet': True, 'concurrent_fragment_downloads': 5, 'postprocessors': [ { 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', } ] if mp3 else [], } with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = await asyncio.to_thread(ydl.extract_info, url, download=False) except DownloadError as e: err_msg = self._ensure_str(e) self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"Info extraction failed: {err_msg}", fg="red", bold=True) ) return except UnicodeDecodeError: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style("Error: Received non-UTF-8 output during info extraction", fg="red", bold=True) ) return estimated_size = info.get('filesize') or info.get('filesize_approx') if estimated_size and estimated_size > max_size: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"File size ({estimated_size // (1024 * 1024)}MB) exceeds 500MB limit", fg="red", bold=True) ) return try: info = await asyncio.to_thread(ydl.extract_info, url, download=True) except DownloadError as e: err_msg = self._ensure_str(e) self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"Download failed: {err_msg}", fg="red", bold=True) ) return except UnicodeDecodeError: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style("Error: Received non-UTF-8 output during download", fg="red", bold=True) ) return # Build metadata for display. metadata_parts = [] title = self._ensure_str(info.get("title")) uploader = self._ensure_str(info.get("uploader")) duration = info.get("duration") upload_date = self._ensure_str(info.get("upload_date")) view_count = info.get("view_count") description = self._ensure_str(info.get("description")) description = re.sub(r'\s+', ' ', description) # Strip multiple spaces to single space if title: metadata_parts.append( ircstyle.style("📝 Title:", fg="yellow", bold=True) + " " + ircstyle.style(title, fg="yellow", bold=False, underline=True, italics=True) ) if uploader: metadata_parts.append( ircstyle.style("👤 Uploader:", fg="purple", bold=True) + " " + ircstyle.style(uploader, fg="purple", bold=False) ) if duration: metadata_parts.append( ircstyle.style("⏱ Duration:", fg="green", bold=True) + " " + ircstyle.style(self._format_duration(duration), fg="green", bold=False) ) if upload_date: formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}" if len(upload_date) == 8 else upload_date metadata_parts.append( ircstyle.style("Upload Date:", fg="silver", bold=True) + " " + ircstyle.style(formatted_date, fg="silver", bold=False, italics=True) ) if view_count is not None: metadata_parts.append( ircstyle.style("Views:", fg="teal", bold=True) + " " + ircstyle.style(str(view_count), fg="teal", bold=False) ) if description: if len(description) > 200: description = description[:200] + "..." metadata_parts.append( ircstyle.style("Description:", fg="silver", bold=True) + " " + ircstyle.style(description, fg="silver", bold=False, italics=True) ) if metadata_parts: # Send metadata header in teal. # self.bot.privmsg(target, ircstyle.style("📁 Metadata:", fg="teal", bold=True)) self.bot.privmsg(target, " │ ".join(metadata_parts)) downloaded_files = info.get('requested_downloads', []) if not downloaded_files: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style("No files downloaded", fg="red", bold=True) ) return first_file = downloaded_files[0] downloaded_file = first_file.get('filepath', first_file.get('filename')) if not downloaded_file or not os.path.exists(downloaded_file): self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"Downloaded file not found: {downloaded_file}", fg="red", bold=True) ) return file_size = os.path.getsize(downloaded_file) if file_size > max_size: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"File size ({file_size // (1024 * 1024)}MB) exceeds 500MB limit", fg="red", bold=True) ) return try: async with aiohttp.ClientSession(headers=HEADERS) as session: form = aiohttp.FormData() async with aiofiles.open(downloaded_file, 'rb') as f: file_content = await f.read() form.add_field( 'file', file_content, filename=os.path.basename(downloaded_file), content_type='application/octet-stream' ) async with session.post('https://hardfiles.org/', data=form, allow_redirects=False) as resp: if resp.status not in [200, 201, 302, 303]: self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"Upload failed: HTTP {resp.status}", fg="red", bold=True) ) return raw_response = await resp.read() response_text = raw_response.decode('utf-8', errors='replace') upload_url = self.extract_url_from_response(response_text) or "Unknown URL" upload_url = self._ensure_str(upload_url) response_msg = ( ircstyle.style("✅ Upload successful: ", fg="green", bold=True) + ircstyle.style(upload_url, fg="blue", underline=True) ) self.bot.privmsg(target, response_msg) except Exception as e: err_msg = self._ensure_str(e) self.bot.privmsg( target, ircstyle.style("❌ ", fg="red") + ircstyle.style(f"Error during file upload: {err_msg}", fg="red", bold=True) ) return def extract_url_from_response(self, response_text): """ Extract the first URL found in the response text. Args: response_text (str): The response text to search for URLs. Returns: str: The first URL found in the response text, or None if no URL is found. """ match = re.search(r'https?://\S+', response_text) return match.group(0) if match else None def _format_duration(self, seconds): """ Convert seconds into a human-readable duration string. Args: seconds (int): The duration in seconds. Returns: str: The formatted duration string. """ seconds = int(seconds) m, s = divmod(seconds, 60) h, m = divmod(m, 60) return f"{h}h {m}m {s}s" if h else f"{m}m {s}s"