当前位置:威尼斯 > 编程 > 执行结果反馈,volley整个源码都是用了接口编程

执行结果反馈,volley整个源码都是用了接口编程

文章作者:编程 上传时间:2019-11-08

功能说明

LongIntervalRetries是基于Quartz.Net的一个长时间间隔重试的类库,其主要解决何时执行威尼斯,以及执行结果反馈的问题。

volley源码学习

之前一直对于源码学习抱着一种又爱又恨的心情。爱的是因为知道源码有一些特别好的设计思路,可以让自己借鉴,而且对于设计模式来说是最好的实战场。那为啥还会恨呢,曾经很多次下载了很多开源库的源码,可是看的看的就感觉云里雾里,不知所踪。心中没有一个总体的框架,总感觉看的细如牛毛,一叶障目。今天又找时间翻出最简单的volley,准备从头再看一遍。没想到收获很多,写下这篇文章,用来记录。

项目开源地址:

产生的原因

简单的说,我们提供了一系列的API供第三方调用,但因为实际API对应的业务处理时间较长,所以为了增加吞吐量,实际的业务逻辑并没包含在API服务器,而是分布在了不同的服务器上进行处理,在业务处理结束后,再通过调用第三方提供的回调Url通知处理结果,为了保证回调正确,那么我们需要有这么一个策略:如果回调失败,我们需要在指定时间间隔之后再次回调,如此反复直至回调成功或者达到重复次数上限。上述回调方案是不是很熟悉?嗯,好吧,直白的讲,回调这一部分我们借鉴(抄袭)了支付宝支付时的回调方案,所以我们要解决的,就是通过代码来实现回调这部分的业务场景,于是也就有了LongIntervalRetries

volley是什么

volley是一个封装好的网络库,是把httpclient或HttpURLConnection又封装了一层,加上了线程,队列,缓存等机制,让网络请求更加容易。

volley是google官方推出的一个开源项目,专门适用于android轻量的请求。但是对于大文件等支持不好。

volley整个源码都是用了接口编程的思想,这也比较符合设计模式的优秀实践。

所以看volley源码会对于接口编程有较深入的理解。

除了volley之外,还有一个retrofit网络框架,是对okhttp的封装,这个框架现在特别火,而且如果你想领略设计模式之美的话,这个retrofit源码必须看,等之后再写一篇分析retrofit的文章

上一篇文章,简单介绍了enode框架中消息队列的设计思路,本文介绍一下enode框架中关系消息的重试机制的设计思路。

为什么不使用Polly

Polly是.NET基金会下的弹性和瞬态故障处理库,其解决的问题天然符合我们回调的业务场景,但为什么在此处却不被采用呢,原因如下:

  • Polly的重试机制是个短时间内的机制,在其重试机制时间内,当前Task一般是通过await阻塞的,而对于我们的场景来说,这明显是不合适的,我们的场景并不应该发生在当前线程内
  • Polly并不支持程序重启时的重试恢复,这一点在我们的业务场景中及其重要,总不能服务器重启后,我们还没回调成功的业务就全部丢了吧

volley工作流程

首先先放一张google官方给的流程图,虽然比较简单,但是可以在心中有一个大致的概念

威尼斯 1

1.png

首先我们从volley调用入手,找到调用入口,这样就可以按图索骥,一点点摸索出整个的流程。

RequestQueue queue = Volley.newRequestQueue(this);
String url ="https://www.gaotenglife.com";

// Request a string response from the provided URL.
StringRequest stringRequest = new StringRequest(Request.Method.GET, url,
            new Response.Listener<String>() {
    @Override
    public void onResponse(String response) {
        // Display the first 500 characters of the response string.
        mTextView.setText("Response is: "+ response.substring(0,500));
    }
}, new Response.ErrorListener() {
    @Override
    public void onErrorResponse(VolleyError error) {
        mTextView.setText("That didn't work!");
    }
});
// Add the request to the RequestQueue.
queue.add(stringRequest);

从上面的代码可以看出,我们首先通过newRequestQueue创建出一个requestqueue,也就是请求队列。然后把一个请求加入到请求队列里面。这里可以看出,当我们把stringrequest加入请求队列后,请求便开始了,所以我们便从add看起。

public <T> Request<T> add(Request<T> request) {
    // Tag the request as belonging to this queue and add it to the set of current requests.
    request.setRequestQueue(this);
    synchronized (mCurrentRequests) {
        mCurrentRequests.add(request);
    }

    // Process requests in the order they are added.
    request.setSequence(getSequenceNumber());
    request.addMarker("add-to-queue");

    // If the request is uncacheable, skip the cache queue and go straight to the network.
    if (!request.shouldCache()) {
        mNetworkQueue.add(request);
        return request;
    }
    mCacheQueue.add(request);
    return request;
 }

这里面主要的功能就是把网络请求加入到对应的网络请求队列(mNetworkQueue),如果设置了请求需要缓存的话,同时也加入到缓存队列中。这里比较奇怪,为啥只是单单加入队列网络请求就能发出去呢。于是我们接着向下看,最有可能是在mNetworkQueue.add的时候,发出请求。于是我们继续深入,发现mNetworkQueue也只是一个简单的线程安全的队列,也没有做过多的操作。其实这样也是合理的,队列不应该包含更多的业务逻辑在里面。于是我继续查找,是哪里持有了这个队列。

 public void start() {
    stop();  // Make sure any currently running dispatchers are stopped.
    // Create the cache dispatcher and start it.
    mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
    mCacheDispatcher.start();

    // Create network dispatchers (and corresponding threads) up to the pool size.
    for (int i = 0; i < mDispatchers.length; i++) {
        NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
                mCache, mDelivery);
        mDispatchers[i] = networkDispatcher;
        networkDispatcher.start();
    }
}

于是发现两个地方使用了这个mNetworkQueue队列,CacheDispatcher,NetworkDispatcher从名字是就能看出,最有可能就是NetworkDispatcher里面进行网络队列的处理了。于是我们看NetworkDispatcher,发现这个NetworkDispatcher原来就是一个线程。

public void run() {
    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
    while (true) {
        try {
            processRequest();
        } catch (InterruptedException e) {
            // We may have been interrupted because it was time to quit.
            if (mQuit) {
                return;
            }
        }
    }
}

在它的run方法里面,不断循环调用processRequest,在processRequest里面从请求队列里取出request

Request<?> request = mQueue.take();//取出请求
......


 // Perform the network request.
        NetworkResponse networkResponse = mNetwork.performRequest(request);//通过networt真正去请求网络

从这里看出,单一职责设计思想。队列只处理队列的逻辑,网络只处理网络的逻辑。不交叉写到一起。

那么network里面是如何真正请求网络的呢,因为比较volley是封装了具体网络请求库。
于是我们看到BasicNetwork里面,它里面有BaseHttpStack,而BaseHttpStack是个抽象类,它的子类,比如HurlStack是封装了
HttpURLConnection,而HttpClientStack是封装了httpcliet底层库。这样network就可以根据配置,选取我们需要的底层网络库。

那么请求完之后,返回的结果如何回调回去呢。这里又借助ResponseDelivery,将请求的结果发送到ui线程

 mDelivery.postResponse(request, response);

我们再进入ExecutorDelivery这里面

public void postResponse(Request<?> request, Response<?> response, Runnable runnable) {
    request.markDelivered();
    request.addMarker("post-response");
    mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, runnable));
}

然后通过mResponsePoster这里面调用handler,最后将结果发送给ui线程

 // Deliver a normal response or error, depending.
       if (mResponse.isSuccess()) {
            mRequest.deliverResponse(mResponse.result);
        } else {
            mRequest.deliverError(mResponse.error);
        }

这里最后调用了request里面设置的监听函数,最后把数据给了调用方。

到此我们的一个调用过程就分析完毕了。是不是挺简单的。

