1515{-# LANGUAGE TypeApplications #-}
1616{-# LANGUAGE TypeFamilies #-}
1717{-# LANGUAGE TypeOperators #-}
18+ {-# OPTIONS_GHC -Wno-orphans #-}
1819{-
1920
2021Implements a scheduler for cooperative multitasking. The scheduler supports
@@ -53,6 +54,7 @@ module Plutus.Trace.Scheduler(
5354 ) where
5455
5556
57+ import Control.Applicative ((<|>) )
5658import Control.Lens hiding (Empty )
5759import Control.Monad.Freer
5860import Control.Monad.Freer.Coroutine
@@ -72,6 +74,8 @@ import Plutus.Trace.Tag (Tag)
7274import Prettyprinter
7375import Prettyprinter.Extras (PrettyShow (.. ), Tagged (.. ))
7476
77+ deriving instance Functor (Status effs a b )
78+
7579{- Note [Thread Tag]
7680
7781Within the scheduler, threads are identified by their 'ThreadId'. The thread
@@ -157,24 +161,27 @@ data WithPriority t
157161 , _thread :: t
158162 } deriving Functor
159163
160- type SuspendedThread effs systemEvent = WithPriority (EmThread effs systemEvent )
164+ type SuspendedThread effs systemEvent a = WithPriority (EmThread effs systemEvent a )
161165
162166type EmSystemCall effs systemEvent = WithPriority (SysCall effs systemEvent )
163167
164168type AgentSystemCall systemEvent = WithPriority (MessageCall systemEvent )
165169
166170-- | Thread that can be run by the scheduler
167- data EmThread effs systemEvent =
171+ data EmThread effs systemEvent a =
168172 EmThread
169- { _continuation :: Maybe systemEvent -> Eff effs (Status effs (EmSystemCall effs systemEvent ) (Maybe systemEvent ) () ) -- ^ The continuation to be run when the thread is resumed.
173+ { _continuation :: Maybe systemEvent -> Eff effs (Status effs (EmSystemCall effs systemEvent ) (Maybe systemEvent ) (Maybe a )) -- ^ The continuation to be run when the thread is resumed.
170174 , _threadId :: ThreadId -- ^ Thread ID
171175 , _tag :: Tag -- ^ Tag of the thread. See note [Thread Tag]
172176 }
173177
178+ mapEmThread :: (Maybe a -> Maybe b ) -> EmThread effs systemEvent a -> EmThread effs systemEvent b
179+ mapEmThread f EmThread {_continuation, _threadId, _tag} = EmThread { _threadId, _tag, _continuation = fmap (fmap (fmap f)) _continuation }
180+
174181-- | The system calls we can make to the scheduler, affecting the the threads
175182-- that are currently running.
176183data ThreadCall effs systemEvent
177- = Fork (ThreadId -> SuspendedThread effs systemEvent ) -- ^ Start a new thread with a new thread ID.
184+ = Fork (ThreadId -> SuspendedThread effs systemEvent () ) -- ^ Start a new thread with a new thread ID.
178185 | Thaw ThreadId -- ^ Unfreeze a thread.
179186 | Exit -- ^ Terminate the scheduler.
180187
@@ -190,11 +197,11 @@ makePrisms ''MessageCall
190197makePrisms ''ThreadCall
191198
192199-- | Scheduler state
193- data SchedulerState effs systemEvent
200+ data SchedulerState effs systemEvent a
194201 = SchedulerState
195- { _normalPrio :: Seq (EmThread effs systemEvent ) -- ^ Threads running at normal priority
196- , _sleeping :: Seq (EmThread effs systemEvent ) -- ^ Sleeping threads (waiting for an external event)
197- , _frozen :: Seq (EmThread effs systemEvent ) -- ^ Frozen threads (will not be resumed until they are explicitly unfrozen)
202+ { _normalPrio :: Seq (EmThread effs systemEvent a ) -- ^ Threads running at normal priority
203+ , _sleeping :: Seq (EmThread effs systemEvent a ) -- ^ Sleeping threads (waiting for an external event)
204+ , _frozen :: Seq (EmThread effs systemEvent a ) -- ^ Frozen threads (will not be resumed until they are explicitly unfrozen)
198205 , _lastThreadId :: ThreadId -- ^ Last thread id assigned to a thread
199206 , _mailboxes :: HashMap ThreadId (Seq systemEvent ) -- ^ The mailboxes of all active threads.
200207 , _activeThreads :: Map Tag (HashSet ThreadId ) -- ^ Map of tags to thread IDs. See note [Thread Tag]
@@ -204,16 +211,16 @@ makeLenses ''SchedulerState
204211
205212-- | Remove a thread from the set of active threads. Usually called when the
206213-- thread is finished.
207- removeActiveThread :: ThreadId -> SchedulerState effs systemEvent -> SchedulerState effs systemEvent
214+ removeActiveThread :: ThreadId -> SchedulerState effs systemEvent a -> SchedulerState effs systemEvent a
208215removeActiveThread tid = over (activeThreads . mapped) (HashSet. delete tid)
209216
210217-- | A suspended thread with a priority and the thread itself.
211- suspendThread :: Priority -> EmThread effs systemEvent -> SuspendedThread effs systemEvent
218+ suspendThread :: Priority -> EmThread effs systemEvent a -> SuspendedThread effs systemEvent a
212219suspendThread = WithPriority
213220
214221-- | Make a thread with the given priority from an action. This is a
215222-- convenience for defining 'SimulatorInterpreter' values.
216- mkThread :: Tag -> Priority -> Eff (Reader ThreadId ': Yield (EmSystemCall effs systemEvent ) (Maybe systemEvent ) ': effs ) () -> ThreadId -> SuspendedThread effs systemEvent
223+ mkThread :: Tag -> Priority -> Eff (Reader ThreadId ': Yield (EmSystemCall effs systemEvent ) (Maybe systemEvent ) ': effs ) (Maybe a ) -> ThreadId -> SuspendedThread effs systemEvent a
217224mkThread tag prio action tid =
218225 let action' = runReader tid action
219226 in WithPriority
@@ -248,7 +255,7 @@ fork :: forall effs systemEvent effs2.
248255 -> Priority -- ^ Priority of the new thread.
249256 -> Eff (Reader ThreadId ': Yield (EmSystemCall effs systemEvent ) (Maybe systemEvent ) ': effs ) ()
250257 -> Eff effs2 (Maybe systemEvent )
251- fork tag prio action = mkSysCall prio (Right $ Fork $ mkThread tag prio action)
258+ fork tag prio action = mkSysCall prio (Right $ Fork $ mkThread tag prio $ Just <$> action)
252259
253260-- | Suspend the current thread
254261sleep :: forall effs systemEvent effs2 .
@@ -271,59 +278,60 @@ initialThreadTag = "initial thread"
271278-- effect using the scheduler, see note [Scheduler]. 'runThreads' only
272279-- returns when all threads are finished.
273280runThreads ::
274- forall effs systemEvent .
281+ forall a effs systemEvent .
275282 ( Eq systemEvent
276283 , Member (LogMsg SchedulerLog ) effs
277284 )
278- => Eff (Reader ThreadId ': Yield (EmSystemCall effs systemEvent ) (Maybe systemEvent ) ': effs ) ()
279- -> Eff effs ()
285+ => Eff (Reader ThreadId ': Yield (EmSystemCall effs systemEvent ) (Maybe systemEvent ) ': effs ) a
286+ -> Eff effs (Maybe a )
280287runThreads e = do
281288 k <- runC $ runReader initialThreadId e
282289 case k of
283- Done () -> pure ()
290+ Done a -> pure $ Just a
284291 Continue _ k' ->
285- let initialThread = EmThread {_continuation = k', _threadId = initialThreadId, _tag = initialThreadTag}
286- in loop
292+ let initialThread = EmThread {_continuation = fmap ( fmap Just ) . k', _threadId = initialThreadId, _tag = initialThreadTag}
293+ in loop Nothing
287294 $ initialState
288295 & activeThreads . at initialThreadTag . non mempty %~ HashSet. insert initialThreadId
289296 & mailboxes . at initialThreadId ?~ Seq. empty
290297 & (fst . nextThreadId)
291298 & enqueue (suspendThread Normal initialThread)
292299
293300-- | Run the threads that are scheduled in a 'SchedulerState' to completion.
294- loop :: forall effs systemEvent .
301+ loop :: forall a effs systemEvent .
295302 ( Eq systemEvent
296303 , Member (LogMsg SchedulerLog ) effs
297304 )
298- => SchedulerState effs systemEvent
299- -> Eff effs ()
300- loop s = do
305+ => Maybe a
306+ -> SchedulerState effs systemEvent a
307+ -> Eff effs (Maybe a )
308+ loop ma s = do
301309 case dequeue s of
302310 AThread EmThread {_continuation, _threadId, _tag} event schedulerState prio -> do
303311 let mkLog e = SchedulerLog {slEvent= e, slThread= _threadId, slPrio= prio, slTag = _tag}
304312 result <- _continuation event
305313 case result of
306- Done () -> loop $ schedulerState & removeActiveThread _threadId
314+ Done ma' -> loop (ma <|> ma') $ schedulerState & removeActiveThread _threadId
307315 Continue WithPriority {_priority, _thread= sysCall} k -> do
308316 let thisThread = suspendThread _priority EmThread {_threadId= _threadId, _continuation= k, _tag = _tag}
309317 newState <- schedulerState & enqueue thisThread & handleSysCall sysCall
310318 case newState of
311- Left r -> logDebug (mkLog $ Stopped r)
312- Right newState' -> loop newState'
313- _ -> pure ()
319+ Left r -> logDebug (mkLog $ Stopped r) >> pure ma
320+ Right newState' -> loop ma newState'
321+ _ -> pure ma
314322
315323-- | Deal with a system call from a running thread.
316324handleSysCall ::
317325 ( Eq systemEvent
318326 , Member (LogMsg SchedulerLog ) effs
319327 )
320328 => SysCall effs systemEvent
321- -> SchedulerState effs systemEvent
322- -> Eff effs (Either StopReason (SchedulerState effs systemEvent ))
329+ -> SchedulerState effs systemEvent a
330+ -> Eff effs (Either StopReason (SchedulerState effs systemEvent a ))
323331handleSysCall sysCall schedulerState = case sysCall of
324332 Right (Fork newThread) -> do
325333 let (schedulerState', tid) = nextThreadId schedulerState
326- t = newThread tid
334+ t = mapEmThread ( const Nothing ) <$> newThread tid
327335 tag = _tag $ _thread t
328336 newState = enqueue t schedulerState'
329337 & activeThreads . at tag . non mempty %~ HashSet. insert tid
@@ -344,15 +352,15 @@ handleSysCall sysCall schedulerState = case sysCall of
344352
345353
346354-- | Return a fresh thread ID and increment the counter
347- nextThreadId :: SchedulerState effs systemEvent -> (SchedulerState effs systemEvent , ThreadId )
355+ nextThreadId :: SchedulerState effs systemEvent a -> (SchedulerState effs systemEvent a , ThreadId )
348356nextThreadId s = (s & lastThreadId %~ ThreadId . succ . unThreadId, s ^. lastThreadId)
349357
350358-- | State of the scheduler before any threads are run.
351- initialState :: SchedulerState effs systemEvent
359+ initialState :: SchedulerState effs systemEvent a
352360initialState = SchedulerState Seq. empty Seq. empty Seq. empty initialThreadId HashMap. empty Map. empty
353361
354362-- | Add a suspended thread to the queue.
355- enqueue :: SuspendedThread effs systemEvent -> SchedulerState effs systemEvent -> SchedulerState effs systemEvent
363+ enqueue :: SuspendedThread effs systemEvent a -> SchedulerState effs systemEvent a -> SchedulerState effs systemEvent a
356364enqueue WithPriority {_priority, _thread} s =
357365 case _priority of
358366 Normal -> s & normalPrio %~ (|> _thread)
@@ -361,11 +369,11 @@ enqueue WithPriority {_priority, _thread} s =
361369
362370-- | Result of calling 'dequeue'. Either a thread that is ready to receive a
363371-- message, or no more threads.
364- data SchedulerDQResult effs systemEvent
365- = AThread (EmThread effs systemEvent ) (Maybe systemEvent ) (SchedulerState effs systemEvent ) Priority
372+ data SchedulerDQResult effs systemEvent a
373+ = AThread (EmThread effs systemEvent a ) (Maybe systemEvent ) (SchedulerState effs systemEvent a ) Priority
366374 | NoMoreThreads
367375
368- dequeue :: SchedulerState effs systemEvent -> SchedulerDQResult effs systemEvent
376+ dequeue :: SchedulerState effs systemEvent a -> SchedulerDQResult effs systemEvent a
369377dequeue s = case dequeueThread s of
370378 Nothing -> NoMoreThreads
371379 Just (s', thread, prio) -> case dequeueMessage s' (_threadId thread) of
@@ -374,7 +382,7 @@ dequeue s = case dequeueThread s of
374382
375383-- | Find the next thread that is ready to be resumed.
376384-- See note [Thread Priority]
377- dequeueThread :: SchedulerState effs systemEvent -> Maybe (SchedulerState effs systemEvent , EmThread effs systemEvent , Priority )
385+ dequeueThread :: SchedulerState effs systemEvent a -> Maybe (SchedulerState effs systemEvent a , EmThread effs systemEvent a , Priority )
378386dequeueThread s =
379387 case s ^. normalPrio of
380388 x :<| xs -> Just (s & normalPrio .~ xs, x, Normal )
@@ -383,7 +391,7 @@ dequeueThread s =
383391 Empty -> Nothing
384392
385393-- | Get the first message for the thread.
386- dequeueMessage :: SchedulerState effs systemEvent -> ThreadId -> Maybe (SchedulerState effs systemEvent , systemEvent )
394+ dequeueMessage :: SchedulerState effs systemEvent a -> ThreadId -> Maybe (SchedulerState effs systemEvent a , systemEvent )
387395dequeueMessage s i = do
388396 mailbox <- s ^. mailboxes . at i
389397 (x, xs) <- case mailbox of { Empty -> Nothing ; x :<| xs -> Just (x, xs) }
0 commit comments