This commit is contained in:
Nils Büchner 2024-07-18 21:20:46 +02:00
parent c8229dd35f
commit b3b9331745
9 changed files with 22 additions and 752 deletions

View file

@ -2,20 +2,20 @@ update_interval: 5
whitelist:
- '@ravage:xentonix.net'
rooms:
'#release:ubuntu.com':
queue: [New, Unapproved]
tracker: Builds
packageset: Packageset
mute: []
'#lubuntu-devel:ubuntu.com':
tracker: Builds
tracker_filter: lubuntu
queue: [New, Unapproved]
queue_filter: lubuntu
packageset: Packageset
packageset_filter: lubuntu
mute: []
'#queuebot:xentonix.net':
queue: [Unapproved]
packageset: Packageset
mute: []
'!XBHBxuegpdVZEJBOIh:xentonix.net':
#maubot test room
member_level: 50
nomember_level: 0
launchpad_groups: [matrix-council]
remove_permissions: yes
enabled: yes
use_socials: yes
'!SqVwRMDWgHlUMdMGzk:ubuntu.com':
#read-only-test:ubuntu.com
member_level: 1
nomember_level: 0
launchpad_groups: [read-only-trusted, matrix-council]
remove_permissions: no
use_socials: no
enabled: yes

View file

@ -1,8 +1,8 @@
id: com.ubuntu.qbot
version: 1.0.1
id: com.ubuntu.roomadmin
version: 0.0.4
modules:
- queuebot
main_class: Queuebot
- roomadmin
main_class: Roomadmin
maubot: 0.1.0
database: false
config: true
@ -11,4 +11,4 @@ license: MIT
extra_files:
- base-config.yaml
dependencies: []
soft_dependencies: []
soft_dependencies: []

View file

@ -1 +0,0 @@
from .bot import Queuebot

View file