对于一个EDA架构为基础的框架,核心就是消息驱动,然后基于最终一致性的原则。所以,非常重要的一点是,如果消息一次执行不成功,那该怎么办?我能想到的对策就是消息的重试。我发现,这篇文章比较难写,因为感觉要把复杂的事情清晰的表达出来,感觉确实不容易。说到重试,那什么是消息的重试呢?怎么重试呢?我这里提到的重试是指,一个消息,从消息队列取出来后,要处理,但是处理失败了,然后要重新尝试再处理该消息;怎么重试?这个问题比较复杂,不能用简单的一两句话来说明。

设计思路及演变

一开始我们的设计思路非常简单,就是如何定时触发回调这个业务代码,但之后发现,为什么我们要仅限于回调呢?回调只是一个业务场景,但我们完全有可能有其它业务场景,恰恰我们也的确存在这样的业务场景,我们需要向第三方服务商进行一些请求,该请求同样耗时较长,该场景是不是很熟悉?只不过与我们作为服务商不同,该服务商居然没提供回调方案,它需要我们自己定时去回调!而同样是这个第三方,其业务请求参数具有相当的定制性,其需要我们预先做很多业务性的预处理,也就是需要做一些顺序性的工作后,我们才能得到完整的请求参数。
于是我们的设计思路开始调整,最终得出该封装应当具备的功能点:

  • 其内部应该封装掉如何定时触发这个功能
  • 其应该具备同时支持多种业务策略(策略模式)
  • 其应该允许设置什么时候来触发要执行的策略
  • 其应该支持服务启动时自动恢复未结束任务的能力
  • 其应该具备最终执行结果通知的能力

缓存的实现

在刚刚上面分析代码的时候说到,mNetworkQueue被两个类持有,一个就是咱们已经分析过的NetworkDispatcher,还有一个就是咱们这一节要讲的CacheDispatcher。

public <T> Request<T> add(Request<T> request)
    ...
    ...
    mCacheQueue.add(request);

在上面队列add的时候,如果请求设置了需要缓存,request.shouldCache(),也就是这个为true,那么会先将这个request加入到缓存的队列里面。

而处理缓存队列的,就是CacheDispatcher。和NetworkDispatcher类似,这个CacheDispatcher也是一个线程,在线程的run方法里面,执行了processRequest这个方法。这里面主要处理了缓存相关逻辑

首先先从缓存队列里面将缓存的request取出来。

final Request<?> request = mCacheQueue.take();

....省略非主要代码
 // Attempt to retrieve this item from cache.
    Cache.Entry entry = mCache.get(request.getCacheKey());
    if (entry == null) {
        request.addMarker("cache-miss");
        // Cache miss; send off to the network dispatcher.
        if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
            mNetworkQueue.put(request);
        }
        return;
    }

    // If it is completely expired, just send it to the network.
    if (entry.isExpired()) {
        request.addMarker("cache-hit-expired");
        request.setCacheEntry(entry);
        if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
            mNetworkQueue.put(request);
        }
        return;
    }

然后进行了两个判断,第一个就是先从缓存中获取这个request,如果没有缓存,则将请求直接加入到之前的网络队列,进行网络请求。如果有缓存,则再通过判断entry.isExpired是否过期,如果缓存的request已经过期,则也加入到网络请求的队列中。

接下来,如果有缓存并且缓存没有过期,则从缓存中取到之前请求过的数据,并进行解析。如下

 Response<?> response = request.parseNetworkResponse(
            new NetworkResponse(entry.data, entry.responseHeaders));

之后又通过了一层判断

 if (!entry.refreshNeeded()) {
        // Completely unexpired cache hit. Just deliver the response.
        mDelivery.postResponse(request, response);
    } else {
        // Soft-expired cache hit. We can deliver the cached response,
        // but we need to also send the request to the network for
        // refreshing.
        request.addMarker("cache-hit-refresh-needed");
        request.setCacheEntry(entry);
        // Mark the response as intermediate.
        response.intermediate = true;

        if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
            // Post the intermediate response back to the user and have
            // the delivery then forward the request along to the network.
            mDelivery.postResponse(request, response, new Runnable() {
                @Override
                public void run() {
                    try {
                        mNetworkQueue.put(request);
                    } catch (InterruptedException e) {
                        // Restore the interrupted status
                        Thread.currentThread().interrupt();
                    }
                }
            });
        } else {
            // request has been added to list of waiting requests
            // to receive the network response from the first request once it returns.
            mDelivery.postResponse(request, response);
        }
    }

