@@ -222,6 +222,9 @@ class HostStreamer {
222222 // queue (sycl_q_) to perform the request. It also performs the callbacks
223223 // to the user code when the requests have been completed.
224224 static void KernelLaunchAndWaitThread () {
225+ size_t producer_count = 0 ;
226+ size_t consumer_count = 0 ;
227+
225228 // Do this loop until told (by main thread) to stop via the
226229 // 'kill_kernel_thread_flag_' atomic shared variable.
227230 while (!kill_kernel_thread_flag_) {
@@ -238,6 +241,9 @@ class HostStreamer {
238241
239242 // pop from the Producer queue
240243 produce_q_.Pop ();
244+
245+ // accumulate producer count
246+ producer_count += count;
241247 }
242248
243249 // If there is a Consume request to launch, do it
@@ -247,12 +253,18 @@ class HostStreamer {
247253 size_t count;
248254 std::tie (buf_idx, count) = consume_q_.Front ();
249255
250- // launch the kernel and push the request to the launch queue
251- auto e = LaunchConsumerKernel (consumer_buffer_[buf_idx], count);
252- launch_q_.Push (std::make_tuple (buf_idx, count, e, false ));
256+ // Only launch consumer when there is enough producer count
257+ if (producer_count >= consumer_count + count) {
258+ // launch the kernel and push the request to the launch queue
259+ auto e = LaunchConsumerKernel (consumer_buffer_[buf_idx], count);
260+ launch_q_.Push (std::make_tuple (buf_idx, count, e, false ));
253261
254- // pop from the Consumer queue
255- consume_q_.Pop ();
262+ // pop from the Consumer queue
263+ consume_q_.Pop ();
264+
265+ // accumulate consumer count
266+ consumer_count += count;
267+ }
256268 }
257269
258270 // Wait on the oldest event to finish given 2 conditions:
@@ -261,7 +273,7 @@ class HostStreamer {
261273 // 2) the user has requested us to flush the launch queue and the
262274 // launch queue is not empty (i.e. flush_ && launch_q_.size() != 0)
263275 if ((launch_q_.Size () >= wait_threshold_) ||
264- (flush_ && !LaunchQueueEmpty () && ProducerQueueEmpty () && ConsumerQueueEmpty () )) {
276+ (flush_ && !LaunchQueueEmpty ())) {
265277 // grab the oldest request from the launch queue
266278 size_t buf_idx;
267279 size_t count;
@@ -276,9 +288,11 @@ class HostStreamer {
276288 if (request_was_producer) {
277289 // std::cout << "Calling Producer Callback" << std::endl;
278290 producer_callback (count);
291+ producer_count -= count;
279292 } else {
280293 // std::cout << "Calling Consumer Callback" << std::endl;
281294 consumer_callback (consumer_buffer_[buf_idx], count);
295+ consumer_count -= count;
282296 }
283297
284298 // Pop from the launch queue. This has to be done AFTER waiting on
0 commit comments