first commit
This commit is contained in:
commit
c0337a90a2
12 changed files with 675 additions and 0 deletions
34
.forgejo/workflows/maubot.yaml
Normal file
34
.forgejo/workflows/maubot.yaml
Normal file
|
@ -0,0 +1,34 @@
|
|||
on: [push]
|
||||
jobs:
|
||||
build:
|
||||
runs-on: docker
|
||||
container:
|
||||
image: ghcr.io/catthehacker/ubuntu:act-22.04
|
||||
env:
|
||||
PIP_INDEX_URL: https://pypi.haxxors.com/simple
|
||||
UNSHARED_SECRET: ${{ secrets.UNSHARED_SECRET }}
|
||||
HOMESERVER_URL: ${{ secrets.HOMESERVER_URL }}
|
||||
HOMESERVER_SECRET: ${{ secrets.HOMESERVER_SECRET }}
|
||||
HOMESERVER_DOMAIN: ${{ secrets.HOMESERVER_DOMAIN }}
|
||||
ADMIN_PW: ${{ secrets.ADMIN_PW }}
|
||||
PUBLIC_URL: ${{ secrets.PUBLIC_URL }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- run: cp ubottu/config.yaml.deploy ubottu/config.yaml
|
||||
- run: rm -f ubottu/config.yaml.deploy
|
||||
- run: rm -f ubottu/config.yaml.default
|
||||
- run: sed -i "s/%%UNSHARED_SECRET%%/${UNSHARED_SECRET}/g" ubottu/config.yaml
|
||||
- run: sed -i "s/%%HOMESERVER_URL%%/${HOMESERVER_URL}/g" ubottu/config.yaml
|
||||
- run: sed -i "s/%%HOMESERVER_SECRET%%/${HOMESERVER_SECRET}/g" ubottu/config.yaml
|
||||
- run: sed -i "s/%%HOMESERVER_DOMAIN%%/${HOMESERVER_DOMAIN}/g" ubottu/config.yaml
|
||||
- run: sed -i "s/%%ADMIN_PW%%/${ADMIN_PW}/g" ubottu/config.yaml
|
||||
- run: sed -i "s/%%PUBLIC_URL%%/${PUBLIC_URL}/g" ubottu/config.yaml
|
||||
- run: pip install maubot
|
||||
- run: mbc build # Build the project
|
||||
- run: mkdir -p output # Ensure output directory exists, `-p` to prevent error if already exists
|
||||
- run: mv *.mbp output/ubottu-latest-py3.10.mbp # Move built file to output
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: ubottu-latest-py3.10.mbp.zip
|
||||
path: output/ubottu-latest-py3.10.mbp
|
16
.gitignore
vendored
Normal file
16
.gitignore
vendored
Normal file
|
@ -0,0 +1,16 @@
|
|||
venv
|
||||
olm
|
||||
.local
|
||||
.cmake
|
||||
.cache
|
||||
.ubottu
|
||||
.bashrc
|
||||
.python_history
|
||||
.bash_history
|
||||
*.mbp
|
||||
*.db
|
||||
*.log
|
||||
config.yaml
|
||||
plugins/*
|
||||
ubottu/maubot.db
|
||||
ubottu/config.yaml
|
0
README.md
Normal file
0
README.md
Normal file
6
base-config.yaml
Normal file
6
base-config.yaml
Normal file
|
@ -0,0 +1,6 @@
|
|||
whitelist:
|
||||
- "@ravage:xentonix.net"
|
||||
rooms:
|
||||
- "!XBHBxuegpdVZEJBOIh:xentonix.net"
|
||||
- "!TloppdJexqToFbZUZW:xentonix.net"
|
||||
- "!EJhpCQHHqqcNicfiql:xentonix.net"
|
11
maubot.yaml
Normal file
11
maubot.yaml
Normal file
|
@ -0,0 +1,11 @@
|
|||
maubot: 0.1.0
|
||||
id: com.ubuntu.ubottu
|
||||
version: 1.0.0
|
||||
license: MIT
|
||||
modules:
|
||||
- ubottu
|
||||
main_class: Ubottu
|
||||
config: true
|
||||
webapp: true
|
||||
extra_files:
|
||||
- base-config.yaml
|
35
requirements.txt
Normal file
35
requirements.txt
Normal file
|
@ -0,0 +1,35 @@
|
|||
aiohttp==3.9.3
|
||||
aiosignal==1.3.1
|
||||
aiosqlite==0.18.0
|
||||
asyncpg==0.28.0
|
||||
attrs==23.2.0
|
||||
bcrypt==4.1.2
|
||||
Brotli==1.1.0
|
||||
certifi==2024.2.2
|
||||
cffi==1.16.0
|
||||
charset-normalizer==3.3.2
|
||||
click==8.1.7
|
||||
colorama==0.4.6
|
||||
commonmark==0.9.1
|
||||
feedparser==6.0.11
|
||||
frozenlist==1.4.1
|
||||
idna==3.6
|
||||
Jinja2==3.1.3
|
||||
MarkupSafe==2.1.5
|
||||
maubot==0.4.2
|
||||
mautrix==0.20.4
|
||||
multidict==6.0.5
|
||||
packaging==24.0
|
||||
prompt-toolkit==3.0.43
|
||||
pycparser==2.21
|
||||
python-olm==3.2.16
|
||||
questionary==1.10.0
|
||||
requests==2.31.0
|
||||
ruamel.yaml==0.17.40
|
||||
ruamel.yaml.clib==0.2.8
|
||||
setuptools==69.2.0
|
||||
sgmllib3k==1.0.0
|
||||
urllib3==2.2.1
|
||||
wcwidth==0.2.13
|
||||
wheel==0.43.0
|
||||
yarl==1.9.4
|
1
ubottu/__init__.py
Normal file
1
ubottu/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
from .bot import Ubottu
|
6
ubottu/base-config.yaml
Normal file
6
ubottu/base-config.yaml
Normal file
|
@ -0,0 +1,6 @@
|
|||
whitelist:
|
||||
- "@ravage:xentonix.net"
|
||||
rooms:
|
||||
- "!XBHBxuegpdVZEJBOIh:xentonix.net"
|
||||
- "!TloppdJexqToFbZUZW:xentonix.net"
|
||||
- "!EJhpCQHHqqcNicfiql:xentonix.net"
|
198
ubottu/bot.py
Normal file
198
ubottu/bot.py
Normal file
|
@ -0,0 +1,198 @@
|
|||
import json
|
||||
import sqlite3
|
||||
import os
|
||||
import re
|
||||
import requests
|
||||
from typing import Type, Tuple
|
||||
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
||||
from maubot import Plugin, MessageEvent
|
||||
from maubot.handlers import command
|
||||
from aiohttp.web import Request, Response, json_response
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse, unquote
|
||||
from .floodprotection import FloodProtection
|
||||
from .packages import Apt
|
||||
from launchpadlib.launchpad import Launchpad
|
||||
|
||||
class Config(BaseProxyConfig):
|
||||
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
||||
helper.copy("whitelist")
|
||||
helper.copy("rooms")
|
||||
|
||||
class Ubottu(Plugin):
|
||||
|
||||
def sanitize_string(self, input_string):
|
||||
# Pattern includes single quotes, double quotes, semicolons, and common SQL comment markers
|
||||
pattern = r"[\'\";]|(--)|(/\*)|(\*/)"
|
||||
# Replace the identified patterns with an empty string
|
||||
safe_string = re.sub(pattern, '', input_string)
|
||||
return safe_string
|
||||
|
||||
async def pre_start(self) -> None:
|
||||
#if await self.get_ubottu_db('https://ubottu.com/ubuntu3.db'):
|
||||
# self.db = sqlite3.connect("/home/maubot/.ubottu/ubuntu3.db")
|
||||
#else:
|
||||
# return False
|
||||
return True
|
||||
|
||||
async def start(self) -> None:
|
||||
self.config.load_and_update()
|
||||
self.flood_protection = FloodProtection()
|
||||
|
||||
async def get_ubottu_db(self, url):
|
||||
"""Load a file from a URL into an in-memory filesystem."""
|
||||
# Create a filename if required
|
||||
u = urlparse(url)
|
||||
fn = "/home/maubot/.ubottu/" + os.path.basename(u.path)
|
||||
#if os.path.isfile(fn):
|
||||
# return fn
|
||||
requests.packages.urllib3.util.connection.HAS_IPV6 = False
|
||||
with requests.get(url, stream=True) as r:
|
||||
r.raise_for_status() # Checks if the request was successful
|
||||
# Open the local file in binary write mode
|
||||
with open(fn, 'wb+') as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
# If you have a chunk of data, write it to the file
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
f.close()
|
||||
return fn
|
||||
|
||||
def check_access(self, sender, room_id):
|
||||
if sender in self.config["whitelist"] and room_id in self.config["rooms"]:
|
||||
return True
|
||||
return False
|
||||
def check_access_sender(self, sender):
|
||||
if sender in self.config["whitelist"]:
|
||||
return True
|
||||
return False
|
||||
|
||||
#@command.new(name="email", aliases=["json"])
|
||||
@command.new(name="jsontest", aliases=["json"])
|
||||
async def email(self, evt: MessageEvent) -> None:
|
||||
if self.check_access(evt.sender, evt.room_id):
|
||||
url='https://xentonix.net/test.json'
|
||||
resp = await self.http.get(url)
|
||||
if resp.status == 200:
|
||||
data = await resp.json()
|
||||
#print(data)
|
||||
await evt.reply(data['employees'][0]['email'])
|
||||
|
||||
async def lookup_factoid_irc(self, command_name, to_user, evt):
|
||||
sql = "SELECT value FROM facts where name = '" + command_name + "' LIMIT 1"
|
||||
db = self.db
|
||||
cur = db.cursor()
|
||||
cur.execute(sql)
|
||||
rows = cur.fetchall()
|
||||
row = None
|
||||
for row in rows:
|
||||
if row[0].startswith('<alias>'):
|
||||
command_name = str(row[0]).replace('<alias> ', '')
|
||||
sql = "SELECT value FROM facts where name = '" + command_name + "' LIMIT 1"
|
||||
cur.execute(sql)
|
||||
rows = cur.fetchall()
|
||||
for row in rows:
|
||||
break
|
||||
break
|
||||
if row is not None and row[0].startswith('<reply>'):
|
||||
output = str(row[0]).replace('<reply> ', '')
|
||||
if to_user:
|
||||
await evt.respond(to_user + ': ' + output)
|
||||
else:
|
||||
await evt.respond(output)
|
||||
return True
|
||||
return False
|
||||
|
||||
async def lookup_factoid_matrix(self, command_name, to_user, evt):
|
||||
api_url = 'http://127.0.0.1:8000/factoids/api/facts/'
|
||||
url = api_url + command_name + '/?format=json'
|
||||
resp = await self.http.get(url)
|
||||
if resp and resp.status == 200:
|
||||
data = await resp.json()
|
||||
if data:
|
||||
id = data['id']
|
||||
name = data['name']
|
||||
value = data['value']
|
||||
ftype = data['ftype']
|
||||
if ftype == 'ALIAS':
|
||||
command_name = value
|
||||
url = api_url + command_name + '/?format=json'
|
||||
resp = await self.http.get(url)
|
||||
if resp and resp.status == 200:
|
||||
data = await resp.json()
|
||||
value = data['value']
|
||||
if to_user:
|
||||
await evt.respond(to_user + ': ' + value)
|
||||
else:
|
||||
await evt.respond(value)
|
||||
return True
|
||||
return False
|
||||
|
||||
@command.passive("^!(.+)$")
|
||||
async def command(self, evt: MessageEvent, match: Tuple[str]) -> None:
|
||||
# allow all rooms and users, only enable flood protection
|
||||
#if self.check_access(evt.sender, evt.room_id):
|
||||
if self.flood_protection.flood_check(evt.sender):
|
||||
args = []
|
||||
to_user = ''
|
||||
command_name = self.sanitize_string(match[0][1:].split(' ')[0])
|
||||
full_command = re.sub(r'\s+', ' ', match[0][1:])
|
||||
if full_command.count('|') > 0:
|
||||
to_user = self.sanitize_string(full_command.split('|')[1].strip())
|
||||
args = full_command.split('|')[0].strip().split(' ')[1:]
|
||||
else:
|
||||
args = full_command.strip().split(' ')[1:]
|
||||
|
||||
#reload stuff
|
||||
if command_name == 'reload' and self.check_access_sender(evt.sender):
|
||||
if self.pre_start():
|
||||
await evt.respond('Reload completed')
|
||||
else:
|
||||
await evt.respond('Reload failed')
|
||||
return True
|
||||
|
||||
#block !tr factoid to allow translation
|
||||
if command_name == 'tr':
|
||||
return False
|
||||
|
||||
if command_name == 'time' or command_name == 'utc':
|
||||
if command_name == 'utc':
|
||||
city = 'London'
|
||||
else:
|
||||
city = " ".join(args)
|
||||
api_url = 'http://127.0.0.1:8000/factoids/api/citytime/' + city + '/?format=json'
|
||||
resp = await self.http.get(api_url)
|
||||
if resp and resp.status == 200:
|
||||
data = await resp.json()
|
||||
if data:
|
||||
await evt.respond('The current time in ' + data['location'] + ' is ' + data['local_time'])
|
||||
|
||||
#!package lookup command
|
||||
if command_name == 'package' or command_name == 'depends':
|
||||
apt = Apt()
|
||||
if len(args) == 0:
|
||||
return False
|
||||
if len(args) == 1:
|
||||
if command_name == 'depends':
|
||||
await evt.respond(apt.depends(args[0], 'noble', False))
|
||||
else:
|
||||
await evt.respond(apt.info(args[0], 'noble', False))
|
||||
return True
|
||||
if len(args) == 2:
|
||||
if args[1] in ['jammy', 'noble', 'mantic']:
|
||||
if command_name == 'depends':
|
||||
await evt.respond(apt.info(args[0], args[1], False))
|
||||
else:
|
||||
await evt.respond(apt.depends(args[0], args[1], False))
|
||||
return True
|
||||
return False
|
||||
|
||||
# check for factoids IRC
|
||||
#if await self.lookup_factoid_irc(command_name, to_user, evt):
|
||||
# return True
|
||||
# check for factoids matrix
|
||||
if await self.lookup_factoid_matrix(command_name, to_user, evt):
|
||||
return True
|
||||
@classmethod
|
||||
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
||||
return Config
|
123
ubottu/config.yaml.deploy
Normal file
123
ubottu/config.yaml.deploy
Normal file
|
@ -0,0 +1,123 @@
|
|||
# The full URI to the database. SQLite and Postgres are fully supported.
|
||||
# Other DBMSes supported by SQLAlchemy may or may not work.
|
||||
# Format examples:
|
||||
# SQLite: sqlite:filename.db
|
||||
# Postgres: postgresql://username:password@hostname/dbname
|
||||
database: sqlite:maubot.db
|
||||
|
||||
# Separate database URL for the crypto database. "default" means use the same database as above.
|
||||
crypto_database: default
|
||||
|
||||
# Additional arguments for asyncpg.create_pool() or sqlite3.connect()
|
||||
# https://magicstack.github.io/asyncpg/current/api/index.html#asyncpg.pool.create_pool
|
||||
# https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
|
||||
# For sqlite, min_size is used as the connection thread pool size and max_size is ignored.
|
||||
database_opts:
|
||||
min_size: 1
|
||||
max_size: 10
|
||||
plugin_directories:
|
||||
# The directory where uploaded new plugins should be stored.
|
||||
upload: ./plugins
|
||||
# The directories from which plugins should be loaded.
|
||||
# Duplicate plugin IDs will be moved to the trash.
|
||||
load:
|
||||
- ./plugins
|
||||
trash: ./trash
|
||||
|
||||
# Configuration for storing plugin databases
|
||||
plugin_databases:
|
||||
# The directory where SQLite plugin databases should be stored.
|
||||
sqlite: ./plugins
|
||||
# The connection URL for plugin databases. If null, all plugins will get SQLite databases.
|
||||
# If set, plugins using the new asyncpg interface will get a Postgres connection instead.
|
||||
# Plugins using the legacy SQLAlchemy interface will always get a SQLite connection.
|
||||
#
|
||||
# To use the same connection pool as the default database, set to "default"
|
||||
# (the default database above must be postgres to do this).
|
||||
#
|
||||
# When enabled, maubot will create separate Postgres schemas in the database for each plugin.
|
||||
# To view schemas in psql, use `\dn`. To view enter and interact with a specific schema,
|
||||
# use `SET search_path = name` (where `name` is the name found with `\dn`) and then use normal
|
||||
# SQL queries/psql commands.
|
||||
postgres:
|
||||
# Maximum number of connections per plugin instance.
|
||||
postgres_max_conns_per_plugin: 3
|
||||
# Overrides for the default database_opts when using a non-"default" postgres connection string.
|
||||
postgres_opts: {}
|
||||
|
||||
server:
|
||||
# The IP and port to listen to.
|
||||
hostname: 127.0.0.1
|
||||
port: 28316
|
||||
# Public base URL where the server is visible.
|
||||
public_url: %%PUBLIC_URL%%
|
||||
# The base path for the UI.
|
||||
ui_base_path: /_matrix/maubot
|
||||
# The base path for plugin endpoints. The instance ID will be appended directly.
|
||||
plugin_base_path: /_matrix/maubot/plugin/
|
||||
# Override path from where to load UI resources.
|
||||
# Set to false to using pkg_resources to find the path.
|
||||
override_resource_path: false
|
||||
# The shared secret to sign API access tokens.
|
||||
# Set to "generate" to generate and save a new token at startup.
|
||||
unshared_secret: %%UNSHARED_SECRET%%
|
||||
|
||||
# Known homeservers. This is required for the `mbc auth` command and also allows
|
||||
# more convenient access from the management UI. This is not required to create
|
||||
# clients in the management UI, since you can also just type the homeserver URL
|
||||
# into the box there.
|
||||
homeservers:
|
||||
%%HOMESERVER_DOMAIN%%:
|
||||
# Client-server API URL
|
||||
url: %%HOMESERVER_URL%%
|
||||
# registration_shared_secret from synapse config
|
||||
secret: %%HOMESERVER_SECRET%%
|
||||
|
||||
# List of administrator users. Plaintext passwords will be bcrypted on startup. Set empty password
|
||||
# to prevent normal login. Root is a special user that can't have a password and will always exist.
|
||||
admins:
|
||||
admin: %%ADMIN_PW%%
|
||||
api_features:
|
||||
login: true
|
||||
plugin: true
|
||||
plugin_upload: true
|
||||
instance: true
|
||||
instance_database: true
|
||||
client: true
|
||||
client_proxy: true
|
||||
client_auth: true
|
||||
dev_open: true
|
||||
log: true
|
||||
|
||||
# Python logging configuration.
|
||||
#
|
||||
# See section 16.7.2 of the Python documentation for more info:
|
||||
# https://docs.python.org/3.6/library/logging.config.html#configuration-dictionary-schema
|
||||
logging:
|
||||
version: 1
|
||||
formatters:
|
||||
colored:
|
||||
(): maubot.lib.color_log.ColorFormatter
|
||||
format: '[%(asctime)s] [%(levelname)s@%(name)s] %(message)s'
|
||||
normal:
|
||||
format: '[%(asctime)s] [%(levelname)s@%(name)s] %(message)s'
|
||||
handlers:
|
||||
file:
|
||||
class: logging.handlers.RotatingFileHandler
|
||||
formatter: normal
|
||||
filename: ./maubot.log
|
||||
maxBytes: 10485760
|
||||
backupCount: 10
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
formatter: colored
|
||||
loggers:
|
||||
maubot:
|
||||
level: DEBUG
|
||||
mautrix:
|
||||
level: DEBUG
|
||||
aiohttp:
|
||||
level: INFO
|
||||
root:
|
||||
level: DEBUG
|
||||
handlers: [file, console]
|
25
ubottu/floodprotection.py
Normal file
25
ubottu/floodprotection.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
from collections import defaultdict
|
||||
from time import time
|
||||
|
||||
class FloodProtection:
|
||||
def __init__(self):
|
||||
self.user_commands = defaultdict(list) # Stores timestamps of commands for each user
|
||||
self.max_commands = 3
|
||||
self.time_window = 60 # 60 seconds
|
||||
|
||||
def flood_check(self, user_id):
|
||||
"""Check if a user can send a command based on flood protection limits."""
|
||||
current_time = time()
|
||||
if user_id not in self.user_commands:
|
||||
self.user_commands[user_id] = [current_time]
|
||||
return True # Allow the command if the user has no recorded commands
|
||||
|
||||
# Remove commands outside the time window
|
||||
self.user_commands[user_id] = [timestamp for timestamp in self.user_commands[user_id] if current_time - timestamp < self.time_window]
|
||||
|
||||
if len(self.user_commands[user_id]) < self.max_commands:
|
||||
self.user_commands[user_id].append(current_time)
|
||||
return True # Allow the command if under the limit
|
||||
|
||||
# Otherwise, do not allow the command
|
||||
return False
|
220
ubottu/packages.py
Normal file
220
ubottu/packages.py
Normal file
|
@ -0,0 +1,220 @@
|
|||
# -*- Encoding: utf-8 -*-
|
||||
###
|
||||
# Copyright (c) 2006-2007 Dennis Kaarsemaker
|
||||
# Copyright (c) 2008-2010 Terence Simpson
|
||||
# Copyright (c) 2017- Krytarik Raido
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of version 2 of the GNU General Public License as
|
||||
# published by the Free Software Foundation.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
###
|
||||
|
||||
import warnings
|
||||
warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
|
||||
import subprocess, os, apt, re
|
||||
#import supybot.utils as utils
|
||||
from email.parser import FeedParser
|
||||
|
||||
def component(arg):
|
||||
if '/' in arg:
|
||||
return arg[:arg.find('/')]
|
||||
return 'main'
|
||||
|
||||
def description(pkg):
|
||||
if 'Description-en' in pkg:
|
||||
return pkg['Description-en'].split('\n')[0]
|
||||
elif 'Description' in pkg:
|
||||
return pkg['Description'].split('\n')[0]
|
||||
return "Description not available"
|
||||
|
||||
class Apt:
|
||||
def __init__(self):
|
||||
self.aptdir = os.path.expanduser('~') + '/apt-data'
|
||||
self.distros = []
|
||||
#self.plugin = "plugin"
|
||||
#self.log = "apt.log"
|
||||
os.environ["LANG"] = "C.UTF-8"
|
||||
if self.aptdir:
|
||||
self.distros = sorted([x[:-5] for x in os.listdir(self.aptdir) if x.endswith('.list')])
|
||||
|
||||
def apt_cache(self, distro, cmd, pkg):
|
||||
return subprocess.check_output(['apt-cache',
|
||||
'-oAPT::Architecture=amd64',
|
||||
'-oAPT::Architectures::=i386',
|
||||
'-oAPT::Architectures::=amd64',
|
||||
'-oDir::State::Lists=%s/%s' % (self.aptdir, distro),
|
||||
'-oDir::State::Status=%s/%s.status' % (self.aptdir, distro),
|
||||
'-oDir::Etc::SourceList=%s/%s.list' % (self.aptdir, distro),
|
||||
'-oDir::Etc::SourceParts=""',
|
||||
'-oDir::Cache=%s/cache' % self.aptdir] +
|
||||
cmd + [pkg.lower()]).decode('utf-8')
|
||||
|
||||
def apt_file(self, distro, pkg):
|
||||
return subprocess.check_output(['apt-file',
|
||||
'-oAPT::Architecture=amd64',
|
||||
'-oAPT::Architectures::=i386',
|
||||
'-oAPT::Architectures::=amd64',
|
||||
'-oDir::State::Lists=%s/%s' % (self.aptdir, distro),
|
||||
'-oDir::State::Status=%s/%s.status' % (self.aptdir, distro),
|
||||
'-oDir::Etc::SourceList=%s/%s.list' % (self.aptdir, distro),
|
||||
'-oDir::Etc::SourceParts=""',
|
||||
'-oDir::Cache=%s/cache' % self.aptdir,
|
||||
'-l', '-i', 'search', pkg]).decode('utf-8')
|
||||
|
||||
def _parse(self, pkg):
|
||||
parser = FeedParser()
|
||||
parser.feed(pkg)
|
||||
return parser.close()
|
||||
|
||||
def find(self, pkg, distro, filelookup=True):
|
||||
if distro.split('-')[0] in ('oldstable', 'stable', 'unstable', 'testing', 'experimental'):
|
||||
pkgTracURL = "https://packages.debian.org"
|
||||
else:
|
||||
pkgTracURL = "https://packages.ubuntu.com"
|
||||
|
||||
try:
|
||||
data = self.apt_cache(distro, ['search', '-n'], pkg)
|
||||
except subprocess.CalledProcessError as e:
|
||||
data = e.output
|
||||
if not data:
|
||||
if filelookup:
|
||||
try:
|
||||
data = self.apt_file(distro, pkg).split()
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == 1:
|
||||
return 'Package/file %s does not exist in %s' % (pkg, distro)
|
||||
#self.log.error("PackageInfo/packages: Please update the cache for %s" % distro)
|
||||
return "Cache out of date, please contact the administrator"
|
||||
except OSError:
|
||||
#self.log.error("PackageInfo/packages: apt-file is not installed")
|
||||
return "Please use %s/ to search for files" % pkgTracURL
|
||||
if data:
|
||||
if len(data) > 10:
|
||||
return "File %s found in %s and %d others <%s/search?searchon=contents&keywords=%s&mode=exactfilename&suite=%s&arch=any>" % (pkg, ', '.join(data[:10]), len(data)-10, pkgTracURL, utils.web.urlquote(pkg), distro)
|
||||
return "File %s found in %s" % (pkg, ', '.join(data))
|
||||
return 'Package/file %s does not exist in %s' % (pkg, distro)
|
||||
return "No packages matching '%s' could be found" % pkg
|
||||
pkgs = [x.split()[0] for x in data.split('\n') if x]
|
||||
if len(pkgs) > 10:
|
||||
return "Found: %s and %d others <%s/search?keywords=%s&searchon=names&suite=%s§ion=all>" % (', '.join(pkgs[:10]), len(pkgs)-10, pkgTracURL, utils.web.urlquote(pkg), distro)
|
||||
else:
|
||||
return "Found: %s" % ', '.join(pkgs)
|
||||
|
||||
def raw_info(self, pkg, distro, isSource, archlookup=True):
|
||||
try:
|
||||
data = self.apt_cache(distro, ['show'] if not isSource else ['showsrc', '--only-source'], pkg)
|
||||
except subprocess.CalledProcessError:
|
||||
data = ''
|
||||
if not data:
|
||||
return 'Package %s does not exist in %s' % (pkg, distro)
|
||||
|
||||
maxp = {'Version': '0~'}
|
||||
packages = list(map(self._parse, [x for x in data.split('\n\n') if x]))
|
||||
for p in packages:
|
||||
if apt.apt_pkg.version_compare(maxp['Version'], p['Version']) <= 0:
|
||||
maxp = p
|
||||
|
||||
if isSource:
|
||||
bdeps = maxp.get('Build-Depends')
|
||||
vcs = maxp.get('Vcs-Browser')
|
||||
for (key, value) in list(maxp.items()):
|
||||
if key.startswith('Build-Depends-'):
|
||||
bdeps = "%s, %s" % (bdeps, value) if bdeps else value
|
||||
elif key.startswith('Vcs-') and not vcs:
|
||||
vcs = "%s (%s)" % (value, key[4:])
|
||||
maxp['Builddeps'] = bdeps
|
||||
maxp['Vcs'] = vcs
|
||||
return maxp
|
||||
|
||||
if not maxp.get('Source'):
|
||||
maxp['Sourcepkg'] = maxp['Package']
|
||||
else:
|
||||
maxp['Sourcepkg'] = maxp['Source'].split()[0]
|
||||
|
||||
if not archlookup:
|
||||
return maxp
|
||||
|
||||
try:
|
||||
data2 = self.apt_cache(distro, ['showsrc', '--only-source'], maxp['Sourcepkg'])
|
||||
except subprocess.CalledProcessError:
|
||||
data2 = ''
|
||||
if not data2:
|
||||
return maxp
|
||||
|
||||
maxp2 = {'Version': '0~'}
|
||||
packages2 = list(map(self._parse, [x for x in data2.split('\n\n') if x]))
|
||||
for p in packages2:
|
||||
if apt.apt_pkg.version_compare(maxp2['Version'], p['Version']) <= 0:
|
||||
maxp2 = p
|
||||
|
||||
archs = re.match(r'.*^ %s \S+ \S+ \S+ arch=(?P<arch>\S+)$' % re.escape(pkg), maxp2['Package-List'],
|
||||
re.I | re.M | re.DOTALL)
|
||||
if archs:
|
||||
archs = archs.group('arch').split(',')
|
||||
if not ('any' in archs or 'all' in archs):
|
||||
maxp['Architectures'] = ', '.join(archs)
|
||||
|
||||
return maxp
|
||||
|
||||
def info(self, pkg, distro, isSource):
|
||||
maxp = self.raw_info(pkg, distro, isSource)
|
||||
if isinstance(maxp, str):
|
||||
return maxp
|
||||
if isSource:
|
||||
return "%s (%s, %s): Packages %s. Maintained by %s%s" % (
|
||||
maxp['Package'], maxp['Version'], distro, maxp['Binary'].replace('\n',''),
|
||||
re.sub(r' <\S+>$', '', maxp.get('Original-Maintainer', maxp['Maintainer'])),
|
||||
" @ %s" % maxp['Vcs'] if maxp['Vcs'] else "")
|
||||
return "{} ({}, {}): {}. In component {}, is {}. Built by {}. Size {:,} kB / {:,} kB{}".format(
|
||||
maxp['Package'], maxp['Version'], distro, description(maxp), component(maxp['Section']),
|
||||
maxp['Priority'], maxp['Sourcepkg'], int((int(maxp['Size'])/1024)+1), int(maxp['Installed-Size']),
|
||||
". (Only available for %s.)" % maxp['Architectures'] if maxp.get('Architectures') else "")
|
||||
|
||||
def depends(self, pkg, distro, isSource):
|
||||
maxp = self.raw_info(pkg, distro, isSource, archlookup=False)
|
||||
if isinstance(maxp, str):
|
||||
return maxp
|
||||
if isSource:
|
||||
return "%s (%s, %s): Build depends on %s" % (
|
||||
maxp['Package'], maxp['Version'], distro, maxp.get('Builddeps', "nothing").replace('\n',''))
|
||||
return "%s (%s, %s): Depends on %s%s" % (
|
||||
maxp['Package'], maxp['Version'], distro, maxp.get('Depends', "nothing").replace('\n',''),
|
||||
". Recommends %s" % maxp['Recommends'].replace('\n','') if maxp.get('Recommends') else "")
|
||||
|
||||
# Simple test
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
argv = sys.argv
|
||||
argc = len(argv)
|
||||
if argc == 1:
|
||||
print("Need at least one arg")
|
||||
sys.exit(1)
|
||||
if argc > 3:
|
||||
print("Only takes 2 args")
|
||||
sys.exit(1)
|
||||
class FakePlugin:
|
||||
class FakeLog:
|
||||
def error(*args, **kwargs):
|
||||
pass
|
||||
def __init__(self):
|
||||
self.log = self.FakeLog()
|
||||
def registryValue(self, *args, **kwargs):
|
||||
return os.path.expanduser('~') + '/apt-data'
|
||||
|
||||
try:
|
||||
(command, lookup) = argv[1].split(None, 1)
|
||||
except:
|
||||
print("Need something to look up")
|
||||
sys.exit(1)
|
||||
dist = "noble"
|
||||
if argc == 3:
|
||||
dist = argv[2]
|
||||
plugin = FakePlugin()
|
||||
aptlookup = Apt(plugin)
|
||||
print(getattr(aptlookup, command)(lookup, dist))
|
Loading…
Add table
Reference in a new issue