RxJava的subscribeOn和observeOn区别与原理

RxJava代码语义清晰,数据产生、变换、消费一目了然,极大的提高了代码的可读性、维护性,同时还提供了另外一个特性——线程切换

RxJava用于线程切换的主要有2个操作符:subscribeOn和observeOn

区别

  • subscrieOn
    指定Observable在特定的调度器上发射数据
  • observeOn
    指定Observer在特定的调度器上接收Observable的数据

    observeOn在收到错误通知时会立即回调observer的onError方法,即使之前还有未消费的数据,onError会在它们之前被传递

特性

  • subscribeOn
    Observable的subscribe方法将在操作符链条中第一个subscribeOn指定的调度器上执行,就算出现多个subscribeOn操作符也是如此
  • observeOn
    observeOn会将直接紧跟的后续操作符在其指定的调度器上执行
    例如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    Observable.create((ObservableOnSubscribe<String>) emitter -> {
    System.out.println(Thread.currentThread());
    String s = "from 1 created";
    System.out.println(s);
    emitter.onNext(s);
    emitter.onComplete();
    })
    .subscribeOn(Schedulers.computation())
    .map(s -> {
    System.out.println(Thread.currentThread());
    System.out.println(s + " in map");
    return s;
    })
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .filter(s -> {
    System.out.println(Thread.currentThread());
    System.out.println(s + " in filter");
    return true;
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .subscribe(s -> {
    System.out.println(Thread.currentThread());
    System.out.println(s + " in observer");
    });
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

结果:

1
2
3
4
5
6
7
8
Thread[RxComputationThreadPool-1,5,main]
from 1 created
Thread[RxComputationThreadPool-1,5,main]
from 1 created in map
Thread[RxCachedThreadScheduler-2,5,main]
from 1 created in filter
Thread[RxComputationThreadPool-2,5,main]
from 1 created in observer

源码分析

首先看subscribeOn,只是将原Observable封装成ObservableSubscribeOn

1
2
3
4
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

那么在发生subscribe时会调用其subscribeActual方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

observer.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;

final AtomicReference<Disposable> upstream;

SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}

@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}

@Override
public void onNext(T t) {
downstream.onNext(t);
}

@Override
public void onError(Throwable t) {
downstream.onError(t);
}

@Override
public void onComplete() {
downstream.onComplete();
}

@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}

final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}
}

在订阅时会将后续Observer封装成SubscribeOnObserver(parent字段),并且将上游Observable(也就是source字段)订阅parent的操作封装成SubscribeTask(Runnable),最后使用调度器scheduler安排Runnable的执行,也就是将上游Observable的订阅操作放到一个线程中执行,Observable将在那个线程产生数据并发射给Observer;

如果出现多个subscribeOn的情形,将在第一个出现的subscibeOn所指定的调度器中执行,因为多个操作符拼接的过程本质是封装Observable的过程,自上而下,新调用的操作符会封装上一个Observable,产生新的Observable,直到最终调用subscribe(observer)方法,而调用subscribe方法的过程本质是封装Observer的过程,自下而上,依次调用前面封装的Observable的subscribeActual方法,其中会封装Observer,最终顶层的Observable看到的是一层一层封装的Observer,所以数据会流经各个Observer(外层Observer的onNext调用里层Observer的onNext),所以第一个subscribeOn封装的ObservableSubscribeOn的subscribeActual最后调用,最终订阅操作也就在其指定的调度器中执行,当然如果出现多个subscribeOn中夹杂observeOn,那就以这个observeOn为分割线;

再看下observeOn,首先封装原Observable为ObservableObserveOn

1
2
3
4
5
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

在ObservableObserveOn中,会将Observer封装成ObserveOnObserver,目的是在收到上游数据后先缓存,并在新的线程中将数据转发给下游Observer,即下游Observer就在调度器指定的线程中接收到数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;

SimpleQueue<T> queue;

Disposable upstream;

Throwable error;
volatile boolean done;

volatile boolean disposed;

int sourceMode;

boolean outputFused;

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}

queue = new SpscLinkedArrayQueue<T>(bufferSize);

downstream.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}

@Override
public void dispose() {
if (!disposed) {
disposed = true;
upstream.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}

@Override
public boolean isDisposed() {
return disposed;
}

void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

void drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;

for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}

for (;;) {
boolean d = done;
T v;

try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;

if (checkTerminated(d, empty, a)) {
return;
}

if (empty) {
break;
}

a.onNext(v);
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

void drainFused() {
int missed = 1;

for (;;) {
if (disposed) {
return;
}

boolean d = done;
Throwable ex = error;

if (!delayError && d && ex != null) {
disposed = true;
downstream.onError(error);
worker.dispose();
return;
}

downstream.onNext(null);

if (d) {
disposed = true;
ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}

@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
}

@Override
public void clear() {
queue.clear();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
}

所以Observable最本质的调用过程就是top-level Observable将数据发射给各个操作符创建的Observer,直至bottom-level Observer;

0%