2017-02-26 21:19:00 wc0000000 阅读数 26
  • R语言从入门到精通系列之数据导入实战视频课程

    本篇首先介绍了如何将txt、csv、非机构花文本文件数据导入到R语言中,并介绍了四种导入excel数据的方法,在介绍R与MySQL数据库管理中,详细介绍了ROBBC包和RMySQL包的安装及使用;后介绍了网络数据爬虫技术,包括利用XML爬取网络表格数据、利用RCurl包批量下载ftp文件等方法。本课程配套课件和脚本均可下载,方便学员跟着视频自己动手操作。

    4568 人正在学习 去看看 谢佳标

1.基本概念

Rx是RxJava针对Android的定制版本。这个版本中通过增加最少的类使在Android应用中编写响应式组件简单而且无障碍,特别之处在与它还提供了一个Scheduler,可以在主线程或任何给定的Handler上进行调度。Rx编程方式基于三个基本概念:观察者模式、迭代器模式、函数式编程。下面通过几个例子,描述在Android上面Rx方式进行编程的基本使用。

2.观察者模式

在Android Studio里面引入rxjava和rxandroid两个依赖包后,我们就可以进行rx开发了:

compile 'io.reactivex:rxjava:1.1.6'

compile 'io.reactivex:rxandroid:1.2.1'

Rx的两个核心类是Observable(被观察者)和Observer(观察者)。前者发送数据和消息,后者处理数据和消息。Observer收到数据时,它的onNext()方法会被调用,数据在方法的参数里面被传入。另外Observer还有onComplete()方法,来表示Observable的数据发送完毕,而onError()则表示数据发送过程中出错。

 

先看一个例子。假设要发送一系列的http请求,这些请求的相对路径我们已经有了:“/home”, “/productList”, “/productDetail”,但是发送的时候需要使用绝对路径,因此可以这样拼接一下:

String[] urls = {"/home", "/productList", "/productDetails"};
for (String url : urls) {
    sendHttpRequest(BASE_URL + url);
}

 

如果用RxAndroid写的话,可以这样做:

 Observable observable = Observable
            .just("/home", "/productList", "/productDetails");
Observer<String> observer = new Observer<String>() {
    public void onCompleted() {}
    public void onError(Throwable e) {}

    public void onNext(String s) {
        sendHttpRequest(BASE_URL + url);
    }
};
observable.subscribe(observer);

单从代码量来讲,RxAndroid似乎是把事情搞复杂了。这个我们先不讨论。先看看RxAndroid代码的基本逻辑:

1)创建一个Observable,它发送了一系列的数据。

2)创建一个Observer,它的onNext()方法会接收到数据。另外两个方法暂时不感兴趣,不作任何处理。

3)Observer订阅Observable。Subscribe方法将两者关联起来。

 

实际上,上述代码还有一种简单的写法:

Observable
        .just("/home", "/productList", "/productDetails")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                sendHttpRequest(BASE_URL + url);
            }
        });

因为对事件结束和事件出错不感兴趣,所以我们可以用Action1类来接收和处理数据。但是过程是一样的:发送数据;订阅;接收数据。

 

3.异步编程

RxAndroid一个最重要的功能就是简化异步编程。e.g.假设你要读取手机里面的所有联系人,然后在UI上用一个ListView把联系人一个个显示出来,可以这样做:

Observable
        .fromCallable(new Callable<Cursor>() {
            @Override
            public Cursor call() throws Exception {
                return queryContacts();                    // ①
            }
        })
        .subscribeOn(Schedulers.io())                      // 设置I
        .observeOn(AndroidSchedulers.mainThread())       // 设置II
        .subscribe(new Action1<Cursor>() {
            @Override
            public void call(Cursor contacts) {
                showContacts();                             // ②
            }
        });


这里面,queryContacts涉及到IO操作,因此,需要保证它在一个异步线程里面运行。因此,使用了一个Callable对象,来包裹这个IO操作。Callable的call()方法只有在Observable被订阅后才会执行,并且你可以指定call()方法在哪个线程里面执行。我们使用了subscribeOn(Schedulers.io()), 指定Callable.call()方法在一个单独的IO线程里面执行。但是一旦加上这一行,后面的Action1.call()方法也会在异步线程里面执行,所以还得加上一句observeOn(AndroidSchedulers.mainThread())
,它的作用是让Action1.call()在主线程里面执行。

总结一下,假设当前线程为主线程,以下表格列出来了不同情况下代码①和代码②的线程关系。

 

代码①

代码②

设置Ⅰ和Ⅱ都没有的情况

主线程执行

主线程执行

设置Ⅰ

异步IO线程执行

异步IO线程执行

设置Ⅰ + 设置Ⅱ

异步IO线程执行

主线程执行

 

在Android里面,异步编程还有一个需要注意的地方,那就是Activity销毁之后,需要停止线程。上面的代码,在Activity.onDestroy()里面,应该停止对Observable的订阅,因为这个时候,线程可能还没执行完毕,这个时候,需要停止对数据的接收。只要停止订阅就可以了,如何停止订阅呢?实际上,上面的代码最终会返回一个Subscription对象:

Subscription subscription = Observable
        .fromCallable(..
        .subscribeOn(...
        .observeOn(...)

这个Subscription对象代表Observable和Observer的一个连接,我们可以这样来停止订阅:

subscription.unsubscribe();

 

4.操作符(Operator)

在Observer接收到数据前,可以用操作符可以对Observable发出的数据进行转换。还是上面的例子,假设我们在读取联系人后,要根据联系人的名字生成对应的图片,并且,只列出来姓“李”的联系人,我们可以这样做:

Observable
        .fromCallable(new Callable<List<File>>() {
            @Override
            public Cursor call() throws Exception {
                return queryContacts(); // ①
            }
        })
        .map(new Func1<Cursor, List<Contact>() {
            @Override
            public List<Contact> call(Cursor c) {
                return getContactsFromCursor(c);           // ④
            }
        })
        .flatMap(new Func1<List<Contact>, Observable<Contact>>() {
            @Override
            public Observable<Contact> call(List<Contact> contacts) {
                return Observable.from(contacts);                   // ②
            }
        })
        .filter(new Func1<Contact, Boolean>() {
            @Override
            public Boolean call(Contact c) {
                return c.getName().startWith("黄");        // ③
            }
        })
       
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Contact>() {
            @Override
            public void call(Contact c) {
                showContact(c);
            }
        });

 

这段代码里面展示了RxAndroid里面三个常用的操作符,flatMap,filter,map. 前面提到,操作符是在Observer接收到数据之前,对Observable发送的数据进行中间转换。以下是转换过程:

步骤①:此时所有数据都存在一个List里面,如果没有其它操作符的转换,我们最后的Observer也会接收到一个List,也就是说,数据是一次性接收到的。但是,需求是图片是分批、一个一个传过来,因此我们需要将list展开。

步骤② flatMap操作符:这一步实际上是将上一步的Observable转换成了一个新的Observable,转换过程中,输入是List<Contact>类型,输出是Contac类型。发送的数据并没有减少,只是发送方式变了,由一次性发送变成一个个发送。

步骤③ filter操作符:这一步对数据进行了过滤,只有姓李的联系人才会被发送到Observer

步骤④ map操作符:这一步对数据进行了类型转换。我们读Cursor,并生成联系人列表。输入的是Cursor,输出的是List<Contact>。

 

5.Subject

回顾我们第一个例子,我们的Observer,Observable都是临时对象,数据处理完毕之后,它们也完成了自己的生命周期。假设我们再来一批数据,并且数据处理流程一样,我们是不是需要再创建新的Observable和Observer?代码重新写一遍?

Observable
        .just("/home", "/productList", "/productDetails")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
            }
        });

 

使用RxAndroid里面的Subject对象可以解决这个问题。Subject既是Observable,也是Observer,可以把它想象成一个管道:可以随时往一端输入数据,经过中间处理后,另一端处理输出。考虑这样一个场景,你在联系人列表里面进行搜索,你在搜索框里面每输入一个字符,联系人都会实时重新查询一遍。这里面就是一个标准的Rx流程:输入->处理(查询)->输出。只不过输入条件会不断的变,然后这个流程需要重复执行。

 

mSearchView.setOnQueryTextListener(new OnQueryTextListener() {
    @Override
    public boolean onQueryTextSubmit(String query) {
        return false;
    }

    @Override
    public boolean onQueryTextChange(String query) {
        mSubject.onNext(query);
        return true;
    }
});

