g1mp/plugins/upload.py
2025-02-13 19:29:24 -08:00

472 lines
18 KiB
Python

# -*- coding: utf-8 -*-
"""
IRC Bot Plugin for Uploading Files to hardfiles.org
This plugin allows users to upload files to hardfiles.org using yt-dlp for downloads.
It supports downloading files from various sources (YouTube, Instagram, etc.) and can
optionally convert videos to MP3 format before uploading. Files larger than 500MB are rejected.
Usage:
!upload [--mp3] <url_or_search_term>...
Example: !upload never gonna give you up
Dependencies:
- aiohttp
- aiofiles
- irc3
- yt-dlp
- ircstyle
Author: Zodiac
Version: 1.2
Date: 2025-02-12
"""
import aiohttp
import aiofiles
import irc3
import tempfile
import os
import re
import asyncio
from irc3.plugins.command import command
import ircstyle
import yt_dlp
from yt_dlp.utils import DownloadError
from urllib.parse import urlparse
import googleapiclient.discovery
# Global headers to mimic a real browser (ban evasion)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://www.google.com/",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1"
}
# Constants for YouTube API
API_SERVICE_NAME = "youtube"
API_VERSION = "v3"
DEVELOPER_KEY = "AIzaSyBNrqOA0ZIziUVLYm0K5W76n9ndqz6zTxI"
# Initialize YouTube API client
youtube = googleapiclient.discovery.build(
API_SERVICE_NAME, API_VERSION, developerKey=DEVELOPER_KEY
)
@irc3.plugin
class UploadPlugin:
"""IRC bot plugin for downloading files via yt-dlp and uploading them to hardfiles.org."""
def __init__(self, bot):
"""
Initialize the UploadPlugin with an IRC bot instance.
Args:
bot (irc3.IrcBot): The IRC bot instance.
"""
self.bot = bot
def _ensure_str(self, value):
"""
Ensure the value is a string. If it's bytes, decode it as UTF-8 with error replacement.
Args:
value (Union[str, bytes, None]): The value to ensure as a string.
Returns:
str: The value as a string.
"""
if isinstance(value, bytes):
return value.decode('utf-8', errors='replace')
if value is None:
return ''
return str(value)
async def search_youtube(self, query):
"""
Search YouTube for the given query and return the URL of the first result.
Args:
query (str): The search query.
Returns:
str: The URL of the first search result.
"""
try:
request = youtube.search().list(
part="id",
q=query,
type="video",
maxResults=1
)
result = await self.bot.loop.run_in_executor(None, request.execute)
if result.get("items"):
video_id = result["items"][0]["id"]["videoId"]
return f"https://www.youtube.com/watch?v={video_id}"
except Exception as e:
self.bot.log.error(f"YouTube search error: {e}")
return None
@command
async def upload(self, mask, target, args):
"""
Upload a file to hardfiles.org (Max 500MB).
Args:
mask (str): The user mask (nickname@host) of the command issuer.
target (str): The channel or user where the command was issued.
args (dict): Parsed command arguments.
Usage:
%%upload [--mp3] <url_or_search_term>...
Example: !upload never gonna give you up
"""
url_or_search_term = args.get('<url_or_search_term>')
# If multiple words are provided, join them into a single string.
if isinstance(url_or_search_term, list):
url_or_search_term = " ".join(url_or_search_term)
mp3 = args.get('--mp3')
if not url_or_search_term:
self.bot.privmsg(
target,
ircstyle.style("Usage: !upload [--mp3] <url_or_search_term>...", fg="red", bold=True, reset=True),
)
return
# If the input is not a URL, treat it as a search term
if not re.match(r'^https?://', url_or_search_term):
self.bot.privmsg(
target,
ircstyle.style(f"Searching YouTube for: {url_or_search_term}", fg="blue", bold=True, reset=True),
)
url = await self.search_youtube(url_or_search_term)
if not url:
self.bot.privmsg(
target,
ircstyle.style("No results found on YouTube.", fg="red", bold=True, reset=True),
)
return
else:
url = url_or_search_term
try:
await self.do_upload(url, target, mp3)
except Exception as exc:
# Convert exception to a safe Unicode string.
exc_msg = self._ensure_str(exc)
self.bot.privmsg(
target,
ircstyle.style(f"Upload task error: {exc_msg}", fg="red", bold=True, reset=True),
)
async def do_upload(self, url, target, mp3):
"""
Download a file using yt-dlp and upload it to hardfiles.org.
Handles binary data and non-UTF-8 strings to avoid decoding errors.
Args:
url (str): The URL of the file to download.
target (str): The channel or user to send messages to.
mp3 (bool): Whether to convert the downloaded file to MP3.
"""
max_size = 500 * 1024 * 1024 # 500MB limit
with tempfile.TemporaryDirectory() as tmp_dir:
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
skip_check_domains = (
"x.com",
"instagram.com",
"youtube.com",
"youtu.be",
"streamable.com",
"reddit.com",
"twitter.com",
"tiktok.com",
"facebook.com",
"dailymotion.com",
)
should_check_headers = not any(domain.endswith(d) for d in skip_check_domains)
if should_check_headers:
try:
async with aiohttp.ClientSession(headers=HEADERS) as session:
async with session.head(url) as response:
if response.status != 200:
self.bot.privmsg(
target,
ircstyle.style(
f"Failed to fetch headers: HTTP {response.status}",
fg="red",
bold=True,
reset=True,
),
)
return
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 500MB limit",
fg="red",
bold=True,
reset=True,
),
)
return
except Exception as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style(
f"Error during header check: {err_msg}",
fg="red",
bold=True,
reset=True,
),
)
return
ydl_opts = {
'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'),
'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best',
'restrictfilenames': True,
'noplaylist': True,
'quiet': True,
'concurrent_fragment_downloads': 5,
'postprocessors': [
{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}
]
if mp3
else [],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=False)
except DownloadError as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style(
f"Info extraction failed: {err_msg}", fg="red", bold=True, reset=True
),
)
return
except UnicodeDecodeError:
self.bot.privmsg(
target,
ircstyle.style(
"Error: Received non-UTF-8 output during info extraction",
fg="red",
bold=True,
reset=True,
),
)
return
estimated_size = info.get('filesize') or info.get('filesize_approx')
if estimated_size and estimated_size > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({estimated_size // (1024 * 1024)}MB) exceeds 500MB limit",
fg="red",
bold=True,
reset=True,
),
)
return
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=True)
except DownloadError as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style(
f"Download failed: {err_msg}", fg="red", bold=True, reset=True
),
)
return
except UnicodeDecodeError:
self.bot.privmsg(
target,
ircstyle.style(
"Error: Received non-UTF-8 output during download",
fg="red",
bold=True,
reset=True,
),
)
return
# Safely convert metadata to strings.
metadata_parts = []
title = self._ensure_str(info.get("title"))
uploader = self._ensure_str(info.get("uploader"))
duration = info.get("duration")
upload_date = self._ensure_str(info.get("upload_date"))
view_count = info.get("view_count")
description = self._ensure_str(info.get("description"))
if title:
metadata_parts.append(
ircstyle.style(f"Title: {title}", fg="yellow", bold=True, reset=True)
)
if uploader:
metadata_parts.append(
ircstyle.style(f"Uploader: {uploader}", fg="purple", bold=True, reset=True)
)
if duration:
metadata_parts.append(
ircstyle.style(
f"Duration: {self._format_duration(duration)}",
fg="green",
bold=True,
reset=True,
)
)
if upload_date:
formatted_date = (
f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}"
if len(upload_date) == 8
else upload_date
)
metadata_parts.append(
ircstyle.style(
f"Upload Date: {formatted_date}", fg="aqua", bold=True, reset=True
)
)
if view_count is not None:
metadata_parts.append(
ircstyle.style(f"Views: {view_count}", fg="royal", bold=True, reset=True)
)
if description:
if len(description) > 200:
description = description[:200] + "..."
metadata_parts.append(
ircstyle.style(f"Description: {description}", fg="silver", reset=True)
)
if metadata_parts:
self.bot.privmsg(target, " | ".join(metadata_parts))
downloaded_files = info.get('requested_downloads', [])
if not downloaded_files:
self.bot.privmsg(
target,
ircstyle.style("No files downloaded", fg="red", bold=True, reset=True),
)
return
first_file = downloaded_files[0]
downloaded_file = first_file.get('filepath', first_file.get('filename'))
if not downloaded_file or not os.path.exists(downloaded_file):
self.bot.privmsg(
target,
ircstyle.style(
f"Downloaded file not found: {downloaded_file}",
fg="red",
bold=True,
reset=True,
),
)
return
file_size = os.path.getsize(downloaded_file)
if file_size > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({file_size // (1024 * 1024)}MB) exceeds 500MB limit",
fg="red",
bold=True,
reset=True,
),
)
return
try:
async with aiohttp.ClientSession(headers=HEADERS) as session:
form = aiohttp.FormData()
async with aiofiles.open(downloaded_file, 'rb') as f:
file_content = await f.read()
form.add_field(
'file',
file_content,
filename=os.path.basename(downloaded_file),
content_type='application/octet-stream',
)
async with session.post(
'https://hardfiles.org/', data=form, allow_redirects=False
) as resp:
if resp.status not in [200, 201, 302, 303]:
self.bot.privmsg(
target,
ircstyle.style(
f"Upload failed: HTTP {resp.status}",
fg="red",
bold=True,
reset=True,
),
)
return
raw_response = await resp.read()
# Decode the response safely even if non-UTF-8 bytes are present.
response_text = raw_response.decode('utf-8', errors='replace')
upload_url = self.extract_url_from_response(response_text) or "Unknown URL"
upload_url = self._ensure_str(upload_url)
response_msg = (
ircstyle.style("Upload successful: ", fg="green", bold=True, reset=True)
+ ircstyle.style(upload_url, fg="blue", underline=True, reset=True)
)
self.bot.privmsg(target, response_msg)
except Exception as e:
err_msg = self._ensure_str(e)
self.bot.privmsg(
target,
ircstyle.style(
f"Error during file upload: {err_msg}", fg="red", bold=True, reset=True
),
)
return
def extract_url_from_response(self, response_text):
"""
Extract the first URL found in the response text.
Args:
response_text (str): The response text to search for URLs.
Returns:
str: The first URL found in the response text, or None if no URL is found.
"""
match = re.search(r'https?://\S+', response_text)
return match.group(0) if match else None
def _format_duration(self, seconds):
"""
Convert seconds into a human-readable duration string.
Args:
seconds (int): The duration in seconds.
Returns:
str: The formatted duration string.
"""
seconds = int(seconds)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}h {m}m {s}s" if h else f"{m}m {s}s"