Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,201 +3,216 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Ignore;
import org.junit.Test;

import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;

@Ignore("Manual only")
public class RxJavaTimeFilteringOperatorsTest {

@Test
public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() throws InterruptedException {
public void givenTimedObservable_whenSampling_thenOnlySampleItemsAreEmitted() {

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> sampledObservable =
timedObservable.sample(Observable.interval(2500L, TimeUnit.MILLISECONDS));
timedObservable.sample(2500L, TimeUnit.MILLISECONDS, testScheduler);

sampledObservable.subscribe(subscriber);

Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValues(3, 5, 6);
}

@Test
public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() throws InterruptedException {
public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() {

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS);
Observable<Integer> filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValues(4, 6);
}

@Test
public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreEmitted() throws InterruptedException {
public void givenRangeObservable_whenThrottlingFirst_thenThrottledFirstItemsAreEmitted() {

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable =
timedObservable.throttleFirst(4100L, TimeUnit.MILLISECONDS);
timedObservable.throttleFirst(4100L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValues(1, 6);
}

@Test
public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted() throws InterruptedException {
public void givenTimedObservable_whenThrottlingWithTimeout_thenLastItemIsEmitted() {

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable.throttleWithTimeout(2000L, TimeUnit.MILLISECONDS);
Observable<Integer> filteredObservable = timedObservable.throttleWithTimeout(2000L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValue(6);
}

@Test
public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmitted() throws InterruptedException {
public void givenTimedObservable_whenDebounceOperatorIsApplied_thenLastItemIsEmitted() {

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable.debounce(2000L, TimeUnit.MILLISECONDS);
Observable<Integer> filteredObservable = timedObservable.debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValue(6);
}

@Test
public void givenTimedObservable_whenUsingTimeout_thenTimeOutException() throws InterruptedException {
public void givenTimedObservable_whenUsingTimeout_thenTimeOutException() {

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable.timeout(500L, TimeUnit.MILLISECONDS);
Observable<Integer> filteredObservable = timedObservable.timeout(500L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertError(TimeoutException.class);
subscriber.assertValues(1);
}

@Test
public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObservableEmitsItems() throws InterruptedException {
public void givenObservable_whenSkippingUntil_thenItemsAreSkippedUntilSecondObservableEmitsItems() {

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);


Observable<Integer> delayedObservable = Observable.just(1)
.delay(3000, TimeUnit.MILLISECONDS);
.delay(3000, TimeUnit.MILLISECONDS, testScheduler);

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable.skipUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValues(4, 5, 6);
}

@Test
public void givenObservable_whenSkippingWhile_thenItemsAreEmittedUntilSecondObservableEmitsItems() throws InterruptedException {
public void givenObservable_whenSkippingWhile_thenItemsAreEmittedUntilSecondObservableEmitsItems() {

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS),
(item, time) -> item
);
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);

TestSubscriber subscriber = new TestSubscriber();

Observable<Integer> delayedObservable = Observable.just(1)
.delay(3000, TimeUnit.MILLISECONDS);
.delay(3000, TimeUnit.MILLISECONDS, testScheduler);

Observable<Integer> filteredObservable = timedObservable.takeUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

Thread.sleep(7000);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertCompleted();
subscriber.assertNoErrors();
Expand Down