From 3026b55ca0d37e355e18c7f9246e1e98c40c9545 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 26 Mar 2024 16:23:11 +0000 Subject: [PATCH 1/3] Use receipts `event_stream_ordering` instead of joins This should reduce IOPs incurred by joining to the events table to lookup stream ordering, which happens in many receipt handling code paths. --- .../databases/main/event_push_actions.py | 22 +++++++------------ synapse/storage/databases/main/receipts.py | 20 ++++++++--------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3a5666cd9b0..ef3a63cb832 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -385,7 +385,6 @@ def _get_unread_counts_by_room_for_user_txn( WITH all_receipts AS ( SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering FROM receipts_linearized - LEFT JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} AND user_id = ? @@ -621,13 +620,12 @@ def _get_thread(thread_id: str) -> NotifCounts: SELECT notif_count, COALESCE(unread_count, 0), thread_id FROM event_push_summary LEFT JOIN ( - SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering + SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering FROM receipts_linearized - LEFT JOIN events USING (room_id, event_id) WHERE user_id = ? AND room_id = ? - AND stream_ordering > ? + AND event_stream_ordering > ? AND {receipt_types_clause} GROUP BY thread_id ) AS receipts USING (thread_id) @@ -659,13 +657,12 @@ def _get_thread(thread_id: str) -> NotifCounts: sql = f""" SELECT COUNT(*), thread_id FROM event_push_actions LEFT JOIN ( - SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering + SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering FROM receipts_linearized - LEFT JOIN events USING (room_id, event_id) WHERE user_id = ? AND room_id = ? - AND stream_ordering > ? + AND event_stream_ordering > ? AND {receipt_types_clause} GROUP BY thread_id ) AS receipts USING (thread_id) @@ -738,13 +735,12 @@ def _get_thread(thread_id: str) -> NotifCounts: thread_id FROM event_push_actions LEFT JOIN ( - SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering + SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering FROM receipts_linearized - LEFT JOIN events USING (room_id, event_id) WHERE user_id = ? AND room_id = ? - AND stream_ordering > ? + AND event_stream_ordering > ? AND {receipt_types_clause} GROUP BY thread_id ) AS receipts USING (thread_id) @@ -881,9 +877,8 @@ def _get_receipts_by_room_txn( ) sql = f""" - SELECT room_id, thread_id, MAX(stream_ordering) + SELECT room_id, thread_id, MAX(event_stream_ordering) FROM receipts_linearized - INNER JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id, thread_id @@ -1362,9 +1357,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: ) sql = """ - SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering + SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, r.event_stream_ordering FROM receipts_linearized AS r - INNER JOIN events AS e USING (event_id) WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ? ORDER BY r.stream_id ASC LIMIT ? diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index d513c425301..d999171133c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -178,14 +178,13 @@ def get_last_unthreaded_receipt_for_user_txn( ) sql = f""" - SELECT event_id, stream_ordering + SELECT event_id, event_stream_ordering FROM receipts_linearized - INNER JOIN events USING (room_id, event_id) WHERE {clause} AND user_id = ? AND room_id = ? AND thread_id IS NULL - ORDER BY stream_ordering DESC + ORDER BY event_stream_ordering DESC LIMIT 1 """ @@ -394,9 +393,9 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]: content: JsonDict = {} for receipt_type, user_id, event_id, data in rows: - content.setdefault(event_id, {}).setdefault(receipt_type, {})[user_id] = ( - db_to_json(data) - ) + content.setdefault(event_id, {}).setdefault(receipt_type, {})[ + user_id + ] = db_to_json(data) return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}] @@ -483,9 +482,9 @@ def f( if user_id in receipt_type_dict: # existing receipt # is the existing receipt threaded and we are currently processing an unthreaded one? if "thread_id" in receipt_type_dict[user_id] and not thread_id: - receipt_type_dict[user_id] = ( - receipt_data # replace with unthreaded one - ) + receipt_type_dict[ + user_id + ] = receipt_data # replace with unthreaded one else: # receipt does not exist, just set it receipt_type_dict[user_id] = receipt_data if thread_id: @@ -736,8 +735,7 @@ def _insert_linearized_receipt_txn( thread_args = (thread_id,) sql = f""" - SELECT stream_ordering, event_id FROM events - INNER JOIN receipts_linearized AS r USING (event_id, room_id) + SELECT r.event_stream_ordering, r.event_id FROM receipts_linearized AS r WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause} """ txn.execute( From 118d42f7e98a69df3f08b6ce478e5071d620536d Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 26 Mar 2024 16:34:32 +0000 Subject: [PATCH 2/3] Add changelog --- changelog.d/17032.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17032.misc diff --git a/changelog.d/17032.misc b/changelog.d/17032.misc new file mode 100644 index 00000000000..b03f6f42e5f --- /dev/null +++ b/changelog.d/17032.misc @@ -0,0 +1 @@ +Use new receipts column to optimise receipt and push action SQL queries. Contributed by Nick @ Beeper (@fizzadar). From ca75be367787d718e372e76347e5419a2c832392 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 26 Mar 2024 16:37:53 +0000 Subject: [PATCH 3/3] Linting --- synapse/storage/databases/main/receipts.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index d999171133c..9660fc46997 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -393,9 +393,9 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]: content: JsonDict = {} for receipt_type, user_id, event_id, data in rows: - content.setdefault(event_id, {}).setdefault(receipt_type, {})[ - user_id - ] = db_to_json(data) + content.setdefault(event_id, {}).setdefault(receipt_type, {})[user_id] = ( + db_to_json(data) + ) return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}] @@ -482,9 +482,9 @@ def f( if user_id in receipt_type_dict: # existing receipt # is the existing receipt threaded and we are currently processing an unthreaded one? if "thread_id" in receipt_type_dict[user_id] and not thread_id: - receipt_type_dict[ - user_id - ] = receipt_data # replace with unthreaded one + receipt_type_dict[user_id] = ( + receipt_data # replace with unthreaded one + ) else: # receipt does not exist, just set it receipt_type_dict[user_id] = receipt_data if thread_id: