前言
本篇源码分析介绍的是同步请求和异步请求的执行流程以及调度器的相关知识。如果需要学习拦截器链相关知识的,可以直接跳过本篇直接到下一篇进行学习。
基本使用方法
首先要明确一点,okhttp
使用的是 Builder 模式来进行我们网络请求的各种参数设置的,接下来我们先举个简单的例子来看看 okhttp
的同步请求和异步请求是如何使用的。我们首先来看看同步请求的例子:
同步请求举例
/*** 同步请求的简单举例*/
public static void SyncRequest(){
new Thread(new Runnable() {
@Overridepublic void run() {
// 1. 构建 OkHttpClientOkHttpClient client = new OkHttpClient.Builder().readTimeout(8, TimeUnit.SECONDS).connectTimeout(8, TimeUnit.SECONDS).build();// 2. 构建 Request 对象Request request = new Request.Builder().get().url("https://www.baidu.com").build();try {
// 3. 通过 newCall 方法创建 Call 对象Call call = client.newCall(request);//4. 通过调用 call 的 execute 方法执行同步请求Response response = call.execute();Log.d(TAG, response.body().string());}catch (IOException e){
e.printStackTrace();}}}).start();
}
这里我们只简要地进行一个设置,主要是说明使用步骤。
- 首先,我们要构造出一个
OkHttpClient
对象,它可以说是我们整个流程的一个核心,我们通过 Builder 模式对它进行参数设置,如读写超时、连接超时时间和缓存设置等,这里我只简单地对读超时和连接超时进行了设置。 - 其次,我们需要构造出
Request
对象,它用于设置我们的请求参数,同样它也是通过 Builder 模式进行创建的。 - 构建完
Request
对象后,我们就可以调用OkHttpClient
的newCall
方法,这个方法会将Request
对象封装成一个Call
类型的对象。 - 最后就是通过调用
Call
的execute
方法来执行同步请求了,我们这里通过一个Response
类型的变量来接收请求得到的数据。
在这里我们需要注意一点的是,我们的同步请求是放在子线程中去执行的,具体原因很简单:同步请求在没有得到响应的时候,会阻塞线程,而我们的主线程是不允许有耗时操作的,所以我们在这里需要开辟一个子线程来执行这个耗时操作。
讲完了同步请求的流程步骤,接下来我们来看看异步请求的例子。
异步请求举例
/*** 异步请求的简单举例*/
public static void AsyncRequest(){
// 1. 构建 OkHttpClientOkHttpClient client = new OkHttpClient.Builder().readTimeout(8, TimeUnit.SECONDS).connectTimeout(8, TimeUnit.SECONDS).build();// 2. 构建 Request 对象final Request request = new Request.Builder().get().url("https://www.baidu.com").build();// 3. 通过 newCall 方法创建 Call 对象Call call = client.newCall(request);// 4. 通过调用 call 的 enqueue 方法执行异步请求call.enqueue(new Callback() {
@Overridepublic void onFailure(Call call, IOException e) {
Log.d(TAG, e.getMessage());}@Overridepublic void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, response.body().string());}});
}
可以看到,异步请求和同步请求的设置其实有非常多的相似之处,它们的唯一不同点在于第4步的调用方法上,同步方法调用的是 execute
,而异步调用的则是 enqueue
方法。enqueue
方法会接收一个 Callback
对象,这个对象会回调两个关键方法:onFailure
和 onResponse
。onFailure
会在我们请求失败的时候进行回调,而 onResponse
方法则是在得到响应后进行回调。
这里同样需要注意一点,所谓的异步,也就是不会阻塞我们的线程的意思,所以它实际上是在内部自己开辟了一个子线程来接收回调方法的,所以我们的 onFaliure
方法和 onResponse
方法都是位于我们的子线程中,在我们需要执行 UI 的相关操作时,应当先在方法内部切换回主线程进行操作。
接下来我们通过一张图来对同步和异步的使用做一个简单的总结:
说完了 okhttp
两种基本的使用方法,接下来我们便要从执行流程来一步步分析 okhttp
的源码了。
源码分析
本着先易后难的原则,笔者会首先对同步请求的执行流程进行分析,然后再对异步请求的执行流程进行分析,由于 okhttp
的源码中涉及的重点如调度器器和拦截器等都是非常大的一块知识,所以笔者会将执行的流程顺序进行梳理,然后在梳理完毕之后再介绍调度器器和拦截器等的知识点。接下来我们来看看同步请求的执行流程。
同步请求的执行流程
通过上面的讲解,我们知道了,在同步请求的时候,我们首先会通过 OkHttpClient.Builder()
来对 OkhttpClient
对象进行构造,我们首先来看看 Builder
里面是什么:
public Builder() {
dispatcher = new Dispatcher(); // 调度器protocols = DEFAULT_PROTOCOLS;connectionSpecs = DEFAULT_CONNECTION_SPECS;eventListenerFactory = EventListener.factory(EventListener.NONE);proxySelector = ProxySelector.getDefault();cookieJar = CookieJar.NO_COOKIES;socketFactory = SocketFactory.getDefault();hostnameVerifier = OkHostnameVerifier.INSTANCE;certificatePinner = CertificatePinner.DEFAULT;proxyAuthenticator = Authenticator.NONE;authenticator = Authenticator.NONE;connectionPool = new ConnectionPool(); // 连接池dns = Dns.SYSTEM;followSslRedirects = true;followRedirects = true;retryOnConnectionFailure = true;connectTimeout = 10_000; // 连接超时时间readTimeout = 10_000; // 读超时时间writeTimeout = 10_000; // 写超时时间pingInterval = 0;}
Builder
是 OkHttpClient
的一个静态内部类,可以看到它的构造方法初始化了非常非常多的成员变量,我们可以通过调用相应的方法来设置这些成员变量的值。然后通过调用 build
方法,就可以返回一个构造完毕的 OkHttpClient
的对象了,build
的源码如下所示:
public OkHttpClient build() {
return new OkHttpClient(this);
}
由于这个方法是 Builder
类中的方法,所以 this
所指的毫无疑问就是我们构造好的 Builder
对象了,通过传入这个构造好的对象,我们就可以构造出我们所需的 OkhttpClient
对象了。
好了,在了解完 OkHttpClient
对象是如何构造出来之后,第二步就是构造 Request
对象了,它也是使用的 Builder 模式构建对象的,所以我们也来看看它的 Builder
里面有什么:
public Builder() {
this.method = "GET";this.headers = new Headers.Builder();
}
可以看到在它的内部的 Builder
构造方法中,默认的请求方法是 get
请求,然后还会新建一个请求头。最后也是通过 build
方法构造出我们需要的 Request
对象。
构造完 Request
对象后,接下来就到了我们的第三部,将 Request
对象封装成 Call
对象。它是通过调用 OkHttpClient
的 newCall
方法实现的,我们来看看这个方法里面做了什么:
/*** Prepares the {@code request} to be executed at some point in the future.*/@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);}
在这里我们可以看到,这个方法返回的是 RealCall
的 newRealCall
方法,在这里需要先说明,Call
是一个接口,它的实现类就是 RealCall
,我们接下来来看看 RealCall
的 newRealCall
方法做了什么吧!
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.RealCall call = new RealCall(client, originalRequest, forWebSocket);call.eventListener = client.eventListenerFactory().create(call);return call;
}
可以看到这个方法构造了一个 RealCall
对象,然后对这个对象设置了事件监听,我们继续往下看看 RealCall
的构造方法做了什么:
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;this.originalRequest = originalRequest;this.forWebSocket = forWebSocket;this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
这个构造方法会将我们之前构造好的 OkHttpClient
对象和 Request
对象保存起来,然后我们关键看到代码第5行,这是一个重试和失败重定向拦截器,我们后面会对 okhttp
的拦截器链机制进行分析,我们在这里知道对它进行了初始化就可以了。
在知道 newCall
方法最终返回的是一个 RealCall
对象之后,我们的同步请求就剩下最后一个调用方法 execute
了,这也是同步请求的核心方法。它的源码如下所示:
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");executed = true;}captureCallStackTrace();eventListener.callStart(this);try {
client.dispatcher().executed(this);Response result = getResponseWithInterceptorChain(); // 发生阻塞的地方if (result == null) throw new IOException("Canceled");return result;} catch (IOException e) {
eventListener.callFailed(this, e);throw e;} finally {
client.dispatcher().finished(this);}
}
在这个方法中,首先会在同步块中对 executed
进行判断,它主要是用于判断我们当前的网络请求是否被执行过了,因为一个网络请求只能被执行一次,如果我们的网络请求已经被执行过,就会抛出异常。
接下来我们看到代码第9行,这里我们调用了 OkHttpClient
的 dispatcher
方法,我们来看看这个方法里面做了什么:
public Dispatcher dispatcher() {
return dispatcher;
}
可以看到,它就是简单地返回了我们的在 OkHttpClient
中的 Dispatcher
对象,它是一个调度器,用于控制并发的请求,我们的请求调度都是通过这个对象来完成的,它的源码分析我们会留在后面。在得到 Dispatcher
对象后,我们接下来就是调用它的 executed
方法,它的源码如下所示:
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
可以看到这是一个同步方法,它会将我们的 RealCall
对象通过 add
方法添加到 runningSyncCalls
这个对象当中,我们来看看它是什么类型的对象:
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
可以看到这个对象是一个用于存放 RealCall
的队列,通过注释我们可以得知,当我们有产生同步请求时,就会将请求放置在这个队列中。分析到这里,我们知道了,同步请求的 executed
会首先将我们的请求添加到 runningSyncCalls
这个队列中。接下来让我们继续回到 executed
方法往下看,我们接着创建了一个 Response
对象,通过 getResponseWithInterceptorChain
方法来接收我们的请求响应,getResponseWithInterceptorChain
是我们 okhttp
分析的另一个重要的核心,我们同样留在后面再做分析,这里我们只需要知道通过这个方法我们能够获取响应数据就可以了。在取得响应后,最后我们一定会执行 Dispatcher
的 finished
方法,finished
方法的源码如下所示:
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
可以看到它调用的是另一个 finished
方法,我们继续看它的源码:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;Runnable idleCallback;synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");if (promoteCalls) promoteCalls();runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();}
}
在这个方法中,我们看到它内部的同步块,在代码第5行,首先判断能否移除这次已经执行结束的同步请求,如果不能就会抛出异常。接下来的 if
语句 promoteCalls
变量由于我们传进来的是 false
,所以不会执行 promoteCalls
方法。跳过这个方法继续往下看,在代码第7行,我们会通过 runningCallsCount
计算现在正在执行的请求数,我们可以看看这段代码的源码:
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
可以看到它返回的是 runningAsuncCalls
和 runningSyncCalls
这两个队列的大小之和,这两个队列分别存放着正在执行的异步和同步请求,所以通过返回它们的大小也就能知道现在到底有多少请求正在执行了。
分析完这个方法后我们继续回到 finished
方法的第11行,如果我们正在执行的请求数为0,也就是不存在请求了并且 idleCallback
不为空的话,就会执行 idleCallback
的 run
方法。
至此,我们的同步请求流程就分析完了,可以看到我们的同步请求流程还是比较简单的,我们来对同步请求的执行流程做一个小总结:
- 同步请求首先构造出
OkHttpClient
对象和Request
对象,设置好相应的参数。它们都是通过 Builder 模式进行构造的,其中Request
对象的默认请求方法是get
。 - 紧接着通过
OkhttpClient
的newCall
方法将Request
对象封装成Call
对象,Call
是一个接口,它的实现类是RealCall
。 - 最后便是调用
Call
的execute
方法了,实际上也就是调用了RealCall
的execute
方法,这个方法会首先判断当前请求是否被执行过,如果已经被执行过就会抛出异常,紧接着会执行调度器Dispatcher
的executed
方法,将Call
对象添加到同步请求队列runningSyncCalls
中。 - 获取的响应数据
Response
对象是通过getResponseWithInterceptorChain
方法来获得的。 - 在执行完请求之后,无论是否抛出异常,都一定会执行
finished
方法来将runningSyncCalls
队列中已执行完毕的同步请求Call
对象移除。
我们接下来先来分析我们的异步请求的执行流程。
异步请求的执行流程
在我们前面的例子中,我们已经知道了,同步和异步的区别仅仅在第4步的调用方法上:同步调用的是 execute
方法,而异步则是调用 enqueue
方法,前3步二者都是一样的,所以在这里的异步请求执行流程我们就直接从 enqueue
方法讲起。
Call
的 enqueue
方法需要传入一个 Callback
对象,我们首先来看看这个参数是怎么定义的:
public interface Callback {
void onFailure(Call call, IOException e);void onResponse(Call call, Response response) throws IOException;
}
可以看到非常简单的一个接口,定义了两个方法,分别对应于我们请求失败和请求成功的情况,这个接口的作用是回调。接下来我们就来看看 enqueue
方法的内部代码,前面提到过了Call
是个接口,所以 enqueue
方法的实现应该在它的实现类 RealCall
里面,RealCall
中的 enqueue
代码如下所示:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");executed = true;}captureCallStackTrace();eventListener.callStart(this);client.dispatcher().enqueue(new AsyncCall(responseCallback));}
可以看到它其实和我们同步请求调用的 execute
方法非常相似,都是首先在同步块中判断这个请求是否被执行过了,如果为 true
就会抛出异常。接下来就是它和 execute
方法的不同点了,我们看到代码第8行,它通过 OkHttpClient
获得 Dispatcher
对象,然后再通过 Dispatcher
对象调用 enqueue
方法。Dispatcher
的 enqueue
方法接收一个 AsyncCall
类型的参数,在 AsyncCall
中会对我们的 Callback
对象进行包装,我们首先来看看这个 AsyncCall
是何方神圣:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());this.responseCallback = responseCallback;}......
}
可以看到它内部维护了一个 Callback
对象,我们传进来的 Callback
在这里被保存了起来,我们还看到它是继承自 NamedRunnable
的,我们来继续看看 NamedRunnable
里面是什么:
public abstract class NamedRunnable implements Runnable {
protected final String name;public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);}@Override public final void run() {
String oldName = Thread.currentThread().getName();Thread.currentThread().setName(name);try {
execute();} finally {
Thread.currentThread().setName(oldName);}}protected abstract void execute();
}
通过源码我们得知,NamedRunnable
是一个抽象类,它是实现了 Runnable
的。在它的 run
方法中,调用了 execute
方法,而 execute
方法我们在代码第18行可以看到是一个抽象方法,因为它是在 run
方法中被执行的,所以它就是在子线程中被执行的,那么接下来我们便回到 NamedRunnable
的父类 AsyncCall
方法中看看父类的实现方法 execute
里面做了什么:
@Override protected void execute() {
boolean signalledCallback = false;try {
Response response = getResponseWithInterceptorChain();if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {
signalledCallback = true;responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {
eventListener.callFailed(RealCall.this, e);responseCallback.onFailure(RealCall.this, e);}} finally {
client.dispatcher().finished(this);}
}
我们首先看到代码第4行,在这里我们依然是通过 getResponseWithInterceptorChain
方法来获得我们的响应数据 Response
,getResponseWithInterceptorChain
方法这里后面做讲解,我们现在只需要知道通过这个方法我们能获取我们想要的数据就行了。接下来看到第5行的 if
判断,这里我们通过 retryAndFollowUpInterceptor
做了一个判断来决定回调哪个方法。
retryAndFollowUpInterceptor
前面我们已经说过了是一个重试和失败重定向拦截器,它的 isCanceled
方法如果返回 true
,说明拦截器链被关闭了,那么我们也就无法获得响应了,关于它的知识我们会留在后面一篇博客进行讲解。所以如果这个 if
判断为 true
,那么我们就会回调我们 Callback
对象的 onFailure
方法,否则就会回调我们 Callback
对象的 onResponse
方法。而在代码第18行我们可以看到,如果抛出异常,也会回调 onFailure
方法。
最后看到 finally
块中调用的 finished
方法,既然是放在 finally
中的,那就一定会被执行,我们来看看它里面做了什么:
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
可以看到它调用的是另一个 finished
方法,这个方法和我们在同步请求中调用的是同一个 finished
方法,不同点是第一个参数异步请求传进来的是 runnigAsyncCalls
队列,并且第三个参数传进来的是 true
。我们再次来看看这个方法:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;Runnable idleCallback;synchronized (this) {
if (!calls.remove(call))sho throw new AssertionError("Call wasn't in-flight!");if (promoteCalls) promoteCalls();runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();}
}
首先在同步块中判断能否移除这个已经执行完毕的异步 Call
请求,不能移除就会抛出异常。紧接着由于 promoteCalls
参数我们传进来的值是 true
,那么在异步中我们就会执行 promoteCalls
方法了,这个方法涉及到了我们 Dispatcher
调度器的内容,我们同样是留在后面做讲解,它的作用是在任务执行完之后,手动清除掉线程池中的缓存区,并把等待中的请求添加到执行队列中去,我们先记住这一点就可以了。接下来就是通过判断正在执行的请求数来决定是否调用 idleCallback
的 run
方法了,这个在前面讲解同步请求的时候已经介绍过了,所以也就不再赘述了。
总结一下 AsyncCall
的 execute
方法,它的作用就是在新开辟的子线程中,获取响应,然后根据不同的情况来决定回调 Callback
的 onFailure
或者 onResponse
方法。这也印证了我们之前的注意事项,在 onFaliure
或 onResponse
方法中操作 UI 需要先调回主线程再进行操作。在执行完当前的异步请求之后,会调用 promoteCalls
方法手动清理线程池中的缓存区,并把等待中的请求添加到执行队列中去。
分析完了 AsyncCall
的 execute
方法,我们的 AsyncCall
就分析到这里了。AsyncCall
的重点也就是它的 execute
方法,要记住它是运行在 run
方法中的。分析完了 Dispatcher
中的 enqueue
方法的参数 AsyncCall
,我们接下来就来看看 Dispatcher
的enqueue
方法里面做了什么吧:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);executorService().execute(call);} else {
readyAsyncCalls.add(call);}
}
可以看到这是一个同步方法,在这段代码中,我们先来看看 runningAsyncCalls
和 readyAsyncCalls
的含义:
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
从注释中我们可以得知,readyAsyncCalls
是一个用于存放异步请求的等待队列,而 runningAsyncCalls
则是一个用存放正在执行异步请求的队列,这个正在执行也包括那些未执行完但被我们取消了的请求。知道了这些信息,我们继续回到 enqueue
方法,看到 enqueue
方法的第2行,在这个 if
判断中,如果 runningAsyncCalls
的大小小于 maxRequests
并且 runningCallsForHost
方法的返回值小于 maxRequestsPerHost
时,我们的 AsyncCall
对象就会被添加到 runningAsyncCalls
队列中,否则 AsyncCall
对象就会被添加到 readyAsyncCalls
队列中。
我们来分析这个 if
判断,maxRequests
的默认值为64,也就是说当异步请求正在执行的个数小于64的时候,第一个条件成立。而 maxRequestsPerHost
的 默认值为5,runningCallsForHost
方法返回当前请求的 host
共享的请求数,如果不超过5个,那么第二个条件就成立了。也就是说,当正在执行的异步请求个数少于64个并且当前请求的 host
请求数不超过5个时,我们的 AsyncCall
对象就会被直接添加到 runningAsyncCalls
队列中,否则就会添加到 readyAsyncCalls
队列中。
在 if
条件成立后,我们除了将 AsyncCall
对象添加到 runningAsyncCalls
队列中外,我们还执行了 executorService().execute(call)
语句,我们首先来看看 executorService
方法里面做了什么:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}
可以看到这个方法返回的是一个线程池,紧接着这个返回的线程池就会调用它的 execute
方法来执行我们的 AsyncCall
对象的线程的 run
方法,我们之前分析过了 AsyncCall
方法的线程执行的 run
方法其实就是执行 AsyncCall
的 execute
方法,在它里面会根据不同的情况回调我们 Callback
对象的 onFailure
方法或 onResponse
方法。所以千万要记住一句话:onFailure
和 onResponse
方法都是位于子线程中的方法,不要在里面直接进行 UI 操作!
到这里,我们的异步请求的执行流程分析就结束了。我们来对异步请求的执行流程做一个小总结:
- 首先异步请求调用的
enqueue
方法内部会判断这个请求是否被执行过了,如果是执行过的就会抛出异常,因为每个请求只能被执行一次。 - 接着就会通过调度器来执行
Dispatcher
的enqueue
方法,这个方法中我们会传入Callback
对象的包装类AsyncCall
的对象,然后通过判断我们的异步请求应当添加到runningAsyncCalls
队列中直接执行还是readyAsyncCalls
队列中进行等待,如果是直接执行,那么就会在线程池中执行我们的请求。 - 在线程池中执行请求实际上会调用到
AsyncCall
中的execute
方法,这个方法会根据不同情况决定回调Callback
对象的onFailure
方法还是onResponse
方法,所以这两个方法是在子线程中执行的,不要在里面直接操作 UI !!! - 在执行完请求之后,无论是否抛出异常,都一定会执行
finished
方法来将runningAsyncCalls
队列中的请求对象移除。
调度器 Dispatcher
我们在前面的源码分析中,一直跳过的调度器部分,就要在这里进行分析了。从前面的源码中我们也能感受的到,无论是同步请求还是异步请求,在执行的方法(execute
/ enqueue
)中总会通过获取到的 Dispatcher
对象的 execute
或 enqueue
方法来执行我们的网络请求。所以 okhttp
的请求管理,也就是通过 Dispatcher
对象来调度的了。首先我们来看到 Dispatcher
这个类的代码:
public final class Dispatcher {
....../** Executes calls. Created lazily. */private @Nullable ExecutorService executorService;/** Ready async calls in the order they'll be run. */private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();/** Running synchronous calls. Includes canceled calls that haven't finished yet. */private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;}public Dispatcher() {
}public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;}......
}
可以看到,这个类的内部定义了一个线程池 ExecutorService
,还有3个队列,分别是readyAsyncCalls
、runningAsyncCalls
和 runningSyncCalls
。这些我们在之前的分析中都有介绍过,在这里我们再来对它们进行一次介绍,因为它们太重要了。
首先是线程池 ExecutorService
,它可以通过构造方法传入我们自定义的线程池进行构造或者通过默认的线程池进行构造,从注释中我们知道它是通过懒汉模式加载的,也就是用到它的时候它才会被实例化。它是通过 executorService
方法实例化的,这个方法代码如下:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}
这个方法在之前我们粗略地介绍了一下,我们在这里仔细地看看它是如何构造出一个线程池的。在代码第3行,可以看到新建了一个线程池,传进的前4个参数分别为 0,Integer.MAX_VALUE
,60 和 TimeUnit.SECONDS
。因为这里涉及到的是线程池的知识,不懂的同学请自行搜索相关知识进行学习,这里不会进行细致的介绍。
这个线程池的核心线程数为 0,最大线程数为 Integer
的最大值,也就是说理论上我们可以创建出无限多个线程,但事实上它会受到我们其它变量的约束,所以我们实际上无法创建出无限多的线程出来。
紧接着两个参数设置的是非核心线程闲置的超时时间,在这里的设置是 60s。而我们的线程池中所有线程均为非核心线程,也就是说,当我们的线程在线程池中超过 60s 没有被执行的时候,它就会被回收掉。
介绍完了线程池,我们接下来看到三个队列:
- readyAsyncCalls:异步等待队列,当异步请求无法添加到异步执行队列中时,就会被添加到这个队列中来。
- runningAsyncCalls:异步执行队列,在这个队列中的异步请求不但包括了正在执行的请求,也包括未执行完成但被取消了的请求。
- runningSyncCalls:同步执行队列,在这个队列中的同步请求不但包括了正在执行的请求,也包括未执行完成但被取消了的请求。
在简单分析完 Dispatcher 这个类的几个重要的成员之后,我们依然分为同步请求和异步请求的调度来进行分析。
同步请求的调度
我们在这里就直接看到同步请求的 execute
方法:
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");executed = true;}captureCallStackTrace();eventListener.callStart(this);try {
client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain();if (result == null) throw new IOException("Canceled");return result;} catch (IOException e) {
eventListener.callFailed(this, e);throw e;} finally {
client.dispatcher().finished(this);}}
这段代码我们之前已经分析过了,接下来我们站在 Dispatcher
的角度来进行分析。我们直接看到代码第9行,调用 Dispatcher
的 executed
方法,这个方法的代码如下:
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
这个方法非常的简单,就是直接将 Call
对象添加到了 runningSyncCalls
这个同步执行队列中,表明它正在被执行,紧接着当执行结束之后,我们一定会调用到第17行 Dispatcher
的 finished
方法,我们重新来看看这个方法里面干了什么:
/** Used by {@code Call#execute} to signal completion. */void finished(RealCall call) {
finished(runningSyncCalls, call, false);}private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;Runnable idleCallback;synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");if (promoteCalls) promoteCalls();runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();}}
这个方法调用的就是下面的 finished
方法,在这个方法中,我们看到代码第10行,我们会在这里从 runningSyncCalls
队列中移除我们的 Call
对象,因为它已经执行完了嘛。如果无法移除了,那么它就会抛出异常。所以到这里,我们就把同步请求中调度器对 Call
从添加到移除的流程介绍完了,接下来介绍更为复杂的异步请求调度过程。
异步请求的调度
在异步请求中,我们也是直接看到我们的 enqueue
方法:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");executed = true;}captureCallStackTrace();eventListener.callStart(this);client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
我们直接看到代码第8行,在这里我们执行了 Dispatcher
的 enqueue
方法,它的代码如下:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);executorService().execute(call);} else {
readyAsyncCalls.add(call);}
}
这段代码我们在之前同样也进行过分析,我们在这里重新在 Dispatcher
的角度进行分析。首先 if
判断语句会判断正在执行的异步请求线程数是否小于 64 以及当前请求的主机请求数是否小于 5,如果都满足的话,那么就会将请求直接添加到 runningAsyncCalls
队列中,并且将请求交由线程池进行处理,否则就会被添加到 readyAsyncCalls
队列中进行等待。线程池中处理的 run
方法实际上处理的就是 AsyncCall
的 execute
方法,这个方法的源码如下:
@Override protected void execute() {
boolean signalledCallback = false;try {
Response response = getResponseWithInterceptorChain();if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {
signalledCallback = true;responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {
eventListener.callFailed(RealCall.this, e);responseCallback.onFailure(RealCall.this, e);}} finally {
client.dispatcher().finished(this);}
}
这段代码我们之前也是分析过,我们这里同样只分析和 Dispatcher
相关的部分,我们直接看到代码第21行的 finally
块中,在这里的时候我们的请求已经执行完毕了,所以接下来就会调用 Dispatcher
的 finished
方法:
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
它内部调用的 finished
和同步请求的调用的是同一个 finished
方法,区别是它的第一个参数传入的是异步执行队列 runningAsyncCalls
,第三个参数传入的是 true
。我们再次看到 finished
方法:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;Runnable idleCallback;synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");if (promoteCalls) promoteCalls();runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();}
}
同样的,在同步块中,我们会先将执行完的 Call
移除,由于我们传进的 promoteCalls
值为 true
,所以在异步请求中 promoteCalls
方法会得到执行,它的源码如下所示:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();runningAsyncCalls.add(call);executorService().execute(call);}if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.}
}
这个方法主要是用于将异步等待队列中的请求调度到异步执行队列中。在代码第2行的 if
判断中,首先判断异步执行队列 runningAsyncCalls
的数量是否超过了最大值,如果是就直接返回,因为这意味着我们的异步执行队列没有空间留给我们等待中的请求了。接下来看到代码第3行的 if
判断,如果异步等待队列为空,也是直接返回,这也好理解,如果没有等待中的请求了,也就不需要调度了。
接下来就到了代码第5行的 for
循环了,这是整段代码的关键。在这个 for
循环中,我们会遍历异步等待队列 readyAsyncCalls
的请求,然后在执行队列 runningAsyncCalls
有空闲位置的时候就会将等待队列中的 Call
移除,然后添加到执行队列中去。也就是在这里,okhttp
实现了对异步等待队列中的请求的调度。对于异步请求的调度,到这里也就分析完了。
在最后,我们对调度器做个小总结:
- 在同步请求中,调度器在同步请求执行前将其添加到同步队列
runningSyncCalls
中,在执行结束后从队列中将它移除。 - 在异步请求中,调度器首先会判断异步执行队列
runningAsyncCalls
是否未满以及请求的主机请求数是否未超过5个,如果这2个条件均满足就会直接将请求添加到runningAsyncCalls
中并在线程池中执行这个请求,当请求执行结束后,调度器会通过promoteCalls
方法将readyAsyncCalls
队列中的请求调度到runningAsyncCalls
中去执行。
总结
本次源码分析主要是对同步请求和异步请求的异同、执行流程和调度器 Dispatcher
做了一个分析,相对而言还是比较简单的。对于本篇遗留下来的知识点拦截器链,会在后面的系列文章进行讲解。祝大家学习愉快!有问题的话可以在下面评论区给我留言。