mSubject = PublishSubject.create();
mSubject
        .debounce(400, TimeUnit.MILLISECONDS)
        .observeOn(Schedulers.io())
        .map(new Func1<String, List<Contact>>() {
            @Override
            public List<Contact> call(String s) {
                return queryContacts(s);
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Contact>() {
            @Override
            public void call(Contact contact) {
                addContactToList(contact);
            }
        });

 

这里面,searchView每次输入有变化时,都会重新查询一遍联系人数据库。因此我们用一个全局变量保存subject,这样可以调用subject.onNext()多次发送数据。这里有两个需要说明的地方:

①:我们用了一个debounce方法,它的作用是,当400ms内没有新的数据输入时,subject才会发送数据。因为用户每输入一个字符时,searchView的onQueryTextChange方法都会被触发。而我们不需要这么频繁的查询。

②:我们用了两个observeOn来指定中间操作和最后操作的执行线程。查询的时候,我们需要在异步线程执行,而最终把联系人加入到查询结果列表的时候,我们又需要切换到主线程。

 

参考文档:

① 从案例学RxAndroid开发:

http://www.infoq.com/cn/articles/RxAndroid-basics

② Rx官网Subject文档:

http://reactivex.io/documentation/subject.html

 

2016-05-11 22:04:05 a332324956 阅读数 6749
  • R语言从入门到精通系列之数据导入实战视频课程

    本篇首先介绍了如何将txt、csv、非机构花文本文件数据导入到R语言中,并介绍了四种导入excel数据的方法,在介绍R与MySQL数据库管理中,详细介绍了ROBBC包和RMySQL包的安装及使用;后介绍了网络数据爬虫技术,包括利用XML爬取网络表格数据、利用RCurl包批量下载ftp文件等方法。本课程配套课件和脚本均可下载,方便学员跟着视频自己动手操作。

    4568 人正在学习 去看看 谢佳标

RxAndroid 常见用法

在项目里面添加

     compile 'io.reactivex:rxandroid:1.1.0'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
    compile 'io.reactivex:rxjava:1.1.0'
    ```
例子1:
```java
    Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i("lxm0",s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.i("lxm0","eeee");
                    }
                }, new Action0() {
                    @Override
                    public void call() {
                        Log.i("lxm0","wwwwwww");
                    }
                });




<div class="se-preview-section-delimiter"></div>

map对数据作为对输入数据做一次处理,然后再传给Action1

        Observable.just(1, 2, 3, 4, 4)
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer  num) {
                        return num+"";
                    }
                    ;}).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("lxm",s);
            }
        });




<div class="se-preview-section-delimiter"></div>

从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

from从里面拿出每一个,调用flatmap将数据再次包裹起来,变换后再次进入filter进入过滤条件,过滤条件有前后顺序,
flatMap 将里面的元素进行每个重新包装,再次作为一个被观察者发送出去。AndroidSchedulers.mainThread() 主线程。
onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
3.将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe(). 需要撤销

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });





<div class="se-preview-section-delimiter"></div>

构造一个被观察者

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread()更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});




<div class="se-preview-section-delimiter"></div>

继续来看

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定




<div class="se-preview-section-delimiter"></div>

关键的东西,将被观察者的一些东西执行在主线程.doOnSubscribe的内容运行在下面的subscribeOn指定的线程上

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程  
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);




<div class="se-preview-section-delimiter"></div>

RxJava 的适用场景和使用方式

  1. 与 Retrofit 的结合

Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit 的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。

Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。下面我用对比的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本的区别。

以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:

@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);




<div class="se-preview-section-delimiter"></div>

在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};




<div class="se-preview-section-delimiter"></div>

而使用 RxJava 形式的 API,定义同样的请求是这样的:

@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);




<div class="se-preview-section-delimiter"></div>

代码变为:

getUser(userId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });




<div class="se-preview-section-delimiter"></div>

看到区别了吗?

当 RxJava 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()。

对比来看, Callback 形式和 Observable 形式长得不太一样,但本质都差不多,而且在细节上 Observable 形式似乎还比Callback 形式要差点。那 Retrofit 为什么还要提供 RxJava 的支持呢?

因为它好用啊!从这个例子看不出来是因为这只是最简单的情况。而一旦情景复杂起来, Callback 形式马上就会开始让人头疼。比如:

假设这么一种情况:你的程序取到的 User 并不应该直接显示,而是需要先与数据库中的数据进行比对和修正后再显示。使用Callback 方式大概可以这么写:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        processUser(user); // 尝试修正 User 数据
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};




<div class="se-preview-section-delimiter"></div>

有问题吗?

很简便,但不要这样做。为什么?因为这样做会影响性能。数据库的操作很重,一次读写操作花费 10~20ms 是很常见的,这样的耗时很容易造成界面的卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。所以为了提升性能,这段代码可以优化一下:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() {
                processUser(user); // 尝试修正 User 数据
                runOnUiThread(new Runnable() { // 切回 UI 线程
                    @Override
                    public void run() {
                        userView.setUser(user);
                    }
                });
            }).start();
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};




