Skip to content

Remove unnecessary replication calls & make them retry on different instances #18564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18564.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove unnecessary HTTP replication calls and make retries round-robin accross workers when possible.
11 changes: 1 addition & 10 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.storage.databases.main.lock import Lock
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
Expand Down Expand Up @@ -1380,7 +1379,6 @@ def __init__(self, hs: "HomeServer"):
# and use them. However we have guards before we use them to ensure that
# we don't route to ourselves, and in monolith mode that will always be
# the case.
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)

self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {}
Expand Down Expand Up @@ -1450,11 +1448,8 @@ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None:
# Check if we can route it somewhere else that isn't us
instances = self._edu_type_to_instance.get(edu_type, ["master"])
if self._instance_name not in instances:
# Pick an instance randomly so that we don't overload one.
route_to = random.choice(instances)

await self._send_edu(
instance_name=route_to,
instances=instances,
edu_type=edu_type,
origin=origin,
content=content,
Expand All @@ -1469,10 +1464,6 @@ async def on_query(self, query_type: str, args: dict) -> JsonDict:
if handler:
return await handler(args)

# Check if we can route it somewhere else that isn't us
if self._instance_name == "master":
return await self._get_query_client(query_type=query_type, args=args)

# Uh oh, no handler! Let's raise an exception so the request returns an
# error.
logger.warning("No handler registered for query type %s", query_type)
Expand Down
13 changes: 6 additions & 7 deletions synapse/handlers/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#
#
import logging
import random
from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Tuple

from synapse.api.constants import AccountDataTypes
Expand Down Expand Up @@ -133,7 +132,7 @@ async def add_account_data_to_room(
return max_stream_id
else:
response = await self._add_room_data_client(
instance_name=random.choice(self._account_data_writers),
instances=self._account_data_writers,
user_id=user_id,
room_id=room_id,
account_data_type=account_data_type,
Expand Down Expand Up @@ -174,7 +173,7 @@ async def remove_account_data_for_room(
return max_stream_id
else:
response = await self._remove_room_data_client(
instance_name=random.choice(self._account_data_writers),
instances=self._account_data_writers,
user_id=user_id,
room_id=room_id,
account_data_type=account_data_type,
Expand Down Expand Up @@ -210,7 +209,7 @@ async def add_account_data_for_user(
return max_stream_id
else:
response = await self._add_user_data_client(
instance_name=random.choice(self._account_data_writers),
instances=self._account_data_writers,
user_id=user_id,
account_data_type=account_data_type,
content=content,
Expand Down Expand Up @@ -246,7 +245,7 @@ async def remove_account_data_for_user(
return max_stream_id
else:
response = await self._remove_user_data_client(
instance_name=random.choice(self._account_data_writers),
instances=self._account_data_writers,
user_id=user_id,
account_data_type=account_data_type,
)
Expand Down Expand Up @@ -277,7 +276,7 @@ async def add_tag_to_room(
return max_stream_id
else:
response = await self._add_tag_client(
instance_name=random.choice(self._account_data_writers),
instances=self._account_data_writers,
user_id=user_id,
room_id=room_id,
tag=tag,
Expand All @@ -302,7 +301,7 @@ async def remove_tag_from_room(self, user_id: str, room_id: str, tag: str) -> in
return max_stream_id
else:
response = await self._remove_tag_client(
instance_name=random.choice(self._account_data_writers),
instances=self._account_data_writers,
user_id=user_id,
room_id=room_id,
tag=tag,
Expand Down
6 changes: 1 addition & 5 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -290,10 +289,7 @@ async def add(
if self._repl_client is not None:
# NOTE: If this throws, the delayed event will remain in the DB and
# will be picked up once the main worker gets another delayed event.
await self._repl_client(
instance_name=MAIN_PROCESS_INSTANCE_NAME,
next_send_ts=next_send_ts,
)
await self._repl_client(next_send_ts=next_send_ts)
elif self._next_send_ts_changed(next_send_ts):
self._schedule_next_at(next_send_ts)

Expand Down
26 changes: 3 additions & 23 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationStoreRoomOnOutlierMembershipRestServlet,
)
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.invite_rule import InviteRule
from synapse.types import JsonDict, StrCollection, get_domain_from_id
Expand Down Expand Up @@ -163,19 +159,6 @@ def __init__(self, hs: "HomeServer"):
self._notifier = hs.get_notifier()
self._worker_locks = hs.get_worker_locks_handler()

self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)

if hs.config.worker.worker_app:
self._maybe_store_room_on_outlier_membership = (
ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs)
)
else:
self._maybe_store_room_on_outlier_membership = (
self.store.maybe_store_room_on_outlier_membership
)

self._room_backfill = Linearizer("room_backfill")

self._third_party_event_rules = (
Expand Down Expand Up @@ -857,7 +840,7 @@ async def do_knock(
event.internal_metadata.out_of_band_membership = True

# Record the room ID and its version so that we have a record of the room
await self._maybe_store_room_on_outlier_membership(
await self.store.maybe_store_room_on_outlier_membership(
room_id=event.room_id, room_version=event_format_version
)

Expand Down Expand Up @@ -1115,7 +1098,7 @@ async def on_invite_request(
# keep a record of the room version, if we don't yet know it.
# (this may get overwritten if we later get a different room version in a
# join dance).
await self._maybe_store_room_on_outlier_membership(
await self.store.maybe_store_room_on_outlier_membership(
room_id=event.room_id, room_version=room_version
)

Expand Down Expand Up @@ -1768,10 +1751,7 @@ async def _clean_room_for_join(self, room_id: str) -> None:
Args:
room_id
"""
if self.config.worker.worker_app:
await self._clean_room_for_join_client(room_id)
else:
await self.store.clean_room_for_join(room_id)
await self.store.clean_room_for_join(room_id)

async def get_room_complexity(
self, remote_room_hosts: List[str], room_id: str
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2259,7 +2259,7 @@ async def persist_events_and_notify(
try:
for batch in batch_iter(event_and_contexts, 200):
result = await self._send_events(
instance_name=instance,
instances=[instance],
store=self._store,
room_id=room_id,
event_and_contexts=batch,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,7 @@ async def _persist_events(

try:
result = await self.send_events(
instance_name=writer_instance,
instances=[writer_instance],
events_and_context=events_and_context,
store=self.store,
requester=requester,
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def __exit__(
class WorkerPresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._presence_writer_instance = hs.config.worker.writers.presence[0]
self._presence_writer_instances = hs.config.worker.writers.presence

# Route presence EDUs to the right worker
hs.get_federation_registry().register_instances_for_edu(
Expand Down Expand Up @@ -717,7 +717,7 @@ async def set_state(

# Proxy request to instance that writes presence
await self._set_state_client(
instance_name=self._presence_writer_instance,
instances=self._presence_writer_instances,
user_id=user_id,
device_id=device_id,
state=state,
Expand All @@ -738,7 +738,7 @@ async def bump_presence_active_time(
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(
instance_name=self._presence_writer_instance,
instances=self._presence_writer_instances,
user_id=user_id,
device_id=device_id,
)
Expand Down Expand Up @@ -2476,7 +2476,7 @@ async def get_replication_rows(
# If not local we query over http replication from the presence
# writer
result = await self._repl_client(
instance_name=instance_name,
instances=[instance_name],
stream_name=PresenceFederationStream.NAME,
from_token=from_token,
upto_token=upto_token,
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def __init__(self, hs: "HomeServer"):
self._is_push_writer = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_writer = hs.config.worker.writers.push_rules[0]
self._push_writers = hs.config.worker.writers.push_rules
self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs)

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
Expand Down Expand Up @@ -1414,7 +1414,7 @@ async def copy_user_state_on_room_upgrade(
)
else:
await self._copy_push_client(
instance_name=self._push_writer,
instances=self._push_writers,
user_id=user_id,
old_room_id=old_room_id,
new_room_id=new_room_id,
Expand Down
Loading
Loading