也就是说,如果缓存的数据需要刷新,那么还是需要将request发送给网络队列进行请求。如果数据不需要刷新,则直接通过mDelivery将缓存的数据发送给ui线程。

这里面有两个概念,如下

  /** True if the entry is expired. */
    public boolean isExpired() {
        return this.ttl < System.currentTimeMillis();
    }

    /** True if a refresh is needed from the original data source. */
    public boolean refreshNeeded() {
        return this.softTtl < System.currentTimeMillis();
    }

ttl和softttl这两个是http协议里面通过header计算出来的两个值。详细可以查看HttpHeaderParser这个类。

上面说到,如果消息处理失败要再重试,其实是一个比较粗的回答。因为比如一个消息在处理的时候总共有5个步骤,如果前2步都成功,但是第3步失败了,那重试的时候,前2步还需要再执行吗?我的想法是,在能办到的情况下,就不要再做前2步操作了,而是直接从第3步开始重试。所以说,这种做法相当于是“哪里跌倒,哪里继续”;

为什么会考虑基于Quartz.Net

无论是定时触发,还是业务策略,以及设置触发时间,这些都很明显的具备Job特性,而Quartz.Net本身就是一个Job类库,而且其本身允许进行并发线程数量设置,如果基于它,明显我们可以不用考虑线程相关的问题,这可以省掉我们很大的工作量

重试策略

看到重试策略的时候,我首先自己想了下,如果要我自己实现重试策略,我会如何做呢,很直白的的思维就是,在网络请求失败的时候,判断是否有重试策略,然后在网络请求失败的地方,重新发起网络请求。

所以我就一直按照这个思路去寻找,可以在volley网路失败的地方,我只找到了如下的代码

private static void attemptRetryOnException(String logPrefix, Request<?> request,
        VolleyError exception) throws VolleyError {
    RetryPolicy retryPolicy = request.getRetryPolicy();
    int oldTimeout = request.getTimeoutMs();

    try {
        retryPolicy.retry(exception);
    } catch (VolleyError e) {
        request.addMarker(
                String.format("%s-timeout-giveup [timeout=%s]", logPrefix, oldTimeout));
        throw e;
    }
    request.addMarker(String.format("%s-retry [timeout=%s]", logPrefix, oldTimeout));
}

这个函数是所有网络有有异常的时候,会调用的。但是我发现这里面除了设置了重试策略的一些属性,其他没有做网络请求操作。这就奇怪了。难道网络请求会自己发起。这让我百思不得其解。

于是我又一遍一遍看了网络请求的代码,终于发现了端倪

原来在networkdispater调用Network进行网络请求的时候,network里面竟然写了一个while循环

 @Override
public NetworkResponse performRequest(Request<?> request) throws VolleyError {
    long requestStart = SystemClock.elapsedRealtime();
    while (true) {
    .....
    .....
    }
}

这样也就能解释了,这个循环会一直尝试去请求网络,直到不满足重试策略之后,退出循环。也就是说,这个重试策略没有正真参与具体了重试逻辑。而只是保存了自己的重试状态,真正的重试逻辑还是网络请求去保证。
看到这里,我突然感觉到写这样代码的人,真是思路别具一格。而且这样的好处显而易见,你可以重写重试策略,而不需要重新修改网络重试的逻辑。这也就是设计模式里面策略模式比较好的运用吧。

那么怎么重试呢?

类库相关

该类库在github上的地址为:
该类库目前为v1.0.0版本,其nuget地址为:

结论体会

