import requests import json import time import os import treq from twisted.internet.defer import inlineCallbacks, returnValue from synapse.module_api import ModuleApi, errors from synapse.types import UserID import logging logger = logging.getLogger(__name__) class InviteCheckerConfig: def __init__(self, config): self.use_allowlist = config.get("use_allowlist", True) self.use_blocklist = config.get("use_blocklist", True) self.blocklist_allowlist_url = config.get("blocklist_allowlist_url", None) self.blocklist_rooms = config.get("blocklist_rooms", []) # Blocklist for room names self.policy_room_ids = config.get("policy_room_ids", []) # List of policy room IDs self.public_baseurl = config.get("public_baseurl") # Fetch public_baseurl from the config self.access_token = config.get("access_token") # Fetch access token from config self.announcement_room_id = config.get("announcement_room_id") # Fetch announcement room ID self.enable_announcement = config.get("enable_announcement", False) # New option to enable/disable announcements @staticmethod def parse_config(config): return InviteCheckerConfig(config) class InviteChecker: def __init__(self, config, api: ModuleApi): self.api = api self.config = InviteCheckerConfig.parse_config(config) self.use_allowlist = self.config.use_allowlist self.use_blocklist = self.config.use_blocklist self.cache_expiry_time = 60 self.cache_timestamp = 0 self.blocklist = set() self.allowlist = set() self.allow_all_invites_on_error = False self.room_id_cache = {} self.blocklist_room_ids = set() self.api.register_spam_checker_callbacks(user_may_invite=self.user_may_invite) logger.info("InviteChecker initialized") def fetch_json(self, url): logger.info(f"Fetching JSON data from: {url}") # Read proxy configuration from the environment variable 'https_proxy', if set https_proxy = os.getenv('https_proxy', None) proxies = {} if https_proxy: # Mask credentials in the proxy URL if '@' in https_proxy: masked_proxy = '****:****@' + https_proxy.split('@')[1] else: masked_proxy = https_proxy proxies = { "https": https_proxy } logger.info(f"Using HTTPS proxy: {masked_proxy}") else: logger.info("No proxy configured, making direct request") try: # Make the request with the proxy if set, otherwise direct response = requests.get(url, proxies=proxies, timeout=10) # Log the response status code logger.info(f"Received response with status code: {response.status_code}") # Check for a successful response if response.status_code == 200: data = response.json() # Parse the JSON data logger.debug(f"Received JSON data: {data}") return data else: logger.error(f"Non-200 response code: {response.status_code}, Response body: {response.text}") return None except Exception as e: logger.error(f"Error while fetching JSON: {str(e)} - {type(e).__name__}") return None @inlineCallbacks def fetch_policy_room_banlist(self): """Fetches the ban lists from multiple policy rooms using Synapse API.""" if not self.config.policy_room_ids: return set() # No policy rooms configured, return an empty set logger.info(f"Fetching ban lists from policy rooms: {self.config.policy_room_ids}") banned_entities = set() banned_entities_by_room = set() for room_id in self.config.policy_room_ids: logger.info(f"Fetching ban list from policy room: {room_id}") try: # Fetch all state events from the policy room state_events = yield self.api.get_room_state(room_id) if isinstance(state_events, dict): logger.info(f"Received state events in dict format from room {room_id} with {len(state_events)} entries.") # Loop over the dictionary of state events for key, event in state_events.items(): event_type = event.get("type", "") content = event.get("content", {}) # Check for ban events of type 'm.policy.rule.user' and 'm.policy.rule.server' if event_type in ["m.policy.rule.user", "m.policy.rule.server"]: entity = content.get('entity', '') if entity: banned_entities_by_room.add(entity) logger.info(f"Fetched {len(banned_entities_by_room)} banned entities from policy room {room_id}.") banned_entities = banned_entities_by_room.union(banned_entities) banned_entities_by_room = set() else: logger.error(f"Unexpected response format from room {room_id}: {type(state_events)}") except Exception as e: logger.error(f"Failed to fetch policy room ban list from room {room_id}. Error: {str(e)}") logger.info(f"Total banned entities from all policy rooms: {len(banned_entities)}") return banned_entities @inlineCallbacks def update_blocklist_allowlist(self): """Fetch and update the blocklist, allowlist, and blocklisted room IDs.""" logger.info("Updating blocklist, allowlist, and room blocklist") json_data = self.fetch_json(self.config.blocklist_allowlist_url) if json_data: logger.debug(f"Fetched JSON data: {json_data}") # Log the full JSON data for verification self.allow_all_invites_on_error = False self.use_allowlist = json_data.get('use_allowlist', True) self.use_blocklist = json_data.get('use_blocklist', True) self.blocklist = set(json_data.get('blocklist', [])) self.allowlist = set(json_data.get('allowlist', [])) logger.debug(f"Blocklist: {self.blocklist}") logger.debug(f"Allowlist: {self.allowlist}") # Fetch and cache the policy room ban lists policy_banlist = yield self.fetch_policy_room_banlist() logger.debug(f"Fetched policy banlist: {policy_banlist}") self.blocklist.update(policy_banlist) # Merge policy bans into blocklist self.blocklist_room_ids = set() for room_entry in json_data.get('blocklist_rooms', []): if room_entry.startswith('!'): logger.info(f"Blocklisting room ID directly: {room_entry}") self.blocklist_room_ids.add(room_entry) else: room_id = yield self.resolve_room_id(room_entry) if room_id: logger.info(f"Blocklisting room: {room_entry} -> {room_id}") self.blocklist_room_ids.add(room_id) else: logger.error(f"Failed to blocklist room: {room_entry}") # Update the cache timestamp self.cache_timestamp = time.time() logger.info(f"Updated blocklist with {len(self.blocklist)} entries and {len(self.blocklist_room_ids)} room IDs.") else: logger.error("Failed to update allowlist/blocklist due to missing JSON data.") self.allow_all_invites_on_error = True @inlineCallbacks def send_message_to_room(self, message_content): """Send a message to the announcement room if announcements are enabled.""" if not self.config.enable_announcement: logger.info("Announcements are disabled, skipping message.") return content = { "msgtype": "m.text", "body": message_content } # Replace '!' with '%21' for room ID encoding encoded_room_id = self.config.announcement_room_id.replace('!', '%21') # Use the public_baseurl from the config public_baseurl = self.config.public_baseurl access_token = self.config.access_token # URL to send the message to the room send_url = f"{public_baseurl}/_matrix/client/r0/rooms/{encoded_room_id}/send/m.room.message/{int(time.time())}" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } try: send_response = yield treq.put(send_url, headers=headers, json=content) response_body = yield send_response.text() if send_response.code == 200: logger.info(f"Message sent to room {self.config.announcement_room_id}: {message_content}") else: logger.error(f"Failed to send message to room {self.config.announcement_room_id}. Status code: {send_response.code}, Response: {response_body}") except Exception as e: logger.error(f"Error sending message to room {self.config.announcement_room_id}: {e}") @inlineCallbacks def user_may_invite(self, inviter: str, invitee: str, room_id: str): logger.debug(f"Checking invite from {inviter} to {invitee} for room {room_id}") if self.allow_all_invites_on_error: logger.info(f"Allowing invite from {inviter} to {invitee} due to previous JSON fetch failure.") returnValue("NOT_SPAM") try: blocklist, allowlist, blocklist_room_ids = yield self.get_blocklist_allowlist() inviter_domain = UserID.from_string(inviter).domain logger.debug(f"Blocklist: {blocklist}, Allowlist: {allowlist}, Blocklist Room IDs: {blocklist_room_ids}") if self.use_allowlist and (inviter_domain in allowlist or inviter in allowlist): returnValue("NOT_SPAM") if room_id in blocklist_room_ids: logger.info(f"Invite blocked: room {room_id} is blocklisted") yield self.send_message_to_room(f"Invite from {inviter} to {invitee} blocked in room {room_id}. Reason: Blocklisted room.") returnValue(errors.Codes.FORBIDDEN) if self.use_blocklist and (inviter_domain in blocklist or inviter in blocklist): logger.info(f"Invite blocked: {inviter} is blocklisted") yield self.send_message_to_room(f"Invite from {inviter} to {invitee} blocked. Reason: Blocklisted.") returnValue(errors.Codes.FORBIDDEN) logger.info(f"Invite allowed by {inviter} for {invitee} in room {room_id}") returnValue("NOT_SPAM") except Exception as e: logger.error(f"Error during invite check: {str(e)} - {type(e).__name__}") returnValue("NOT_SPAM") # Fallback to allow the invite if an error occurs @inlineCallbacks def get_blocklist_allowlist(self): current_time = time.time() # Update cache if expired if current_time - self.cache_timestamp > self.cache_expiry_time: self.allow_all_invites_on_error = False yield self.update_blocklist_allowlist() if self.allow_all_invites_on_error: logger.info("Skipping allowlist/blocklist checks because of previous JSON fetch failure.") self.allow_all_invites_on_error = False returnValue((set(), set(), set())) # Return cached blocklist, allowlist, and blocklist room IDs returnValue((self.blocklist, self.allowlist, self.blocklist_room_ids))