g1mp/plugins/upload.py

415 lines
16 KiB
Python
Raw Normal View History

2025-02-13 06:35:15 +00:00
# -*- coding: utf-8 -*-
2025-02-13 05:31:42 +00:00
"""
IRC Bot Plugin for Uploading Files to hardfiles.org
This plugin allows users to upload files to hardfiles.org using yt-dlp for downloads.
2025-02-13 11:45:02 +00:00
It supports downloading files from various sources (YouTube, Instagram, etc.) and can
2025-02-13 05:31:42 +00:00
optionally convert videos to MP3 format before uploading. Files larger than 100MB are rejected.
Usage:
!upload [--mp3] <url>
Dependencies:
- aiohttp
- aiofiles
- irc3
- yt-dlp
- ircstyle
2025-02-13 06:35:15 +00:00
Author: Zodiac
2025-02-13 05:33:15 +00:00
Version: 1.2
2025-02-13 05:31:42 +00:00
Date: 2025-02-12
"""
2025-02-13 04:55:42 +00:00
import aiohttp
import aiofiles
import irc3
import tempfile
import os
import re
import asyncio
from irc3.plugins.command import command
import ircstyle
import yt_dlp
from yt_dlp.utils import DownloadError
from urllib.parse import urlparse
2025-02-13 11:33:31 +00:00
# Global headers to mimic a real browser (ban evasion)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://www.google.com/",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1"
}
2025-02-13 05:31:42 +00:00
2025-02-13 04:55:42 +00:00
@irc3.plugin
class UploadPlugin:
2025-02-13 06:35:15 +00:00
"""IRC bot plugin for downloading files via yt-dlp and uploading them to hardfiles.org."""
2025-02-13 04:55:42 +00:00
def __init__(self, bot):
2025-02-13 06:35:15 +00:00
"""
Initialize the UploadPlugin with an IRC bot instance.
Args:
bot (irc3.IrcBot): The IRC bot instance.
"""
2025-02-13 04:55:42 +00:00
self.bot = bot
2025-02-13 05:33:15 +00:00
def _ensure_str(self, value):
"""
Ensure the value is a string. If it's bytes, decode it as UTF-8 with error replacement.
2025-02-13 06:35:15 +00:00
Args:
value (Union[str, bytes, None]): The value to ensure as a string.
Returns:
str: The value as a string.
2025-02-13 05:33:15 +00:00
"""
if isinstance(value, bytes):
return value.decode('utf-8', errors='replace')
if value is None:
return ''
return str(value)
2025-02-13 04:55:42 +00:00
@command
async def upload(self, mask, target, args):
"""
Upload a file to hardfiles.org (Max 100MB).
2025-02-13 05:31:42 +00:00
Args:
2025-02-13 06:35:15 +00:00
mask (str): The user mask (nickname@host) of the command issuer.
target (str): The channel or user where the command was issued.
args (dict): Parsed command arguments.
2025-02-13 05:31:42 +00:00
Usage:
2025-02-13 04:55:42 +00:00
%%upload [--mp3] <url>
"""
url = args.get('<url>')
mp3 = args.get('--mp3')
2025-02-13 05:31:42 +00:00
2025-02-13 04:55:42 +00:00
if not url:
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style("Usage: !upload [--mp3] <url>", fg="red", bold=True, reset=True),
2025-02-13 04:55:42 +00:00
)
return
try:
await self.do_upload(url, target, mp3)
except Exception as exc:
2025-02-13 05:33:15 +00:00
# Convert exception to a safe Unicode string.
exc_msg = self._ensure_str(exc)
2025-02-13 04:55:42 +00:00
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style(f"Upload task error: {exc_msg}", fg="red", bold=True, reset=True),
2025-02-13 04:55:42 +00:00
)
async def do_upload(self, url, target, mp3):
2025-02-13 05:31:42 +00:00
"""
Download a file using yt-dlp and upload it to hardfiles.org.
2025-02-13 05:33:15 +00:00
Handles binary data and non-UTF-8 strings to avoid decoding errors.
2025-02-13 06:35:15 +00:00
Args:
url (str): The URL of the file to download.
target (str): The channel or user to send messages to.
mp3 (bool): Whether to convert the downloaded file to MP3.
2025-02-13 05:31:42 +00:00
"""
2025-02-13 04:55:42 +00:00
max_size = 100 * 1024 * 1024 # 100MB limit
with tempfile.TemporaryDirectory() as tmp_dir:
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
2025-02-13 06:35:15 +00:00
skip_check_domains = (
"x.com",
"instagram.com",
"youtube.com",
"youtu.be",
"streamable.com",
2025-02-13 11:37:22 +00:00
"reddit.com",
"twitter.com",
"tiktok.com",
"facebook.com",
"dailymotion.com",
2025-02-13 06:35:15 +00:00
)
2025-02-13 11:45:02 +00:00
2025-02-13 04:55:42 +00:00
should_check_headers = not any(domain.endswith(d) for d in skip_check_domains)
if should_check_headers:
2025-02-13 05:31:42 +00:00
try:
2025-02-13 11:33:31 +00:00
async with aiohttp.ClientSession(headers=HEADERS) as session:
2025-02-13 05:31:42 +00:00
async with session.head(url) as response:
if response.status != 200:
self.bot.privmsg(
target,
ircstyle.style(
f"Failed to fetch headers: HTTP {response.status}",
2025-02-13 06:35:15 +00:00
fg="red",
bold=True,
reset=True,
),
2025-02-13 04:55:42 +00:00
)
2025-02-13 05:31:42 +00:00
return
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({int(content_length) // (1024 * 1024)}MB) exceeds 100MB limit",
2025-02-13 06:35:15 +00:00
fg="red",
bold=True,
reset=True,
),
2025-02-13 05:31:42 +00:00
)
return
except Exception as e:
2025-02-13 05:33:15 +00:00
err_msg = self._ensure_str(e)
2025-02-13 05:31:42 +00:00
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style(
f"Error during header check: {err_msg}",
fg="red",
bold=True,
reset=True,
),
2025-02-13 05:31:42 +00:00
)
return
2025-02-13 04:55:42 +00:00
ydl_opts = {
'outtmpl': os.path.join(tmp_dir, '%(title)s.%(ext)s'),
'format': 'bestaudio/best' if mp3 else 'best[ext=mp4]/best',
'restrictfilenames': True,
'noplaylist': True,
'quiet': True,
'concurrent_fragment_downloads': 5,
2025-02-13 06:35:15 +00:00
'postprocessors': [
{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}
2025-02-13 11:45:02 +00:00
]
if mp3
else [],
2025-02-13 04:55:42 +00:00
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=False)
except DownloadError as e:
2025-02-13 05:33:15 +00:00
err_msg = self._ensure_str(e)
2025-02-13 11:45:02 +00:00
self.bot.privmsg(
target,
ircstyle.style(
f"Info extraction failed: {err_msg}", fg="red", bold=True, reset=True
),
)
return
except UnicodeDecodeError:
self.bot.privmsg(
target,
ircstyle.style(
"Error: Received non-UTF-8 output during info extraction",
fg="red",
bold=True,
reset=True,
),
)
return
2025-02-13 04:55:42 +00:00
estimated_size = info.get('filesize') or info.get('filesize_approx')
if estimated_size and estimated_size > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({estimated_size // (1024 * 1024)}MB) exceeds 100MB limit",
2025-02-13 06:35:15 +00:00
fg="red",
bold=True,
reset=True,
),
2025-02-13 04:55:42 +00:00
)
return
try:
info = await asyncio.to_thread(ydl.extract_info, url, download=True)
except DownloadError as e:
2025-02-13 05:33:15 +00:00
err_msg = self._ensure_str(e)
2025-02-13 04:55:42 +00:00
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style(
2025-02-13 11:45:02 +00:00
f"Download failed: {err_msg}", fg="red", bold=True, reset=True
2025-02-13 06:35:15 +00:00
),
2025-02-13 04:55:42 +00:00
)
return
2025-02-13 05:31:42 +00:00
except UnicodeDecodeError:
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style(
"Error: Received non-UTF-8 output during download",
fg="red",
bold=True,
reset=True,
),
2025-02-13 05:31:42 +00:00
)
return
2025-02-13 04:55:42 +00:00
2025-02-13 11:45:02 +00:00
# Safely convert metadata to strings.
2025-02-13 04:55:42 +00:00
metadata_parts = []
2025-02-13 05:33:15 +00:00
title = self._ensure_str(info.get("title"))
uploader = self._ensure_str(info.get("uploader"))
2025-02-13 04:55:42 +00:00
duration = info.get("duration")
2025-02-13 05:33:15 +00:00
upload_date = self._ensure_str(info.get("upload_date"))
2025-02-13 04:55:42 +00:00
view_count = info.get("view_count")
2025-02-13 05:33:15 +00:00
description = self._ensure_str(info.get("description"))
2025-02-13 04:55:42 +00:00
if title:
2025-02-13 06:35:15 +00:00
metadata_parts.append(
ircstyle.style(f"Title: {title}", fg="yellow", bold=True, reset=True)
)
2025-02-13 04:55:42 +00:00
if uploader:
2025-02-13 06:35:15 +00:00
metadata_parts.append(
ircstyle.style(f"Uploader: {uploader}", fg="purple", bold=True, reset=True)
)
2025-02-13 04:55:42 +00:00
if duration:
2025-02-13 06:35:15 +00:00
metadata_parts.append(
ircstyle.style(
f"Duration: {self._format_duration(duration)}",
fg="green",
bold=True,
reset=True,
)
)
2025-02-13 04:55:42 +00:00
if upload_date:
2025-02-13 06:35:15 +00:00
formatted_date = (
f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:]}"
if len(upload_date) == 8
else upload_date
)
metadata_parts.append(
ircstyle.style(
f"Upload Date: {formatted_date}", fg="aqua", bold=True, reset=True
)
)
2025-02-13 04:55:42 +00:00
if view_count is not None:
2025-02-13 06:35:15 +00:00
metadata_parts.append(
ircstyle.style(f"Views: {view_count}", fg="royal", bold=True, reset=True)
)
2025-02-13 04:55:42 +00:00
if description:
if len(description) > 200:
description = description[:200] + "..."
2025-02-13 06:35:15 +00:00
metadata_parts.append(
ircstyle.style(f"Description: {description}", fg="silver", reset=True)
)
2025-02-13 04:55:42 +00:00
if metadata_parts:
self.bot.privmsg(target, " | ".join(metadata_parts))
downloaded_files = info.get('requested_downloads', [])
if not downloaded_files:
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style("No files downloaded", fg="red", bold=True, reset=True),
2025-02-13 04:55:42 +00:00
)
return
first_file = downloaded_files[0]
downloaded_file = first_file.get('filepath', first_file.get('filename'))
if not downloaded_file or not os.path.exists(downloaded_file):
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style(
f"Downloaded file not found: {downloaded_file}",
fg="red",
bold=True,
reset=True,
),
2025-02-13 04:55:42 +00:00
)
return
file_size = os.path.getsize(downloaded_file)
if file_size > max_size:
self.bot.privmsg(
target,
ircstyle.style(
f"File size ({file_size // (1024 * 1024)}MB) exceeds 100MB limit",
2025-02-13 06:35:15 +00:00
fg="red",
bold=True,
reset=True,
),
2025-02-13 04:55:42 +00:00
)
return
2025-02-13 05:31:42 +00:00
try:
2025-02-13 11:33:31 +00:00
async with aiohttp.ClientSession(headers=HEADERS) as session:
2025-02-13 05:31:42 +00:00
form = aiohttp.FormData()
async with aiofiles.open(downloaded_file, 'rb') as f:
file_content = await f.read()
form.add_field(
'file',
file_content,
filename=os.path.basename(downloaded_file),
2025-02-13 06:35:15 +00:00
content_type='application/octet-stream',
2025-02-13 05:31:42 +00:00
)
2025-02-13 06:35:15 +00:00
async with session.post(
'https://hardfiles.org/', data=form, allow_redirects=False
) as resp:
2025-02-13 05:31:42 +00:00
if resp.status not in [200, 201, 302, 303]:
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style(
f"Upload failed: HTTP {resp.status}",
fg="red",
bold=True,
reset=True,
),
2025-02-13 05:31:42 +00:00
)
return
raw_response = await resp.read()
2025-02-13 11:45:02 +00:00
# Decode the response safely even if non-UTF-8 bytes are present.
2025-02-13 05:31:42 +00:00
response_text = raw_response.decode('utf-8', errors='replace')
2025-02-13 04:55:42 +00:00
upload_url = self.extract_url_from_response(response_text) or "Unknown URL"
2025-02-13 05:33:15 +00:00
upload_url = self._ensure_str(upload_url)
2025-02-13 04:55:42 +00:00
response_msg = (
2025-02-13 06:35:15 +00:00
ircstyle.style("Upload successful: ", fg="green", bold=True, reset=True)
+ ircstyle.style(upload_url, fg="blue", underline=True, reset=True)
2025-02-13 04:55:42 +00:00
)
self.bot.privmsg(target, response_msg)
2025-02-13 05:31:42 +00:00
except Exception as e:
2025-02-13 05:33:15 +00:00
err_msg = self._ensure_str(e)
2025-02-13 05:31:42 +00:00
self.bot.privmsg(
target,
2025-02-13 06:35:15 +00:00
ircstyle.style(
f"Error during file upload: {err_msg}", fg="red", bold=True, reset=True
),
2025-02-13 05:31:42 +00:00
)
return
2025-02-13 04:55:42 +00:00
def extract_url_from_response(self, response_text):
2025-02-13 05:31:42 +00:00
"""
Extract the first URL found in the response text.
2025-02-13 06:35:15 +00:00
Args:
response_text (str): The response text to search for URLs.
Returns:
str: The first URL found in the response text, or None if no URL is found.
2025-02-13 05:31:42 +00:00
"""
2025-02-13 04:55:42 +00:00
match = re.search(r'https?://\S+', response_text)
return match.group(0) if match else None
def _format_duration(self, seconds):
2025-02-13 05:31:42 +00:00
"""
Convert seconds into a human-readable duration string.
2025-02-13 06:35:15 +00:00
Args:
seconds (int): The duration in seconds.
Returns:
str: The formatted duration string.
2025-02-13 05:31:42 +00:00
"""
2025-02-13 04:55:42 +00:00
seconds = int(seconds)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
2025-02-13 11:33:31 +00:00
return f"{h}h {m}m {s}s" if h else f"{m}m {s}s"