Add support for MSC4155 Invite filtering (#18288)

This implements
https://github.com/matrix-org/matrix-spec-proposals/pull/4155, which
adds support for a new account data type that blocks an invite based on
some conditions in the event contents.

---------

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
Will Hunt
2025-06-05 11:49:09 +01:00
committed by GitHub
parent 586b82e580
commit 8010377a88
17 changed files with 542 additions and 5 deletions

View File

@@ -0,0 +1 @@
Add support for [MSC4155](https://github.com/matrix-org/matrix-spec-proposals/pull/4155) Invite Filtering.

View File

@@ -127,6 +127,8 @@ experimental_features:
msc3983_appservice_otk_claims: true
# Proxy key queries to exclusive ASes
msc3984_appservice_key_query: true
# Invite filtering
msc4155_enabled: true
server_notices:
system_mxid_localpart: _server

View File

@@ -229,6 +229,7 @@ test_packages=(
./tests/msc3902
./tests/msc3967
./tests/msc4140
./tests/msc4155
)
# Enable dirty runs, so tests will reuse the same container where possible.

View File

@@ -286,6 +286,10 @@ class AccountDataTypes:
IGNORED_USER_LIST: Final = "m.ignored_user_list"
TAG: Final = "m.tag"
PUSH_RULES: Final = "m.push_rules"
# MSC4155: Invite filtering
MSC4155_INVITE_PERMISSION_CONFIG: Final = (
"org.matrix.msc4155.invite_permission_config"
)
class HistoryVisibility:

View File

@@ -137,6 +137,9 @@ class Codes(str, Enum):
PROFILE_TOO_LARGE = "M_PROFILE_TOO_LARGE"
KEY_TOO_LARGE = "M_KEY_TOO_LARGE"
# Part of MSC4155
INVITE_BLOCKED = "ORG.MATRIX.MSC4155.M_INVITE_BLOCKED"
class CodeMessageException(RuntimeError):
"""An exception with integer code, a message string attributes and optional headers.

View File

@@ -566,3 +566,6 @@ class ExperimentalConfig(Config):
"msc4263_limit_key_queries_to_users_who_share_rooms",
False,
)
# MSC4155: Invite filtering
self.msc4155_enabled: bool = experimental.get("msc4155_enabled", False)

View File

@@ -78,6 +78,7 @@ from synapse.replication.http.federation import (
ReplicationStoreRoomOnOutlierMembershipRestServlet,
)
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.invite_rule import InviteRule
from synapse.types import JsonDict, StrCollection, get_domain_from_id
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
@@ -1089,6 +1090,20 @@ class FederationHandler:
if event.state_key == self._server_notices_mxid:
raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
# check the invitee's configuration and apply rules
invite_config = await self.store.get_invite_config_for_user(event.state_key)
rule = invite_config.get_invite_rule(event.sender)
if rule == InviteRule.BLOCK:
logger.info(
f"Automatically rejecting invite from {event.sender} due to the invite filtering rules of {event.state_key}"
)
raise SynapseError(
403,
"You are not permitted to invite this user.",
errcode=Codes.INVITE_BLOCKED,
)
# InviteRule.IGNORE is handled at the sync layer
# We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
# We don't rate limit based on room ID, as that should be done by

View File

@@ -53,6 +53,7 @@ from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.push import ReplicationCopyPusherRestServlet
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.invite_rule import InviteRule
from synapse.types import (
JsonDict,
Requester,
@@ -915,6 +916,21 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
additional_fields=block_invite_result[1],
)
# check the invitee's configuration and apply rules. Admins on the server can bypass.
if not is_requester_admin:
invite_config = await self.store.get_invite_config_for_user(target_id)
rule = invite_config.get_invite_rule(requester.user.to_string())
if rule == InviteRule.BLOCK:
logger.info(
f"Automatically rejecting invite from {target_id} due to the the invite filtering rules of {requester.user}"
)
raise SynapseError(
403,
"You are not permitted to invite this user.",
errcode=Codes.INVITE_BLOCKED,
)
# InviteRule.IGNORE is handled at the sync layer.
# An empty prev_events list is allowed as long as the auth_event_ids are present
if prev_event_ids is not None:
return await self._local_membership_update(

View File

@@ -49,6 +49,7 @@ from synapse.storage.databases.main.state import (
Sentinel as StateSentinel,
)
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.invite_rule import InviteRule
from synapse.storage.roommember import (
RoomsForUser,
RoomsForUserSlidingSync,
@@ -278,6 +279,7 @@ class SlidingSyncRoomLists:
# Remove invites from ignored users
ignored_users = await self.store.ignored_users(user_id)
invite_config = await self.store.get_invite_config_for_user(user_id)
if ignored_users:
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
@@ -292,7 +294,14 @@ class SlidingSyncRoomLists:
room_for_user_sliding_sync = room_membership_for_user_map[room_id]
if (
room_for_user_sliding_sync.membership == Membership.INVITE
and room_for_user_sliding_sync.sender in ignored_users
and room_for_user_sliding_sync.sender
and (
room_for_user_sliding_sync.sender in ignored_users
or invite_config.get_invite_rule(
room_for_user_sliding_sync.sender
)
== InviteRule.IGNORE
)
):
room_membership_for_user_map.pop(room_id, None)

View File

@@ -66,6 +66,7 @@ from synapse.logging.opentracing import (
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.invite_rule import InviteRule
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
@@ -2549,6 +2550,7 @@ class SyncHandler:
room_entries: List[RoomSyncResultBuilder] = []
invited: List[InvitedSyncResult] = []
knocked: List[KnockedSyncResult] = []
invite_config = await self.store.get_invite_config_for_user(user_id)
for room_id, events in mem_change_events_by_room_id.items():
# The body of this loop will add this room to at least one of the five lists
# above. Things get messy if you've e.g. joined, left, joined then left the
@@ -2631,7 +2633,11 @@ class SyncHandler:
# Only bother if we're still currently invited
should_invite = last_non_join.membership == Membership.INVITE
if should_invite:
if last_non_join.sender not in ignored_users:
if (
last_non_join.sender not in ignored_users
and invite_config.get_invite_rule(last_non_join.sender)
!= InviteRule.IGNORE
):
invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join)
if invite_room_sync:
invited.append(invite_room_sync)
@@ -2786,6 +2792,7 @@ class SyncHandler:
membership_list=Membership.LIST,
excluded_rooms=sync_result_builder.excluded_room_ids,
)
invite_config = await self.store.get_invite_config_for_user(user_id)
room_entries = []
invited = []
@@ -2811,6 +2818,8 @@ class SyncHandler:
elif event.membership == Membership.INVITE:
if event.sender in ignored_users:
continue
if invite_config.get_invite_rule(event.sender) == InviteRule.IGNORE:
continue
invite = await self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite))
elif event.membership == Membership.KNOCK:

View File

@@ -52,6 +52,7 @@ from synapse.events.snapshot import EventContext
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.state import POWER_KEY
from synapse.storage.databases.main.roommember import EventIdMembership
from synapse.storage.invite_rule import InviteRule
from synapse.storage.roommember import ProfileInfo
from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator
from synapse.types import JsonValue
@@ -191,9 +192,17 @@ class BulkPushRuleEvaluator:
# if this event is an invite event, we may need to run rules for the user
# who's been invited, otherwise they won't get told they've been invited
if event.type == EventTypes.Member and event.membership == Membership.INVITE:
if (
event.is_state()
and event.type == EventTypes.Member
and event.membership == Membership.INVITE
):
invited = event.state_key
if invited and self.hs.is_mine_id(invited) and invited not in local_users:
invite_config = await self.store.get_invite_config_for_user(invited)
if invite_config.get_invite_rule(event.sender) != InviteRule.ALLOW:
# Invite was blocked or ignored, never notify.
return {}
if self.hs.is_mine_id(invited) and invited not in local_users:
local_users.append(invited)
if not local_users:

View File

@@ -174,6 +174,8 @@ class VersionsRestServlet(RestServlet):
"org.matrix.simplified_msc3575": msc3575_enabled,
# Arbitrary key-value profile fields.
"uk.tcpip.msc4133": self.config.experimental.msc4133_enabled,
# MSC4155: Invite filtering
"org.matrix.msc4155": self.config.experimental.msc4155_enabled,
},
},
)

View File

@@ -43,6 +43,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.invite_rule import InviteRulesConfig
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
@@ -102,6 +103,8 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
self._delete_account_data_for_deactivated_users,
)
self._msc4155_enabled = hs.config.experimental.msc4155_enabled
def get_max_account_data_stream_id(self) -> int:
"""Get the current max stream ID for account data stream
@@ -557,6 +560,23 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
)
async def get_invite_config_for_user(self, user_id: str) -> InviteRulesConfig:
"""
Get the invite configuration for the current user.
Args:
user_id:
"""
if not self._msc4155_enabled:
# This equates to allowing all invites, as if the setting was off.
return InviteRulesConfig(None)
data = await self.get_global_account_data_by_type_for_user(
user_id, AccountDataTypes.MSC4155_INVITE_PERMISSION_CONFIG
)
return InviteRulesConfig(data)
def process_replication_rows(
self,
stream_name: str,

View File

@@ -43,6 +43,7 @@ try:
USE_ICU = True
except ModuleNotFoundError:
# except ModuleNotFoundError:
USE_ICU = False
from synapse.api.errors import StoreError

View File

@@ -0,0 +1,110 @@
import logging
from enum import Enum
from typing import Optional, Pattern
from matrix_common.regex import glob_to_regex
from synapse.types import JsonMapping, UserID
logger = logging.getLogger(__name__)
class InviteRule(Enum):
"""Enum to define the action taken when an invite matches a rule."""
ALLOW = "allow"
BLOCK = "block"
IGNORE = "ignore"
class InviteRulesConfig:
"""Class to determine if a given user permits an invite from another user, and the action to take."""
def __init__(self, account_data: Optional[JsonMapping]):
self.allowed_users: list[Pattern[str]] = []
self.ignored_users: list[Pattern[str]] = []
self.blocked_users: list[Pattern[str]] = []
self.allowed_servers: list[Pattern[str]] = []
self.ignored_servers: list[Pattern[str]] = []
self.blocked_servers: list[Pattern[str]] = []
def process_field(
values: Optional[list[str]],
ruleset: list[Pattern[str]],
rule: InviteRule,
) -> None:
if isinstance(values, list):
for value in values:
if isinstance(value, str) and len(value) > 0:
# User IDs cannot exceed 255 bytes. Don't process large, potentially
# expensive glob patterns.
if len(value) > 255:
logger.debug(
"Ignoring invite config glob pattern that is >255 bytes: {value}"
)
continue
try:
ruleset.append(glob_to_regex(value))
except Exception as e:
# If for whatever reason we can't process this, just ignore it.
logger.debug(
f"Could not process '{value}' field of invite rule config, ignoring: {e}"
)
if account_data:
process_field(
account_data.get("allowed_users"), self.allowed_users, InviteRule.ALLOW
)
process_field(
account_data.get("ignored_users"), self.ignored_users, InviteRule.IGNORE
)
process_field(
account_data.get("blocked_users"), self.blocked_users, InviteRule.BLOCK
)
process_field(
account_data.get("allowed_servers"),
self.allowed_servers,
InviteRule.ALLOW,
)
process_field(
account_data.get("ignored_servers"),
self.ignored_servers,
InviteRule.IGNORE,
)
process_field(
account_data.get("blocked_servers"),
self.blocked_servers,
InviteRule.BLOCK,
)
def get_invite_rule(self, user_id: str) -> InviteRule:
"""Get the invite rule that matches this user. Will return InviteRule.ALLOW if no rules match
Args:
user_id: The user ID of the inviting user.
"""
user = UserID.from_string(user_id)
# The order here is important. We always process user rules before server rules
# and we always process in the order of Allow, Ignore, Block.
for patterns, rule in [
(self.allowed_users, InviteRule.ALLOW),
(self.ignored_users, InviteRule.IGNORE),
(self.blocked_users, InviteRule.BLOCK),
]:
for regex in patterns:
if regex.match(user_id):
return rule
for patterns, rule in [
(self.allowed_servers, InviteRule.ALLOW),
(self.ignored_servers, InviteRule.IGNORE),
(self.blocked_servers, InviteRule.BLOCK),
]:
for regex in patterns:
if regex.match(user.domain):
return rule
return InviteRule.ALLOW

View File

@@ -5,10 +5,13 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
import synapse.rest.client.login
import synapse.rest.client.room
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.errors import Codes, LimitExceededError, SynapseError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import FrozenEventV3
from synapse.federation.federation_base import (
event_from_pdu_json,
)
from synapse.federation.federation_client import SendJoinResult
from synapse.server import HomeServer
from synapse.types import UserID, create_requester
@@ -453,3 +456,165 @@ class RoomMemberMasterHandlerTestCase(HomeserverTestCase):
new_count = rows[0][0]
self.assertEqual(initial_count, new_count)
class TestInviteFiltering(FederatingHomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.client.login.register_servlets,
synapse.rest.client.room.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.handler = hs.get_room_member_handler()
self.fed_handler = hs.get_federation_handler()
self.store = hs.get_datastores().main
# Create three users.
self.alice = self.register_user("alice", "pass")
self.alice_token = self.login("alice", "pass")
self.bob = self.register_user("bob", "pass")
self.bob_token = self.login("bob", "pass")
@override_config({"experimental_features": {"msc4155_enabled": True}})
def test_misc4155_block_invite_local(self) -> None:
"""Test that MSC4155 will block a user from being invited to a room"""
room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
self.get_success(
self.store.add_account_data_for_user(
self.bob,
AccountDataTypes.MSC4155_INVITE_PERMISSION_CONFIG,
{
"blocked_users": [self.alice],
},
)
)
f = self.get_failure(
self.handler.update_membership(
requester=create_requester(self.alice),
target=UserID.from_string(self.bob),
room_id=room_id,
action=Membership.INVITE,
),
SynapseError,
).value
self.assertEqual(f.code, 403)
self.assertEqual(f.errcode, "ORG.MATRIX.MSC4155.M_INVITE_BLOCKED")
@override_config({"experimental_features": {"msc4155_enabled": False}})
def test_msc4155_disabled_allow_invite_local(self) -> None:
"""Test that MSC4155 will block a user from being invited to a room"""
room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
self.get_success(
self.store.add_account_data_for_user(
self.bob,
AccountDataTypes.MSC4155_INVITE_PERMISSION_CONFIG,
{
"blocked_users": [self.alice],
},
)
)
self.get_success(
self.handler.update_membership(
requester=create_requester(self.alice),
target=UserID.from_string(self.bob),
room_id=room_id,
action=Membership.INVITE,
),
)
@override_config({"experimental_features": {"msc4155_enabled": True}})
def test_msc4155_block_invite_remote(self) -> None:
"""Test that MSC4155 will block a remote user from being invited to a room"""
# A remote user who sends the invite
remote_server = "otherserver"
remote_user = "@otheruser:" + remote_server
self.get_success(
self.store.add_account_data_for_user(
self.bob,
AccountDataTypes.MSC4155_INVITE_PERMISSION_CONFIG,
{"blocked_users": [remote_user]},
)
)
room_id = self.helper.create_room_as(
room_creator=self.alice, tok=self.alice_token
)
room_version = self.get_success(self.store.get_room_version(room_id))
invite_event = event_from_pdu_json(
{
"type": EventTypes.Member,
"content": {"membership": "invite"},
"room_id": room_id,
"sender": remote_user,
"state_key": self.bob,
"depth": 32,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
room_version,
)
f = self.get_failure(
self.fed_handler.on_invite_request(
remote_server,
invite_event,
invite_event.room_version,
),
SynapseError,
).value
self.assertEqual(f.code, 403)
self.assertEqual(f.errcode, "ORG.MATRIX.MSC4155.M_INVITE_BLOCKED")
@override_config({"experimental_features": {"msc4155_enabled": True}})
def test_msc4155_block_invite_remote_server(self) -> None:
"""Test that MSC4155 will block a remote server's user from being invited to a room"""
# A remote user who sends the invite
remote_server = "otherserver"
remote_user = "@otheruser:" + remote_server
self.get_success(
self.store.add_account_data_for_user(
self.bob,
AccountDataTypes.MSC4155_INVITE_PERMISSION_CONFIG,
{"blocked_servers": [remote_server]},
)
)
room_id = self.helper.create_room_as(
room_creator=self.alice, tok=self.alice_token
)
room_version = self.get_success(self.store.get_room_version(room_id))
invite_event = event_from_pdu_json(
{
"type": EventTypes.Member,
"content": {"membership": "invite"},
"room_id": room_id,
"sender": remote_user,
"state_key": self.bob,
"depth": 32,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
room_version,
)
f = self.get_failure(
self.fed_handler.on_invite_request(
remote_server,
invite_event,
invite_event.room_version,
),
SynapseError,
).value
self.assertEqual(f.code, 403)
self.assertEqual(f.errcode, "ORG.MATRIX.MSC4155.M_INVITE_BLOCKED")

View File

@@ -0,0 +1,167 @@
from synapse.storage.invite_rule import InviteRule, InviteRulesConfig
from synapse.types import UserID
from tests import unittest
regular_user = UserID.from_string("@test:example.org")
allowed_user = UserID.from_string("@allowed:allow.example.org")
blocked_user = UserID.from_string("@blocked:block.example.org")
ignored_user = UserID.from_string("@ignored:ignore.example.org")
class InviteFilterTestCase(unittest.TestCase):
def test_empty(self) -> None:
"""Permit by default"""
config = InviteRulesConfig(None)
self.assertEqual(
config.get_invite_rule(regular_user.to_string()), InviteRule.ALLOW
)
def test_ignore_invalid(self) -> None:
"""Invalid strings are ignored"""
config = InviteRulesConfig({"blocked_users": ["not a user"]})
self.assertEqual(
config.get_invite_rule(blocked_user.to_string()), InviteRule.ALLOW
)
def test_user_blocked(self) -> None:
"""Permit all, except explicitly blocked users"""
config = InviteRulesConfig({"blocked_users": [blocked_user.to_string()]})
self.assertEqual(
config.get_invite_rule(blocked_user.to_string()), InviteRule.BLOCK
)
self.assertEqual(
config.get_invite_rule(regular_user.to_string()), InviteRule.ALLOW
)
def test_user_ignored(self) -> None:
"""Permit all, except explicitly ignored users"""
config = InviteRulesConfig({"ignored_users": [ignored_user.to_string()]})
self.assertEqual(
config.get_invite_rule(ignored_user.to_string()), InviteRule.IGNORE
)
self.assertEqual(
config.get_invite_rule(regular_user.to_string()), InviteRule.ALLOW
)
def test_user_precedence(self) -> None:
"""Always take allowed over ignored, ignored over blocked, and then block."""
config = InviteRulesConfig(
{
"allowed_users": [allowed_user.to_string()],
"ignored_users": [allowed_user.to_string(), ignored_user.to_string()],
"blocked_users": [
allowed_user.to_string(),
ignored_user.to_string(),
blocked_user.to_string(),
],
}
)
self.assertEqual(
config.get_invite_rule(allowed_user.to_string()), InviteRule.ALLOW
)
self.assertEqual(
config.get_invite_rule(ignored_user.to_string()), InviteRule.IGNORE
)
self.assertEqual(
config.get_invite_rule(blocked_user.to_string()), InviteRule.BLOCK
)
def test_server_blocked(self) -> None:
"""Block all users on the server except those allowed."""
user_on_same_server = UserID("blocked", allowed_user.domain)
config = InviteRulesConfig(
{
"allowed_users": [allowed_user.to_string()],
"blocked_servers": [allowed_user.domain],
}
)
self.assertEqual(
config.get_invite_rule(allowed_user.to_string()), InviteRule.ALLOW
)
self.assertEqual(
config.get_invite_rule(user_on_same_server.to_string()), InviteRule.BLOCK
)
def test_server_ignored(self) -> None:
"""Ignore all users on the server except those allowed."""
user_on_same_server = UserID("ignored", allowed_user.domain)
config = InviteRulesConfig(
{
"allowed_users": [allowed_user.to_string()],
"ignored_servers": [allowed_user.domain],
}
)
self.assertEqual(
config.get_invite_rule(allowed_user.to_string()), InviteRule.ALLOW
)
self.assertEqual(
config.get_invite_rule(user_on_same_server.to_string()), InviteRule.IGNORE
)
def test_server_allow(self) -> None:
"""Allow all from a server except explictly blocked or ignored users."""
blocked_user_on_same_server = UserID("blocked", allowed_user.domain)
ignored_user_on_same_server = UserID("ignored", allowed_user.domain)
allowed_user_on_same_server = UserID("another", allowed_user.domain)
config = InviteRulesConfig(
{
"ignored_users": [ignored_user_on_same_server.to_string()],
"blocked_users": [blocked_user_on_same_server.to_string()],
"allowed_servers": [allowed_user.to_string()],
}
)
self.assertEqual(
config.get_invite_rule(allowed_user.to_string()), InviteRule.ALLOW
)
self.assertEqual(
config.get_invite_rule(allowed_user_on_same_server.to_string()),
InviteRule.ALLOW,
)
self.assertEqual(
config.get_invite_rule(blocked_user_on_same_server.to_string()),
InviteRule.BLOCK,
)
self.assertEqual(
config.get_invite_rule(ignored_user_on_same_server.to_string()),
InviteRule.IGNORE,
)
def test_server_precedence(self) -> None:
"""Always take allowed over ignored, ignored over blocked, and then block."""
config = InviteRulesConfig(
{
"allowed_servers": [allowed_user.domain],
"ignored_servers": [allowed_user.domain, ignored_user.domain],
"blocked_servers": [
allowed_user.domain,
ignored_user.domain,
blocked_user.domain,
],
}
)
self.assertEqual(
config.get_invite_rule(allowed_user.to_string()), InviteRule.ALLOW
)
self.assertEqual(
config.get_invite_rule(ignored_user.to_string()), InviteRule.IGNORE
)
self.assertEqual(
config.get_invite_rule(blocked_user.to_string()), InviteRule.BLOCK
)
def test_server_glob(self) -> None:
"""Test that glob patterns match"""
config = InviteRulesConfig({"blocked_servers": ["*.example.org"]})
self.assertEqual(
config.get_invite_rule(allowed_user.to_string()), InviteRule.BLOCK
)
self.assertEqual(
config.get_invite_rule(ignored_user.to_string()), InviteRule.BLOCK
)
self.assertEqual(
config.get_invite_rule(blocked_user.to_string()), InviteRule.BLOCK
)
self.assertEqual(
config.get_invite_rule(regular_user.to_string()), InviteRule.ALLOW
)