<div class="se-preview-section-delimiter"></div>

性能问题解决,但……这代码实在是太乱了,迷之缩进啊!杂乱的代码往往不仅仅是美观问题,因为代码越乱往往就越难读懂,而如果项目中充斥着杂乱的代码,无疑会降低代码的可读性,造成团队开发效率的降低和出错率的升高。

这时候,如果用 RxJava 的形式,就好办多了。 RxJava 形式的代码是这样的:

getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });
    ```
    后台代码和前台代码全都写在一条链中,明显清晰了很多。

再举一个例子:假设 /user 接口并不能直接访问,而需要填入一个在线获取的 token ,代码应该怎么写?

Callback 方式,可以使用嵌套的 Callback:




<div class="se-preview-section-delimiter"></div>

```java
@GET("/token")
public void getToken(Callback<String> callback);

@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);

...

getToken(new Callback<String>() {
    @Override
    public void success(String token) {
        getUser(userId, new Callback<User>() {
            @Override
            public void success(User user) {
                userView.setUser(user);
            }

            @Override
            public void failure(RetrofitError error) {
                // Error handling
                ...
            }
        };
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
});

倒是没有什么性能问题,可是迷之缩进毁一生,你懂我也懂,做过大项目的人应该更懂。

而使用 RxJava 的话,代码是这样的:

@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);

...

getToken()
    .flatMap(new Func1<String, Observable<User>>() {
        @Override
        public Observable<User> onNext(String token) {
            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });
    ```
用一个 flatMap() 就搞定了逻辑,依然是一条链。看着就很爽,是吧?

好,Retrofit 部分就到这里。
 2. RxBinding

RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API。

举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置:
Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
    .subscribe(new Action1<ViewClickEvent>() {
        @Override
        public void call(ViewClickEvent event) {
            // Click handling
        }
    });
看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的快速连环点击:

```java
// 感谢大家能够完整阅读整篇文章,如果喜欢,请分享并打赏支持下~.~!
// 更多文章,敬请期待啦。
2016-04-12 15:24:39 fengqiaoyebo2008 阅读数 1568
  • R语言从入门到精通系列之数据导入实战视频课程

    本篇首先介绍了如何将txt、csv、非机构花文本文件数据导入到R语言中,并介绍了四种导入excel数据的方法,在介绍R与MySQL数据库管理中,详细介绍了ROBBC包和RMySQL包的安装及使用;后介绍了网络数据爬虫技术,包括利用XML爬取网络表格数据、利用RCurl包批量下载ftp文件等方法。本课程配套课件和脚本均可下载,方便学员跟着视频自己动手操作。

    4568 人正在学习 去看看 谢佳标

从去年开始开始接触RxJava,但项目中期使用的话整体修改较大,目前项目都告一段落,有时间来研究下,既然用我们就用最新的版本。

1、app dependencies

dependencies {
    compile fileTree(dir: 'libs', include: ['*.jar'])
    testCompile 'junit:junit:4.12'
    compile 'com.android.support:appcompat-v7:23.1.1'
    compile 'io.reactivex:rxandroid:1.1.0'
    compile 'com.squareup.retrofit2:adapter-rxjava:2.0.1'
    compile 'com.squareup.retrofit2:converter-gson:2.0.1'
    compile 'com.squareup.okhttp3:logging-interceptor:3.2.0'
    compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
}

2、Api接口,定义项目中用到的接口

import io.edgarcode.jiaxibao_retrofit.model.User;
import io.edgarcode.jiaxibao_retrofit.model.Task;
import retrofit2.http.GET;
import retrofit2.http.Query;
import rx.Observable;

/**
 * Created by zhangjifeng on 16/4/11.
 */
public interface Api {

    @GET("user/login.json")
    Observable<User> login(
            @Query("username") String username,
            @Query("password") String password,
            @Query("deviceId") String deviceId,
            @Query("role") int role
    );

    @GET("task/list.json")
    Observable<Task> getTaskList (
            @Query("token") String token,
            @Query("state") int taskState,
            @Query("pageNo") int pageNo,
            @Query("pageSize") int pageSize
    );

}

3、定义Retrofit的相关操作,包含设置Retrofit的http处理层,设置log的等级,设置转换工厂

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.widget.TextView;

import com.jakewharton.rxbinding.view.RxView;

import java.util.concurrent.TimeUnit;

import io.edgarcode.jiaxibao_retrofit.model.Task;
import io.edgarcode.jiaxibao_retrofit.model.User;
import io.edgarcode.jiaxibao_retrofit.net.RetrofitInstance;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class MainActivity extends AppCompatActivity{

    private static final String TAG = "MainActivity";

    private TextView tv;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        tv = (TextView) findViewById(R.id.tv);
        //RxBind click事件
        RxView.clicks(findViewById(R.id.btn_login))
                .throttleFirst(1, TimeUnit.SECONDS) //防止手快的点击
                .subscribe(new Action1<Void>() {
                    @Override
                    public void call(Void aVoid) {
                        login();
                    }
                });
    }

    private void login() {
        RetrofitInstance.getApi().login("186xxxxxxxx", "123456", "123", 0)
                //转换,user->string(token)
                .map(new Func1<User, String>() {
                    @Override
                    public String call(User user) {
                        return user.getData().getToken();
                    }
                })
                //转换,token->返回Observable<Task>
                .flatMap(new Func1<String, Observable<Task>>() {
                    @Override
                    public Observable<Task> call(String token) {
                        return RetrofitInstance.getApi().getTaskList(token, 1, 1, 10);
                    }
                })
                //转换,将List<Task.TaskBean> 展开,返回Observable<Task.TaskBean>
                .flatMap(new Func1<Task, Observable<Task.TaskBean>>() {
                    @Override
                    public Observable<Task.TaskBean> call(Task task) {
                        return Observable.from(task.getData());
                    }
                })
                .subscribeOn(Schedulers.io()) //在异步线程中处理
                .observeOn(AndroidSchedulers.mainThread()) //订阅后结果在主线程
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.i(TAG, "showLoading...");
                    }
                })
                .doAfterTerminate(new Action0() {
                    @Override
                    public void call() {
                        Log.i(TAG, "hideLoading...");
                    }
                })
                .subscribe(new Subscriber<Task.TaskBean>() {
                    @Override
                    public void onCompleted() {
                        Log.i(TAG ,"onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i(TAG, "onError", e);
                    }

                    @Override
                    public void onNext(Task.TaskBean bean) {
                        Log.i(TAG,"onNext bean = " + bean.getTitle());
                    }
                });

    }

}


