import treq from twisted.internet.defer import inlineCallbacks, returnValue from cachetools import TTLCache from synapse.module_api import ModuleApi, errors from synapse.types import UserID import logging logger = logging.getLogger(__name__) class InviteCheckerConfig: def __init__(self, config): # Initialize whitelist and blacklist toggle settings self.use_whitelist = config.get("use_whitelist", True) self.use_blacklist = config.get("use_blacklist", True) self.blacklist_whitelist_url = config.get("blacklist_whitelist_url", None) @staticmethod def parse_config(config): return InviteCheckerConfig(config) class InviteChecker: def __init__(self, config, api: ModuleApi): self.api = api self.config = InviteCheckerConfig.parse_config(config) # Initialize toggles for enabling or disabling whitelist and blacklist self.use_whitelist = self.config.use_whitelist self.use_blacklist = self.config.use_blacklist # Cache for whitelist/blacklist (TTL = 10 minutes) self.cache = TTLCache(maxsize=1, ttl=600) self.blacklist = set() self.whitelist = set() self.allow_all_invites_on_error = False # Flag to allow all invites if the JSON fetch fails # Register spam checker callback self.api.register_spam_checker_callbacks(user_may_invite=self.user_may_invite) logger.info("InviteChecker initialized") @inlineCallbacks def fetch_json(self, url): """Fetch JSON data from the URL asynchronously using Twisted treq.""" logger.debug(f"Fetching JSON data from: {url}") try: response = yield treq.get(url) if response.code == 200: data = yield response.json() logger.debug(f"Received data: {data}") returnValue(data) else: logger.error(f"Failed to fetch JSON data. Status code: {response.code}") returnValue(None) except Exception as e: logger.error(f"Error while fetching JSON: {str(e)}") returnValue(None) @inlineCallbacks def update_blacklist_whitelist(self): """Fetch and update the blacklist and whitelist from the URLs.""" logger.info("Updating blacklist and whitelist") json_data = yield self.fetch_json(self.config.blacklist_whitelist_url) if json_data: self.allow_all_invites_on_error = False # Reset the error flag if we successfully fetch the JSON self.use_whitelist = json_data.get('use_whitelist', True) self.use_blacklist = json_data.get('use_blacklist', True) self.blacklist = set(json_data.get('blacklist', [])) self.whitelist = set(json_data.get('whitelist', [])) # Cache the data self.cache['blacklist_whitelist'] = (self.blacklist, self.whitelist) logger.info(f"Updated whitelist: {self.whitelist}") logger.info(f"Updated blacklist: {self.blacklist}") else: # Set the error flag to allow all invites if fetching the JSON fails logger.error("Failed to update whitelist/blacklist due to missing JSON data. Allowing all invites.") self.allow_all_invites_on_error = True @inlineCallbacks def get_blacklist_whitelist(self): """Return cached blacklist/whitelist or fetch new data if cache is expired.""" if self.allow_all_invites_on_error: logger.info("Skipping whitelist/blacklist checks because of previous JSON fetch failure.") returnValue((set(), set())) # Return empty sets to indicate no checks should be applied try: returnValue(self.cache['blacklist_whitelist']) except KeyError: yield self.update_blacklist_whitelist() returnValue(self.cache['blacklist_whitelist']) @inlineCallbacks def user_may_invite(self, inviter: str, invitee: str, room_id: str): """Check if a user may invite another based on whitelist/blacklist rules.""" logger.debug(f"Checking invite from {inviter} to {invitee}") # If JSON fetching failed, allow all invites if self.allow_all_invites_on_error: logger.info(f"Allowing invite from {inviter} to {invitee} due to previous JSON fetch failure.") returnValue("NOT_SPAM") # Allow all invites # Proceed with whitelist/blacklist checks if JSON fetching was successful blacklist, whitelist = yield self.get_blacklist_whitelist() inviter_domain = UserID.from_string(inviter).domain # Whitelist check (if enabled) if self.use_whitelist: logger.debug(f"Whitelist enabled. Checking domain: {inviter_domain}") if inviter_domain in whitelist: logger.info(f"Invite allowed: {inviter_domain} is in the whitelist") returnValue("NOT_SPAM") # Allow if the domain is in the whitelist # Blacklist check (if enabled) if self.use_blacklist: logger.debug(f"Blacklist enabled. Checking domain: {inviter_domain}") if inviter_domain in blacklist: logger.info(f"Invite blocked: {inviter_domain} is in the blacklist") returnValue(errors.Codes.FORBIDDEN) # Deny if the domain is in the blacklist # If whitelist is enabled and domain is not in the whitelist, deny the invite if self.use_whitelist and inviter_domain not in whitelist: logger.info(f"Invite blocked: {inviter_domain} is not in the whitelist") returnValue(errors.Codes.FORBIDDEN) # Allow by default logger.info(f"Invite allowed by default: {inviter_domain}") returnValue("NOT_SPAM")