This commit is contained in:
Zodiac 2025-02-12 21:31:42 -08:00
parent c8025e04eb
commit 5c7b0e987b

View File

@ -1,3 +1,25 @@
"""
IRC Bot Plugin for Uploading Files to hardfiles.org
This plugin allows users to upload files to hardfiles.org using yt-dlp for downloads.
It supports downloading files from various sources (YouTube, Instagram, etc.) and can
optionally convert videos to MP3 format before uploading. Files larger than 100MB are rejected.
Usage:
!upload [--mp3] <url>
Dependencies:
- aiohttp
- aiofiles
- irc3
- yt-dlp
- ircstyle
Author: Your Name
Version: 1.1
Date: 2025-02-12
"""
import aiohttp import aiohttp
import aiofiles import aiofiles
import irc3 import irc3
@ -11,9 +33,12 @@ import yt_dlp
from yt_dlp.utils import DownloadError from yt_dlp.utils import DownloadError
from urllib.parse import urlparse from urllib.parse import urlparse
@irc3.plugin @irc3.plugin
class UploadPlugin: class UploadPlugin:
"""IRC bot plugin for uploading files to hardfiles.org using yt-dlp for downloads.""" """
IRC bot plugin for downloading files via yt-dlp and uploading them to hardfiles.org.
"""
def __init__(self, bot): def __init__(self, bot):
self.bot = bot self.bot = bot
@ -23,10 +48,17 @@ class UploadPlugin:
""" """
Upload a file to hardfiles.org (Max 100MB). Upload a file to hardfiles.org (Max 100MB).
Args:
mask: The user mask (nickname@host) of the command issuer.
target: The channel or user where the command was issued.
args: Parsed command arguments.
Usage:
%%upload [--mp3] <url> %%upload [--mp3] <url>
""" """
url = args.get('<url>') url = args.get('<url>')
mp3 = args.get('--mp3') mp3 = args.get('--mp3')
if not url: if not url:
self.bot.privmsg( self.bot.privmsg(
target, target,
@ -35,7 +67,6 @@ class UploadPlugin:
return return
try: try:
# Directly await the upload task.
await self.do_upload(url, target, mp3) await self.do_upload(url, target, mp3)
except Exception as exc: except Exception as exc:
self.bot.privmsg( self.bot.privmsg(
@ -44,38 +75,51 @@ class UploadPlugin:
) )
async def do_upload(self, url, target, mp3): async def do_upload(self, url, target, mp3):
"""Download a file using yt-dlp and upload it.""" """
Download a file using yt-dlp and upload it to hardfiles.org.
Handles binary data properly to avoid UTF-8 decoding errors.
"""
max_size = 100 * 1024 * 1024 # 100MB limit max_size = 100 * 1024 * 1024 # 100MB limit
# Use a temporary directory context manager so cleanup is automatic. # Use a temporary directory for downloads (auto-cleanup)
with tempfile.TemporaryDirectory() as tmp_dir: with tempfile.TemporaryDirectory() as tmp_dir:
# Parse URL and determine if a header check is needed. # Determine whether to check headers for file size.
parsed_url = urlparse(url) parsed_url = urlparse(url)
domain = parsed_url.netloc.lower() domain = parsed_url.netloc.lower()
skip_check_domains = ("x.com", "instagram.com", "youtube.com", "youtu.be", "streamable.com") skip_check_domains = ("x.com", "instagram.com", "youtube.com", "youtu.be", "streamable.com")
should_check_headers = not any(domain.endswith(d) for d in skip_check_domains) should_check_headers = not any(domain.endswith(d) for d in skip_check_domains)
if should_check_headers: if should_check_headers:
async with aiohttp.ClientSession() as session: try:
async with session.head(url) as response: async with aiohttp.ClientSession() as session:
if response.status != 200: async with session.head(url) as response:
self.bot.privmsg( if response.status != 200:
target, self.bot.privmsg(
ircstyle.style(f"Failed to fetch headers: HTTP {response.status}", fg="red", bold=True, reset=True) target,
) ircstyle.style(
return f"Failed to fetch headers: HTTP {response.status}",
content_length = response.headers.get('Content-Length') fg="red", bold=True, reset=True
if content_length and int(content_length) > max_size: )
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 100MB limit",
fg="red", bold=True, reset=True
) )
) return
return content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 100MB limit",
fg="red", bold=True, reset=True
)
)
return
except Exception as e:
self.bot.privmsg(
target,
ircstyle.style(f"Error during header check: {e}", fg="red", bold=True, reset=True)
)
return
# Set up yt-dlp options. # Configure yt-dlp options.
ydl_opts = { ydl_opts = {
'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'), 'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'),
'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best', 'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best',
@ -90,7 +134,7 @@ class UploadPlugin:
}] if mp3 else [], }] if mp3 else [],
} }
# Use yt_dlp to extract information (first without downloading). # Extract info without downloading first.
with yt_dlp.YoutubeDL(ydl_opts) as ydl: with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try: try:
info = await asyncio.to_thread(ydl.extract_info, url, download=False) info = await asyncio.to_thread(ydl.extract_info, url, download=False)
@ -100,8 +144,14 @@ class UploadPlugin:
ircstyle.style(f"Info extraction failed: {e}", fg="red", bold=True, reset=True) ircstyle.style(f"Info extraction failed: {e}", fg="red", bold=True, reset=True)
) )
return return
except UnicodeDecodeError:
self.bot.privmsg(
target,
ircstyle.style("Error: Received non-UTF-8 output during info extraction", fg="red", bold=True, reset=True)
)
return
# Check the estimated file size if available. # Check estimated file size if available.
estimated_size = info.get('filesize') or info.get('filesize_approx') estimated_size = info.get('filesize') or info.get('filesize_approx')
if estimated_size and estimated_size > max_size: if estimated_size and estimated_size > max_size:
self.bot.privmsg( self.bot.privmsg(
@ -113,7 +163,7 @@ class UploadPlugin:
) )
return return
# Proceed with the download. # Download the file.
try: try:
info = await asyncio.to_thread(ydl.extract_info, url, download=True) info = await asyncio.to_thread(ydl.extract_info, url, download=True)
except DownloadError as e: except DownloadError as e:
@ -122,6 +172,12 @@ class UploadPlugin:
ircstyle.style(f"Download failed: {e}", fg="red", bold=True, reset=True) ircstyle.style(f"Download failed: {e}", fg="red", bold=True, reset=True)
) )
return return
except UnicodeDecodeError:
self.bot.privmsg(
target,
ircstyle.style("Error: Received non-UTF-8 output during download", fg="red", bold=True, reset=True)
)
return
# Prepare and send metadata (if available). # Prepare and send metadata (if available).
metadata_parts = [] metadata_parts = []
@ -139,7 +195,6 @@ class UploadPlugin:
if duration: if duration:
metadata_parts.append(ircstyle.style(f"Duration: {self._format_duration(duration)}", fg="green", bold=True, reset=True)) metadata_parts.append(ircstyle.style(f"Duration: {self._format_duration(duration)}", fg="green", bold=True, reset=True))
if upload_date: if upload_date:
# Format date as YYYY-MM-DD if possible.
formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}" if len(upload_date) == 8 else upload_date formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}" if len(upload_date) == 8 else upload_date
metadata_parts.append(ircstyle.style(f"Upload Date: {formatted_date}", fg="aqua", bold=True, reset=True)) metadata_parts.append(ircstyle.style(f"Upload Date: {formatted_date}", fg="aqua", bold=True, reset=True))
if view_count is not None: if view_count is not None:
@ -151,7 +206,7 @@ class UploadPlugin:
if metadata_parts: if metadata_parts:
self.bot.privmsg(target, " | ".join(metadata_parts)) self.bot.privmsg(target, " | ".join(metadata_parts))
# Verify that a file was downloaded. # Retrieve the downloaded file path.
downloaded_files = info.get('requested_downloads', []) downloaded_files = info.get('requested_downloads', [])
if not downloaded_files: if not downloaded_files:
self.bot.privmsg( self.bot.privmsg(
@ -160,7 +215,6 @@ class UploadPlugin:
) )
return return
# Retrieve the file path.
first_file = downloaded_files[0] first_file = downloaded_files[0]
downloaded_file = first_file.get('filepath', first_file.get('filename')) downloaded_file = first_file.get('filepath', first_file.get('filename'))
if not downloaded_file or not os.path.exists(downloaded_file): if not downloaded_file or not os.path.exists(downloaded_file):
@ -170,7 +224,7 @@ class UploadPlugin:
) )
return return
# Check the actual file size as an extra safeguard. # Extra safeguard: verify actual file size.
file_size = os.path.getsize(downloaded_file) file_size = os.path.getsize(downloaded_file)
if file_size > max_size: if file_size > max_size:
self.bot.privmsg( self.bot.privmsg(
@ -183,33 +237,52 @@ class UploadPlugin:
return return
# Upload the file to hardfiles.org. # Upload the file to hardfiles.org.
async with aiohttp.ClientSession() as session: try:
form = aiohttp.FormData() async with aiohttp.ClientSession() as session:
async with aiofiles.open(downloaded_file, 'rb') as f: form = aiohttp.FormData()
file_content = await f.read() async with aiofiles.open(downloaded_file, 'rb') as f:
form.add_field('file', file_content, filename=os.path.basename(downloaded_file)) file_content = await f.read()
async with session.post('https://hardfiles.org/', data=form, allow_redirects=False) as resp: # Ensure binary data is handled correctly.
if resp.status not in [200, 201, 302, 303]: form.add_field(
self.bot.privmsg( 'file',
target, file_content,
ircstyle.style(f"Upload failed: HTTP {resp.status}", fg="red", bold=True, reset=True) filename=os.path.basename(downloaded_file),
) content_type='application/octet-stream'
else: )
response_text = await resp.text() async with session.post('https://hardfiles.org/', data=form, allow_redirects=False) as resp:
if resp.status not in [200, 201, 302, 303]:
self.bot.privmsg(
target,
ircstyle.style(f"Upload failed: HTTP {resp.status}", fg="red", bold=True, reset=True)
)
return
# Read raw response bytes and decode with error replacement to avoid decode issues.
raw_response = await resp.read()
response_text = raw_response.decode('utf-8', errors='replace')
upload_url = self.extract_url_from_response(response_text) or "Unknown URL" upload_url = self.extract_url_from_response(response_text) or "Unknown URL"
response_msg = ( response_msg = (
ircstyle.style("Upload successful: ", fg="green", bold=True, reset=True) + ircstyle.style("Upload successful: ", fg="green", bold=True, reset=True) +
ircstyle.style(upload_url, fg="blue", underline=True, reset=True) ircstyle.style(upload_url, fg="blue", underline=True, reset=True)
) )
self.bot.privmsg(target, response_msg) self.bot.privmsg(target, response_msg)
except Exception as e:
self.bot.privmsg(
target,
ircstyle.style(f"Error during file upload: {e}", fg="red", bold=True, reset=True)
)
return
def extract_url_from_response(self, response_text): def extract_url_from_response(self, response_text):
"""Extract the first URL found in the response text.""" """
Extract the first URL found in the response text.
"""
match = re.search(r'https?://\S+', response_text) match = re.search(r'https?://\S+', response_text)
return match.group(0) if match else None return match.group(0) if match else None
def _format_duration(self, seconds): def _format_duration(self, seconds):
"""Convert seconds into a human-readable Hh Mm Ss format.""" """
Convert seconds into a human-readable duration string.
"""
seconds = int(seconds) seconds = int(seconds)
m, s = divmod(seconds, 60) m, s = divmod(seconds, 60)
h, m = divmod(m, 60) h, m = divmod(m, 60)