import treq import json import time from twisted.internet.defer import inlineCallbacks, returnValue from synapse.module_api import ModuleApi, errors from synapse.types import UserID import logging logger = logging.getLogger(__name__) class InviteCheckerConfig: def __init__(self, config): # Initialize allowlist and blocklist toggle settings self.use_allowlist = config.get("use_allowlist", True) self.use_blocklist = config.get("use_blocklist", True) self.blocklist_allowlist_url = config.get("blocklist_allowlist_url", None) @staticmethod def parse_config(config): return InviteCheckerConfig(config) class InviteChecker: def __init__(self, config, api: ModuleApi): self.api = api self.config = InviteCheckerConfig.parse_config(config) # Initialize toggles for enabling or disabling allowlist and blocklist self.use_allowlist = self.config.use_allowlist self.use_blocklist = self.config.use_blocklist # Cache for allowlist/blocklist self.cache_expiry_time = 60 # Cache expiry in seconds self.cache_timestamp = 0 # Timestamp of when the cache was last updated self.blocklist = set() self.allowlist = set() self.allow_all_invites_on_error = False # Flag to allow all invites if the JSON fetch fails # Register spam checker callback self.api.register_spam_checker_callbacks(user_may_invite=self.user_may_invite) logger.info("InviteChecker initialized") @inlineCallbacks def fetch_json(self, url): """Fetch JSON data from a file download link asynchronously.""" logger.info(f"Fetching JSON data from: {url}") try: response = yield treq.get(url) if response.code == 200: try: # Try parsing the content as JSON, even if it's a file download content = yield response.content() data = json.loads(content.decode('utf-8')) logger.debug(f"Received data: {data}") returnValue(data) except Exception as json_error: logger.error(f"Failed to decode JSON data: {json_error}") returnValue(None) else: logger.error(f"Failed to fetch JSON data. Status code: {response.code}") returnValue(None) except Exception as e: logger.error(f"Error while fetching JSON: {str(e)}") returnValue(None) @inlineCallbacks def update_blocklist_allowlist(self): """Fetch and update the blocklist and allowlist from the URLs.""" logger.info("Updating blocklist and allowlist") json_data = yield self.fetch_json(self.config.blocklist_allowlist_url) if json_data: self.allow_all_invites_on_error = False # Reset the error flag if we successfully fetch the JSON self.use_allowlist = json_data.get('use_allowlist', True) self.use_blocklist = json_data.get('use_blocklist', True) self.blocklist = set(json_data.get('blocklist', [])) self.allowlist = set(json_data.get('allowlist', [])) # Update cache timestamp self.cache_timestamp = time.time() logger.info(f"Updated allowlist: {self.allowlist}") logger.info(f"Updated blocklist: {self.blocklist}") else: # Set the error flag to allow all invites if fetching the JSON fails logger.error("Failed to update allowlist/blocklist due to missing JSON data. Allowing all invites.") self.allow_all_invites_on_error = True @inlineCallbacks def get_blocklist_allowlist(self): """Return cached blocklist/allowlist or fetch new data if cache is expired.""" current_time = time.time() # Check if cache is expired if current_time - self.cache_timestamp > self.cache_expiry_time: yield self.update_blocklist_allowlist() if self.allow_all_invites_on_error: logger.info("Skipping allowlist/blocklist checks because of previous JSON fetch failure.") returnValue((set(), set())) # Return empty sets to indicate no checks should be applied returnValue((self.blocklist, self.allowlist)) @inlineCallbacks def user_may_invite(self, inviter: str, invitee: str, room_id: str): """Check if a user may invite another based on allowlist/blocklist rules.""" logger.info(f"Checking invite from {inviter} to {invitee}") # If JSON fetching failed, allow all invites if self.allow_all_invites_on_error: logger.info(f"Allowing invite from {inviter} to {invitee} due to previous JSON fetch failure.") returnValue("NOT_SPAM") # Allow all invites # Proceed with allowlist/blocklist checks if JSON fetching was successful blocklist, allowlist = yield self.get_blocklist_allowlist() inviter_domain = UserID.from_string(inviter).domain # allowlist check (if enabled) if self.use_allowlist: logger.info(f"Allowlist enabled. Checking domain: {inviter_domain}") if inviter_domain in allowlist: logger.info(f"Invite allowed: {inviter_domain} is in the allowlist") returnValue("NOT_SPAM") # Allow if the domain is in the allowlist # blocklist check (if enabled) if self.use_blocklist: logger.info(f"Blocklist enabled. Checking domain: {inviter_domain}") if inviter_domain in blocklist: logger.info(f"Invite blocked: {inviter_domain} is in the blocklist") returnValue(errors.Codes.FORBIDDEN) # Deny if the domain is in the blocklist # If allowlist is enabled and domain is not in the allowlist, deny the invite if self.use_allowlist and inviter_domain not in allowlist: logger.info(f"Invite blocked: {inviter_domain} is not in the allowlist") returnValue(errors.Codes.FORBIDDEN) # Allow by default logger.info(f"Invite allowed by default: {inviter_domain}") returnValue("NOT_SPAM")