通过完整的看了一遍volley源码,体会到了几个比较重要的思想

  • 一个就是单一职责,每一层都分开,负责每一层应该有的功能。互相解耦,底层不依赖上层。比如网络请求Network这个类,不依赖与上层的队列类,而具体封装底层网络请求的BaseHttpStack也不依赖上层Network。可以让network层很轻松的更换底层网络请求库。

  • 策略模式的运用,也让代码逻辑与策略分离,策略里面不依赖具体逻辑。逻辑代码里面通过改变不同策略对象,来达到控制不同策略的目的。

  • 面向接口的编程,整个volley各个模块都是通过接口互相之间调用,这样不依赖与具体实现,就将整个框架都搭建好了,感觉很受启发。

经过分析,我们发现整个enode框架中需要重试的点是非常多的,比如command产生的event要发送到队列时,如果失败那需要重试;比如event持久化时失败了,也需要重试,等等。所以,显而易见,我们应该设计一个可以被重用的重试服务,提供对某些特定的重试场景的支持。

快速使用

此处仅是简单的代码示例,后续会有详细的使用说明
首先我们需要声明Job

    public class SomeJob: IJob
    {
        public virtual Task Execute(IJobExecutionContext context)
        {
            return Task.FromResult(1);//默认LongIntervalRetries是通过Job是否产生异常来判断是否执行成功的
        }
    }

然后我们可以将这个Job注册到LongIntervalRetries,同时设置重试策略,以及注册事件监控执行结果,完整的示例如下

var retry = new StdRetry();
//声明并注册重试规则
string simpleRuleName = "SimpleRepeatRetryRule";
var simpleRepeatRule = new SimpleRepeatRetryRule(simpleRuleName, 5, TimeSpan.FromSeconds(2));
retry.RuleManager.AddRule(simpleRepeatRule);
var registerInfo = new RetryJobRegisterInfo
{
    //指定要采用的重试规则,如果不设置,则默认使用已注册的第一项
    UsedRuleName = simpleRuleName,
    //需要传递给IJob的上下文数据
    JobMap = new Dictionary<string, object>
    {
        {"SomeKey","SomeValue" }
    },
    //开始执行时间,如果不指定则表示立刻执行
    StartAt = DateTimeOffset.UtcNow.AddSeconds(3),
};
//注册要执行的Job
retry.RegisterJob<SomeJob>(registerInfo);
//注册每次Job执行后的通知事件
retry.RegisterEvent<SomeJob>(e =>
{//Some code
});
retry.Start();//启动Quartz服务
//启动服务后仍可以RegisterJob、RegisterEvent

我们先来想一下,我们希望有什么样的重试功能。以“event持久化时失败”为例,如果这一步失败,我们希望立马对这个步骤重试几次,比如3次,如果3次内成功了,那就成功了,继续往下做下面的逻辑;如果还是失败了呢?我们难道就放弃了吗?实际上,我们不能放弃,因为一般如果事件持久化失败很有可能是由于网络问题或eventstore有什么问题,而且如果我们就这样放弃了,那很可能整个业务逻辑的流程就被中断了,这样就无法做到数据的最终一致性了。所以,因为这种暂时的IO问题导致的失败,我们不能随便就放弃重试,应该在尝试几次重试仍失败时采取必要的手段,可以在IO恢复时,能自动再处理该消息;但是我们又不能使用当前线程无限制的重试下去,因为这样就导致没办法处理其他的消息了;所以我们自然就能想到:我们应该在消息重试几次仍失败时,将该消息放入一个专门的重试队列,然后有另外一个独立的线程会定时从该队列取出要重试的消息,然后重试这些消息;这样,当IO恢复时,这些消息就能很快被成功处理了;

另外一个问题,那这种专门的重试队列需要支持消息持久化吗?不用,我们只需要内存队列就行了,因为当一个消息还没被完全成功处理前,是不会从message store删除的;所以,就算机器重启了,该消息还是能在该机器重启后被处理的;而当该机器没重启时,该专门重试的内存队列会不断地以独立的线程定时重试该消息;

那这种专门的重试队列需要多少个呢?理论上我们可以为每个需要重试的点都设计一个重试队列来支持,但是这样一方面过于复杂,而且线程多了还会影响系统的性能;所以我们需要权衡一下,只对同一个阶段中要做的所有的事情设计一个重试队列,该阶段中这些要做的事情中有任何一步失败,就都放到该阶段对应的重试队列里;

还有一个问题,如果一个消息在某一次重试时成功了,但是我们希望在成功后继续对该消息做后续的步骤,该如何实现呢?这个问题初想想感觉比较麻烦,因为我们可能已经没有了该消息的一些上下文环境。最重要的是,我们如何知道该消息重试成功后接下来该做什么呢?而且就算知道接下来要做什么了,但是要是我们在做这个下一步的步骤时,要是又失败了呢?是不是也要重试呢?所以,我们发现这里很关键。

经过我的一些思考,我发现,如果一个消息在某个阶段要被处理多个步骤,且有些步骤之间有条件依赖,比如只有在第2步处理的结果是成功时,我们才有必要做后面的3步;正常情况,如果一切顺利,那就是一步步从上往下的去做;但是因为考虑到任何一步可能都会出问题,而且我们希望在任何一步失败然后重试成功后,能继续后续的步骤。所以,基于这些特征,我觉得我们可以设计一种类似回调函数的机制,当某个逻辑执行成功后,执行回调函数,我们可以在回调函数中存放接下来要做的逻辑;显然,我觉得我们需要某种递归的数据结构;为了支持上面这种类似回调函数的需求,我设计了如下的一个数据结构:

    /// <summary>一个数据结构,封装了一段要执行的逻辑以及一些相关的上下文信息
    /// </summary>
    public class ActionInfo
    {
        /// <summary>表示某个Action的名字
        /// </summary>
        public string Name { get; private set; }
        /// <summary>表示某个Action,封装了一段逻辑
        /// </summary>
        public Func<object, bool> Action { get; private set; }
        /// <summary>表示Action执行时所需要的数据信息
        /// </summary>
        public object Data { get; private set; }
        /// <summary>表示Action执行成功后,要执行的下一个Action的信息,这里体现出递归
        /// </summary>
        public ActionInfo Next { get; private set; }

        public ActionInfo(string name, Func<object, bool> action, object data, ActionInfo next)
        {
            if (action == null)
            {
                throw new ArgumentNullException("action");
            }
            Name = name;
            Action = action;
            Data = data;
            Next = next;
        }
    }

从上面的代码,我们可以清晰的看到,我们设计了一个简单的数据结构,用来包含要执行的逻辑,该逻辑执行时所需要的参数信息,以及该逻辑执行成功后要做的下一个逻辑;通过上面这个数据结构,我们已经为实现上面的重试需求做好了数据结构方面的准备;

接下来,我们需要想想,如何设计一个重试服务。经过上面的分析,我们只要,我们的重试服务需要两个主要功能:1)对某段逻辑连续重试指定次数;2)将某段逻辑放入重试队列定时重试;对于第一个功能需求,比较简单,直接设计一个递归函数即可,代码如下:

        public bool TryAction(string actionName, Func<bool> action, int maxRetryCount)
        {
            return TryRecursively(actionName, (x, y, z) => action(), 0, maxRetryCount);
        }
        private bool TryRecursively(string actionName, Func<string, int, int, bool> action, int retriedCount, int maxRetryCount)
        {
            var success = false;
            try
            {
                success = action(actionName, retriedCount, maxRetryCount);
                if (retriedCount > 0)
                {
                    _logger.InfoFormat("Retried action {0} for {1} times.", actionName, retriedCount);
                }
            }
            catch (Exception ex)
            {
                _logger.Error(string.Format("Exception raised when tring action {0}, retrid count {1}.", actionName, retriedCount), ex);
            }

            if (success)
            {
                return true;
            }
            else if (retriedCount < maxRetryCount)
            {
                return TryRecursively(actionName, action, retriedCount + 1, maxRetryCount);
            }
            else
            {
                return false;
            }
        }

调用的代码示例如下:

    if (_retryService.TryAction("TrySendEvent", () => TrySendEvent(eventStream), 3))
    {
        FinishExecution(command, queue);
    }

简单说明一下:

当我们要重试时,我们首先调用retryService的TrtAction方法,该方法就是用来支持“对某段逻辑的指定次数的连续重试”。该方法的第一个参数是一个字符串,表示要执行的逻辑的名称,这个名称没什么实际用途,只是帮助我们区分当前在执行的逻辑是哪段逻辑,该名称会在记录日志时使用,方便我们后续通过日志分析到底是哪里出错了,或者重试过了;然后第二个参数表示要重试的某个委托;当然,因为我们要知道该委托内部的逻辑是否处理成功,所以需要一个布尔类型的返回值;最后一个参数则是指定需要连续重试多少次,上面的示例代码表示:先执行指定逻辑,如果失败,则连续重试3次;所以,如果每次都失败,相当于总共会执行4次;上面的代码应该不难理解,就不多分析了;

接下来分析一下第一个需求“将某段逻辑放入重试队列定时重试”:

当连续重试还是失败后,我们就会放入内存队列,然后定时重试了。那么如何定时呢?一般用定时器即可;那定时多少呢?这个目前我也是拍脑袋的,目前设定为5秒。为什么是5秒呢?主要是两个考虑:1)为了不希望太频繁的重试,因为太频繁的重试会占用更多的系统资源,导致会影响框架中正常的消息处理性能;2)因为这种定时的重试对实时性一般不会很高,就是说,比如当IO恢复后,我们一般不会要求马上就能重试,过个几秒甚至几分钟后再重试,也能接受。实际上,如果没有这种自动定时的重试机制,我们可能只能等到机器重启后才能再次被重试了,相比之下,已经非常自动和及时了。

所依,总结一下,我们需要:1)定时器,用于定时执行;2)ActionInfo包装要重试的逻辑的相关信息;3)内存队列,用于存放ActionInfo;所以,代码如下:

public class DefaultRetryService : IRetryService
    {
        private const long DefaultPeriod = 5000;
        private BlockingCollection<ActionInfo> _retryQueue = new BlockingCollection<ActionInfo>(new ConcurrentQueue<ActionInfo>());
        private Timer _timer;
        private ILogger _logger;
        private bool _looping;

        public DefaultRetryService(ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.Create(GetType().Name);
            _timer = new Timer(Loop, null, 0, DefaultPeriod);
        }

        public void Initialize(long period)
        {
            _timer.Change(0, period);
        }
        public void RetryInQueue(ActionInfo actionInfo)
        {
            _retryQueue.Add(actionInfo);
        }

        private void Loop(object data)
        {
            try
            {
                if (!_looping)
                {
                    _looping = true;
                    RetryAction();
                    _looping = false;
                }
            }
            catch (Exception ex)
            {
                _logger.Error("Exception raised when retring action.", ex);
                _looping = false;
            }
        }
        private void RetryAction()
        {
            var actionInfo = _retryQueue.Take();
            if (actionInfo != null)
            {
                var success = false;
                try
                {
                    success = actionInfo.Action(actionInfo.Data);
                    _logger.InfoFormat("Executed action {0} from queue.", actionInfo.Name);
                }
                catch (Exception ex)
                {
                    _logger.Error(string.Format("Exception raised when executing action {0}.", actionInfo.Name), ex);
                }
                finally
                {
                    if (success)
                    {
                        if (actionInfo.Next != null)
                        {
                            _retryQueue.Add(actionInfo.Next);
                        }
                    }
                    else
                    {
                        _retryQueue.Add(actionInfo);
                    }
                }
            }
        }
    }

经过上面的分析后,相信大家看代码都应该能理解了。需要注意的点:

  1. 我用了BlockingCollection,这是一个支持并发且支持阻塞的基于publish-consumer模式的集合,而且这里,该集合内部封装了ConcurrentQueue,所以,他也是一个队列;这样设计的好处是,在队列中没有元素的时候,线程会被卡住,从而不会浪费资源;只有当队列中有元素时,才会在当天timer周期到来时,能够从队列取出要重试的ActionInfo,然后进行重试操作。
  2. Timer的周期默认设置为5秒,那么,我们为了避免同一时刻,有两个ActionInfo在被同时处理,我加了一个标记位_looping,当当前有ActionIno正在被处理时,则该标记位为true,否则为false。通过该标记位,我们能确保队列中的元素会一个个按顺序被处理,这样就不会混乱,导致莫名其妙的bug出现;
  3. 从上面的RetryAction方法中,我们可以看出,当当前的ActionInfo处理成功后,如果下一个ActionInfo存在(Next属性不等于空),则把下一个ActionInfo放入重试队列,等待被处理;通过这样的设计,我们能够以非常统一的方式重试用户希望重试的ActionInfo以及这些ActionInfo重试成功后的回调ActionInfo。另外,如果当前ActionInfo执行失败,则仍然将当前ActionInfo再放回队列,继续重试;

下面我们看一个简单的调用示例吧:

        private void CommitAggregate(AggregateRoot dirtyAggregate, ICommand command, IMessageQueue<ICommand> queue)
        {
            var eventStream = BuildEvents(dirtyAggregate, command);

            if (_retryService.TryAction("TrySendEvent", () => TrySendEvent(eventStream), 3))
            {
                FinishExecution(command, queue);
            }
            else
            {
                _retryService.RetryInQueue(
                    new ActionInfo(
                        "TrySendEvent",
                        (obj) => TrySendEvent(obj as EventStream),
                        eventStream,
                        new ActionInfo(
                            "SendEventSuccessAction",
                            (obj) =>
                            {
                                var data = obj as dynamic;
                                var currentCommand = data.Command as ICommand;
                                var currentQueue = data.Queue as IMessageQueue<ICommand>;
                                FinishExecution(currentCommand, currentQueue);
                                return true;
                            },
                            new { Command = command, Queue = queue },
                            null)));
            }
        }

说明:

上面的代码是在一个command执行完成后对于产生的事件,框架要提交该聚合根产生的事件;通过BuildEvents方法获取聚合根上产生的事件,然后我们接下来是尝试将该事件发送到一个事件队列,但是因为该事件队列在消息入队时会持久化消息,也就是会有IO操作,所以就可能失败,所以我们先尝试执行一次,如果失败则立马连续尝试重试3次,如果这4次中任意一次成功了,则做成功的逻辑,上例是调用FinishExecution方法;如果这4次都失败,则进入else的逻辑,即放入队列定时重试,但是我们希望在放入队列重试时如果某一次重试成功了也需要保证能调用FinishExecution方法,所以也定义了一个回调的ActionInfo。最后,为了尽量让每个ActionInfo所需要的参数信息语义明确,避免语言层面的闭包等复杂难理解的问题,我们尽量将ActionInfo中的Action所需要的参数信息明确的设置到ActionInfo上,而不是从外层的函数中拿,从外层的函数中拿,要是再多线程时,容易出现问题,而且也容易引起代码修改导致的难以检查出来的闭包问题;当然,这里,大家可以看到我使用了匿名对象,我是偷懒了,如果希望性能更高,则可以显示定义一个类来封装需要的参数信息;

总结:

本文通过代码加思路的方式大概介绍了enode框架中关于消息重试的设计思路。但是我没有介绍enode中到底哪些点会用到重试机制,有很多,至少五六个地方吧。但我觉得这不是重点了,重点是上面我分析的一些思路,具体需要重试的场景是偏业务性质了,涉及到enode框架中从command开始处理到最后event被发布到query端的整个过程中的每个关键的环节。我觉得通过本文的分析,可以帮助想看代码的朋友更容易理解enode中关于重试方面的代码,这样就够了;关于重试方面,还有一个点没有说,就是command的重试,关于这一点,和本文提到的重试有点不同,我准备专门写一篇文章介绍一下吧。

本文由威尼斯发布于编程,转载请注明出处:执行结果反馈,volley整个源码都是用了接口编程

关键词: