1
0

Pipe through the feature flag

This commit is contained in:
Olivier Wilkinson (reivilibre)
2021-12-10 14:48:14 +00:00
parent c5e072fad5
commit 36595a7cdc

View File

@@ -48,7 +48,7 @@ This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
from typing import Dict, Iterable, List, Optional
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set
from synapse.appservice import (
ApplicationService,
@@ -61,6 +61,9 @@ from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -86,7 +89,7 @@ class ApplicationServiceScheduler:
self.as_api = hs.get_application_service_api()
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock, hs)
async def start(self):
logger.info("Starting appservice scheduler")
@@ -142,7 +145,7 @@ class _ServiceQueuer:
appservice at a given time.
"""
def __init__(self, txn_ctrl, clock):
def __init__(self, txn_ctrl, clock, hs: "HomeServer"):
# dict of {service_id: [events]}
self.queued_events: Dict[str, List[EventBase]] = {}
# dict of {service_id: [event_json]}
@@ -151,9 +154,12 @@ class _ServiceQueuer:
self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.requests_in_flight: Set[str] = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
self._msc3202_transaction_extensions_enabled: bool = (
hs.config.experimental.msc3202_transaction_extensions
)
def start_background_request(self, service):
# start a sender for this appservice if we don't already have one