2017-03-12 11:50:30 z529905310 阅读数 2536
  • R语言从入门到精通系列之数据导入实战视频课程

    本篇首先介绍了如何将txt、csv、非机构花文本文件数据导入到R语言中,并介绍了四种导入excel数据的方法,在介绍R与MySQL数据库管理中,详细介绍了ROBBC包和RMySQL包的安装及使用;后介绍了网络数据爬虫技术,包括利用XML爬取网络表格数据、利用RCurl包批量下载ftp文件等方法。本课程配套课件和脚本均可下载,方便学员跟着视频自己动手操作。

    4568 人正在学习 去看看 谢佳标

RxJava —— WHAT | WHEN | HOW (基于rxjava2)

1.WHAT

RxJava本质上是一个实现异步操作的库,是扩展的观察者模式。

实现异步还有可以使用AsyncTask、Handler等其他线程类,但是RxJava可以做到简洁。

从SD中读取所有图片并添加到图片管理,因为有IO操作,所以使用线程处理。

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageManager.add(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();    

而使用RxJava中可以这样写:

Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
    @Override
    public Observable<File> call(File file) {
        return Observable.from(file.listFiles());
    }
})
.filter(new Func1<File, Boolean>() {
    @Override
    public Boolean call(File file) {
        return file.getName().endsWith(".png");
    }
})
.map(new Func1<File, Bitmap>() {
    @Override
    public Bitmap call(File file) {
        return getBitmapFromFile(file);
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
    @Override
    public void call(Bitmap bitmap) {
        imageManager.add(bitmap);
    }
});    

虽然代码上是增加了,但是逻辑上采用了链式调用,更加清晰。使用了lamda还可以是这样:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });    

RxJava中有四个基本概念:观察者、被观察者/主题、订阅、事件。

  • Observable/Observer(不支持backpress)

  • Flowable/FlowableSubscriber(支持backpress)

  • Single/SingleObserver(只发送一个onSuccess或者onError的通知)

  • Completable/CompletableObserver(只发送一个onComplete或者onError的通知)

  • Maybe/MaybeObserver(Single与Completable的结合,只发送一个onSuccess或者onComplete或者onError的通知)

Backpress:当被观察者不停发射数据流,而观察者的响应不及时就会产生MissingBackpressureException,所以需要一定的策略来应对无法这种情况。

2.WHEN