@ -1,239 +0,0 @@
from __future__ import annotations
import json, os, re, requests, asyncio, pytz, importlib, traceback, logging
from aiohttp.web import Request, Response, json_response
from datetime import datetime, timedelta
from launchpadlib.launchpad import Launchpad
from maubot import Plugin, MessageEvent
from maubot.handlers import command
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from mautrix.util import background_task
from mautrix.types import (
EventType,
MemberStateEventContent,
PowerLevelStateEventContent,
RoomID,
RoomAlias,
StateEvent,
UserID,
)
from pathlib import Path
from time import time
from typing import Type, Tuple
from urllib.parse import urlparse, unquote
from .plugs import queue, tracker, packageset
from .floodprotection import FloodProtection
qbot_change_level = EventType.find("com.ubuntu.qbot", t_class=EventType.Class.STATE)
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("whitelist")
helper.copy("rooms")
class Queuebot(Plugin):
reminder_loop_task: asyncio.Future
VERBOSE=False
plugin_queue_new = queue.Queue("New", VERBOSE)
plugin_queue_unapproved = queue.Queue("Unapproved", VERBOSE)
plugin_packageset = packageset.Packageset("Packageset", VERBOSE)
plugin_tracker = tracker.Tracker("Builds", VERBOSE)
room_ids = []
room_mapping = {}
power_level_cache: dict[RoomID, tuple[int, PowerLevelStateEventContent]]
async def start(self) -> None:
self.config.load_and_update()
self.flood_protection = FloodProtection()
self.power_level_cache = {}
logger = logging.getLogger(self.id)
logger.setLevel(logging.DEBUG)
self.log = logger
if await self.resolve_room_aliases():
self.poll_task = asyncio.create_task(self.poll_plugins())
self.log.info("Queuebot started")
async def stop(self) -> None:
await super().stop()
self.poll_task.cancel()
async def get_power_levels(self, room_id: RoomID) -> PowerLevelStateEventContent:
try:
expiry, levels = self.power_level_cache[room_id]
if expiry < int(time()):
return levels
except KeyError:
pass
levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS)
now = int(time())
self.power_level_cache[room_id] = (now + 5 * 60, levels)
return levels
async def can_manage(self, evt: MessageEvent) -> bool:
if evt.sender in self.config["whitelist"]:
return True
levels = await self.get_power_levels(evt.room_id)
user_level = levels.get_user_level(evt.sender)
state_level = levels.get_event_level(qbot_change_level)
if not isinstance(state_level, int):
state_level = 50
if user_level < state_level:
return False
return True
@command.new(name="qbot", require_subcommand=False)
async def qbot(self, evt: MessageEvent) -> None:
if not await self.can_manage(evt) and self.flood_protection.flood_check(evt.sender):
await evt.respond("You don't have the permission to use this command.")
return False
await evt.respond("Invalid argument. Example: !qbot mute queue")
return False
@qbot.subcommand("mute", aliases=["unmute"])
@command.argument("plugin", "(un)mute a plugin", required=False)
async def mute(self, evt: MessageEvent, plugin: str) -> None:
if not await self.can_manage(evt) and self.flood_protection.flood_check(evt.sender):
await evt.respond("You don't have the permission to manage mutes.")
return False
if not plugin or plugin not in ["queue", "tracker", "packageset"]:
await evt.respond("Invalid plugin. Valid plugins are queue, tracker and packageset. Example: !qbot mute queue")
return False
room_alias = self.room_mapping[evt.room_id]
if plugin in self.config["rooms"][room_alias]['mute']:
self.config["rooms"][room_alias]['mute'].remove(plugin)
await evt.respond(f"Unmuted {plugin}")
else:
self.config["rooms"][room_alias]['mute'].append(plugin)
await evt.respond(f"Muted {plugin}")
self.config.save()
async def resolve_room_alias_to_id(self, room_alias: str):
try:
room_id = await self.client.resolve_room_alias(RoomAlias(room_alias))
return room_id
except Exception as e:
self.log.error(f"Error resolving room alias {room_alias}: {e}")
self.log.debug(traceback.format_exc())
return None
async def resolve_room_aliases(self):
self.room_ids = []
self.room_mapping = {}
for room_alias in self.config["rooms"]:
if room_alias.startswith("#"):
if room_id_obj := await self.resolve_room_alias_to_id(room_alias):
room_id = str(room_id_obj.room_id)
self.room_ids.append(room_id)
self.room_mapping[room_id] = room_alias
self.log.info("Added room " + room_alias + " with id " + room_id)
elif room_alias.startswith("!"):
self.room_ids.append(room_alias)
self.room_mapping[room_alias] = room_alias
self.log.info("Added room id " + room_alias)
else:
self.log.debug("Error addming room " + room_alias)
return True
def check_access_sender(self, sender):
if sender in self.config["whitelist"]:
return True
return False
def check_plugin_filter_mute(self, plugin_name, queue, notice, room_id, room_alias):
try:
# is the plugin enabled
if self.config['rooms'][room_alias].get(plugin_name) is None:
self.log.debug(f"plugin_name {plugin_name} is None for {room_alias}")
return False
queues = self.config['rooms'][room_alias].get(plugin_name)
if isinstance(queues, list):
if queue not in queues:
self.log.debug(f"queue {plugin_name}.{queue} not queues for {room_alias}")
return False
elif isinstance(queues, str):
if queue != queues:
self.log.debug(f"queue {plugin_name}.{queue} not in queue string for {room_alias}")
return False
else:
self.log.debug(f"queues is not str or list for {room_alias}")
return False
if self.config['rooms'][room_alias].get('mute') is None:
return True
mutes = self.config['rooms'][room_alias].get('mute')
mute_name = f"{plugin_name}.{queue}".lower()
if not isinstance(mutes, list) or len(mutes) < 1:
self.log.debug(f"mutes in {room_alias} is not a valid list or empty")
elif mute_name in mutes:
self.log.debug(f"{mute_name} is in mutes for {room_alias}")
return False
# is there a filter
filter_name = plugin_name + '_filter'.lower()
if self.config['rooms'][room_alias].get(filter_name) is not None:
if str(self.config['rooms'][room_alias][filter_name]).lower() not in notice.lower():
self.log.debug(f"not sending notice to {room_alias} as it matches filter " + str(self.config['rooms'][room_alias][filter_name]).lower() )
return False
except Exception as e:
self.log.debug("Error checking filter or mute: " + str(e))
self.log.debug(traceback.format_exc())
return False
return True
async def poll_plugins(self) -> None:
try:
await self._poll_plugins()
except asyncio.CancelledError:
self.log.info("Polling stopped")
except Exception:
self.log.exception("Fatal error while polling plugins")
async def _poll_plugins(self) -> None:
self.log.info("Polling started")
while True:
try:
await self._poll_once()
except asyncio.CancelledError:
self.log.info("Polling stopped")
except Exception:
self.log.exception("Error while polling plugins")
self.log.debug("Sleeping " + str(self.config["update_interval"] * 60) + " seconds")
await asyncio.sleep(self.config["update_interval"] * 60)
async def _poll_once(self) -> None:
try:
plugins_to_poll = [self.plugin_queue_new, self.plugin_queue_unapproved, self.plugin_tracker, self.plugin_packageset]
for plugin_name in plugins_to_poll:
if not hasattr(plugin_name, 'update'):
continue
notices = plugin_name.update()
self.log.debug('update() function called on ' + str(plugin_name.name) + '.' + str(plugin_name.queue))
sent_count = 0
if notices:
self.log.debug(f"New notices available")
if await self.resolve_room_aliases():
for notice in notices:
for room_id in self.room_ids:
self.log.debug(f"Checking notices or {room_id}")
try:
room_alias = self.room_mapping[room_id]
self.log.debug(f"Checking notices or {room_alias} ( {room_id} )")
if self.check_plugin_filter_mute(plugin_name=plugin_name.name,queue=plugin_name.queue, notice=notice[0], room_id=room_id, room_alias=room_alias):
self.log.debug(f"new notice from {plugin_name.name}.{plugin_name.queue} to {room_alias}")
self.log.debug(f"sent count: {sent_count}")
if sent_count >= 5:
self.log.debug(f"sent count reached: {sent_count} sleeping 60 secs")
if await asyncio.sleep(60):
sent_count = 0
await self.client.send_notice(room_id, notice[0])
else:
await self.client.send_notice(room_id, notice[0])
sent_count += 1
except Exception as e:
self.log.debug(f"Error sending notice to {room_id}: {e}")
self.log.debug(traceback.format_exc())
pass
except Exception as e:
self.log.debug(f"Error polling plugins: {e}")
self.log.debug(traceback.format_exc())
pass
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config

