""" GoatPlugin.py A plugin for the irc3 IRC bot framework that reads the contents of 'goat.txt' and sends each line to a specified channel at regular intervals. If a task is already running for the target channel, it notifies the user and prevents starting a new task. Usage: %%goat [] %%goatstop Commands: %%goat [] Starts sending the contents of 'goat.txt' line by line to the target channel. Optionally, specify a nickname to prepend to each message. %%goatstop Stops the ongoing goat task for the target channel. Author: [Your Name] Date Created: [Creation Date] Last Modified: [Last Modification Date] License: [License Information] """ import asyncio import irc3 from irc3.plugins.command import command @irc3.plugin class GoatPlugin: """ A plugin to send the contents of goat.txt line by line to a channel. This plugin reads the contents of 'goat.txt' and sends each line to the target channel at a regular interval. If a task is already running for the target channel, it will notify the user and prevent starting a new task. Attributes: bot (irc3.IrcBot): The IRC bot instance. goat_tasks (dict): A dictionary to keep track of running tasks for each target channel. """ def __init__(self, bot): """ Initialize the plugin with the bot reference. Args: bot (irc3.IrcBot): The IRC bot instance. """ self.bot = bot self.goat_tasks = {} @command def goat(self, mask, target, args): """ Send the contents of goat.txt line by line to the channel and resend when reaching the end. Args: mask (str): The user mask. target (str): The target channel or user. args (dict): Command arguments. Usage: %%goat [] """ nick = args.get("") if target in self.goat_tasks: self.bot.privmsg(target, "A goat task is already running.") return try: with open('goat.txt', 'r', encoding='utf-8') as file: lines = file.readlines() except Exception as e: self.bot.privmsg(target, f"Error reading goat.txt: {e}") return task = self.bot.loop.create_task(self.send_lines(target, nick, lines)) self.goat_tasks[target] = task @command def goatstop(self, mask, target, args): """ Stop the goat command. Args: mask (str): The user mask. target (str): The target channel or user. args (dict): Command arguments. Usage: %%goatstop """ if target in self.goat_tasks: task = self.goat_tasks.pop(target) task.cancel() self.bot.privmsg(target, "Goat task stopped.") else: self.bot.privmsg(target, "No goat task is currently running.") async def send_lines(self, target, nick, lines): """ Send lines of text to a target channel or user periodically. Args: target (str): The target channel or user. nick (str): Optional nickname to prepend to each message. lines (list): List of lines to send. """ message_count = 0 try: while True: for line in lines: stripped_line = line.strip() msg = f"{nick} : {stripped_line}" if nick else stripped_line self.bot.privmsg(target, msg) message_count += 1 await asyncio.sleep(0.007) except asyncio.CancelledError: self.bot.privmsg(target, "Goat task cancelled.") raise