Skip to content

Commit 458564d

Browse files
committed
add forked_from_thread_id turn metadata
1 parent 4234884 commit 458564d

17 files changed

Lines changed: 345 additions & 6 deletions

File tree

codex-rs/app-server/src/request_processors/external_agent_config_processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ impl ExternalAgentConfigRequestProcessor {
307307
config,
308308
initial_history: InitialHistory::Forked(rollout_items),
309309
session_source: None,
310+
forked_from_thread_id: None,
310311
thread_source: None,
311312
dynamic_tools: Vec::new(),
312313
persist_extended_history: false,

codex-rs/app-server/src/request_processors/thread_processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,7 @@ impl ThreadRequestProcessor {
11041104
codex_app_server_protocol::ThreadStartSource::Clear => InitialHistory::Cleared,
11051105
},
11061106
session_source: None,
1107+
forked_from_thread_id: None,
11071108
thread_source,
11081109
dynamic_tools: core_dynamic_tools,
11091110
persist_extended_history: false,

codex-rs/app-server/tests/suite/v2/client_metadata.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
use anyhow::Result;
22
use app_test_support::McpProcess;
3+
use app_test_support::create_fake_rollout;
34
use app_test_support::to_response;
45
use codex_app_server_protocol::JSONRPCResponse;
56
use codex_app_server_protocol::RequestId;
7+
use codex_app_server_protocol::ThreadForkParams;
8+
use codex_app_server_protocol::ThreadForkResponse;
9+
use codex_app_server_protocol::ThreadSource;
610
use codex_app_server_protocol::ThreadStartParams;
711
use codex_app_server_protocol::ThreadStartResponse;
812
use codex_app_server_protocol::TurnStartParams;
@@ -99,6 +103,95 @@ async fn turn_start_forwards_client_metadata_to_responses_request_v2() -> Result
99103
Ok(())
100104
}
101105

106+
#[tokio::test]
107+
async fn turn_start_sends_fork_lineage_in_turn_metadata_for_thread_fork_v2() -> Result<()> {
108+
skip_if_no_network!(Ok(()));
109+
110+
let server = responses::start_mock_server().await;
111+
let response_mock = responses::mount_sse_once(
112+
&server,
113+
responses::sse(vec![
114+
responses::ev_response_created("resp-1"),
115+
responses::ev_assistant_message("msg-1", "Done"),
116+
responses::ev_completed("resp-1"),
117+
]),
118+
)
119+
.await;
120+
121+
let codex_home = TempDir::new()?;
122+
create_config_toml(
123+
codex_home.path(),
124+
&server.uri(),
125+
/*supports_websockets*/ false,
126+
)?;
127+
128+
let source_thread_id = create_fake_rollout(
129+
codex_home.path(),
130+
"2025-01-05T12-00-00",
131+
"2025-01-05T12:00:00Z",
132+
"Saved user message",
133+
Some("mock_provider"),
134+
/*git_info*/ None,
135+
)?;
136+
137+
let mut mcp = McpProcess::new(codex_home.path()).await?;
138+
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
139+
140+
let fork_req = mcp
141+
.send_thread_fork_request(ThreadForkParams {
142+
thread_id: source_thread_id.clone(),
143+
thread_source: Some(ThreadSource::User),
144+
..Default::default()
145+
})
146+
.await?;
147+
let fork_resp: JSONRPCResponse = timeout(
148+
DEFAULT_READ_TIMEOUT,
149+
mcp.read_stream_until_response_message(RequestId::Integer(fork_req)),
150+
)
151+
.await??;
152+
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
153+
154+
let turn_req = mcp
155+
.send_turn_start_request(TurnStartParams {
156+
thread_id: thread.id.clone(),
157+
input: vec![V2UserInput::Text {
158+
text: "Continue".to_string(),
159+
text_elements: Vec::new(),
160+
}],
161+
..Default::default()
162+
})
163+
.await?;
164+
let turn_resp: JSONRPCResponse = timeout(
165+
DEFAULT_READ_TIMEOUT,
166+
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
167+
)
168+
.await??;
169+
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
170+
171+
timeout(
172+
DEFAULT_READ_TIMEOUT,
173+
mcp.read_stream_until_notification_message("turn/completed"),
174+
)
175+
.await??;
176+
177+
let request = response_mock.single_request();
178+
let metadata = request
179+
.header("x-codex-turn-metadata")
180+
.as_deref()
181+
.map(parse_json_header)
182+
.unwrap_or_else(|| panic!("missing x-codex-turn-metadata header"));
183+
assert_eq!(
184+
metadata["forked_from_thread_id"].as_str(),
185+
Some(source_thread_id.as_str())
186+
);
187+
assert_eq!(metadata["thread_id"].as_str(), Some(thread.id.as_str()));
188+
assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str()));
189+
assert!(metadata.get("parent_thread_id").is_none());
190+
assert!(metadata.get("subagent_type").is_none());
191+
192+
Ok(())
193+
}
194+
102195
#[tokio::test]
103196
async fn turn_steer_updates_client_metadata_on_follow_up_responses_request_v2() -> Result<()> {
104197
skip_if_no_network!(Ok(()));

codex-rs/core/src/agent/control.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ impl AgentControl {
487487
self.clone(),
488488
session_source,
489489
/*thread_source*/ Some(ThreadSource::Subagent),
490+
/*forked_from_thread_id*/ Some(parent_thread_id),
490491
/*persist_extended_history*/ false,
491492
inherited_shell_snapshot,
492493
inherited_exec_policy,

codex-rs/core/src/codex_delegate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ pub(crate) async fn run_codex_thread_interactive(
8686
extensions: Arc::clone(&parent_session.services.extensions),
8787
conversation_history: initial_history.unwrap_or(InitialHistory::New),
8888
session_source: SessionSource::SubAgent(subagent_source.clone()),
89+
forked_from_thread_id: None,
8990
thread_source: Some(ThreadSource::Subagent),
9091
agent_control: parent_session.services.agent_control.clone(),
9192
dynamic_tools: Vec::new(),

codex-rs/core/src/session/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::realtime_conversation::RealtimeConversationManager;
3636
use crate::session_prefix::format_subagent_notification_message;
3737
use crate::skills::SkillRenderSideEffects;
3838
use crate::skills_load_input_from_config;
39+
use crate::turn_metadata::ThreadMetadataLineage;
3940
use crate::turn_metadata::TurnMetadataState;
4041
use crate::turn_timing::now_unix_timestamp_ms;
4142
use async_channel::Receiver;
@@ -398,6 +399,7 @@ pub(crate) struct CodexSpawnArgs {
398399
pub(crate) extensions: Arc<codex_extension_api::ExtensionRegistry<crate::config::Config>>,
399400
pub(crate) conversation_history: InitialHistory,
400401
pub(crate) session_source: SessionSource,
402+
pub(crate) forked_from_thread_id: Option<ThreadId>,
401403
pub(crate) thread_source: Option<ThreadSource>,
402404
pub(crate) agent_control: AgentControl,
403405
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
@@ -462,6 +464,7 @@ impl Codex {
462464
extensions,
463465
conversation_history,
464466
session_source,
467+
forked_from_thread_id,
465468
thread_source,
466469
agent_control,
467470
dynamic_tools,
@@ -617,6 +620,8 @@ impl Codex {
617620
app_server_client_name: None,
618621
app_server_client_version: None,
619622
session_source,
623+
forked_from_thread_id: forked_from_thread_id
624+
.or_else(|| conversation_history.forked_from_id()),
620625
thread_source,
621626
dynamic_tools,
622627
persist_extended_history,

codex-rs/core/src/session/review.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub(super) async fn spawn_review_thread(
7575
let turn_metadata_state = Arc::new(TurnMetadataState::new(
7676
sess.session_id().to_string(),
7777
sess.thread_id().to_string(),
78+
ThreadMetadataLineage::from_fork_source(sess.forked_from_thread_id()),
7879
parent_turn_context.thread_source,
7980
review_turn_id.clone(),
8081
#[allow(deprecated)]

codex-rs/core/src/session/session.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio::sync::Semaphore;
1818
/// A session has at most 1 running task at a time, and can be interrupted by user input.
1919
pub(crate) struct Session {
2020
pub(crate) conversation_id: ThreadId,
21+
pub(super) forked_from_thread_id: Option<ThreadId>,
2122
pub(crate) installation_id: String,
2223
pub(super) tx_event: Sender<Event>,
2324
pub(super) agent_status: watch::Sender<AgentStatus>,
@@ -95,6 +96,8 @@ pub(crate) struct SessionConfiguration {
9596
pub(super) app_server_client_version: Option<String>,
9697
/// Source of the session (cli, vscode, exec, mcp, ...)
9798
pub(super) session_source: SessionSource,
99+
/// Immediate history source copied into this thread, when this thread was forked.
100+
pub(super) forked_from_thread_id: Option<ThreadId>,
98101
/// Optional analytics source classification for this thread.
99102
pub(super) thread_source: Option<ThreadSource>,
100103
pub(super) dynamic_tools: Vec<DynamicToolSpec>,
@@ -467,6 +470,10 @@ impl Session {
467470
self.conversation_id
468471
}
469472

473+
pub(crate) fn forked_from_thread_id(&self) -> Option<ThreadId> {
474+
self.forked_from_thread_id
475+
}
476+
470477
/// Returns the identity shared by the root thread and all descendant threads.
471478
pub(crate) fn session_id(&self) -> SessionId {
472479
self.services.agent_control.session_id()
@@ -505,7 +512,10 @@ impl Session {
505512
session_configuration.collaboration_mode.model(),
506513
session_configuration.provider
507514
);
508-
let forked_from_id = initial_history.forked_from_id();
515+
let forked_from_id = session_configuration
516+
.forked_from_thread_id
517+
.or_else(|| initial_history.forked_from_id());
518+
session_configuration.forked_from_thread_id = forked_from_id;
509519

510520
let event_persistence_mode = if session_configuration.persist_extended_history {
511521
ThreadEventPersistenceMode::Extended
@@ -1040,6 +1050,7 @@ impl Session {
10401050

10411051
let sess = Arc::new(Session {
10421052
conversation_id: thread_id,
1053+
forked_from_thread_id: forked_from_id,
10431054
installation_id,
10441055
tx_event: tx_event.clone(),
10451056
agent_status,

codex-rs/core/src/session/tests.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2999,6 +2999,7 @@ async fn set_rate_limits_retains_previous_credits() {
29992999
app_server_client_name: None,
30003000
app_server_client_version: None,
30013001
session_source: SessionSource::Exec,
3002+
forked_from_thread_id: None,
30023003
thread_source: None,
30033004
dynamic_tools: Vec::new(),
30043005
persist_extended_history: false,
@@ -3103,6 +3104,7 @@ async fn set_rate_limits_updates_plan_type_when_present() {
31033104
app_server_client_name: None,
31043105
app_server_client_version: None,
31053106
session_source: SessionSource::Exec,
3107+
forked_from_thread_id: None,
31063108
thread_source: None,
31073109
dynamic_tools: Vec::new(),
31083110
persist_extended_history: false,
@@ -3630,6 +3632,7 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati
36303632
app_server_client_name: None,
36313633
app_server_client_version: None,
36323634
session_source: SessionSource::Exec,
3635+
forked_from_thread_id: None,
36333636
thread_source: None,
36343637
dynamic_tools: Vec::new(),
36353638
persist_extended_history: false,
@@ -4373,6 +4376,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
43734376
app_server_client_name: None,
43744377
app_server_client_version: None,
43754378
session_source: SessionSource::Exec,
4379+
forked_from_thread_id: None,
43764380
thread_source: None,
43774381
dynamic_tools: Vec::new(),
43784382
persist_extended_history: false,
@@ -4482,6 +4486,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
44824486
app_server_client_name: None,
44834487
app_server_client_version: None,
44844488
session_source: SessionSource::Exec,
4489+
forked_from_thread_id: None,
44854490
thread_source: None,
44864491
dynamic_tools: Vec::new(),
44874492
persist_extended_history: false,
@@ -4623,6 +4628,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
46234628

46244629
let session = Session {
46254630
conversation_id: thread_id,
4631+
forked_from_thread_id: None,
46264632
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
46274633
tx_event,
46284634
agent_status: agent_status_tx,
@@ -4715,6 +4721,7 @@ async fn make_session_with_config_and_rx(
47154721
app_server_client_name: None,
47164722
app_server_client_version: None,
47174723
session_source: SessionSource::Exec,
4724+
forked_from_thread_id: None,
47184725
thread_source: None,
47194726
dynamic_tools: Vec::new(),
47204727
persist_extended_history: false,
@@ -4818,6 +4825,7 @@ async fn make_session_with_history_source_and_agent_control_and_rx(
48184825
app_server_client_name: None,
48194826
app_server_client_version: None,
48204827
session_source: session_source.clone(),
4828+
forked_from_thread_id: None,
48214829
thread_source: None,
48224830
dynamic_tools: Vec::new(),
48234831
persist_extended_history: false,
@@ -6311,6 +6319,7 @@ where
63116319
app_server_client_name: None,
63126320
app_server_client_version: None,
63136321
session_source: SessionSource::Exec,
6322+
forked_from_thread_id: None,
63146323
thread_source: None,
63156324
dynamic_tools,
63166325
persist_extended_history: false,
@@ -6452,6 +6461,7 @@ where
64526461

64536462
let session = Arc::new(Session {
64546463
conversation_id: thread_id,
6464+
forked_from_thread_id: None,
64556465
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
64566466
tx_event,
64576467
agent_status: agent_status_tx,

codex-rs/core/src/session/tests/guardian_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
692692
session_source: SessionSource::SubAgent(SubAgentSource::Other(
693693
GUARDIAN_REVIEWER_NAME.to_string(),
694694
)),
695+
forked_from_thread_id: None,
695696
thread_source: None,
696697
agent_control: AgentControl::default(),
697698
dynamic_tools: Vec::new(),

0 commit comments

Comments
 (0)