g1mp/plugins/upload.py
2025-02-14 19:17:00 -08:00

449 lines
18 KiB
Python

# -*- coding: utf-8 -*-
"""
IRC Bot Plugin for Uploading Files to hardfiles.org
This plugin allows users to upload files to hardfiles.org using yt-dlp for downloads.
It supports downloading files from various sources (YouTube, Instagram, etc.) and can
optionally convert videos to MP3 format before uploading. Files larger than 500MB are rejected.
Usage:
!upload [--mp3] <url_or_search_term>...
Example: !upload never gonna give you up
Dependencies:
- aiohttp
- aiofiles
- irc3
- yt-dlp
- ircstyle
Author: Zodiac
Version: 1.3
Date: 2025-02-12
"""
import aiohttp
import aiofiles
import irc3
import tempfile
import os
import re
import asyncio
from irc3.plugins.command import command
import ircstyle
import yt_dlp
from yt_dlp.utils import DownloadError
from urllib.parse import urlparse
import googleapiclient.discovery
from irc3.compat import Queue
# Global headers to mimic a real browser (ban evasion)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://www.google.com/",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1"
}
# Constants for YouTube API
API_SERVICE_NAME = "youtube"
API_VERSION = "v3"
DEVELOPER_KEY = "AIzaSyBNrqOA0ZIziUVLYm0K5W76n9ndqz6zTxI"
# Initialize YouTube API client
youtube = googleapiclient.discovery.build(
API_SERVICE_NAME, API_VERSION, developerKey=DEVELOPER_KEY
)
@irc3.plugin
class UploadPlugin:
"""IRC bot plugin for downloading files via yt-dlp and uploading them to hardfiles.org."""
def __init__(self, bot):
"""
Initialize the UploadPlugin with an IRC bot instance.
Args:
bot (irc3.IrcBot): The IRC bot instance.
"""
self.bot = bot
self.queue = Queue() # Initialize the queue
self.bot.loop.create_task(self.process_queue()) # Start queue processing
async def process_queue(self):
"""Process upload tasks from the queue."""
while True:
task = await self.queue.get()
url, target, mp3 = task
try:
await self.do_upload(url, target, mp3)
except Exception as e:
exc_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"Upload failed: {exc_msg}", fg="red", bold=True)
)
finally:
self.queue.task_done()
def _ensure_str(self, value):
"""
Ensure the value is a string. If it's bytes, decode it as UTF-8 with error replacement.
Args:
value (Union[str, bytes, None]): The value to ensure as a string.
Returns:
str: The value as a string.
"""
if isinstance(value, bytes):
return value.decode('utf-8', errors='replace')
if value is None:
return ''
return str(value)
async def search_youtube(self, query):
"""
Search YouTube for the given query and return the URL of the first result.
Args:
query (str): The search query.
Returns:
str: The URL of the first search result.
"""
try:
request = youtube.search().list(
part="id",
q=query,
type="video",
maxResults=1
)
result = await self.bot.loop.run_in_executor(None, request.execute)
if result.get("items"):
video_id = result["items"][0]["id"]["videoId"]
return f"https://www.youtube.com/watch?v={video_id}"
except Exception as e:
self.bot.log.error(f"YouTube search error: {e}")
return None
@command
async def upload(self, mask, target, args):
"""
Upload a file to hardfiles.org (Max 500MB).
Args:
mask (str): The user mask (nickname@host) of the command issuer.
target (str): The channel or user where the command was issued.
args (dict): Parsed command arguments.
Usage:
%%upload [--mp3] <url_or_search_term>...
Example: !upload never gonna give you up
"""
url_or_search_term = args.get('<url_or_search_term>')
if isinstance(url_or_search_term, list):
url_or_search_term = " ".join(url_or_search_term)
mp3 = args.get('--mp3')
if not url_or_search_term:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style("Usage: !upload [--mp3] <url_or_search_term>...", fg="red", bold=True)
)
return
if not re.match(r'^https?://', url_or_search_term):
self.bot.privmsg(
target,
ircstyle.style("🔍 ", fg="blue") +
ircstyle.style("Searching YouTube for: ", fg="blue") +
ircstyle.style(url_or_search_term, fg="green", underline=True, italics=True)
)
url = await self.search_youtube(url_or_search_term)
if not url:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style("No results found on YouTube.", fg="red", bold=True)
)
return
else:
url = url_or_search_term
# Enqueue the upload task
queue_size = self.queue.qsize() + 1
self.queue.put_nowait((url, target, mp3))
self.bot.privmsg(
target,
ircstyle.style("", fg="yellow") +
ircstyle.style("Upload queued ", fg="blue", bold=False, italics=True) +
ircstyle.style(f"(Position #{queue_size})",italics=True, fg="grey") +
ircstyle.style(" Processing soon...", fg="blue", italics=True)
)
async def do_upload(self, url, target, mp3):
"""
Download a file using yt-dlp and upload it to hardfiles.org.
Handles binary data and non-UTF-8 strings to avoid decoding errors.
Args:
url (str): The URL of the file to download.
target (str): The channel or user to send messages to.
mp3 (bool): Whether to convert the downloaded file to MP3.
"""
max_size = 500 * 1024 * 1024 # 500MB limit
with tempfile.TemporaryDirectory() as tmp_dir:
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
skip_check_domains = (
"x.com",
"instagram.com",
"youtube.com",
"youtu.be",
"streamable.com",
"reddit.com",
"twitter.com",
"tiktok.com",
"facebook.com",
"dailymotion.com",
)
should_check_headers = not any(domain.endswith(d) for d in skip_check_domains)
if should_check_headers:
try:
async with aiohttp.ClientSession(headers=HEADERS) as session:
async with session.head(url) as response:
if response.status != 200:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"Failed to fetch headers: HTTP {response.status}", fg="red", bold=True)
)
return
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 500MB limit", fg="red", bold=True)
)
return
except Exception as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"Error during header check: {err_msg}", fg="red", bold=True)
)
return
ydl_opts = {
'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'),
'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best',
'restrictfilenames': True,
'noplaylist': True,
'quiet': True,
'concurrent_fragment_downloads': 5,
'postprocessors': [
{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}
] if mp3 else [],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=False)
except DownloadError as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"Info extraction failed: {err_msg}", fg="red", bold=True)
)
return
except UnicodeDecodeError:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style("Error: Received non-UTF-8 output during info extraction", fg="red", bold=True)
)
return
estimated_size = info.get('filesize') or info.get('filesize_approx')
if estimated_size and estimated_size > max_size:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"File size ({estimated_size // (1024 * 1024)}MB) exceeds 500MB limit", fg="red", bold=True)
)
return
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=True)
except DownloadError as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"Download failed: {err_msg}", fg="red", bold=True)
)
return
except UnicodeDecodeError:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style("Error: Received non-UTF-8 output during download", fg="red", bold=True)
)
return
# Build metadata for display.
metadata_parts = []
title = self._ensure_str(info.get("title"))
uploader = self._ensure_str(info.get("uploader"))
duration = info.get("duration")
upload_date = self._ensure_str(info.get("upload_date"))
view_count = info.get("view_count")
description = self._ensure_str(info.get("description"))
description = re.sub(r'\s+', ' ', description) # Strip multiple spaces to single space
if title:
metadata_parts.append(
ircstyle.style("📝 Title:", fg="yellow", bold=True) + " " +
ircstyle.style(title, fg="yellow", bold=False, underline=True, italics=True)
)
if uploader:
metadata_parts.append(
ircstyle.style("👤 Uploader:", fg="purple", bold=True) + " " +
ircstyle.style(uploader, fg="purple", bold=False)
)
if duration:
metadata_parts.append(
ircstyle.style("⏱ Duration:", fg="green", bold=True) + " " +
ircstyle.style(self._format_duration(duration), fg="green", bold=False)
)
if upload_date:
formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}" if len(upload_date) == 8 else upload_date
metadata_parts.append(
ircstyle.style("Upload Date:", fg="silver", bold=True) + " " +
ircstyle.style(formatted_date, fg="silver", bold=False, italics=True)
)
if view_count is not None:
metadata_parts.append(
ircstyle.style("Views:", fg="teal", bold=True) + " " +
ircstyle.style(str(view_count), fg="teal", bold=False)
)
if description:
if len(description) > 200:
description = description[:200] + "..."
metadata_parts.append(
ircstyle.style("Description:", fg="silver", bold=True) + " " +
ircstyle.style(description, fg="silver", bold=False, italics=True)
)
if metadata_parts:
# Send metadata header in teal.
# self.bot.privmsg(target, ircstyle.style("📁 Metadata:", fg="teal", bold=True))
self.bot.privmsg(target, "".join(metadata_parts))
downloaded_files = info.get('requested_downloads', [])
if not downloaded_files:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style("No files downloaded", fg="red", bold=True)
)
return
first_file = downloaded_files[0]
downloaded_file = first_file.get('filepath', first_file.get('filename'))
if not downloaded_file or not os.path.exists(downloaded_file):
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"Downloaded file not found: {downloaded_file}", fg="red", bold=True)
)
return
file_size = os.path.getsize(downloaded_file)
if file_size > max_size:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"File size ({file_size // (1024 * 1024)}MB) exceeds 500MB limit", fg="red", bold=True)
)
return
try:
async with aiohttp.ClientSession(headers=HEADERS) as session:
form = aiohttp.FormData()
async with aiofiles.open(downloaded_file, 'rb') as f:
file_content = await f.read()
form.add_field(
'file',
file_content,
filename=os.path.basename(downloaded_file),
content_type='application/octet-stream'
)
async with session.post('https://hardfiles.org/', data=form, allow_redirects=False) as resp:
if resp.status not in [200, 201, 302, 303]:
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"Upload failed: HTTP {resp.status}", fg="red", bold=True)
)
return
raw_response = await resp.read()
response_text = raw_response.decode('utf-8', errors='replace')
upload_url = self.extract_url_from_response(response_text) or "Unknown URL"
upload_url = self._ensure_str(upload_url)
response_msg = (
ircstyle.style("✅ Upload successful: ", fg="green", bold=True) +
ircstyle.style(upload_url, fg="blue", underline=True)
)
self.bot.privmsg(target, response_msg)
except Exception as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style("", fg="red") +
ircstyle.style(f"Error during file upload: {err_msg}", fg="red", bold=True)
)
return
def extract_url_from_response(self, response_text):
"""
Extract the first URL found in the response text.
Args:
response_text (str): The response text to search for URLs.
Returns:
str: The first URL found in the response text, or None if no URL is found.
"""
match = re.search(r'https?://\S+', response_text)
return match.group(0) if match else None
def _format_duration(self, seconds):
"""
Convert seconds into a human-readable duration string.
Args:
seconds (int): The duration in seconds.
Returns:
str: The formatted duration string.
"""
seconds = int(seconds)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}h {m}m {s}s" if h else f"{m}m {s}s"