support blocking rooms to be invited to

This commit is contained in:
Nils Büchner 2024-09-14 21:44:46 +02:00
parent d96dd4ff6c
commit c0d5dfcb5b
3 changed files with 69 additions and 39 deletions

View file

@ -22,3 +22,28 @@ modules:
use_allowlist: true
# Whether to use the blocklist to block certain homeservers (default: true)
use_blocklist: true
blocklist_rooms:
- "#test:matrix.org"
- "!dkgsemSiSMrGfxEwCb:ubuntu.com
Example for the contents of the URL with the json data:
```json
{
"use_allowlist": true,
"use_blocklist": true,
"allowlist": [
"trusted-homeserver.com",
"another-trusted-server.org"
],
"blocklist": [
"malicious-homeserver.com",
"blocked-server.org"
],
"blocklist_rooms": [
"#test:matrix.org", // Room alias to be resolved to room_id
"#private-room:example.org", // Another room alias
"!abc123:matrix.org" // Direct room ID
]
}
```

View file

@ -21,7 +21,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"matrix-synapase"
"matrix-synapse"
]
version = "0.2.0"

View file

@ -10,10 +10,10 @@ logger = logging.getLogger(__name__)
class InviteCheckerConfig:
def __init__(self, config):
# Initialize allowlist and blocklist toggle settings
self.use_allowlist = config.get("use_allowlist", True)
self.use_blocklist = config.get("use_blocklist", True)
self.blocklist_allowlist_url = config.get("blocklist_allowlist_url", None)
self.blocklist_rooms = config.get("blocklist_rooms", []) # Blocklist for room names
@staticmethod
def parse_config(config):
@ -24,33 +24,31 @@ class InviteChecker:
self.api = api
self.config = InviteCheckerConfig.parse_config(config)
# Initialize toggles for enabling or disabling allowlist and blocklist
self.use_allowlist = self.config.use_allowlist
self.use_blocklist = self.config.use_blocklist
# Cache for allowlist/blocklist
self.cache_expiry_time = 60 # Cache expiry in seconds
self.cache_timestamp = 0 # Timestamp of when the cache was last updated
self.cache_expiry_time = 60
self.cache_timestamp = 0
self.blocklist = set()
self.allowlist = set()
self.allow_all_invites_on_error = False # Flag to allow all invites if the JSON fetch fails
self.allow_all_invites_on_error = False
self.room_id_cache = {}
self.blocklist_room_ids = set()
# Register spam checker callback
self.api.register_spam_checker_callbacks(user_may_invite=self.user_may_invite)
logger.info("InviteChecker initialized")
@inlineCallbacks
def fetch_json(self, url):
"""Fetch JSON data from a file download link asynchronously."""
logger.info(f"Fetching JSON data from: {url}")
try:
response = yield treq.get(url)
if response.code == 200:
try:
# Try parsing the content as JSON, even if it's a file download
content = yield response.content()
data = json.loads(content.decode('utf-8'))
logger.debug(f"Received data: {data}")
logger.debug(f"Received JSON data: {data}")
returnValue(data)
except Exception as json_error:
logger.error(f"Failed to decode JSON data: {json_error}")
@ -64,75 +62,82 @@ class InviteChecker:
@inlineCallbacks
def update_blocklist_allowlist(self):
"""Fetch and update the blocklist and allowlist from the URLs."""
logger.info("Updating blocklist and allowlist")
"""Fetch and update the blocklist, allowlist, and blocklisted room IDs from the URLs."""
logger.info("Updating blocklist, allowlist, and room blocklist")
json_data = yield self.fetch_json(self.config.blocklist_allowlist_url)
if json_data:
self.allow_all_invites_on_error = False # Reset the error flag if we successfully fetch the JSON
self.allow_all_invites_on_error = False
self.use_allowlist = json_data.get('use_allowlist', True)
self.use_blocklist = json_data.get('use_blocklist', True)
self.blocklist = set(json_data.get('blocklist', []))
self.allowlist = set(json_data.get('allowlist', []))
self.blocklist_room_ids_json = set(json_data.get('blocklist_rooms', []))
self.blocklist_room_ids = set()
for room_entry in self.blocklist_room_ids_json:
if room_entry.startswith('!'):
# If it's a room ID, add it directly to the blocklist
logger.info(f"Blocklisting room ID directly: {room_entry}")
self.blocklist_room_ids.add(room_entry)
else:
try:
# If it's a room alias, resolve it to a room ID
room_id = yield self.resolve_room_id(room_entry)
if room_id:
logger.info(f"Blocklisting room: {room_entry} -> {room_id}")
self.blocklist_room_ids.add(room_id)
else:
logger.error(f"Failed to blocklist room: {room_entry}")
except Exception as e:
logger.error(f"Failed to blocklist room: {room_entry}. Error: {str(e)}")
# Update cache timestamp
self.cache_timestamp = time.time()
logger.info(f"Updated allowlist: {self.allowlist}")
logger.info(f"Updated blocklist: {self.blocklist}")
else:
# Set the error flag to allow all invites if fetching the JSON fails
logger.error("Failed to update allowlist/blocklist due to missing JSON data. Allowing all invites.")
self.allow_all_invites_on_error = True
@inlineCallbacks
def get_blocklist_allowlist(self):
"""Return cached blocklist/allowlist or fetch new data if cache is expired."""
current_time = time.time()
# Check if cache is expired
if current_time - self.cache_timestamp > self.cache_expiry_time:
yield self.update_blocklist_allowlist()
if self.allow_all_invites_on_error:
logger.info("Skipping allowlist/blocklist checks because of previous JSON fetch failure.")
returnValue((set(), set())) # Return empty sets to indicate no checks should be applied
returnValue((set(), set(), set()))
returnValue((self.blocklist, self.allowlist))
returnValue((self.blocklist, self.allowlist, self.blocklist_room_ids))
@inlineCallbacks
def user_may_invite(self, inviter: str, invitee: str, room_id: str):
"""Check if a user may invite another based on allowlist/blocklist rules."""
logger.info(f"Checking invite from {inviter} to {invitee}")
logger.info(f"Checking invite from {inviter} to {invitee} for room {room_id}")
# If JSON fetching failed, allow all invites
if self.allow_all_invites_on_error:
logger.info(f"Allowing invite from {inviter} to {invitee} due to previous JSON fetch failure.")
returnValue("NOT_SPAM") # Allow all invites
returnValue("NOT_SPAM")
# Proceed with allowlist/blocklist checks if JSON fetching was successful
blocklist, allowlist = yield self.get_blocklist_allowlist()
blocklist, allowlist, blocklist_room_ids = yield self.get_blocklist_allowlist()
inviter_domain = UserID.from_string(inviter).domain
# allowlist check (if enabled)
logger.info(f"Blocklist: {blocklist}, Allowlist: {allowlist}, Blocklist Room IDs: {blocklist_room_ids}")
if room_id in blocklist_room_ids:
logger.info(f"Invite blocked: room {room_id} is blocklisted")
returnValue(errors.Codes.FORBIDDEN)
if self.use_allowlist:
logger.info(f"Allowlist enabled. Checking domain: {inviter_domain}")
if inviter_domain in allowlist:
logger.info(f"Invite allowed: {inviter_domain} is in the allowlist")
returnValue("NOT_SPAM") # Allow if the domain is in the allowlist
returnValue("NOT_SPAM")
# blocklist check (if enabled)
if self.use_blocklist:
logger.info(f"Blocklist enabled. Checking domain: {inviter_domain}")
if inviter_domain in blocklist:
logger.info(f"Invite blocked: {inviter_domain} is in the blocklist")
returnValue(errors.Codes.FORBIDDEN) # Deny if the domain is in the blocklist
returnValue(errors.Codes.FORBIDDEN)
# If allowlist is enabled and domain is not in the allowlist, deny the invite
if self.use_allowlist and inviter_domain not in allowlist:
logger.info(f"Invite blocked: {inviter_domain} is not in the allowlist")
returnValue(errors.Codes.FORBIDDEN)
# Allow by default
logger.info(f"Invite allowed by default: {inviter_domain}")
returnValue("NOT_SPAM")