g1mp/plugins/upload.py
2025-02-13 03:43:09 -08:00

434 lines
17 KiB
Python

# -*- coding: utf-8 -*-
"""
IRC Bot Plugin for Uploading Files to hardfiles.org
This plugin allows users to upload files to hardfiles.org using yt-dlp for downloads.
It supports downloading files from various sources (YouTube, Instagram, Reddit, etc.) and can
optionally convert videos to MP3 format before uploading. Files larger than 100MB are rejected.
Usage:
!upload [--mp3] <url>
Dependencies:
- aiohttp
- aiofiles
- irc3
- yt-dlp
- ircstyle
Author: Zodiac
Version: 1.2
Date: 2025-02-12
"""
import aiohttp
import aiofiles
import irc3
import tempfile
import os
import re
import asyncio
from irc3.plugins.command import command
import ircstyle
import yt_dlp
from yt_dlp.utils import DownloadError
from urllib.parse import urlparse
# Global headers to mimic a real browser (ban evasion)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://www.google.com/",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1"
}
@irc3.plugin
class UploadPlugin:
"""IRC bot plugin for downloading files via yt-dlp and uploading them to hardfiles.org."""
def __init__(self, bot):
"""
Initialize the UploadPlugin with an IRC bot instance.
Args:
bot (irc3.IrcBot): The IRC bot instance.
"""
self.bot = bot
def _ensure_str(self, value):
"""
Ensure the value is a string. If it's bytes, decode it as UTF-8 with error replacement.
Args:
value (Union[str, bytes, None]): The value to ensure as a string.
Returns:
str: The value as a string.
"""
if isinstance(value, bytes):
return value.decode('utf-8', errors='replace')
if value is None:
return ''
return str(value)
@command
async def upload(self, mask, target, args):
"""
Upload a file to hardfiles.org (Max 100MB).
Args:
mask (str): The user mask (nickname@host) of the command issuer.
target (str): The channel or user where the command was issued.
args (dict): Parsed command arguments.
Usage:
%%upload [--mp3] <url>
"""
url = args.get('<url>')
mp3 = args.get('--mp3')
if not url:
self.bot.privmsg(
target,
ircstyle.style("Usage: !upload [--mp3] <url>", fg="red", bold=True, reset=True),
)
return
try:
await self.do_upload(url, target, mp3)
except Exception as exc:
# Convert exception to a safe Unicode string.
exc_msg = self._ensure_str(exc)
self.bot.privmsg(
target,
ircstyle.style(f"Upload task error: {exc_msg}", fg="red", bold=True, reset=True),
)
async def do_upload(self, url, target, mp3):
"""
Download a file using yt-dlp and upload it to hardfiles.org.
Handles binary data and non-UTF-8 strings to avoid decoding errors.
Args:
url (str): The URL of the file to download.
target (str): The channel or user to send messages to.
mp3 (bool): Whether to convert the downloaded file to MP3.
"""
max_size = 100 * 1024 * 1024 # 100MB limit
with tempfile.TemporaryDirectory() as tmp_dir:
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
skip_check_domains = (
"x.com",
"instagram.com",
"youtube.com",
"youtu.be",
"streamable.com",
"reddit.com",
"twitter.com",
"tiktok.com",
"facebook.com",
"dailymotion.com",
)
should_check_headers = not any(domain.endswith(d) for d in skip_check_domains)
if should_check_headers:
try:
async with aiohttp.ClientSession(headers=HEADERS) as session:
async with session.head(url) as response:
if response.status != 200:
self.bot.privmsg(
target,
ircstyle.style(
f"Failed to fetch headers: HTTP {response.status}",
fg="red",
bold=True,
reset=True,
),
)
return
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 100MB limit",
fg="red",
bold=True,
reset=True,
),
)
return
except Exception as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style(
f"Error during header check: {err_msg}",
fg="red",
bold=True,
reset=True,
),
)
return
ydl_opts = {
'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'),
'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best',
'restrictfilenames': True,
'noplaylist': True,
'quiet': True,
'concurrent_fragment_downloads': 5,
'postprocessors': [
{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}
] if mp3 else [],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
# Try to extract info without downloading
info = await asyncio.to_thread(ydl.extract_info, url, download=False)
except DownloadError as e:
err_msg = self._ensure_str(e)
# If authentication is required (e.g. for Reddit), skip info extraction
if "Account authentication is required" in err_msg or "[Reddit]" in err_msg:
self.bot.privmsg(
target,
ircstyle.style(
f"Info extraction failed (auth error): {err_msg}. Skipping info extraction and proceeding with download.",
fg="yellow",
bold=True,
reset=True,
),
)
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=True)
except Exception as e2:
err_msg2 = self._ensure_str(e2)
self.bot.privmsg(
target,
ircstyle.style(
f"Download failed: {err_msg2}",
fg="red",
bold=True,
reset=True,
),
)
return
else:
self.bot.privmsg(
target,
ircstyle.style(
f"Info extraction failed: {err_msg}",
fg="red",
bold=True,
reset=True,
),
)
return
estimated_size = info.get('filesize') or info.get('filesize_approx')
if estimated_size and estimated_size > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({estimated_size // (1024 * 1024)}MB) exceeds 100MB limit",
fg="red",
bold=True,
reset=True,
),
)
return
try:
# Proceed with the download (this may overwrite info if not already downloaded)
info = await asyncio.to_thread(ydl.extract_info, url, download=True)
except DownloadError as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style(
f"Download failed: {err_msg}",
fg="red",
bold=True,
reset=True,
),
)
return
except UnicodeDecodeError:
self.bot.privmsg(
target,
ircstyle.style(
"Error: Received non-UTF-8 output during download",
fg="red",
bold=True,
reset=True,
),
)
return
# Safely convert metadata to strings
metadata_parts = []
title = self._ensure_str(info.get("title"))
uploader = self._ensure_str(info.get("uploader"))
duration = info.get("duration")
upload_date = self._ensure_str(info.get("upload_date"))
view_count = info.get("view_count")
description = self._ensure_str(info.get("description"))
if title:
metadata_parts.append(
ircstyle.style(f"Title: {title}", fg="yellow", bold=True, reset=True)
)
if uploader:
metadata_parts.append(
ircstyle.style(f"Uploader: {uploader}", fg="purple", bold=True, reset=True)
)
if duration:
metadata_parts.append(
ircstyle.style(
f"Duration: {self._format_duration(duration)}",
fg="green",
bold=True,
reset=True,
)
)
if upload_date:
formatted_date = (
f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}"
if len(upload_date) == 8
else upload_date
)
metadata_parts.append(
ircstyle.style(
f"Upload Date: {formatted_date}", fg="aqua", bold=True, reset=True
)
)
if view_count is not None:
metadata_parts.append(
ircstyle.style(f"Views: {view_count}", fg="royal", bold=True, reset=True)
)
if description:
if len(description) > 200:
description = description[:200] + "..."
metadata_parts.append(
ircstyle.style(f"Description: {description}", fg="silver", reset=True)
)
if metadata_parts:
self.bot.privmsg(target, " | ".join(metadata_parts))
downloaded_files = info.get('requested_downloads', [])
if not downloaded_files:
self.bot.privmsg(
target,
ircstyle.style("No files downloaded", fg="red", bold=True, reset=True),
)
return
first_file = downloaded_files[0]
downloaded_file = first_file.get('filepath', first_file.get('filename'))
if not downloaded_file or not os.path.exists(downloaded_file):
self.bot.privmsg(
target,
ircstyle.style(
f"Downloaded file not found: {downloaded_file}",
fg="red",
bold=True,
reset=True,
),
)
return
file_size = os.path.getsize(downloaded_file)
if file_size > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({file_size // (1024 * 1024)}MB) exceeds 100MB limit",
fg="red",
bold=True,
reset=True,
),
)
return
try:
async with aiohttp.ClientSession(headers=HEADERS) as session:
form = aiohttp.FormData()
async with aiofiles.open(downloaded_file, 'rb') as f:
file_content = await f.read()
form.add_field(
'file',
file_content,
filename=os.path.basename(downloaded_file),
content_type='application/octet-stream',
)
async with session.post(
'https://hardfiles.org/', data=form, allow_redirects=False
) as resp:
if resp.status not in [200, 201, 302, 303]:
self.bot.privmsg(
target,
ircstyle.style(
f"Upload failed: HTTP {resp.status}",
fg="red",
bold=True,
reset=True,
),
)
return
raw_response = await resp.read()
response_text = raw_response.decode('utf-8', errors='replace')
upload_url = self.extract_url_from_response(response_text) or "Unknown URL"
upload_url = self._ensure_str(upload_url)
response_msg = (
ircstyle.style("Upload successful: ", fg="green", bold=True, reset=True)
+ ircstyle.style(upload_url, fg="blue", underline=True, reset=True)
)
self.bot.privmsg(target, response_msg)
except Exception as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style(
f"Error during file upload: {err_msg}", fg="red", bold=True, reset=True
),
)
return
def extract_url_from_response(self, response_text):
"""
Extract the first URL found in the response text.
Args:
response_text (str): The response text to search for URLs.
Returns:
str: The first URL found in the response text, or None if no URL is found.
"""
match = re.search(r'https?://\S+', response_text)
return match.group(0) if match else None
def _format_duration(self, seconds):
"""
Convert seconds into a human-readable duration string.
Args:
seconds (int): The duration in seconds.
Returns:
str: The formatted duration string.
"""
seconds = int(seconds)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}h {m}m {s}s" if h else f"{m}m {s}s"