Skip to content

Commit ecf8328

Browse files
nolouchovervenus
authored andcommitted
raft: leader respond to learner read index message (#220)
Signed-off-by: nolouch <nolouch@gmail.com> Signed-off-by: Neil Shen <overvenus@gmail.com>
1 parent 3b41216 commit ecf8328

File tree

2 files changed

+90
-5
lines changed

2 files changed

+90
-5
lines changed

src/raft.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,11 +1486,23 @@ impl<T: Storage> Raft<T> {
14861486
}
14871487
}
14881488
} else {
1489-
let rs = ReadState {
1490-
index: self.raft_log.committed,
1491-
request_ctx: m.take_entries()[0].take_data(),
1492-
};
1493-
self.read_states.push(rs);
1489+
// there is only one voting member (the leader) in the cluster
1490+
if m.get_from() == INVALID_ID || m.get_from() == self.id {
1491+
// from leader itself
1492+
let rs = ReadState {
1493+
index: self.raft_log.committed,
1494+
request_ctx: m.take_entries()[0].take_data(),
1495+
};
1496+
self.read_states.push(rs);
1497+
} else {
1498+
// from learner member
1499+
let mut to_send = Message::default();
1500+
to_send.set_to(m.get_from());
1501+
to_send.set_msg_type(MessageType::MsgReadIndexResp);
1502+
to_send.set_index(self.raft_log.committed);
1503+
to_send.set_entries(m.take_entries());
1504+
self.send(to_send);
1505+
}
14941506
}
14951507
return Ok(());
14961508
}

tests/integration_cases/test_raft.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2205,6 +2205,79 @@ fn test_read_only_option_safe() {
22052205
}
22062206
}
22072207

2208+
#[test]
2209+
fn test_read_only_with_learner() {
2210+
setup_for_test();
2211+
let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage());
2212+
let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage());
2213+
2214+
let mut nt = Network::new(vec![Some(a), Some(b)]);
2215+
2216+
// we can not let system choose the value of randomizedElectionTimeout
2217+
// otherwise it will introduce some uncertainty into this test case
2218+
// we need to ensure randomizedElectionTimeout > electionTimeout here
2219+
let b_election_timeout = nt.peers[&2].get_election_timeout();
2220+
nt.peers
2221+
.get_mut(&2)
2222+
.unwrap()
2223+
.set_randomized_election_timeout(b_election_timeout + 1);
2224+
2225+
for _ in 0..b_election_timeout {
2226+
nt.peers.get_mut(&2).unwrap().tick();
2227+
}
2228+
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
2229+
2230+
assert_eq!(nt.peers[&1].state, StateRole::Leader);
2231+
assert_eq!(nt.peers[&2].state, StateRole::Follower);
2232+
2233+
let mut tests = vec![
2234+
(1, 10, 11, "ctx1"),
2235+
(2, 10, 21, "ctx2"),
2236+
(1, 10, 31, "ctx3"),
2237+
(2, 10, 41, "ctx4"),
2238+
];
2239+
2240+
for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() {
2241+
for _ in 0..proposals {
2242+
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
2243+
}
2244+
2245+
let e = new_entry(0, 0, Some(wctx));
2246+
nt.send(vec![new_message_with_entries(
2247+
id,
2248+
id,
2249+
MessageType::MsgReadIndex,
2250+
vec![e],
2251+
)]);
2252+
2253+
let read_states: Vec<ReadState> = nt
2254+
.peers
2255+
.get_mut(&id)
2256+
.unwrap()
2257+
.read_states
2258+
.drain(..)
2259+
.collect();
2260+
assert_eq!(
2261+
read_states.is_empty(),
2262+
false,
2263+
"#{}: read_states is empty, want non-empty",
2264+
i
2265+
);
2266+
let rs = &read_states[0];
2267+
assert_eq!(
2268+
rs.index, wri,
2269+
"#{}: read_index = {}, want {}",
2270+
i, rs.index, wri
2271+
);
2272+
let vec_wctx = wctx.as_bytes().to_vec();
2273+
assert_eq!(
2274+
rs.request_ctx, vec_wctx,
2275+
"#{}: request_ctx = {:?}, want {:?}",
2276+
i, rs.request_ctx, vec_wctx
2277+
);
2278+
}
2279+
}
2280+
22082281
#[test]
22092282
fn test_read_only_option_lease() {
22102283
setup_for_test();

0 commit comments

Comments
 (0)