RxJava2 使用

超时截断

Observable.fromIterable(nameList).map(name -> delay(name))
                .timeout(200, TimeUnit.MILLISECONDS)
                .subscribe(System.out::println,ex -> ex.printStackTrace());

合并

Observable.zip(fastName(), slowAge(), (name, age) -> {
            return name + " : " + age;
        }).blockingSubscribe(item -> System.out.println(item));

批量执行

Observable.fromIterable(nameList).buffer(3).map(items -> {
            return String.join(",", items);
        }).subscribe(result -> System.out.println(result));

并发执行

Observable.fromIterable(nameList).flatMap(name -> Observable.just(name).subscribeOn(Schedulers.io()))
                .subscribe(item -> {
                    System.out.println(Thread.currentThread().getName() + " :: " + item);
                });

完整示例

public class RxJavaDemo {

    public static List<String> nameList = Arrays.asList("Hello", "World", "Yes", "No");

    public static List<Integer> ageList = Arrays.asList(20, 21, 22, 23);

    public static String delay(String name) {
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return name;
    }

    public static Observable<String> fastName() {
        return Observable.fromIterable(nameList);
    }

    public static Observable<Integer> slowAge() {
        return Observable.fromIterable(ageList).delay(5, TimeUnit.SECONDS);
    }

    public static void main(String[] args) {

        // 1.超时截断
        Observable.fromIterable(nameList).map(name -> delay(name))
                .timeout(200, TimeUnit.MILLISECONDS)
                .subscribe(System.out::println,
                        ex -> ex.printStackTrace());
        // 2.合并
        Observable.zip(fastName(), slowAge(), (name, age) -> {
            return name + " : " + age;
        }).blockingSubscribe(item -> System.out.println(item));

        // 3.批量执行
        Observable.fromIterable(nameList).buffer(3).map(items -> {
            return String.join(",", items);
        }).subscribe(result -> System.out.println(result));
        // 4.并发执行
        Observable.fromIterable(nameList).subscribeOn(Schedulers.io()).subscribe(item -> {
            System.out.println(Thread.currentThread().getName() + " :: " + item);
        });
        /***
         *RxCachedThreadScheduler-1 :: Hello
         *RxCachedThreadScheduler-1 :: World
         *RxCachedThreadScheduler-1 :: Yes
         *RxCachedThreadScheduler-1 :: No
         */

        Observable.fromIterable(nameList).flatMap(name -> Observable.just(name).subscribeOn(Schedulers.io()))
                .subscribe(item -> {
                    System.out.println(Thread.currentThread().getName() + " :: " + item);
                });
        /**
         * output:
         *RxCachedThreadScheduler-3 :: Yes
         *RxCachedThreadScheduler-4 :: No
         *RxCachedThreadScheduler-2 :: World
         *RxCachedThreadScheduler-1 :: Hello
         */

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
添加新评论