Skip to content

Commit db6b80d

Browse files
oskarthrasom
authored andcommitted
chat, protocol: Introduce Lamport clock semantics for message order
This commit ensures messages are ordered correctly when participants join and leave a group chat. Specifically, the last received message will appear last. Previously the user and chat clock was queried and updated in an ad hoc manner. With this change there are only two clock changes to keep track of: Sending messages: time = time+1; time_stamp = time; send(message, time_stamp); Receiving messages: (message, time_stamp) = receive(); time = max(time_stamp, time)+1; (See https://en.wikipedia.org/wiki/Lamport_timestamps) Note that this means we can get rid of all the non-message clock queries and updates.
1 parent 646f61a commit db6b80d

File tree

9 files changed

+140
-110
lines changed

9 files changed

+140
-110
lines changed

src/status_im/chat/handlers.cljs

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -457,41 +457,6 @@
457457
(dispatch [:remove-unviewed-messages chat-id])))]
458458
(u/side-effect! send-seen!))
459459

460-
(defn send-clock-value-request!
461-
[{:keys [web3 current-public-key]
462-
:contacts/keys [contacts]} [_ {:keys [message-id from]}]]
463-
(when-not (get-in contacts [from :dapp?])
464-
(protocol/send-clock-value-request!
465-
{:web3 web3
466-
:message {:from current-public-key
467-
:to from
468-
:message-id message-id}})))
469-
470-
(register-handler :send-clock-value-request! (u/side-effect! send-clock-value-request!))
471-
472-
(defn send-clock-value!
473-
[{:keys [web3 current-public-key]} to message-id clock-value]
474-
(when current-public-key
475-
(protocol/send-clock-value! {:web3 web3
476-
:message {:from current-public-key
477-
:to to
478-
:message-id message-id
479-
:clock-value clock-value}})))
480-
481-
(register-handler :update-clock-value!
482-
(after (fn [db [_ to i {:keys [message-id] :as message} last-clock-value]]
483-
(let [clock-value (+ last-clock-value i 1)]
484-
(messages/update (assoc message :clock-value clock-value))
485-
(send-clock-value! db to message-id clock-value))))
486-
(fn [db [_ _ i {:keys [message-id]} last-clock-value]]
487-
(assoc-in db [:message-extras message-id :clock-value] (+ last-clock-value i 1))))
488-
489-
(register-handler :send-clock-value!
490-
(u/side-effect!
491-
(fn [db [_ to message-id]]
492-
(let [{:keys [clock-value]} (messages/get-by-id message-id)]
493-
(send-clock-value! db to message-id clock-value)))))
494-
495460
(register-handler :check-and-open-dapp!
496461
(u/side-effect!
497462
(fn [{:keys [current-chat-id global-commands]

src/status_im/chat/handlers/receive_message.cljs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
[cljs.reader :refer [read-string]]
1212
[status-im.data-store.chats :as chats]
1313
[status-im.utils.scheduler :as s]
14-
[taoensso.timbre :as log]))
14+
[taoensso.timbre :as log]
15+
[status-im.utils.clocks :as clocks]))
1516

1617
(defn store-message [{chat-id :chat-id :as message}]
1718
(messages/save chat-id (dissoc message :new?)))
@@ -24,18 +25,16 @@
2425

2526
(defn add-message
2627
[db {:keys [from group-id chat-id
27-
message-id timestamp clock-value show?]
28+
message-id timestamp clock-value]
2829
:as message
2930
:or {clock-value 0}}]
3031
(let [same-message (messages/get-by-id message-id)
3132
current-identity (get-current-identity db)
3233
chat-id' (or group-id chat-id from)
3334
exists? (chats/exists? chat-id')
3435
active? (chats/is-active? chat-id')
35-
chat-clock-value (messages/get-last-clock-value chat-id')
36-
clock-value (if (zero? clock-value)
37-
(inc chat-clock-value)
38-
clock-value)]
36+
local-clock (messages/get-last-clock-value chat-id')
37+
clock-new (clocks/receive clock-value local-clock)]
3938
(when (and (not same-message)
4039
(not= from current-identity)
4140
(or (not exists?) active?))
@@ -44,7 +43,7 @@
4443
message' (assoc (cu/check-author-direction previous-message message)
4544
:chat-id chat-id'
4645
:timestamp (or timestamp (random/timestamp))
47-
:clock-value clock-value)]
46+
:clock-value clock-new)]
4847
(store-message message')
4948
(dispatch [:upsert-chat! {:chat-id chat-id'
5049
:group-chat group-chat?}])
@@ -54,9 +53,7 @@
5453
(dispatch [::set-last-message message'])
5554
(when (= (:content-type message') content-type-command-request)
5655
(dispatch [:add-request chat-id' message']))
57-
(dispatch [:add-unviewed-message chat-id' message-id])
58-
(when-not show?
59-
(dispatch [:send-clock-value-request! message])))
56+
(dispatch [:add-unviewed-message chat-id' message-id]))
6057
(if (and
6158
(= (:content-type message) content-type-command)
6259
(not= chat-id' wallet-chat-id)

src/status_im/chat/handlers/send_message.cljs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
[status-im.protocol.core :as protocol]
2020
[taoensso.timbre :refer-macros [debug] :as log]
2121
[status-im.chat.handlers.console :as console]
22-
[status-im.utils.types :as types]))
22+
[status-im.utils.types :as types]
23+
[status-im.utils.clocks :as clocks]))
2324

2425
(defn prepare-command
2526
[identity chat-id clock-value
@@ -52,7 +53,7 @@
5253
:to-message to-message
5354
:type (:type command)
5455
:has-handler (:has-handler command)
55-
:clock-value (inc clock-value)
56+
:clock-value (clocks/send clock-value)
5657
:show? true}))
5758

5859
(defn console-command? [chat-id command-name]
@@ -177,7 +178,7 @@
177178
:content-type text-content-type
178179
:outgoing true
179180
:timestamp (time/now-ms)
180-
:clock-value (inc clock-value)
181+
:clock-value (clocks/send clock-value)
181182
:show? true})
182183
message'' (cond-> message'
183184
(and group-chat public?)

src/status_im/protocol/chat.cljs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,3 @@
4040
:requires-ack? false)
4141
(assoc-in [:payload :group-id] (:group-id message))
4242
(dissoc :group-id)))))
43-
44-
(defn send-clock-value-request!
45-
[{:keys [web3 message]}]
46-
(debug :send-clock-value-request message)
47-
(d/add-pending-message!
48-
web3
49-
(merge message-defaults
50-
(assoc message
51-
:type :clock-value-request
52-
:requires-ack? false))))
53-
54-
(defn send-clock-value!
55-
[{:keys [web3 message]}]
56-
(debug :send-clock-value message)
57-
(d/add-pending-message!
58-
web3
59-
(merge message-defaults
60-
(assoc message
61-
:type :clock-value
62-
:requires-ack? false))))

src/status_im/protocol/core.cljs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
;; user
1919
(def send-message! chat/send!)
2020
(def send-seen! chat/send-seen!)
21-
(def send-clock-value-request! chat/send-clock-value-request!)
22-
(def send-clock-value! chat/send-clock-value!)
2321
(def reset-pending-messages! d/reset-pending-messages!)
2422

2523
;; group

src/status_im/protocol/handlers.cljs

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@
117117
(dispatch [:message-delivered message])
118118
(dispatch [:pending-message-remove message]))
119119
:seen (dispatch [:message-seen message])
120-
:clock-value-request (dispatch [:message-clock-value-request message])
121-
:clock-value (dispatch [:message-clock-value message])
122120
:group-invitation (dispatch [:group-chat-invite-received message])
123121
:update-group (dispatch [:update-group-message message])
124122
:add-group-identity (dispatch [:participant-invited-to-group message])
@@ -299,15 +297,6 @@
299297
(assoc message :message-status status))]
300298
(messages/update message)))))))
301299

302-
(defn save-message-clock-value!
303-
[{:keys [message-extras]}
304-
[_ {{:keys [message-id clock-value]} :payload}]]
305-
(when-let [{old-clock-value :clock-value
306-
:as message} (merge (messages/get-by-id message-id)
307-
(get message-extras message-id))]
308-
(if (>= clock-value old-clock-value)
309-
(messages/update (assoc message :clock-value clock-value :show? true)))))
310-
311300
(defn update-message-status [status]
312301
(fn [db
313302
[_ {:keys [from]
@@ -346,33 +335,6 @@
346335
[(after (save-message-status! :seen))]
347336
(update-message-status :seen))
348337

349-
(register-handler :message-clock-value-request
350-
(u/side-effect!
351-
(fn [_ [_ {:keys [from] {:keys [message-id]} :payload}]]
352-
(let [{:keys [chat-id]} (messages/get-by-id message-id)
353-
message-overhead (chats/get-message-overhead chat-id)
354-
last-clock-value (messages/get-last-clock-value chat-id)]
355-
(if (pos? message-overhead)
356-
(let [last-outgoing (->> (messages/get-last-outgoing chat-id message-overhead)
357-
(reverse)
358-
(map-indexed vector))]
359-
(chats/reset-message-overhead chat-id)
360-
(doseq [[i message] last-outgoing]
361-
(dispatch [:update-clock-value! from i message (+ last-clock-value 100)])))
362-
(dispatch [:send-clock-value! from message-id]))))))
363-
364-
(register-handler :message-clock-value
365-
(after save-message-clock-value!)
366-
(fn [{:keys [message-extras] :as db}
367-
[_ {{:keys [message-id clock-value]} :payload}]]
368-
(if-let [{old-clock-value :clock-value} (merge (messages/get-by-id message-id)
369-
(get message-extras message-id))]
370-
(if (> clock-value old-clock-value)
371-
(assoc-in db [:message-extras message-id] {:clock-value clock-value
372-
:show? true})
373-
db)
374-
db)))
375-
376338
(register-handler :pending-message-upsert
377339
(after
378340
(fn [_ [_ {:keys [type id] :as pending-message}]]

src/status_im/utils/clocks.cljs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
(ns status-im.utils.clocks)
2+
3+
;; We use Lamport clocks to ensure correct ordering of events in chats. This is
4+
;; necessary because we operate in a distributed system and there is no central
5+
;; coordinator for what happened before what.
6+
;;
7+
;; For example, the last received message in a group chat will appear last,
8+
;; regardless if that person has seen all the previous group chat messages. The
9+
;; principal invariant to maintain is that clock-values should be monotonically
10+
;; increasing.
11+
;;
12+
;; All clock updates happens as part of sending or receiving a message. Here's
13+
;; the basic algorithm:
14+
;;
15+
;; Sending messages:
16+
;; time = time+1;
17+
;; time_stamp = time;
18+
;; send(message, time_stamp);
19+
;;
20+
;; Receiving messages:
21+
;; (message, time_stamp) = receive();
22+
;; time = max(time_stamp, time)+1;
23+
;;
24+
;; Details:
25+
;; https://en.wikipedia.org/wiki/Lamport_timestamps
26+
;; http://amturing.acm.org/p558-lamport.pdf
27+
28+
(defn send [local-clock]
29+
(inc local-clock))
30+
31+
(defn receive [message-clock local-clock]
32+
(inc (max message-clock local-clock)))

test/cljs/status_im/test/runner.cljs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
[status-im.test.chat.models.input]
55
[status-im.test.handlers]
66
[status-im.test.utils.utils]
7-
[status-im.test.utils.money]))
7+
[status-im.test.utils.money]
8+
[status-im.test.utils.clocks]))
89

910
(enable-console-print!)
1011

@@ -18,4 +19,5 @@
1819
'status-im.test.chat.models.input
1920
'status-im.test.handlers
2021
'status-im.test.utils.utils
21-
'status-im.test.utils.money)
22+
'status-im.test.utils.money
23+
'status-im.test.utils.clocks)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
(ns status-im.test.utils.clocks
2+
(:require [cljs.test :refer-macros [deftest is testing]]
3+
[status-im.utils.clocks :as clocks]))
4+
5+
;; Messages are shown on a per-chat basis, ordered by the message clock-value.
6+
;; See status-im-utils.clocks namespace for details.
7+
8+
;; We are not a monolith.
9+
(def a (atom {:identity "a"}))
10+
(def b (atom {:identity "b"}))
11+
(def c (atom {:identity "c"}))
12+
13+
;; The network is unreliable.
14+
(defn random-broadcast! [chat-id message]
15+
(when (> (rand-int 10) 5) (recv! a chat-id message))
16+
(when (> (rand-int 10) 5) (recv! b chat-id message))
17+
(when (> (rand-int 10) 5) (recv! c chat-id message)))
18+
19+
(defn get-last-clock-value
20+
[db chat-id]
21+
(if-let [messages (-> @db :chats chat-id :messages)]
22+
(-> (sort-by :clock-value > messages)
23+
first
24+
:clock-value)
25+
0))
26+
27+
(defn save! [db chat-id message]
28+
(swap! db
29+
(fn [state]
30+
(let [messages (-> state :chats chat-id :messages)]
31+
(assoc-in state [:chats chat-id :messages]
32+
(conj messages message))))))
33+
34+
(defn send! [db chat-id message]
35+
(let [clock-value (get-last-clock-value db chat-id)
36+
prepared-message (assoc message :clock-value (clocks/send clock-value))]
37+
(save! db chat-id prepared-message)
38+
(random-broadcast! chat-id prepared-message)))
39+
40+
(defn recv! [db chat-id {:keys [clock-value] :as message}]
41+
(let [local-clock (get-last-clock-value db chat-id)
42+
new-clock (clocks/receive clock-value local-clock)]
43+
(when-not (= (:from message) (:identity @db))
44+
(save! db chat-id (assoc message :clock-value new-clock)))))
45+
46+
(defn thread [db chat-id]
47+
(let [messages (-> @db :chats chat-id :messages)]
48+
(sort-by :clock-value < messages)))
49+
50+
(defn format-message [{:keys [from text]}]
51+
(str from ": " text ", "))
52+
53+
(defn format-thread [thread]
54+
(apply str (map format-message thread)))
55+
56+
;; Invariant we want to maintain.
57+
(defn ordered-increasing-text? [thread]
58+
(let [xs (map :text thread)]
59+
(or (empty? xs) (apply < xs))))
60+
61+
(defn simulate! []
62+
(send! a :foo {:from "a" :text "1"})
63+
(send! a :foo {:from "a" :text "2"})
64+
65+
(send! a :bar {:from "a" :text "1"})
66+
67+
(send! b :foo {:from "b" :text "3"})
68+
(send! c :foo {:from "c" :text "4"})
69+
(send! a :foo {:from "a" :text "5"})
70+
71+
(send! c :bar {:from "c" :text "7"}))
72+
73+
(deftest clocks
74+
(testing "Message order preserved"
75+
(simulate!)
76+
(is (ordered-increasing-text? (thread a :foo)))
77+
(is (ordered-increasing-text? (thread b :foo)))
78+
(is (ordered-increasing-text? (thread c :foo)))
79+
(is (ordered-increasing-text? (thread a :bar))))
80+
81+
(testing "Bad thread recognized as such"
82+
(let [bad-thread '({:from "a", :text "1", :clock-value 1}
83+
{:from "c", :text "4", :clock-value 1}
84+
{:from "a", :text "2", :clock-value 2}
85+
{:from "a", :text "5", :clock-value 8})]
86+
(is (not (ordered-increasing-text? bad-thread))))))
87+
88+
;; Debugging
89+
;;(println "******************************************")
90+
;;(println "A's POV :foo" (format-thread (thread a :foo)))
91+
;;(println "B's POV :foo" (format-thread (thread b :foo)))
92+
;;(println "C's POV :foo" (format-thread (thread c :foo)))
93+
;;(println "******************************************")

0 commit comments

Comments
 (0)