@@ -744,8 +744,9 @@ function remotecall_wait(w::Worker, f, args...)
744744 rv. waitingfor = w. id
745745 rr = RemoteRef (w)
746746 send_msg (w, CallWaitMsg (f, args, rr2id (rr), prid))
747- wait (rv)
747+ v = fetch (rv. c )
748748 delete! (PGRP. refs, prid)
749+ isa (v, RemoteException) && throw (v)
749750 rr
750751end
751752
@@ -778,8 +779,18 @@ function call_on_owner(f, rr::RemoteRef, args...)
778779 end
779780end
780781
781- wait_ref (rid, args... ) = (wait (lookup_ref (rid). c, args... ); nothing )
782- wait (r:: RemoteRef , args... ) = (call_on_owner (wait_ref, r, args... ); r)
782+ function wait_ref (rid, callee, args... )
783+ v = fetch_ref (rid, args... )
784+ if isa (v, RemoteException)
785+ if myid () == callee
786+ throw (v)
787+ else
788+ return v
789+ end
790+ end
791+ nothing
792+ end
793+ wait (r:: RemoteRef , args... ) = (call_on_owner (wait_ref, r, myid (), args... ); r)
783794
784795fetch_ref (rid, args... ) = fetch (lookup_ref (rid). c, args... )
785796fetch (r:: RemoteRef , args... ) = call_on_owner (fetch_ref, r, args... )
@@ -791,19 +802,23 @@ put_ref(rid, args...) = put!(lookup_ref(rid), args...)
791802put! (rr:: RemoteRef , args... ) = (call_on_owner (put_ref, rr, args... ); rr)
792803
793804take! (rv:: RemoteValue , args... ) = take! (rv. c, args... )
794- take_ref (rid, args... ) = take! (lookup_ref (rid), args... )
795- take! (rr:: RemoteRef , args... ) = call_on_owner (take_ref, rr, args... )
805+ function take_ref (rid, callee, args... )
806+ v= take! (lookup_ref (rid), args... )
807+ isa (v, RemoteException) && (myid () == callee) && throw (v)
808+ v
809+ end
810+ take! (rr:: RemoteRef , args... ) = call_on_owner (take_ref, rr, myid (), args... )
796811
797812close_ref (rid) = (close (lookup_ref (rid). c); nothing )
798813close (rr:: RemoteRef ) = call_on_owner (close_ref, rr)
799814
800815
801816function deliver_result (sock:: IO , msg, oid, value)
802817 # print("$(myid()) sending result $oid\n")
803- if is (msg,:call_fetch )
818+ if is (msg,:call_fetch ) || isa (value, RemoteException)
804819 val = value
805820 else
806- val = oid
821+ val = :OK
807822 end
808823 try
809824 send_msg_now (sock, ResultMsg (oid, val))
898913function handle_msg (msg:: CallWaitMsg , r_stream, w_stream)
899914 @schedule begin
900915 rv = schedule_call (msg. response_oid, ()-> msg. f (msg. args... ))
901- deliver_result (w_stream, :call_wait , msg. notify_oid, wait (rv))
916+ deliver_result (w_stream, :call_wait , msg. notify_oid, fetch (rv. c ))
902917 end
903918end
904919
0 commit comments