188 lines
6.9 KiB
Python
188 lines
6.9 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
IRC Bot Plugin for Tracking User Activity with TinyDB.
|
|
|
|
This plugin tracks user join times, messages, and nickname changes. Provides
|
|
a !seen command to check last activity. Stores data in "seen.json".
|
|
|
|
Dependencies:
|
|
- irc3
|
|
- tinydb
|
|
- ircstyle
|
|
- logging
|
|
- re
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from tinydb import TinyDB, Query
|
|
import irc3
|
|
from irc3.plugins.command import command
|
|
import ircstyle
|
|
import logging
|
|
import re
|
|
|
|
|
|
def truncate(text, length=50):
|
|
"""
|
|
Truncate text to specified length, appending "..." if truncated.
|
|
|
|
Args:
|
|
text (str): Input text
|
|
length (int): Maximum length
|
|
|
|
Returns:
|
|
str: Truncated text
|
|
"""
|
|
if not isinstance(text, str):
|
|
text = str(text)
|
|
return text if len(text) <= length else text[:length] + "..."
|
|
|
|
|
|
@irc3.plugin
|
|
class UserActivityTracker:
|
|
"""IRC bot plugin to track and report user activity using TinyDB."""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.db = TinyDB('seen.json')
|
|
self.users = self.db.table('users')
|
|
self.User = Query()
|
|
self.log = logging.getLogger(__name__)
|
|
|
|
@irc3.event(r'(?P<mask>\S+)!.* NICK :?(?P<new_nick>\S+)')
|
|
def handle_nick_change(self, mask, new_nick, **kwargs):
|
|
"""Track nickname changes and update the old nickname with the new one."""
|
|
self.log.debug(f"Received old_nick: {mask.nick}, new_nick: {new_nick}, mask: {mask}, kwargs: {kwargs}")
|
|
try:
|
|
now = datetime.now().isoformat()
|
|
host = mask.split('!')[1] if '!' in mask else 'unknown'
|
|
|
|
# Update the record for the old nickname
|
|
old_user = self.users.get(self.User.nick == mask.nick)
|
|
if old_user:
|
|
# Mark the nickname change in the old user's record
|
|
old_user_data = {
|
|
'last_nick_change': {'old': mask.nick, 'new': new_nick, 'time': now},
|
|
'nick': new_nick
|
|
}
|
|
self.users.update(old_user_data, self.User.nick == mask.nick)
|
|
else:
|
|
# Insert a record for the old nickname if it doesn't exist
|
|
self.users.insert({
|
|
'nick': mask.nick,
|
|
'host': host,
|
|
'last_nick_change': {'old': mask.nick, 'new': new_nick, 'time': now},
|
|
'last_join': None,
|
|
'last_message': None
|
|
})
|
|
|
|
# Update or insert the record for the new nickname
|
|
new_user = self.users.get(self.User.nick == new_nick)
|
|
if new_user:
|
|
self.users.update({'host': host}, self.User.nick == new_nick)
|
|
else:
|
|
self.users.insert({
|
|
'nick': new_nick,
|
|
'host': host,
|
|
'last_nick_change': None,
|
|
'last_join': None,
|
|
'last_message': None
|
|
})
|
|
except Exception as e:
|
|
self.log.error(f"Error tracking nick change {mask.nick}→{new_nick}: {e}")
|
|
|
|
@irc3.event(irc3.rfc.JOIN)
|
|
def handle_join(self, mask, channel, **kwargs):
|
|
"""Track when users join a channel."""
|
|
try:
|
|
nick = mask.nick
|
|
host = mask.host
|
|
now = datetime.now().isoformat()
|
|
user = self.users.get(self.User.nick == nick)
|
|
update_data = {'last_join': now, 'host': host}
|
|
|
|
if user:
|
|
self.users.update(update_data, self.User.nick == nick)
|
|
else:
|
|
self.users.insert({'nick': nick, 'host': host, 'last_join': now,
|
|
'last_message': None, 'last_nick_change': None})
|
|
except Exception as e:
|
|
self.log.error(f"Error tracking join for {nick}: {e}")
|
|
|
|
@irc3.event(irc3.rfc.PRIVMSG)
|
|
def handle_message(self, target, data, mask, **kwargs):
|
|
"""Track user messages."""
|
|
try:
|
|
now = datetime.now().isoformat()
|
|
nick = mask.nick # Get the nickname from the mask
|
|
host = mask.host # Get the host from the mask
|
|
message = {'text': data, 'time': now}
|
|
self.users.upsert(
|
|
{'nick': nick, 'host': host, 'last_message': message},
|
|
self.User.nick == nick
|
|
)
|
|
except Exception as e:
|
|
self.log.error(f"Error tracking message from {nick}: {e}")
|
|
|
|
@command(permission=None)
|
|
async def seen(self, mask, target, args):
|
|
"""Retrieve a user's last activity.
|
|
|
|
%%seen <nick>
|
|
"""
|
|
try:
|
|
requested_nick = args['<nick>']
|
|
user = self.users.get(self.User.nick == requested_nick)
|
|
|
|
if not user:
|
|
msg = ircstyle.style(f"🚫 {requested_nick} has never been observed.", fg="red")
|
|
self.bot.privmsg(target, msg)
|
|
return
|
|
|
|
response = []
|
|
header = ircstyle.style(f"📊 Activity report for {requested_nick}:", fg="blue", bold=True)
|
|
response.append(header)
|
|
|
|
# Last join
|
|
if user.get('last_join'):
|
|
join_time = datetime.fromisoformat(user['last_join']).strftime('%Y-%m-%d %H:%M:%S')
|
|
response.append(
|
|
ircstyle.style("🕒 Last join: ", fg="cyan") +
|
|
ircstyle.style(join_time, fg="white")
|
|
)
|
|
|
|
# Last message
|
|
if user.get('last_message'):
|
|
msg_time = datetime.fromisoformat(user['last_message']['time']).strftime('%Y-%m-%d %H:%M:%S')
|
|
msg_text = truncate(user['last_message']['text'])
|
|
response.append(
|
|
ircstyle.style("💬 Last message: ", fg="green") +
|
|
ircstyle.style(f"[{msg_time}] ", fg="grey") +
|
|
ircstyle.style(msg_text, fg="white", italics=True)
|
|
)
|
|
|
|
# Last nick change
|
|
if user.get('last_nick_change'):
|
|
old_nick = user['last_nick_change']['old']
|
|
new_nick = user['last_nick_change']['new']
|
|
change_time = datetime.fromisoformat(user['last_nick_change']['time']).strftime('%Y-%m-%d %H:%M:%S')
|
|
response.append(
|
|
ircstyle.style(f"📛 Nickname change: ", fg="purple") +
|
|
ircstyle.style(f"Changed from {old_nick} to {new_nick} at {change_time}", fg="white")
|
|
)
|
|
|
|
# Handle no tracked activity
|
|
if len(response) == 1: # Only header present
|
|
response.append(ircstyle.style("No tracked activities.", fg="yellow"))
|
|
|
|
for line in response:
|
|
self.bot.privmsg(target, line)
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Error in !seen command: {e}")
|
|
self.bot.privmsg(
|
|
target,
|
|
ircstyle.style("❌ Internal error processing request.", fg="red", bold=True)
|
|
)
|