From 742502c1548f73b3683a9ab6d5401df5bdeb8ddb Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Mon, 1 Jun 2026 13:37:56 +0300 Subject: [PATCH 1/3] Move new WorkerLocal implementation to the old location --- compiler/rustc_data_structures/src/sync.rs | 3 +- .../src/sync/worker_local.rs | 149 --------------- compiler/rustc_interface/src/util.rs | 3 +- compiler/rustc_thread_pool/src/lib.rs | 2 +- .../rustc_thread_pool/src/worker_local.rs | 175 +++++++++++++----- 5 files changed, 133 insertions(+), 199 deletions(-) delete mode 100644 compiler/rustc_data_structures/src/sync/worker_local.rs diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 3d5bc85278286..47e08dd1fce3a 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -42,14 +42,13 @@ pub use self::parallel::{ try_par_for_each_in, }; pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; -pub use self::worker_local::{Registry, WorkerLocal}; +pub use rustc_thread_pool::{ComplementaryRegistry, WorkerLocal}; pub use crate::marker::*; mod freeze; mod lock; mod parallel; mod vec; -mod worker_local; /// Keep the conditional imports together in a submodule, so that import-sorting /// doesn't split them up. diff --git a/compiler/rustc_data_structures/src/sync/worker_local.rs b/compiler/rustc_data_structures/src/sync/worker_local.rs deleted file mode 100644 index d75af00985047..0000000000000 --- a/compiler/rustc_data_structures/src/sync/worker_local.rs +++ /dev/null @@ -1,149 +0,0 @@ -use std::cell::{Cell, OnceCell}; -use std::num::NonZero; -use std::ops::Deref; -use std::ptr; -use std::sync::Arc; - -use parking_lot::Mutex; - -use crate::outline; -use crate::sync::CacheAligned; - -/// A pointer to the `RegistryData` which uniquely identifies a registry. -/// This identifier can be reused if the registry gets freed. -#[derive(Clone, Copy, PartialEq)] -struct RegistryId(*const RegistryData); - -impl RegistryId { - #[inline(always)] - /// Verifies that the current thread is associated with the registry and returns its unique - /// index within the registry. This panics if the current thread is not associated with this - /// registry. - /// - /// Note that there's a race possible where the identifier in `THREAD_DATA` could be reused - /// so this can succeed from a different registry. - fn verify(self) -> usize { - let (id, index) = THREAD_DATA.with(|data| (data.registry_id.get(), data.index.get())); - - if id == self { index } else { outline(|| panic!("Unable to verify registry association")) } - } -} - -struct RegistryData { - thread_limit: NonZero, - threads: Mutex, -} - -/// Represents a list of threads which can access worker locals. -#[derive(Clone)] -pub struct Registry(Arc); - -thread_local! { - /// The registry associated with the thread. - /// This allows the `WorkerLocal` type to clone the registry in its constructor. - static REGISTRY: OnceCell = const { OnceCell::new() }; -} - -struct ThreadData { - registry_id: Cell, - index: Cell, -} - -thread_local! { - /// A thread local which contains the identifier of `REGISTRY` but allows for faster access. - /// It also holds the index of the current thread. - static THREAD_DATA: ThreadData = const { ThreadData { - registry_id: Cell::new(RegistryId(ptr::null())), - index: Cell::new(0), - }}; -} - -impl Registry { - /// Creates a registry which can hold up to `thread_limit` threads. - pub fn new(thread_limit: NonZero) -> Self { - Registry(Arc::new(RegistryData { thread_limit, threads: Mutex::new(0) })) - } - - /// Gets the registry associated with the current thread. Panics if there's no such registry. - pub fn current() -> Self { - REGISTRY.with(|registry| registry.get().cloned().expect("No associated registry")) - } - - /// Registers the current thread with the registry so worker locals can be used on it. - /// Panics if the thread limit is hit or if the thread already has an associated registry. - pub fn register(&self) { - let mut threads = self.0.threads.lock(); - if *threads < self.0.thread_limit.get() { - REGISTRY.with(|registry| { - if registry.get().is_some() { - drop(threads); - panic!("Thread already has a registry"); - } - registry.set(self.clone()).ok(); - THREAD_DATA.with(|data| { - data.registry_id.set(self.id()); - data.index.set(*threads); - }); - *threads += 1; - }); - } else { - drop(threads); - panic!("Thread limit reached"); - } - } - - /// Gets the identifier of this registry. - fn id(&self) -> RegistryId { - RegistryId(&*self.0) - } -} - -/// Holds worker local values for each possible thread in a registry. You can only access the -/// worker local value through the `Deref` impl on the registry associated with the thread it was -/// created on. It will panic otherwise. -pub struct WorkerLocal { - locals: Box<[CacheAligned]>, - registry: Registry, -} - -// This is safe because the `deref` call will return a reference to a `T` unique to each thread -// or it will panic for threads without an associated local. So there isn't a need for `T` to do -// it's own synchronization. The `verify` method on `RegistryId` has an issue where the id -// can be reused, but `WorkerLocal` has a reference to `Registry` which will prevent any reuse. -unsafe impl Sync for WorkerLocal {} - -impl WorkerLocal { - /// Creates a new worker local where the `initial` closure computes the - /// value this worker local should take for each thread in the registry. - #[inline] - pub fn new T>(mut initial: F) -> WorkerLocal { - let registry = Registry::current(); - WorkerLocal { - locals: (0..registry.0.thread_limit.get()).map(|i| CacheAligned(initial(i))).collect(), - registry, - } - } - - /// Returns the worker-local values for each thread - #[inline] - pub fn into_inner(self) -> impl Iterator { - self.locals.into_vec().into_iter().map(|local| local.0) - } -} - -impl Deref for WorkerLocal { - type Target = T; - - #[inline(always)] - fn deref(&self) -> &T { - // This is safe because `verify` will only return values less than - // `self.registry.thread_limit` which is the size of the `self.locals` array. - unsafe { &self.locals.get_unchecked(self.registry.id().verify()).0 } - } -} - -impl Default for WorkerLocal { - fn default() -> Self { - WorkerLocal::new(|_| T::default()) - } -} diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index d7d306918fd0d..644ad778c0616 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -190,7 +190,8 @@ pub(crate) fn run_in_thread_pool_with_globals< let thread_stack_size = init_stack_size(thread_builder_diag); - let registry = sync::Registry::new(std::num::NonZero::new(threads).unwrap()); + let registry = + sync::ComplementaryRegistry::new(std::num::NonZero::new(threads).unwrap()); let Some(proof) = sync::check_dyn_thread_safe() else { return run_in_thread_with_globals( diff --git a/compiler/rustc_thread_pool/src/lib.rs b/compiler/rustc_thread_pool/src/lib.rs index 7ce7fbc27eabe..4d7c914094284 100644 --- a/compiler/rustc_thread_pool/src/lib.rs +++ b/compiler/rustc_thread_pool/src/lib.rs @@ -90,7 +90,7 @@ mod tests; pub mod tlv; -pub use worker_local::WorkerLocal; +pub use worker_local::{ComplementaryRegistry, WorkerLocal}; pub use self::broadcast::{BroadcastContext, broadcast, spawn_broadcast}; pub use self::join::{join, join_context}; diff --git a/compiler/rustc_thread_pool/src/worker_local.rs b/compiler/rustc_thread_pool/src/worker_local.rs index 912001233bfea..fde106d8408b9 100644 --- a/compiler/rustc_thread_pool/src/worker_local.rs +++ b/compiler/rustc_thread_pool/src/worker_local.rs @@ -1,67 +1,142 @@ -use std::fmt; +use std::cell::{Cell, OnceCell}; +use std::num::NonZero; use std::ops::Deref; -use std::sync::Arc; +use std::ptr; +use std::sync::{Arc, Mutex}; -use crate::registry::{Registry, WorkerThread}; +use crossbeam_utils::CachePadded; -#[repr(align(64))] -#[derive(Debug)] -struct CacheAligned(T); +/// A pointer to the `ComplementaryRegistryData` which uniquely identifies a registry. +/// This identifier can be reused if the registry gets freed. +#[derive(Clone, Copy, PartialEq)] +struct ComplementaryRegistryId(*const ComplementaryRegistryData); -/// Holds worker-locals values for each thread in a thread pool. -/// You can only access the worker local value through the Deref impl -/// on the thread pool it was constructed on. It will panic otherwise +impl ComplementaryRegistryId { + #[inline(always)] + /// Verifies that the current thread is associated with the registry and returns its unique + /// index within the registry. This panics if the current thread is not associated with this + /// registry. + /// + /// Note that there's a race possible where the identifier in `THREAD_DATA` could be reused + /// so this can succeed from a different registry. + fn verify(self) -> usize { + let (id, index) = THREAD_DATA.with(|data| (data.registry_id.get(), data.index.get())); + + if id == self { index } else { ComplementaryRegistryId::verification_error() } + } + + #[cold] + #[inline(never)] + fn verification_error() -> ! { + panic!("Unable to verify registry association") + } +} + +struct ComplementaryRegistryData { + thread_limit: NonZero, + threads: Mutex, +} + +/// Represents a list of threads which can access worker locals. +#[derive(Clone)] +pub struct ComplementaryRegistry(Arc); + +thread_local! { + /// The registry associated with the thread. + /// This allows the `WorkerLocal` type to clone the registry in its constructor. + static REGISTRY: OnceCell = const { OnceCell::new() }; +} + +struct ThreadData { + registry_id: Cell, + index: Cell, +} + +thread_local! { + /// A thread local which contains the identifier of `REGISTRY` but allows for faster access. + /// It also holds the index of the current thread. + static THREAD_DATA: ThreadData = const { ThreadData { + registry_id: Cell::new(ComplementaryRegistryId(ptr::null())), + index: Cell::new(0), + }}; +} + +impl ComplementaryRegistry { + /// Creates a registry which can hold up to `thread_limit` threads. + pub fn new(thread_limit: NonZero) -> Self { + ComplementaryRegistry(Arc::new(ComplementaryRegistryData { + thread_limit, + threads: Mutex::new(0), + })) + } + + /// Gets the registry associated with the current thread. Panics if there's no such registry. + pub fn current() -> Self { + REGISTRY.with(|registry| registry.get().cloned().expect("No associated registry")) + } + + /// Registers the current thread with the registry so worker locals can be used on it. + /// Panics if the thread limit is hit or if the thread already has an associated registry. + pub fn register(&self) { + let mut threads = self.0.threads.lock().unwrap(); + if *threads < self.0.thread_limit.get() { + REGISTRY.with(|registry| { + if registry.get().is_some() { + drop(threads); + panic!("Thread already has a registry"); + } + registry.set(self.clone()).ok(); + THREAD_DATA.with(|data| { + data.registry_id.set(self.id()); + data.index.set(*threads); + }); + *threads += 1; + }); + } else { + drop(threads); + panic!("Thread limit reached"); + } + } + + /// Gets the identifier of this registry. + fn id(&self) -> ComplementaryRegistryId { + ComplementaryRegistryId(&*self.0) + } +} + +/// Holds worker local values for each possible thread in a registry. You can only access the +/// worker local value through the `Deref` impl on the registry associated with the thread it was +/// created on. It will panic otherwise. pub struct WorkerLocal { - locals: Vec>, - registry: Arc, + locals: Box<[CachePadded]>, + registry: ComplementaryRegistry, } -/// We prevent concurrent access to the underlying value in the -/// Deref impl, thus any values safe to send across threads can -/// be used with WorkerLocal. +// This is safe because the `deref` call will return a reference to a `T` unique to each thread +// or it will panic for threads without an associated local. So there isn't a need for `T` to do +// it's own synchronization. The `verify` method on `RegistryId` has an issue where the id +// can be reused, but `WorkerLocal` has a reference to `Registry` which will prevent any reuse. unsafe impl Sync for WorkerLocal {} impl WorkerLocal { /// Creates a new worker local where the `initial` closure computes the - /// value this worker local should take for each thread in the thread pool. + /// value this worker local should take for each thread in the registry. #[inline] + #[track_caller] pub fn new T>(mut initial: F) -> WorkerLocal { - let registry = Registry::current(); + let registry = ComplementaryRegistry::current(); WorkerLocal { - locals: (0..registry.num_threads()).map(|i| CacheAligned(initial(i))).collect(), + locals: (0..registry.0.thread_limit.get()) + .map(|i| CachePadded::new(initial(i))) + .collect(), registry, } } - /// Returns the worker-local value for each thread + /// Returns the worker-local values for each thread #[inline] - pub fn into_inner(self) -> Vec { - self.locals.into_iter().map(|c| c.0).collect() - } - - fn current(&self) -> &T { - unsafe { - let worker_thread = WorkerThread::current(); - if worker_thread.is_null() - || !std::ptr::eq(&*(*worker_thread).registry, &*self.registry) - { - panic!("WorkerLocal can only be used on the thread pool it was created on") - } - &self.locals[(*worker_thread).index].0 - } - } -} - -impl WorkerLocal> { - /// Joins the elements of all the worker locals into one Vec - pub fn join(self) -> Vec { - self.into_inner().into_iter().flatten().collect() - } -} - -impl fmt::Debug for WorkerLocal { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("WorkerLocal").field("registry", &self.registry.id()).finish() + pub fn into_inner(self) -> impl Iterator { + self.locals.into_vec().into_iter().map(CachePadded::into_inner) } } @@ -70,6 +145,14 @@ impl Deref for WorkerLocal { #[inline(always)] fn deref(&self) -> &T { - self.current() + // This is safe because `verify` will only return values less than + // `self.registry.thread_limit` which is the size of the `self.locals` array. + unsafe { &*self.locals.get_unchecked(self.registry.id().verify()) } + } +} + +impl Default for WorkerLocal { + fn default() -> Self { + WorkerLocal::new(|_| T::default()) } } From 49681c817ff7f6247c63bf39fa70920cb3f9c98d Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Mon, 1 Jun 2026 14:12:58 +0300 Subject: [PATCH 2/3] Update comments --- compiler/rustc_thread_pool/src/worker_local.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/compiler/rustc_thread_pool/src/worker_local.rs b/compiler/rustc_thread_pool/src/worker_local.rs index fde106d8408b9..6fd371439b741 100644 --- a/compiler/rustc_thread_pool/src/worker_local.rs +++ b/compiler/rustc_thread_pool/src/worker_local.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex}; use crossbeam_utils::CachePadded; -/// A pointer to the `ComplementaryRegistryData` which uniquely identifies a registry. +/// A pointer to the `ComplementaryRegistryData` which uniquely identifies a complementary registry. /// This identifier can be reused if the registry gets freed. #[derive(Clone, Copy, PartialEq)] struct ComplementaryRegistryId(*const ComplementaryRegistryData); @@ -42,7 +42,7 @@ struct ComplementaryRegistryData { pub struct ComplementaryRegistry(Arc); thread_local! { - /// The registry associated with the thread. + /// The complementary registry associated with the thread. /// This allows the `WorkerLocal` type to clone the registry in its constructor. static REGISTRY: OnceCell = const { OnceCell::new() }; } @@ -104,9 +104,9 @@ impl ComplementaryRegistry { } } -/// Holds worker local values for each possible thread in a registry. You can only access the -/// worker local value through the `Deref` impl on the registry associated with the thread it was -/// created on. It will panic otherwise. +/// Holds worker local values for each possible thread in a complementary registry. +/// You can only access the worker local value through the `Deref` impl on the registry associated +/// with the thread it was created on. It will panic otherwise. pub struct WorkerLocal { locals: Box<[CachePadded]>, registry: ComplementaryRegistry, @@ -114,8 +114,9 @@ pub struct WorkerLocal { // This is safe because the `deref` call will return a reference to a `T` unique to each thread // or it will panic for threads without an associated local. So there isn't a need for `T` to do -// it's own synchronization. The `verify` method on `RegistryId` has an issue where the id -// can be reused, but `WorkerLocal` has a reference to `Registry` which will prevent any reuse. +// it's own synchronization. The `verify` method on `ComplementaryRegistryId` has an issue where the +// id can be reused, but `WorkerLocal` has a reference to `ComplementaryRegistry` which will prevent +// any reuse. unsafe impl Sync for WorkerLocal {} impl WorkerLocal { From a94aa003999460b0be6200bbcebc9d922bf4b4e6 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Mon, 1 Jun 2026 16:43:18 +0300 Subject: [PATCH 3/3] Formatter --- compiler/rustc_data_structures/src/sync.rs | 2 +- compiler/rustc_interface/src/util.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 47e08dd1fce3a..b6b6ae778ca61 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -29,6 +29,7 @@ pub use parking_lot::{ MappedRwLockReadGuard as MappedReadGuard, MappedRwLockWriteGuard as MappedWriteGuard, RwLockReadGuard as ReadGuard, RwLockWriteGuard as WriteGuard, }; +pub use rustc_thread_pool::{ComplementaryRegistry, WorkerLocal}; pub use self::atomic::AtomicU64; pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard}; @@ -42,7 +43,6 @@ pub use self::parallel::{ try_par_for_each_in, }; pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; -pub use rustc_thread_pool::{ComplementaryRegistry, WorkerLocal}; pub use crate::marker::*; mod freeze; diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index 644ad778c0616..d2d0d74f2a55a 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -190,8 +190,7 @@ pub(crate) fn run_in_thread_pool_with_globals< let thread_stack_size = init_stack_size(thread_builder_diag); - let registry = - sync::ComplementaryRegistry::new(std::num::NonZero::new(threads).unwrap()); + let registry = sync::ComplementaryRegistry::new(std::num::NonZero::new(threads).unwrap()); let Some(proof) = sync::check_dyn_thread_safe() else { return run_in_thread_with_globals(