updates to seen

This commit is contained in:
Zodiac 2025-02-18 23:18:04 -08:00
parent 0dc2758ff1
commit a64e9554c6

View File

@ -20,25 +20,13 @@ import irc3
from irc3.plugins.command import command from irc3.plugins.command import command
import ircstyle import ircstyle
import logging import logging
import re
def truncate(text, length=50): def truncate(text, length=50):
""" """Truncate text to specified length, appending "..." if truncated."""
Truncate text to specified length, appending "..." if truncated.
Args:
text (str): Input text
length (int): Maximum length
Returns:
str: Truncated text
"""
if not isinstance(text, str): if not isinstance(text, str):
text = str(text) text = str(text)
return text if len(text) <= length else text[:length] + "..." return text if len(text) <= length else text[:length] + "..."
@irc3.plugin @irc3.plugin
class UserActivityTracker: class UserActivityTracker:
"""IRC bot plugin to track and report user activity using TinyDB.""" """IRC bot plugin to track and report user activity using TinyDB."""
@ -50,45 +38,40 @@ class UserActivityTracker:
self.User = Query() self.User = Query()
self.log = logging.getLogger(__name__) self.log = logging.getLogger(__name__)
def _add_unique_hostmask(self, nick, hostmask):
"""Add hostmask to user data if it's not already present and not 'unknown'."""
if hostmask == 'unknown':
return []
user = self.users.get(self.User.nick == nick.lower())
if user and 'hostmasks' in user:
if hostmask not in user['hostmasks']:
user['hostmasks'].append(hostmask)
return user['hostmasks']
else:
return [hostmask]
@irc3.event(r'(?P<mask>\S+)!.* NICK :?(?P<new_nick>\S+)') @irc3.event(r'(?P<mask>\S+)!.* NICK :?(?P<new_nick>\S+)')
def handle_nick_change(self, mask, new_nick, **kwargs): def handle_nick_change(self, mask, new_nick, **kwargs):
"""Track nickname changes and update the old nickname with the new one.""" """Track nickname changes."""
self.log.debug(f"Received old_nick: {mask.nick}, new_nick: {new_nick}, mask: {mask}, kwargs: {kwargs}")
try: try:
now = datetime.now().isoformat() now = datetime.now().isoformat()
host = mask.split('!')[1] if '!' in mask else 'unknown' host = mask.split('!')[1] if '!' in mask else 'unknown'
# Update the record for the old nickname self.users.upsert({
old_user = self.users.get(self.User.nick == mask.nick) 'nick': mask.nick.lower(),
if old_user: 'hostmasks': self._add_unique_hostmask(mask.nick.lower(), host),
# Mark the nickname change in the old user's record 'last_nick_change': {'old': mask.nick, 'new': new_nick, 'time': now}
old_user_data = { }, self.User.nick == mask.nick.lower())
'last_nick_change': {'old': mask.nick, 'new': new_nick, 'time': now},
'nick': new_nick
}
self.users.update(old_user_data, self.User.nick == mask.nick)
else:
# Insert a record for the old nickname if it doesn't exist
self.users.insert({
'nick': mask.nick,
'host': host,
'last_nick_change': {'old': mask.nick, 'new': new_nick, 'time': now},
'last_join': None,
'last_message': None
})
# Update or insert the record for the new nickname self.users.upsert({
new_user = self.users.get(self.User.nick == new_nick) 'nick': new_nick.lower(),
if new_user: 'hostmasks': self._add_unique_hostmask(new_nick.lower(), host),
self.users.update({'host': host}, self.User.nick == new_nick)
else:
self.users.insert({
'nick': new_nick,
'host': host,
'last_nick_change': None,
'last_join': None, 'last_join': None,
'last_message': None 'last_message': None,
}) 'last_nick_change': {'old': mask.nick, 'new': new_nick, 'time': now}
}, self.User.nick == new_nick.lower())
except Exception as e: except Exception as e:
self.log.error(f"Error tracking nick change {mask.nick}{new_nick}: {e}") self.log.error(f"Error tracking nick change {mask.nick}{new_nick}: {e}")
@ -96,43 +79,38 @@ class UserActivityTracker:
def handle_join(self, mask, channel, **kwargs): def handle_join(self, mask, channel, **kwargs):
"""Track when users join a channel.""" """Track when users join a channel."""
try: try:
nick = mask.nick
host = mask.host
now = datetime.now().isoformat() now = datetime.now().isoformat()
user = self.users.get(self.User.nick == nick) self.users.upsert({
update_data = {'last_join': now, 'host': host} 'nick': mask.nick.lower(),
'hostmasks': self._add_unique_hostmask(mask.nick.lower(), mask.host),
if user: 'last_join': now,
self.users.update(update_data, self.User.nick == nick) 'last_message': None,
else: 'last_nick_change': None
self.users.insert({'nick': nick, 'host': host, 'last_join': now, }, self.User.nick == mask.nick.lower())
'last_message': None, 'last_nick_change': None})
except Exception as e: except Exception as e:
self.log.error(f"Error tracking join for {nick}: {e}") self.log.error(f"Error tracking join for {mask.nick}: {e}")
@irc3.event(irc3.rfc.PRIVMSG) @irc3.event(irc3.rfc.PRIVMSG)
def handle_message(self, target, data, mask, **kwargs): def handle_message(self, target, data, mask, **kwargs):
"""Track user messages.""" """Track user messages."""
try: try:
now = datetime.now().isoformat() now = datetime.now().isoformat()
nick = mask.nick # Get the nickname from the mask self.users.upsert({
host = mask.host # Get the host from the mask 'nick': mask.nick.lower(),
message = {'text': data, 'time': now} 'hostmasks': self._add_unique_hostmask(mask.nick.lower(), mask.host),
self.users.upsert( 'last_message': {'text': data, 'time': now}
{'nick': nick, 'host': host, 'last_message': message}, }, self.User.nick == mask.nick.lower())
self.User.nick == nick
)
except Exception as e: except Exception as e:
self.log.error(f"Error tracking message from {nick}: {e}") self.log.error(f"Error tracking message from {mask.nick}: {e}")
@command(permission=None) @command
async def seen(self, mask, target, args): async def seen(self, mask, target, args):
"""Retrieve a user's last activity. """Retrieve a user's last activity.
%%seen <nick> %%seen <nick>
""" """
try: try:
requested_nick = args['<nick>'] requested_nick = args['<nick>'].lower()
user = self.users.get(self.User.nick == requested_nick) user = self.users.get(self.User.nick == requested_nick)
if not user: if not user:
@ -144,16 +122,25 @@ class UserActivityTracker:
header = ircstyle.style(f"📊 Activity report for {requested_nick}:", fg="blue", bold=True) header = ircstyle.style(f"📊 Activity report for {requested_nick}:", fg="blue", bold=True)
response.append(header) response.append(header)
# Last join # Hostmasks
if user.get('last_join'): if 'hostmasks' in user and user['hostmasks']:
join_time = datetime.fromisoformat(user['last_join']).strftime('%Y-%m-%d %H:%M:%S') hostmasks_str = ", ".join(user['hostmasks'])
response.append( response.append(
ircstyle.style("🕒 Last join: ", fg="cyan") + ircstyle.style("🌐 Hostmasks: ", fg="cyan") +
ircstyle.style(join_time, fg="white") ircstyle.style(hostmasks_str, fg="white")
) )
# Last join - only if time exists
if 'last_join' in user and user['last_join']:
try:
join_time = datetime.fromisoformat(user['last_join']).strftime('%Y-%m-%d %H:%M:%S')
response.append(ircstyle.style("🕒 Last join: ", fg="cyan") + ircstyle.style(join_time, fg="white"))
except ValueError:
response.append(ircstyle.style("🕒 Last join: ", fg="cyan") + ircstyle.style("Invalid Time Format", fg="white"))
# Last message # Last message
if user.get('last_message'): if 'last_message' in user and user['last_message']:
try:
msg_time = datetime.fromisoformat(user['last_message']['time']).strftime('%Y-%m-%d %H:%M:%S') msg_time = datetime.fromisoformat(user['last_message']['time']).strftime('%Y-%m-%d %H:%M:%S')
msg_text = truncate(user['last_message']['text']) msg_text = truncate(user['last_message']['text'])
response.append( response.append(
@ -161,18 +148,22 @@ class UserActivityTracker:
ircstyle.style(f"[{msg_time}] ", fg="grey") + ircstyle.style(f"[{msg_time}] ", fg="grey") +
ircstyle.style(msg_text, fg="white", italics=True) ircstyle.style(msg_text, fg="white", italics=True)
) )
except (ValueError, KeyError):
response.append(ircstyle.style("💬 Last message: ", fg="green") + ircstyle.style("Invalid or Missing Data", fg="white"))
# Last nick change # Last nick change
if user.get('last_nick_change'): if 'last_nick_change' in user and user['last_nick_change']:
try:
change_time = datetime.fromisoformat(user['last_nick_change']['time']).strftime('%Y-%m-%d %H:%M:%S')
old_nick = user['last_nick_change']['old'] old_nick = user['last_nick_change']['old']
new_nick = user['last_nick_change']['new'] new_nick = user['last_nick_change']['new']
change_time = datetime.fromisoformat(user['last_nick_change']['time']).strftime('%Y-%m-%d %H:%M:%S')
response.append( response.append(
ircstyle.style(f"📛 Nickname change: ", fg="purple") + ircstyle.style(f"📛 Nickname change: ", fg="purple") +
ircstyle.style(f"Changed from {old_nick} to {new_nick} at {change_time}", fg="white") ircstyle.style(f"Changed from {old_nick} to {new_nick} at {change_time}", fg="white")
) )
except (ValueError, KeyError):
response.append(ircstyle.style(f"📛 Nickname change: ", fg="purple") + ircstyle.style("Invalid or Missing Data", fg="white"))
# Handle no tracked activity
if len(response) == 1: # Only header present if len(response) == 1: # Only header present
response.append(ircstyle.style("No tracked activities.", fg="yellow")) response.append(ircstyle.style("No tracked activities.", fg="yellow"))
@ -181,7 +172,4 @@ class UserActivityTracker:
except Exception as e: except Exception as e:
self.log.error(f"Error in !seen command: {e}") self.log.error(f"Error in !seen command: {e}")
self.bot.privmsg( self.bot.privmsg(target, ircstyle.style("❌ Internal error processing request.", fg="red", bold=True))
target,
ircstyle.style("❌ Internal error processing request.", fg="red", bold=True)
)