View file

@ -1,25 +0,0 @@
from collections import defaultdict
from time import time
class FloodProtection:
def __init__(self):
self.user_commands = defaultdict(list) # Stores timestamps of commands for each user
self.max_commands = 3
self.time_window = 60 # 60 seconds
def flood_check(self, user_id):
"""Check if a user can send a command based on flood protection limits."""
current_time = time()
if user_id not in self.user_commands:
self.user_commands[user_id] = [current_time]
return True # Allow the command if the user has no recorded commands
# Remove commands outside the time window
self.user_commands[user_id] = [timestamp for timestamp in self.user_commands[user_id] if current_time - timestamp < self.time_window]
if len(self.user_commands[user_id]) < self.max_commands:
self.user_commands[user_id].append(current_time)
return True # Allow the command if under the limit
# Otherwise, do not allow the command
return False

View file

@ -1,113 +0,0 @@
#!/usr/bin/python
from __future__ import print_function
import traceback
import threading
from launchpadlib.launchpad import Launchpad
class PackagesetScanner(threading.Thread):
notices = list()
def run(self):
try:
# Authenticated login to Launchpad
self.lp = Launchpad.login_anonymously(
'maubot-queuebot', 'production',
launchpadlib_dir="/tmp/queuebot-%s/" % self.queue)
self.notices = list()
ubuntu = self.lp.distributions['ubuntu']
ubuntu_series = [series for series in ubuntu.series
if series.active]
# In verbose mode, show the current content of the queue
if self.verbose and self.queue not in self.queue_state:
self.queue_state[self.queue] = set()
# Get the content of the current queue
new_list = set()
for series in ubuntu_series:
for pkgset in self.lp.packagesets.getBySeries(
distroseries=series):
for pkg in list(pkgset.getSourcesIncluded()):
new_list.add(";".join([
series.self_link,
series.name,
pkgset.name,
pkg
]))
if self.queue in self.queue_state:
if len(new_list - self.queue_state[self.queue]) > 25:
self.notices.append(("%s: %s entries have been"
" added or removed" %
(self.queue,
len(new_list -
self.queue_state[self.queue])),
['packageset']))
elif len(self.queue_state[self.queue] - new_list) > 25:
self.notices.append(("%s: %s entries have been"
" added or removed" %
(self.queue,
len(self.queue_state[self.queue] -
new_list)),
['packageset']))
else:
# Print removed packages
for pkg in sorted(self.queue_state[self.queue] - new_list):
pkg_seriesurl, pkg_series, pkg_set, \
pkg_name = pkg.split(';')
self.notices.append(("%s: Removed %s from %s in %s" % (
self.queue, pkg_name, pkg_set, pkg_series),
['packageset']))
# Print added packages
for pkg in sorted(new_list - self.queue_state[self.queue]):
pkg_seriesurl, pkg_series, pkg_set, \
pkg_name = pkg.split(';')
self.notices.append(("%s: Added %s to %s in %s" % (
self.queue, pkg_name, pkg_set, pkg_series),
['packageset']))
self.queue_state[self.queue] = new_list
except:
# We don't want the bot to crash when LP fails
traceback.print_exc()
class Packageset():
queue_state = dict()
scanner = PackagesetScanner()
name = "packageset"
queue = ""
def __init__(self, queue, verbose=False):
self.queue = queue
self.verbose = verbose
self.spawn_scanner()
def spawn_scanner(self):
if self.scanner.is_alive():
raise Exception("Scanner is already running")
self.scanner = PackagesetScanner()
self.scanner.queue_state = self.queue_state
self.scanner.verbose = self.verbose
self.scanner.queue = self.queue
self.scanner.start()
def update(self):
if self.scanner.is_alive():
return False
# Get the result from the thread
notices = list(self.scanner.notices)
# Spawn a new insance of the monitoring thread
self.spawn_scanner()
return notices

View file

@ -1,191 +0,0 @@
#!/usr/bin/python
from __future__ import print_function
import traceback
import threading
from launchpadlib.launchpad import Launchpad
class QueueScanner(threading.Thread):
notices = list()
def run(self):
try:
# Authenticated login to Launchpad
self.lp = Launchpad.login_anonymously(
'maubot-queuebot', 'production',
launchpadlib_dir="/tmp/queuebot-%s/" % self.queue)
self.notices = list()
ubuntu = self.lp.distributions['ubuntu']
ubuntu_series = [series for series in ubuntu.series
if series.active]
# In verbose mode, show the current content of the queue
if self.verbose and self.queue not in self.queue_state:
self.queue_state[self.queue] = set()
# Get the content of the current queue
new_list = set()
for series in ubuntu_series:
for pkg in series.getPackageUploads(status=self.queue):
# Split the different sub-packages
all_name = pkg.display_name.split(', ')
all_arch = pkg.display_arches.split(', ')
all_pkg = []
for name in all_name:
all_pkg.append((name, all_arch[all_name.index(name)]))
for (name, arch) in all_pkg:
if name.startswith('language-pack-'):
continue
if name.startswith('kde-l10n-'):
continue
if arch.startswith('raw-'):
continue
if arch == 'uefi' or arch == 'signing':
continue
new_list.add(";".join([
series.self_link,
"%s-%s" % (series.name.lower(),
pkg.pocket.lower()),
name,
pkg.display_version,
arch,
pkg.archive.name,
pkg.self_link,
]))
if self.queue in self.queue_state:
# Print removed packages
for pkg in sorted(self.queue_state[self.queue] - new_list):
pkg_seriesurl, pkg_pocket, pkg_name, pkg_version, \
pkg_arch, pkg_archive, pkg_self = pkg.split(';')
pkg_status = self.lp.load(pkg_self).status
if pkg_status == "Rejected":
status = "rejected"
elif pkg_status in ("Accepted", "Done"):
status = "accepted"
else:
print("Impossible package status: %s "
"(%s, %s, %s, %s, %s)" %
(pkg_status, self.queue, pkg_name,
pkg_arch, pkg_pocket, pkg_version))
continue
mute = (
"queue;%s" % (pkg_pocket),
"queue;%s" % (self.queue.lower()),
"queue;%s;%s" % (pkg_pocket, self.queue.lower()),
"queue;%s;%s" % (self.queue.lower(), pkg_pocket)
)
self.notices.append(("%s: %s %s [%s] (%s) [%s]" % (
self.queue, status, pkg_name, pkg_arch,
pkg_pocket, pkg_version), mute))
# Print added packages
for pkg in sorted(new_list - self.queue_state[self.queue]):
pkg_seriesurl, pkg_pocket, pkg_name, pkg_version, \
pkg_arch, pkg_archive, pkg_self = pkg.split(';')
pkg_series = self.lp.load(pkg_seriesurl)
# Try to get some more data by looking at
# the current archive
current_component = 'none'
current_version = 'none'
current_pkgsets = set()
for archive in ubuntu.archives:
current_pkg = archive.getPublishedSources(
source_name=pkg_name, status="Published",
distro_series=pkg_series, exact_match=True)
if list(current_pkg):
current_component = current_pkg[0].component_name
current_version = \
current_pkg[0].source_package_version
break
for pkgset in self.lp.packagesets.setsIncludingSource(
distroseries=pkg_series,
sourcepackagename=pkg_name):
current_pkgsets.add(pkgset.name)
# Prepare the packageset list
if current_pkgsets:
pkg_pkgsets = ", ".join(sorted(current_pkgsets))
else:
pkg_pkgsets = "no packageset"
# Post the mssage to the channel
message = ""
if self.queue == 'New':
if pkg_arch == "source":
message = "%s source: %s (%s/%s) [%s]" % (
self.queue, pkg_name, pkg_pocket,
pkg_archive, pkg_version)
elif pkg_arch == "sync":
message = "%s sync: %s (%s/%s) [%s]" % (
self.queue, pkg_name, pkg_pocket,
pkg_archive, pkg_version)
else:
message = "%s binary: %s [%s] (%s/%s) [%s] (%s)" \
% (self.queue, pkg_name, pkg_arch,
pkg_pocket, current_component,
pkg_version, pkg_pkgsets)
else:
message = "%s: %s (%s/%s) [%s => %s] (%s)" % (
self.queue, pkg_name, pkg_pocket,
current_component, current_version,
pkg_version, pkg_pkgsets)
if pkg_arch == "sync":
message += " (sync)"
mute = (
"queue;%s" % (pkg_pocket),
"queue;%s" % (self.queue.lower()),
"queue;%s;%s" % (pkg_pocket, self.queue.lower()),
"queue;%s;%s" % (self.queue.lower(), pkg_pocket)
)
self.notices.append((message, mute))
self.queue_state[self.queue] = new_list
except:
# We don't want the bot to crash when LP fails
traceback.print_exc()
class Queue():
queue_state = dict()
scanner = QueueScanner()
name = "queue"
queue = ""
def __init__(self, queue, verbose=False):
self.queue = queue
self.verbose = verbose
self.spawn_scanner()
def spawn_scanner(self):
if self.scanner.is_alive():
raise Exception("Scanner is already running")
self.scanner = QueueScanner()
self.scanner.queue_state = self.queue_state
self.scanner.verbose = self.verbose
self.scanner.queue = self.queue
self.scanner.start()
def update(self):
if self.scanner.is_alive():
return False
# Get the result from the thread
notices = list(self.scanner.notices)
# Spawn a new insance of the monitoring thread
self.spawn_scanner()
return notices

View file

@ -1,161 +0,0 @@
#!/usr/bin/python
from __future__ import print_function
import threading
import traceback
import xmlrpc.client as xmlrpclib
class TrackerScanner(threading.Thread):
notices = list()
def run(self):
try:
self.notices = list()
# In verbose mode, show the current content of the queue
if self.verbose and self.queue not in self.tracker_state:
self.tracker_state[self.queue] = set()
milestones = [milestone for milestone in
self.drupal.qatracker.milestones.get_list([0])
if milestone['notify'] == "1"
and 'Touch' not in milestone['title']]
products = {}
for product in self.drupal.qatracker.products.get_list([0]):
products[product['id']] = product
new_list = set()
for milestone in milestones:
for build in self.drupal.qatracker.builds.get_list(
int(milestone['id']), [0, 1, 4]):
build_milestone = milestone['title']
build_product = products[build['productid']]['title']
build_version = build['version']
build_status = build['status_string']
new_list.add("%s;%s;%s;%s" % (build_milestone,
build_product,
build_version,
build_status))
if self.queue in self.tracker_state:
build_products = [";".join(build.split(';')[0:2])
for build in self.tracker_state[self.queue]]
new_build_products = [";".join(build.split(';')[0:2])
for build in new_list]
# Print removed images
for build in self.tracker_state[self.queue] - new_list:
build_milestone, build_product, build_version, \
build_status = build.split(';')
if "%s;%s" % (build_milestone, build_product) \
in new_build_products:
continue
# Post to the channels. Don't mark all the records
# as removed when we remove a milestone
skip = False
for milestone in milestones:
if build_milestone == milestone['title']:
skip = True
break
else:
skip = True
if not skip:
self.notices.append(("%s: %s [%s] has been removed" % (
self.queue, build_product, build_milestone),
("tracker",)))
# Print other changes and deal with cases where a released
# milestone is moved back to testing
if len(new_list - self.tracker_state[self.queue]) > 25:
self.notices.append((
"%s: %s entries have been "
"added, updated or disabled" % (
self.queue, len(new_list -
self.tracker_state[self.queue])),
("tracker",)))
elif len(self.tracker_state[self.queue] - new_list) > 25:
self.notices.append((
"%s: %s entries have been "
"added, updated or disabled" % (
self.queue,
len(self.tracker_state[self.queue] - new_list)),
("tracker",)))
else:
for build in sorted(
new_list - self.tracker_state[self.queue]):
build_milestone, build_product, build_version, \
build_status = build.split(';')
if "%s;%s" % (build_milestone, build_product) \
in build_products:
if build_status == "Re-building":
self.notices.append((
"%s: %s [%s] has been disabled" % (
self.queue, build_product,
build_milestone), ("tracker",)))
elif build_status == "Ready":
self.notices.append((
"%s: %s [%s] has been marked as ready" % (
self.queue, build_product,
build_milestone), ("tracker",)))
else:
self.notices.append((
"%s: %s [%s] has been updated (%s)" % (
self.queue, build_product,
build_milestone, build_version),
("tracker",)))
else:
self.notices.append((
"%s: %s [%s] (%s) has been added" % (
self.queue, build_product, build_milestone,
build_version), ("tracker",)))
self.tracker_state[self.queue] = new_list
except:
# We don't want the bot to crash when LP fails
traceback.print_exc()
class Tracker():
tracker_state = dict()
scanner = TrackerScanner()
name = "tracker"
queue = ""
def __init__(self, queue, verbose=False):
self.queue = queue
self.verbose = verbose
# Setup ISO tracker
self.drupal = xmlrpclib.ServerProxy(
"https://iso.qa.ubuntu.com/xmlrpc.php")
self.spawn_scanner()
def spawn_scanner(self):
if self.scanner.is_alive():
raise Exception("Scanner is already running")
self.scanner = TrackerScanner()
self.scanner.tracker_state = self.tracker_state
self.scanner.verbose = self.verbose
self.scanner.drupal = self.drupal
self.scanner.queue = self.queue
self.scanner.start()
def update(self):
if self.scanner.is_alive():
return False
# Get the result from the thread
notices = list(self.scanner.notices)
# Spawn a new insance of the monitoring thread
self.spawn_scanner()
return notices