From 36a56ed8309289c925727d00b8aae8f81ee7b5c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nils=20B=C3=BCchner?= Date: Sun, 7 Apr 2024 01:19:56 +0200 Subject: [PATCH] add {moderators} placeholder for facts --- ubottu/bot.py | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/ubottu/bot.py b/ubottu/bot.py index cc5cc5a..993d77e 100644 --- a/ubottu/bot.py +++ b/ubottu/bot.py @@ -2,10 +2,22 @@ import json import os import re import requests +import traceback +from time import time from typing import Type, Tuple from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from maubot import Plugin, MessageEvent from maubot.handlers import command +from mautrix.types import ( + EventType, + MemberStateEventContent, + PowerLevelStateEventContent, + RoomID, + RoomAlias, + StateEvent, + UserID, +) +from mautrix.errors import MForbidden from aiohttp.web import Request, Response, json_response from pathlib import Path from urllib.parse import urlparse, unquote @@ -13,12 +25,60 @@ from .floodprotection import FloodProtection from .packages import Apt from launchpadlib.launchpad import Launchpad +ubottu_change_level = EventType.find("com.ubuntu.ubottu", t_class=EventType.Class.STATE) + class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("whitelist") helper.copy("rooms") class Ubottu(Plugin): + power_level_cache: dict[RoomID, tuple[int, PowerLevelStateEventContent]] + + async def get_power_levels(self, room_id: RoomID) -> PowerLevelStateEventContent: + try: + expiry, levels = self.power_level_cache[room_id] + if expiry < int(time()): + return levels + except KeyError: + self.log.info(f"Cache miss for {room_id}") + pass + levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS) + #self.log.info(f"Levels: {levels}") + if levels: + now = int(time()) + self.power_level_cache[room_id] = (now + 5 * 60, levels) + return levels + return False + + async def can_manage(self, evt: MessageEvent) -> bool: + if evt.sender in self.config["whitelist"]: + return True + levels = await self.get_power_levels(evt.room_id) + user_level = levels.get_user_level(evt.sender) + state_level = levels.get_event_level(ubottu_change_level) + if not isinstance(state_level, int): + state_level = 50 + if user_level < state_level: + return False + return True + + async def get_room_mods_and_admins(self, evt: MessageEvent) -> list: + high_level_user_ids = [] # Initialize an empty list to store user IDs + try: + # Fetch the state of the room, focusing on power levels + levels = await self.get_power_levels(evt.room_id) + for user_id, level in levels.users.items(): + if level > 50: + high_level_user_ids.append(user_id) + else: + self.log.info("No power levels found in {evt.room_id}") + except Exception as e: + print(f"Failed to access room state: {e}") + self.log.info(f"Failed to access room state: {e}") + self.log.info(f"Failed to access room state: {traceback.print_exc()}") + # Optionally, handle the error more gracefully here (e.g., by logging or by returning an error message) + return high_level_user_ids def sanitize_string(self, input_string): # Pattern includes single quotes, double quotes, semicolons, and common SQL comment markers @@ -33,6 +93,7 @@ class Ubottu(Plugin): async def start(self) -> None: self.config.load_and_update() self.flood_protection = FloodProtection() + self.power_level_cache = {} def check_access(self, sender, room_id): if sender in self.config["whitelist"] and room_id in self.config["rooms"]: @@ -88,6 +149,12 @@ class Ubottu(Plugin): if resp and resp.status == 200: data = await resp.json() value = data['value'] + if "{moderators}" in value: + moderators = await self.get_room_mods_and_admins(evt) + if isinstance(moderators, list) and len(moderators) > 0: + value = value.replace("{moderators}", "https://matrix.to/#/" + "https://matrix.to/#/".join(moderators)) + else: + return False if to_user: await evt.respond(to_user + ': ' + value) else: