Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Ensure the to-be-upserted rows have the expected thread ID.
  • Loading branch information
clokep committed Oct 4, 2022
commit 5fb86d95349555526d6534aff00ada3370dd6c54
34 changes: 23 additions & 11 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,19 +1103,26 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

# First ensure that the existing rows have an updated thread_id field.
self.db_pool.simple_update_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id, "thread_id": None},
updatevalues={"thread_id": "main"},
)

# Replace the previous summary with the new counts.
#
# TODO(threads): Upsert per-thread instead of setting them all to main.
self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
keyvalues={"room_id": room_id, "user_id": user_id, "thread_id": "main"},
values={
"notif_count": notif_count,
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
"thread_id": "main",
},
)

Expand Down Expand Up @@ -1264,20 +1271,25 @@ def _rotate_notifs_before_txn(

logger.info("Rotating notifications, handling %d rows", len(summaries))

# Ensure that any updated threads have an updated thread_id.
self.db_pool.simple_update_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id", "thread_id"),
key_values=[(user_id, room_id, None) for user_id, room_id in summaries],
value_names=("thread_id",),
value_values=[("main",) for _ in summaries],
)

# TODO(threads): Update on a per-thread basis.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
key_names=("user_id", "room_id", "thread_id"),
key_values=[(user_id, room_id, "main") for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_values=[
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
"main",
)
(summary.notif_count, summary.unread_count, summary.stream_ordering)
for summary in summaries.values()
],
)
Expand Down