常见的使用场景可以是请求网络、io操作等,具体可以参考的业务逻辑,如:
- 检查缓存是否失效取然后选择数据源;
- 需要多个接口返回数据后更新界面;
- 接口的请求参数另一个接口请求返回的数据;
- 界面按钮需要防止连续点击的情况;
- 响应式的界面;
- 复杂的数据变换;

3. HOW

3.1 操作符

creat、just、fromXXX

Observable.create从零开始创建一个Observable;
Observable.just的参数即为将要发射的数据,可传入多个基类相同对象;
Observable.fromXXX方法包括:fromArray、fromCallable、fromFuture、fromIterable、fromPublisher;from方法参数类型:数组、Callable接口对象、Future接口对象、Iterable接口对象、Publisher接口对象

  • Future,Callable都是Java并发框架的接口,Callable 、Future、ThreadPoolExecutor需要一起研究;Publisher接口是rxjava的基础接口。

empty、never、error

empty:创建一个不发射任何数据但是正常终止的Observable,此时会回调onCompleted

never:创建一个不发射数据也不终止的Observable

error:创建一个不发射数据以一个错误终止的Observable

range

该操作符创建特定整数序列的Observable,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据

timer

timer操作符创建一个在给定的时间段之后返回一个特殊值的Observable。它在延迟一段给定的时间后发射一个简单的数字0 。

interval

该操作符按固定的时间间隔发射一个无限递增的整数序列,它接受一个表示时间间隔的参数和一个表示时间单位的参数。

repeat

该操作符是重复的发射某个数据序列,并且可以自己设置重复的次数。

defer

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable,该操作符能保证订阅执行时数据源是最新的数据。

map

该操作符是对原始Observable发射的每一项数据运用一个函数,然后返回一个发射这些结果的Observable。

flatMap

该操作符与map操作符的区别是它将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。

cast

该操作符就是做一些强制类型转换操作的。例如,当我们在页面跳转时数据对象往往是序列化的,当我们在新的页面收到数据后就要强制转换为我们想要的类型。底层调用map操作符。

concatMap

该操作符是类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。

switchMap

当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个。

filter

该操作符接收一个Predicate参数,可以在其中通过运用自己的判断条件去判断我们要过滤的数据,当数据通过判断条件后返回true表示发射该项数据,否则就不发射。

first

只对Observable发射的第一项数据,或者满足某个条件的第一项数据进行处理,则可以使用First操作符。

last

该操作符与first意义相反,若我们只对Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣时使用该操作符。

publish、refCount

Observable 有 Cold 和 Hot 之分:Hot Observable 无论有没有 Observer 订阅,事件都会发出;Cold Observable 只有 Subscriber 订阅时,才开始执行发射数据流的代码。

Observable 的 just、creat、range、fromXXX 等操作符都能生成Cold Observable。

使用 publish 操作符,可以让 Cold Observable 转换成 Hot Observable,它将Observable 转换成 ConnectableObservable,而ConnectableObservable在被订阅后需要调用connect()才会开始发射数据流;ConnectableObservable的refCount操作符可以将Hot Observable转换成 Cold Observable。

3.2 源码分析

订阅

通过查看源码会看到Observable.just调用了Observable.fromXXX方法,而Observable.fromXXX和Observable.create调用了RxJavaPlugins.onAssembly。
RxJavaPlugins.onAssembly参数:

ObservableFromCallable<T> extends Observable<T> implements Callable<T>
ObservableFromArray<T> extends Observable<T>
ObservableFromFuture<T> extends Observable<T>
ObservableFromIterable<T> extends Observable<T>
ObservableFromPublisher<T> extends Observable<T>

创建Observable对象的from方法返回的就是ObservableFrom这些对象,ObservableFrom对象继承自Observable,实现了subscribeActual方法。

Observable.just("").subscribe(new Observer());  

Observable的非静态订阅方法subscribe实际上是调用了subscribeActual(Observer s)

发射数据

Cold Observable的subscribeActual(Observer s)方法实际上也是数据流的发送方法。在源码中会发现,在Observer订阅当前Cold Observable之后便立即开始了数据发射方法,以fromArray举例:

@Override
public void subscribeActual(Observer<? super T> s) {
    FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

先创建一个FromArrayDisposable对象对需要发射的数据流进行封装,在执行订阅s.onSubscribe(d);后立即开始了d.run();数据流的发射操作。


ConnectableObservable是Hot Observable,不会主动发射数据流。
Observable的publish操作符在创建ConnectableObservable是实际上是创建了一个ObservablePublish对象,ObservablePublish继承了抽象类ConnectableObservable, ConnectableObservable也是Observable的子类,ObservablePublish在实现subscribeActual(Observer s)方法的时候只进行了Observer对Observable的订阅,而没有立刻发射数据流。同时实现了ConnectableObservable的connect方法,在connect方法中才开始进行数据流的发送。
这也就是之前提到的,需要在使用publish操作符将Cold Observable转为Hot Observable需要调用connect才能发送数据流。


Hot Observable转为Cold Observable,关键点在ConnectableObservable的refCount操作符会调用RxJavaPlugins.onAssembly()来创建一个继承了Observable的ObservableRefCount对象,ObservableRefCount的构造器接受ConnectableObservable对象,ObservableRefCount在实现subscribeActual(Observer s)方法的时候调用了ConnectableObservable的connect方法,以此来达到Hot Observable转为Cold Observable的目的。


总结:RxJava各种操作符的实现,实际上是装饰模式的各种妙用

线程切换

//常用的线程策略
Schedulers.immediate()//在当前线程运行,默认为此策略;
Schedulers.newThread()//每次都创建新的线程执行操作;
Schedulers.io()//类似newThread()但是此策略有无限量的线程池,主要用于读写文件、数据库、网络请求等;
Schedulers.computation()//用于需要计算的策略,使用线程池,池大小为CPU核心数;
Schedulers.trampoline()//将任务加入一个队列,等待执行
AndroidSchedulers.mainThread()//在Android主线程中执行,    
RxAndroid独有

使用Rxjava可以非常方便指定订阅者对执行线程。例:

Flowable<String> flowableJust = Flowable.just("test Just Flowable");
    Disposable disposable = flowableJust
            .map(new Function<String, String>() {
                public String apply(String s) throws Exception {
                    System.out.println("map1 thread  = " + Thread.currentThread().getName());
                    return s + "mp1";
                }
            })
            .map(new Function<String, String>() {
                public String apply(String s) throws Exception {
                    System.out.println("map2 thread  = " + Thread.currentThread().getName());
                    return s + "mp2";
                }
            })
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.io())
            .map(new Function<String, String>() {
                public String apply(String s) throws Exception {
                    System.out.println("map3 thread  = " + Thread.currentThread().getName());
                    return s + "mp3";
                }
            })
            .map(new Function<String, String>() {
                public String apply(String s) throws Exception {
                    System.out.println("map4 thread  = " + Thread.currentThread().getName());
                    return s + "mp4";
                }
            })
            .subscribe(new Consumer<String>() {
                public void accept(String s) throws Exception {
                    System.out.println("onNext Just s = " + Thread.currentThread().getName());
                }
            }, new Consumer<Throwable>() {
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError Just throwable = " + throwable.getMessage());
                }
            }, new Action() {
                public void run() throws Exception {
                    System.out.println("onComplete Just");
                }
            }, new Consumer<Subscription>() {
                public void accept(Subscription subscription) throws Exception {
                    subscription.request(Long.MAX_VALUE);
                    System.out.println("onSubscribe Just subscription = " + Thread.currentThread().getName());
                }
            });

运行结果:

onSubscribe Just subscription = main
map1 thread  = RxComputationThreadPool-1
map2 thread  = RxComputationThreadPool-1

map3 thread  = RxCachedThreadScheduler-1
map4 thread  = RxCachedThreadScheduler-1
onNext Just s = RxCachedThreadScheduler-1

subscribeOn操作符改变调用它之前代码的线程;observeOn操作符改变调用它之后代码的线程。

除了订阅操作本身在主线程中运行,其他操作都在subscribeOn与observeOn两个操作符指定的线程中运行。

Schedulers是创建Scheduler的工厂, 提供了若干静态方法用来创建各种Scheduler;

Scheduler提供创建Workder的接口;

Worker提供了几种执行任务的接口,用来执行任务, 它底下利用各种类型的线程或者线程池完成任务的执行,它是真正执行任务的地方;

同一类型的Scheduler只有一个,但是对应的worker是不同的.比如Schedulers.computation, 对应的Scheduler只有一个,但是每次调用createWorker,获得的worker是scheduler里面worker数组中的一个(数组数目和处理器的数目相同)

2016-08-15 15:31:07 nicolelili1 阅读数 445
  • R语言从入门到精通系列之数据导入实战视频课程

    本篇首先介绍了如何将txt、csv、非机构花文本文件数据导入到R语言中,并介绍了四种导入excel数据的方法,在介绍R与MySQL数据库管理中,详细介绍了ROBBC包和RMySQL包的安装及使用;后介绍了网络数据爬虫技术,包括利用XML爬取网络表格数据、利用RCurl包批量下载ftp文件等方法。本课程配套课件和脚本均可下载,方便学员跟着视频自己动手操作。

    4568 人正在学习 去看看 谢佳标

使用如下android需要在build.gradle中加入compile 'io.reactivex:rxandroid:1.2.1'

一、在UI现场观察

 //需要导入compile 'io.reactivex:rxandroid:1.2.1'
        Observable<Integer> averageObservable =
                MathObservable.sumInteger(Observable.just(11, 2, 3, 4, 5))
                .subscribeOn(Schedulers.newThread())                                        //subscribeOn 操作符声明你要在子线程执行运算
                .observeOn(AndroidSchedulers.mainThread());                                 //observeOn 操作符声明你要在主线程等待 Observable 的结果

        final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            int result = -1;
            @Override
            public void onNext(Integer s) {
                result = s;
                Log.e(TAG, "onNext................." + s+"..............."+Thread.currentThread().getName());
            }

            @Override
            public void onCompleted() {
                tvContent.setText(result+"");
                Log.e(TAG, "onCompleted................."+Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };
        averageObservable.subscribe(subscriber);

运行结果:



二、RXAndroid中handler的使用

Android使用一个叫 Handler 的类绑定异步通信到消息循环。为了在任意线程 观察 一个Observable,需要创建一个与那个类关联的 Handler,然后使用AndroidSchedulers.handlerThread 、HandlerScheduler.from(handler)调度器:

Observable在一个新的线程执行:

 private void testObserveHandler() {
        final Handler[] observeHandler = new Handler[1];
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                Looper.prepare();
                observeHandler[0] = new Handler(Looper.myLooper()){
                    @Override
                    public void handleMessage(Message msg) {
                        super.handleMessage(msg);
                        switch (msg.what){
                            case 1:
                                int result = (int) msg.obj;
                                Log.e(TAG, "handmessage................." + result + ".............." + Thread.currentThread().getName());
                                break;
                            default:
                                break;
                        }
                    }
                };
                Looper.loop();
            }
        });
        MathObservable.sumInteger(Observable.just(11, 2, 3, 4, 5))
                .subscribeOn(Schedulers.newThread())                                        //subscribeOn 操作符声明你要在子线程执行运算
                .observeOn(HandlerScheduler.from(observeHandler[0]))                        //observeOn 操作符声明subscribe在子线程中执行
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.e(TAG, "sendMessage................." + integer + "............" + Thread.currentThread().getName());
                        Message message = new Message();
                        message.what = 1;
                        message.obj = integer;
                        try {
                            Thread.sleep(10 * 1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        if (observeHandler[0] != null) {
                            observeHandler[0].sendMessage(message);
                        }
                    }
                });

    }

运行结果:



三、Observable何时应该订阅和取消订阅

在Android上,要在异步操作中访问框架中的对象有些棘手,那是因为Andoid系统可以决定销毁(destroy)一个 Activity,例如,当一个后台线程还在运行的时候,如果这个线程尝试访问一个已经死掉的Activity中的View对象,会导致异常退出(Crash)。(这也会导致内存泄露,因为 Activity 已经不可见了,你的后台线程还持有它的引用。)

这仍然是在Android上使用RxJava需要关注的一个问题,但是通过使用 Subscription和其它Observable操作符,你可以优雅地解决这个问题。通常来说,当你在Activity中订阅一个Observable的结果时(无论是直接的还是通过一个内部类),你必须在 onDestroy 里取消订阅,就像下面例子里展示的那样:

// MyActivity
private Subscription subscription;

protected void onCreate(Bundle savedInstanceState) {
    this.subscription = observable.subscribe(this);
}

...

protected void onDestroy() {
    this.subscription.unsubscribe();
    super.onDestroy();
}

这样确保所有指向订阅者(这个Activity)的引用尽快释放,不会再有通知通过 onNext 发射给这个订阅者。

有一个问题,如果由于屏幕方向的变化导致这个 Activity 被销毁,在 onCreate 中这个Observable会再次启动。你可以使用 cache 或 replay 操作符阻止它发生,这些操作符保证Observable在 Activity 的生命周期内存在(你可以在一个全局的缓存中保存它,比如放在Fragment中。)你可以使用任何操作符,只要能保证:当订阅者订阅一个已经在运行的Observable时,在它与Activity 解除关联的这段时间里发射的数据都会被回放,并且来自这个Observable的任何离线通知都会正常分发。




博文 来自: ZkMao1007
没有更多推荐了,返回首页