add {moderators} placeholder for facts

This commit is contained in:
Nils Büchner 2024-04-07 01:19:56 +02:00
parent 65388c5d09
commit 36a56ed830

View file

@ -2,10 +2,22 @@ import json
import os
import re
import requests
import traceback
from time import time
from typing import Type, Tuple
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from maubot import Plugin, MessageEvent
from maubot.handlers import command
from mautrix.types import (
EventType,
MemberStateEventContent,
PowerLevelStateEventContent,
RoomID,
RoomAlias,
StateEvent,
UserID,
)
from mautrix.errors import MForbidden
from aiohttp.web import Request, Response, json_response
from pathlib import Path
from urllib.parse import urlparse, unquote
@ -13,12 +25,60 @@ from .floodprotection import FloodProtection
from .packages import Apt
from launchpadlib.launchpad import Launchpad
ubottu_change_level = EventType.find("com.ubuntu.ubottu", t_class=EventType.Class.STATE)
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("whitelist")
helper.copy("rooms")
class Ubottu(Plugin):
power_level_cache: dict[RoomID, tuple[int, PowerLevelStateEventContent]]
async def get_power_levels(self, room_id: RoomID) -> PowerLevelStateEventContent:
try:
expiry, levels = self.power_level_cache[room_id]
if expiry < int(time()):
return levels
except KeyError:
self.log.info(f"Cache miss for {room_id}")
pass
levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS)
#self.log.info(f"Levels: {levels}")
if levels:
now = int(time())
self.power_level_cache[room_id] = (now + 5 * 60, levels)
return levels
return False
async def can_manage(self, evt: MessageEvent) -> bool:
if evt.sender in self.config["whitelist"]:
return True
levels = await self.get_power_levels(evt.room_id)
user_level = levels.get_user_level(evt.sender)
state_level = levels.get_event_level(ubottu_change_level)
if not isinstance(state_level, int):
state_level = 50
if user_level < state_level:
return False
return True
async def get_room_mods_and_admins(self, evt: MessageEvent) -> list:
high_level_user_ids = [] # Initialize an empty list to store user IDs
try:
# Fetch the state of the room, focusing on power levels
levels = await self.get_power_levels(evt.room_id)
for user_id, level in levels.users.items():
if level > 50:
high_level_user_ids.append(user_id)
else:
self.log.info("No power levels found in {evt.room_id}")
except Exception as e:
print(f"Failed to access room state: {e}")
self.log.info(f"Failed to access room state: {e}")
self.log.info(f"Failed to access room state: {traceback.print_exc()}")
# Optionally, handle the error more gracefully here (e.g., by logging or by returning an error message)
return high_level_user_ids
def sanitize_string(self, input_string):
# Pattern includes single quotes, double quotes, semicolons, and common SQL comment markers
@ -33,6 +93,7 @@ class Ubottu(Plugin):
async def start(self) -> None:
self.config.load_and_update()
self.flood_protection = FloodProtection()
self.power_level_cache = {}
def check_access(self, sender, room_id):
if sender in self.config["whitelist"] and room_id in self.config["rooms"]:
@ -88,6 +149,12 @@ class Ubottu(Plugin):
if resp and resp.status == 200:
data = await resp.json()
value = data['value']
if "{moderators}" in value:
moderators = await self.get_room_mods_and_admins(evt)
if isinstance(moderators, list) and len(moderators) > 0:
value = value.replace("{moderators}", "https://matrix.to/#/" + "https://matrix.to/#/".join(moderators))
else:
return False
if to_user:
await evt.respond(to_user + ': ' + value)
else: