From c0337a90a29cdd0451b94d9df3c09bd443ed32be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nils=20B=C3=BCchner?= Date: Mon, 25 Mar 2024 22:14:25 +0100 Subject: [PATCH] first commit --- .forgejo/workflows/maubot.yaml | 34 +++++ .gitignore | 16 +++ README.md | 0 base-config.yaml | 6 + maubot.yaml | 11 ++ requirements.txt | 35 ++++++ ubottu/__init__.py | 1 + ubottu/base-config.yaml | 6 + ubottu/bot.py | 198 +++++++++++++++++++++++++++++ ubottu/config.yaml.deploy | 123 ++++++++++++++++++ ubottu/floodprotection.py | 25 ++++ ubottu/packages.py | 220 +++++++++++++++++++++++++++++++++ 12 files changed, 675 insertions(+) create mode 100644 .forgejo/workflows/maubot.yaml create mode 100644 .gitignore create mode 100644 README.md create mode 100644 base-config.yaml create mode 100644 maubot.yaml create mode 100644 requirements.txt create mode 100644 ubottu/__init__.py create mode 100644 ubottu/base-config.yaml create mode 100644 ubottu/bot.py create mode 100644 ubottu/config.yaml.deploy create mode 100644 ubottu/floodprotection.py create mode 100644 ubottu/packages.py diff --git a/.forgejo/workflows/maubot.yaml b/.forgejo/workflows/maubot.yaml new file mode 100644 index 0000000..c16d968 --- /dev/null +++ b/.forgejo/workflows/maubot.yaml @@ -0,0 +1,34 @@ +on: [push] +jobs: + build: + runs-on: docker + container: + image: ghcr.io/catthehacker/ubuntu:act-22.04 + env: + PIP_INDEX_URL: https://pypi.haxxors.com/simple + UNSHARED_SECRET: ${{ secrets.UNSHARED_SECRET }} + HOMESERVER_URL: ${{ secrets.HOMESERVER_URL }} + HOMESERVER_SECRET: ${{ secrets.HOMESERVER_SECRET }} + HOMESERVER_DOMAIN: ${{ secrets.HOMESERVER_DOMAIN }} + ADMIN_PW: ${{ secrets.ADMIN_PW }} + PUBLIC_URL: ${{ secrets.PUBLIC_URL }} + + steps: + - uses: actions/checkout@v4 + - run: cp ubottu/config.yaml.deploy ubottu/config.yaml + - run: rm -f ubottu/config.yaml.deploy + - run: rm -f ubottu/config.yaml.default + - run: sed -i "s/%%UNSHARED_SECRET%%/${UNSHARED_SECRET}/g" ubottu/config.yaml + - run: sed -i "s/%%HOMESERVER_URL%%/${HOMESERVER_URL}/g" ubottu/config.yaml + - run: sed -i "s/%%HOMESERVER_SECRET%%/${HOMESERVER_SECRET}/g" ubottu/config.yaml + - run: sed -i "s/%%HOMESERVER_DOMAIN%%/${HOMESERVER_DOMAIN}/g" ubottu/config.yaml + - run: sed -i "s/%%ADMIN_PW%%/${ADMIN_PW}/g" ubottu/config.yaml + - run: sed -i "s/%%PUBLIC_URL%%/${PUBLIC_URL}/g" ubottu/config.yaml + - run: pip install maubot + - run: mbc build # Build the project + - run: mkdir -p output # Ensure output directory exists, `-p` to prevent error if already exists + - run: mv *.mbp output/ubottu-latest-py3.10.mbp # Move built file to output + - uses: actions/upload-artifact@v3 + with: + name: ubottu-latest-py3.10.mbp.zip + path: output/ubottu-latest-py3.10.mbp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..52fca32 --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +venv +olm +.local +.cmake +.cache +.ubottu +.bashrc +.python_history +.bash_history +*.mbp +*.db +*.log +config.yaml +plugins/* +ubottu/maubot.db +ubottu/config.yaml diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/base-config.yaml b/base-config.yaml new file mode 100644 index 0000000..8e05a9f --- /dev/null +++ b/base-config.yaml @@ -0,0 +1,6 @@ +whitelist: + - "@ravage:xentonix.net" +rooms: + - "!XBHBxuegpdVZEJBOIh:xentonix.net" + - "!TloppdJexqToFbZUZW:xentonix.net" + - "!EJhpCQHHqqcNicfiql:xentonix.net" \ No newline at end of file diff --git a/maubot.yaml b/maubot.yaml new file mode 100644 index 0000000..046fc1a --- /dev/null +++ b/maubot.yaml @@ -0,0 +1,11 @@ +maubot: 0.1.0 +id: com.ubuntu.ubottu +version: 1.0.0 +license: MIT +modules: + - ubottu +main_class: Ubottu +config: true +webapp: true +extra_files: + - base-config.yaml diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7013b40 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,35 @@ +aiohttp==3.9.3 +aiosignal==1.3.1 +aiosqlite==0.18.0 +asyncpg==0.28.0 +attrs==23.2.0 +bcrypt==4.1.2 +Brotli==1.1.0 +certifi==2024.2.2 +cffi==1.16.0 +charset-normalizer==3.3.2 +click==8.1.7 +colorama==0.4.6 +commonmark==0.9.1 +feedparser==6.0.11 +frozenlist==1.4.1 +idna==3.6 +Jinja2==3.1.3 +MarkupSafe==2.1.5 +maubot==0.4.2 +mautrix==0.20.4 +multidict==6.0.5 +packaging==24.0 +prompt-toolkit==3.0.43 +pycparser==2.21 +python-olm==3.2.16 +questionary==1.10.0 +requests==2.31.0 +ruamel.yaml==0.17.40 +ruamel.yaml.clib==0.2.8 +setuptools==69.2.0 +sgmllib3k==1.0.0 +urllib3==2.2.1 +wcwidth==0.2.13 +wheel==0.43.0 +yarl==1.9.4 diff --git a/ubottu/__init__.py b/ubottu/__init__.py new file mode 100644 index 0000000..b5aa82d --- /dev/null +++ b/ubottu/__init__.py @@ -0,0 +1 @@ +from .bot import Ubottu \ No newline at end of file diff --git a/ubottu/base-config.yaml b/ubottu/base-config.yaml new file mode 100644 index 0000000..8e05a9f --- /dev/null +++ b/ubottu/base-config.yaml @@ -0,0 +1,6 @@ +whitelist: + - "@ravage:xentonix.net" +rooms: + - "!XBHBxuegpdVZEJBOIh:xentonix.net" + - "!TloppdJexqToFbZUZW:xentonix.net" + - "!EJhpCQHHqqcNicfiql:xentonix.net" \ No newline at end of file diff --git a/ubottu/bot.py b/ubottu/bot.py new file mode 100644 index 0000000..91bb5db --- /dev/null +++ b/ubottu/bot.py @@ -0,0 +1,198 @@ +import json +import sqlite3 +import os +import re +import requests +from typing import Type, Tuple +from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper +from maubot import Plugin, MessageEvent +from maubot.handlers import command +from aiohttp.web import Request, Response, json_response +from pathlib import Path +from urllib.parse import urlparse, unquote +from .floodprotection import FloodProtection +from .packages import Apt +from launchpadlib.launchpad import Launchpad + +class Config(BaseProxyConfig): + def do_update(self, helper: ConfigUpdateHelper) -> None: + helper.copy("whitelist") + helper.copy("rooms") + +class Ubottu(Plugin): + + def sanitize_string(self, input_string): + # Pattern includes single quotes, double quotes, semicolons, and common SQL comment markers + pattern = r"[\'\";]|(--)|(/\*)|(\*/)" + # Replace the identified patterns with an empty string + safe_string = re.sub(pattern, '', input_string) + return safe_string + + async def pre_start(self) -> None: + #if await self.get_ubottu_db('https://ubottu.com/ubuntu3.db'): + # self.db = sqlite3.connect("/home/maubot/.ubottu/ubuntu3.db") + #else: + # return False + return True + + async def start(self) -> None: + self.config.load_and_update() + self.flood_protection = FloodProtection() + + async def get_ubottu_db(self, url): + """Load a file from a URL into an in-memory filesystem.""" + # Create a filename if required + u = urlparse(url) + fn = "/home/maubot/.ubottu/" + os.path.basename(u.path) + #if os.path.isfile(fn): + # return fn + requests.packages.urllib3.util.connection.HAS_IPV6 = False + with requests.get(url, stream=True) as r: + r.raise_for_status() # Checks if the request was successful + # Open the local file in binary write mode + with open(fn, 'wb+') as f: + for chunk in r.iter_content(chunk_size=8192): + # If you have a chunk of data, write it to the file + if chunk: + f.write(chunk) + f.close() + return fn + + def check_access(self, sender, room_id): + if sender in self.config["whitelist"] and room_id in self.config["rooms"]: + return True + return False + def check_access_sender(self, sender): + if sender in self.config["whitelist"]: + return True + return False + + #@command.new(name="email", aliases=["json"]) + @command.new(name="jsontest", aliases=["json"]) + async def email(self, evt: MessageEvent) -> None: + if self.check_access(evt.sender, evt.room_id): + url='https://xentonix.net/test.json' + resp = await self.http.get(url) + if resp.status == 200: + data = await resp.json() + #print(data) + await evt.reply(data['employees'][0]['email']) + + async def lookup_factoid_irc(self, command_name, to_user, evt): + sql = "SELECT value FROM facts where name = '" + command_name + "' LIMIT 1" + db = self.db + cur = db.cursor() + cur.execute(sql) + rows = cur.fetchall() + row = None + for row in rows: + if row[0].startswith(''): + command_name = str(row[0]).replace(' ', '') + sql = "SELECT value FROM facts where name = '" + command_name + "' LIMIT 1" + cur.execute(sql) + rows = cur.fetchall() + for row in rows: + break + break + if row is not None and row[0].startswith(''): + output = str(row[0]).replace(' ', '') + if to_user: + await evt.respond(to_user + ': ' + output) + else: + await evt.respond(output) + return True + return False + + async def lookup_factoid_matrix(self, command_name, to_user, evt): + api_url = 'http://127.0.0.1:8000/factoids/api/facts/' + url = api_url + command_name + '/?format=json' + resp = await self.http.get(url) + if resp and resp.status == 200: + data = await resp.json() + if data: + id = data['id'] + name = data['name'] + value = data['value'] + ftype = data['ftype'] + if ftype == 'ALIAS': + command_name = value + url = api_url + command_name + '/?format=json' + resp = await self.http.get(url) + if resp and resp.status == 200: + data = await resp.json() + value = data['value'] + if to_user: + await evt.respond(to_user + ': ' + value) + else: + await evt.respond(value) + return True + return False + + @command.passive("^!(.+)$") + async def command(self, evt: MessageEvent, match: Tuple[str]) -> None: + # allow all rooms and users, only enable flood protection + #if self.check_access(evt.sender, evt.room_id): + if self.flood_protection.flood_check(evt.sender): + args = [] + to_user = '' + command_name = self.sanitize_string(match[0][1:].split(' ')[0]) + full_command = re.sub(r'\s+', ' ', match[0][1:]) + if full_command.count('|') > 0: + to_user = self.sanitize_string(full_command.split('|')[1].strip()) + args = full_command.split('|')[0].strip().split(' ')[1:] + else: + args = full_command.strip().split(' ')[1:] + + #reload stuff + if command_name == 'reload' and self.check_access_sender(evt.sender): + if self.pre_start(): + await evt.respond('Reload completed') + else: + await evt.respond('Reload failed') + return True + + #block !tr factoid to allow translation + if command_name == 'tr': + return False + + if command_name == 'time' or command_name == 'utc': + if command_name == 'utc': + city = 'London' + else: + city = " ".join(args) + api_url = 'http://127.0.0.1:8000/factoids/api/citytime/' + city + '/?format=json' + resp = await self.http.get(api_url) + if resp and resp.status == 200: + data = await resp.json() + if data: + await evt.respond('The current time in ' + data['location'] + ' is ' + data['local_time']) + + #!package lookup command + if command_name == 'package' or command_name == 'depends': + apt = Apt() + if len(args) == 0: + return False + if len(args) == 1: + if command_name == 'depends': + await evt.respond(apt.depends(args[0], 'noble', False)) + else: + await evt.respond(apt.info(args[0], 'noble', False)) + return True + if len(args) == 2: + if args[1] in ['jammy', 'noble', 'mantic']: + if command_name == 'depends': + await evt.respond(apt.info(args[0], args[1], False)) + else: + await evt.respond(apt.depends(args[0], args[1], False)) + return True + return False + + # check for factoids IRC + #if await self.lookup_factoid_irc(command_name, to_user, evt): + # return True + # check for factoids matrix + if await self.lookup_factoid_matrix(command_name, to_user, evt): + return True + @classmethod + def get_config_class(cls) -> Type[BaseProxyConfig]: + return Config \ No newline at end of file diff --git a/ubottu/config.yaml.deploy b/ubottu/config.yaml.deploy new file mode 100644 index 0000000..dc7796b --- /dev/null +++ b/ubottu/config.yaml.deploy @@ -0,0 +1,123 @@ +# The full URI to the database. SQLite and Postgres are fully supported. +# Other DBMSes supported by SQLAlchemy may or may not work. +# Format examples: +# SQLite: sqlite:filename.db +# Postgres: postgresql://username:password@hostname/dbname +database: sqlite:maubot.db + +# Separate database URL for the crypto database. "default" means use the same database as above. +crypto_database: default + +# Additional arguments for asyncpg.create_pool() or sqlite3.connect() +# https://magicstack.github.io/asyncpg/current/api/index.html#asyncpg.pool.create_pool +# https://docs.python.org/3/library/sqlite3.html#sqlite3.connect +# For sqlite, min_size is used as the connection thread pool size and max_size is ignored. +database_opts: + min_size: 1 + max_size: 10 +plugin_directories: + # The directory where uploaded new plugins should be stored. + upload: ./plugins + # The directories from which plugins should be loaded. + # Duplicate plugin IDs will be moved to the trash. + load: + - ./plugins + trash: ./trash + +# Configuration for storing plugin databases +plugin_databases: + # The directory where SQLite plugin databases should be stored. + sqlite: ./plugins + # The connection URL for plugin databases. If null, all plugins will get SQLite databases. + # If set, plugins using the new asyncpg interface will get a Postgres connection instead. + # Plugins using the legacy SQLAlchemy interface will always get a SQLite connection. + # + # To use the same connection pool as the default database, set to "default" + # (the default database above must be postgres to do this). + # + # When enabled, maubot will create separate Postgres schemas in the database for each plugin. + # To view schemas in psql, use `\dn`. To view enter and interact with a specific schema, + # use `SET search_path = name` (where `name` is the name found with `\dn`) and then use normal + # SQL queries/psql commands. + postgres: + # Maximum number of connections per plugin instance. + postgres_max_conns_per_plugin: 3 + # Overrides for the default database_opts when using a non-"default" postgres connection string. + postgres_opts: {} + +server: + # The IP and port to listen to. + hostname: 127.0.0.1 + port: 28316 + # Public base URL where the server is visible. + public_url: %%PUBLIC_URL%% + # The base path for the UI. + ui_base_path: /_matrix/maubot + # The base path for plugin endpoints. The instance ID will be appended directly. + plugin_base_path: /_matrix/maubot/plugin/ + # Override path from where to load UI resources. + # Set to false to using pkg_resources to find the path. + override_resource_path: false + # The shared secret to sign API access tokens. + # Set to "generate" to generate and save a new token at startup. + unshared_secret: %%UNSHARED_SECRET%% + +# Known homeservers. This is required for the `mbc auth` command and also allows +# more convenient access from the management UI. This is not required to create +# clients in the management UI, since you can also just type the homeserver URL +# into the box there. +homeservers: + %%HOMESERVER_DOMAIN%%: + # Client-server API URL + url: %%HOMESERVER_URL%% + # registration_shared_secret from synapse config + secret: %%HOMESERVER_SECRET%% + +# List of administrator users. Plaintext passwords will be bcrypted on startup. Set empty password +# to prevent normal login. Root is a special user that can't have a password and will always exist. +admins: + admin: %%ADMIN_PW%% +api_features: + login: true + plugin: true + plugin_upload: true + instance: true + instance_database: true + client: true + client_proxy: true + client_auth: true + dev_open: true + log: true + +# Python logging configuration. +# +# See section 16.7.2 of the Python documentation for more info: +# https://docs.python.org/3.6/library/logging.config.html#configuration-dictionary-schema +logging: + version: 1 + formatters: + colored: + (): maubot.lib.color_log.ColorFormatter + format: '[%(asctime)s] [%(levelname)s@%(name)s] %(message)s' + normal: + format: '[%(asctime)s] [%(levelname)s@%(name)s] %(message)s' + handlers: + file: + class: logging.handlers.RotatingFileHandler + formatter: normal + filename: ./maubot.log + maxBytes: 10485760 + backupCount: 10 + console: + class: logging.StreamHandler + formatter: colored + loggers: + maubot: + level: DEBUG + mautrix: + level: DEBUG + aiohttp: + level: INFO + root: + level: DEBUG + handlers: [file, console] diff --git a/ubottu/floodprotection.py b/ubottu/floodprotection.py new file mode 100644 index 0000000..7f405cf --- /dev/null +++ b/ubottu/floodprotection.py @@ -0,0 +1,25 @@ +from collections import defaultdict +from time import time + +class FloodProtection: + def __init__(self): + self.user_commands = defaultdict(list) # Stores timestamps of commands for each user + self.max_commands = 3 + self.time_window = 60 # 60 seconds + + def flood_check(self, user_id): + """Check if a user can send a command based on flood protection limits.""" + current_time = time() + if user_id not in self.user_commands: + self.user_commands[user_id] = [current_time] + return True # Allow the command if the user has no recorded commands + + # Remove commands outside the time window + self.user_commands[user_id] = [timestamp for timestamp in self.user_commands[user_id] if current_time - timestamp < self.time_window] + + if len(self.user_commands[user_id]) < self.max_commands: + self.user_commands[user_id].append(current_time) + return True # Allow the command if under the limit + + # Otherwise, do not allow the command + return False \ No newline at end of file diff --git a/ubottu/packages.py b/ubottu/packages.py new file mode 100644 index 0000000..8395445 --- /dev/null +++ b/ubottu/packages.py @@ -0,0 +1,220 @@ +# -*- Encoding: utf-8 -*- +### +# Copyright (c) 2006-2007 Dennis Kaarsemaker +# Copyright (c) 2008-2010 Terence Simpson +# Copyright (c) 2017- Krytarik Raido +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of version 2 of the GNU General Public License as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +### + +import warnings +warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning) +import subprocess, os, apt, re +#import supybot.utils as utils +from email.parser import FeedParser + +def component(arg): + if '/' in arg: + return arg[:arg.find('/')] + return 'main' + +def description(pkg): + if 'Description-en' in pkg: + return pkg['Description-en'].split('\n')[0] + elif 'Description' in pkg: + return pkg['Description'].split('\n')[0] + return "Description not available" + +class Apt: + def __init__(self): + self.aptdir = os.path.expanduser('~') + '/apt-data' + self.distros = [] + #self.plugin = "plugin" + #self.log = "apt.log" + os.environ["LANG"] = "C.UTF-8" + if self.aptdir: + self.distros = sorted([x[:-5] for x in os.listdir(self.aptdir) if x.endswith('.list')]) + + def apt_cache(self, distro, cmd, pkg): + return subprocess.check_output(['apt-cache', + '-oAPT::Architecture=amd64', + '-oAPT::Architectures::=i386', + '-oAPT::Architectures::=amd64', + '-oDir::State::Lists=%s/%s' % (self.aptdir, distro), + '-oDir::State::Status=%s/%s.status' % (self.aptdir, distro), + '-oDir::Etc::SourceList=%s/%s.list' % (self.aptdir, distro), + '-oDir::Etc::SourceParts=""', + '-oDir::Cache=%s/cache' % self.aptdir] + + cmd + [pkg.lower()]).decode('utf-8') + + def apt_file(self, distro, pkg): + return subprocess.check_output(['apt-file', + '-oAPT::Architecture=amd64', + '-oAPT::Architectures::=i386', + '-oAPT::Architectures::=amd64', + '-oDir::State::Lists=%s/%s' % (self.aptdir, distro), + '-oDir::State::Status=%s/%s.status' % (self.aptdir, distro), + '-oDir::Etc::SourceList=%s/%s.list' % (self.aptdir, distro), + '-oDir::Etc::SourceParts=""', + '-oDir::Cache=%s/cache' % self.aptdir, + '-l', '-i', 'search', pkg]).decode('utf-8') + + def _parse(self, pkg): + parser = FeedParser() + parser.feed(pkg) + return parser.close() + + def find(self, pkg, distro, filelookup=True): + if distro.split('-')[0] in ('oldstable', 'stable', 'unstable', 'testing', 'experimental'): + pkgTracURL = "https://packages.debian.org" + else: + pkgTracURL = "https://packages.ubuntu.com" + + try: + data = self.apt_cache(distro, ['search', '-n'], pkg) + except subprocess.CalledProcessError as e: + data = e.output + if not data: + if filelookup: + try: + data = self.apt_file(distro, pkg).split() + except subprocess.CalledProcessError as e: + if e.returncode == 1: + return 'Package/file %s does not exist in %s' % (pkg, distro) + #self.log.error("PackageInfo/packages: Please update the cache for %s" % distro) + return "Cache out of date, please contact the administrator" + except OSError: + #self.log.error("PackageInfo/packages: apt-file is not installed") + return "Please use %s/ to search for files" % pkgTracURL + if data: + if len(data) > 10: + return "File %s found in %s and %d others <%s/search?searchon=contents&keywords=%s&mode=exactfilename&suite=%s&arch=any>" % (pkg, ', '.join(data[:10]), len(data)-10, pkgTracURL, utils.web.urlquote(pkg), distro) + return "File %s found in %s" % (pkg, ', '.join(data)) + return 'Package/file %s does not exist in %s' % (pkg, distro) + return "No packages matching '%s' could be found" % pkg + pkgs = [x.split()[0] for x in data.split('\n') if x] + if len(pkgs) > 10: + return "Found: %s and %d others <%s/search?keywords=%s&searchon=names&suite=%s§ion=all>" % (', '.join(pkgs[:10]), len(pkgs)-10, pkgTracURL, utils.web.urlquote(pkg), distro) + else: + return "Found: %s" % ', '.join(pkgs) + + def raw_info(self, pkg, distro, isSource, archlookup=True): + try: + data = self.apt_cache(distro, ['show'] if not isSource else ['showsrc', '--only-source'], pkg) + except subprocess.CalledProcessError: + data = '' + if not data: + return 'Package %s does not exist in %s' % (pkg, distro) + + maxp = {'Version': '0~'} + packages = list(map(self._parse, [x for x in data.split('\n\n') if x])) + for p in packages: + if apt.apt_pkg.version_compare(maxp['Version'], p['Version']) <= 0: + maxp = p + + if isSource: + bdeps = maxp.get('Build-Depends') + vcs = maxp.get('Vcs-Browser') + for (key, value) in list(maxp.items()): + if key.startswith('Build-Depends-'): + bdeps = "%s, %s" % (bdeps, value) if bdeps else value + elif key.startswith('Vcs-') and not vcs: + vcs = "%s (%s)" % (value, key[4:]) + maxp['Builddeps'] = bdeps + maxp['Vcs'] = vcs + return maxp + + if not maxp.get('Source'): + maxp['Sourcepkg'] = maxp['Package'] + else: + maxp['Sourcepkg'] = maxp['Source'].split()[0] + + if not archlookup: + return maxp + + try: + data2 = self.apt_cache(distro, ['showsrc', '--only-source'], maxp['Sourcepkg']) + except subprocess.CalledProcessError: + data2 = '' + if not data2: + return maxp + + maxp2 = {'Version': '0~'} + packages2 = list(map(self._parse, [x for x in data2.split('\n\n') if x])) + for p in packages2: + if apt.apt_pkg.version_compare(maxp2['Version'], p['Version']) <= 0: + maxp2 = p + + archs = re.match(r'.*^ %s \S+ \S+ \S+ arch=(?P\S+)$' % re.escape(pkg), maxp2['Package-List'], + re.I | re.M | re.DOTALL) + if archs: + archs = archs.group('arch').split(',') + if not ('any' in archs or 'all' in archs): + maxp['Architectures'] = ', '.join(archs) + + return maxp + + def info(self, pkg, distro, isSource): + maxp = self.raw_info(pkg, distro, isSource) + if isinstance(maxp, str): + return maxp + if isSource: + return "%s (%s, %s): Packages %s. Maintained by %s%s" % ( + maxp['Package'], maxp['Version'], distro, maxp['Binary'].replace('\n',''), + re.sub(r' <\S+>$', '', maxp.get('Original-Maintainer', maxp['Maintainer'])), + " @ %s" % maxp['Vcs'] if maxp['Vcs'] else "") + return "{} ({}, {}): {}. In component {}, is {}. Built by {}. Size {:,} kB / {:,} kB{}".format( + maxp['Package'], maxp['Version'], distro, description(maxp), component(maxp['Section']), + maxp['Priority'], maxp['Sourcepkg'], int((int(maxp['Size'])/1024)+1), int(maxp['Installed-Size']), + ". (Only available for %s.)" % maxp['Architectures'] if maxp.get('Architectures') else "") + + def depends(self, pkg, distro, isSource): + maxp = self.raw_info(pkg, distro, isSource, archlookup=False) + if isinstance(maxp, str): + return maxp + if isSource: + return "%s (%s, %s): Build depends on %s" % ( + maxp['Package'], maxp['Version'], distro, maxp.get('Builddeps', "nothing").replace('\n','')) + return "%s (%s, %s): Depends on %s%s" % ( + maxp['Package'], maxp['Version'], distro, maxp.get('Depends', "nothing").replace('\n',''), + ". Recommends %s" % maxp['Recommends'].replace('\n','') if maxp.get('Recommends') else "") + +# Simple test +if __name__ == "__main__": + import sys + argv = sys.argv + argc = len(argv) + if argc == 1: + print("Need at least one arg") + sys.exit(1) + if argc > 3: + print("Only takes 2 args") + sys.exit(1) + class FakePlugin: + class FakeLog: + def error(*args, **kwargs): + pass + def __init__(self): + self.log = self.FakeLog() + def registryValue(self, *args, **kwargs): + return os.path.expanduser('~') + '/apt-data' + + try: + (command, lookup) = argv[1].split(None, 1) + except: + print("Need something to look up") + sys.exit(1) + dist = "noble" + if argc == 3: + dist = argv[2] + plugin = FakePlugin() + aptlookup = Apt(plugin) + print(getattr(aptlookup, command)(lookup, dist))