diff --git a/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsTest.java b/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsTest.java index 868f1234024a..63076cbc4ad9 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/filters/RxJavaTimeFilteringOperatorsTest.java @@ -3,33 +3,34 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.junit.Ignore; import org.junit.Test; import rx.Observable; import rx.observers.TestSubscriber; +import rx.schedulers.TestScheduler; -@Ignore("Manual only") public class RxJavaTimeFilteringOperatorsTest { @Test - public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() throws InterruptedException { + public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() { + + TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = - Observable.just(1, 2, 3, 4, 5, 6) - .zipWith( - Observable.interval(0, 1, TimeUnit.SECONDS), - (item, time) -> item - ); + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), + (item, time) -> item + ); TestSubscriber subscriber = new TestSubscriber(); Observable sampledObservable = - timedObservable.sample(Observable.interval(2500L, TimeUnit.MILLISECONDS)); + timedObservable.sample(2500L, TimeUnit.MILLISECONDS, testScheduler); sampledObservable.subscribe(subscriber); - Thread.sleep(7000); + testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); @@ -37,22 +38,24 @@ public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() th } @Test - public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() throws InterruptedException { + public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() { + + TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = - Observable.just(1, 2, 3, 4, 5, 6) - .zipWith( - Observable.interval(0, 1, TimeUnit.SECONDS), - (item, time) -> item - ); + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), + (item, time) -> item + ); TestSubscriber subscriber = new TestSubscriber(); - Observable filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS); + Observable filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); - Thread.sleep(7000); + testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); @@ -60,23 +63,25 @@ public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmit } @Test - public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreEmitted() throws InterruptedException { + public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreEmitted() { + + TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = - Observable.just(1, 2, 3, 4, 5, 6) - .zipWith( - Observable.interval(0, 1, TimeUnit.SECONDS), - (item, time) -> item - ); + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), + (item, time) -> item + ); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = - timedObservable.throttleFirst(4100L, TimeUnit.MILLISECONDS); + timedObservable.throttleFirst(4100L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); - Thread.sleep(7000); + testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); @@ -84,22 +89,24 @@ public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreE } @Test - public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted() throws InterruptedException { + public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted() { + + TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = - Observable.just(1, 2, 3, 4, 5, 6) - .zipWith( - Observable.interval(0, 1, TimeUnit.SECONDS), - (item, time) -> item - ); + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), + (item, time) -> item + ); TestSubscriber subscriber = new TestSubscriber(); - Observable filteredObservable = timedObservable.throttleWithTimeout(2000L, TimeUnit.MILLISECONDS); + Observable filteredObservable = timedObservable.throttleWithTimeout(2000L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); - Thread.sleep(7000); + testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); @@ -107,22 +114,24 @@ public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted } @Test - public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmitted() throws InterruptedException { + public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmitted() { + + TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = - Observable.just(1, 2, 3, 4, 5, 6) - .zipWith( - Observable.interval(0, 1, TimeUnit.SECONDS), - (item, time) -> item - ); + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), + (item, time) -> item + ); TestSubscriber subscriber = new TestSubscriber(); - Observable filteredObservable = timedObservable.debounce(2000L, TimeUnit.MILLISECONDS); + Observable filteredObservable = timedObservable.debounce(2000L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); - Thread.sleep(7000); + testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); @@ -130,40 +139,44 @@ public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmi } @Test - public void givenTimedObservable_whenUsingTimeout_thenTimeOutException() throws InterruptedException { + public void givenTimedObservable_whenUsingTimeout_thenTimeOutException() { + + TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = - Observable.just(1, 2, 3, 4, 5, 6) - .zipWith( - Observable.interval(0, 1, TimeUnit.SECONDS), - (item, time) -> item - ); + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), + (item, time) -> item + ); TestSubscriber subscriber = new TestSubscriber(); - Observable filteredObservable = timedObservable.timeout(500L, TimeUnit.MILLISECONDS); + Observable filteredObservable = timedObservable.timeout(500L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); - Thread.sleep(7000); + testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertError(TimeoutException.class); subscriber.assertValues(1); } @Test - public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObservableEmitsItems() throws InterruptedException { + public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObservableEmitsItems() { + + TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = - Observable.just(1, 2, 3, 4, 5, 6) - .zipWith( - Observable.interval(0, 1, TimeUnit.SECONDS), - (item, time) -> item - ); + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), + (item, time) -> item + ); Observable delayedObservable = Observable.just(1) - .delay(3000, TimeUnit.MILLISECONDS); + .delay(3000, TimeUnit.MILLISECONDS, testScheduler); TestSubscriber subscriber = new TestSubscriber(); @@ -171,7 +184,7 @@ public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObse filteredObservable.subscribe(subscriber); - Thread.sleep(7000); + testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); @@ -179,25 +192,27 @@ public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObse } @Test - public void givenObservable_whenSkippingWhile_thenItemsAreEmittedUntilSecondObservableEmitsItems() throws InterruptedException { + public void givenObservable_whenSkippingWhile_thenItemsAreEmittedUntilSecondObservableEmitsItems() { + + TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = - Observable.just(1, 2, 3, 4, 5, 6) - .zipWith( - Observable.interval(0, 1, TimeUnit.SECONDS), - (item, time) -> item - ); + Observable.just(1, 2, 3, 4, 5, 6) + .zipWith( + Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), + (item, time) -> item + ); TestSubscriber subscriber = new TestSubscriber(); Observable delayedObservable = Observable.just(1) - .delay(3000, TimeUnit.MILLISECONDS); + .delay(3000, TimeUnit.MILLISECONDS, testScheduler); Observable filteredObservable = timedObservable.takeUntil(delayedObservable); filteredObservable.subscribe(subscriber); - Thread.sleep(7000); + testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors();