In this post I’m going to combine retryWhen operator with Reachability and delay options inspired by RxSwiftExt to implement an effective retry strategy.

The requirements are fairly straightforward. Let’s say you have an observable sequence which wraps a networking call. If the observable sequence fails with a network error I would like to:

  • automatically retry it up to N times
  • use exponential backoff or other delay options
  • retry immediately when a network connection is re-established

This sounds like a tall order for a single retryWhen operator, but it’s actually flexible enough to support all of those requirements. In this post, I’m going to create a new custom retry operator which would wrap this entire logic.

The complete implementation is available here.

Check out API Client in Swift and Introducing RxNuke for more awesome use-cases of RxSwift.

Usage

Here’s how our custom retry operator is defined:

extension ObservableType {
    /// Retries the source observable sequence on error using a provided retry
    /// strategy.
    /// - parameter maxAttemptCount: Maximum number of times to repeat the
    /// sequence. `Int.max` by default.
    /// - parameter delay: Algorithm for computing retry interval.
    /// - parameter didBecomeReachable: Trigger which is fired when network
    /// connection becomes reachable.
    /// - parameter shouldRetry: Always returns `true` by default.
    func retry(_ maxAttemptCount: Int = default,
               delay: DelayOptions,
               didBecomeReachable: Observable<Void> = default,
               shouldRetry: @escaping (Error) -> Bool = default) -> Observable<E>
}

I’ve also added a couple of extension to some of the RxSwift traits to make using retry easier.

You would use the retry operator just as any other of the built-in operators. For example, here’s how it fits into a classic search example:

let isBusy = ActivityIndicator()

let results = input
    .throttle(0.3)
    .distinctUntilChanged()
    .flatMapLatest { input in
        guard input.characters.count > 1 else { return .just([]) }
        return service(input)
            .retry(delay: .exponentialDelayed(initial: 2, multiplier: 1, maxDelay: 16))
            .trackActivity(isBusy)
            .asDriver(onErrorJustReturn: [])

Notice that trackActivity(isBusy) is called after the retry operator. If it were called before it then it would be tracking the activity of each of the individual requests.

Implementation

Let’s jump straight into the final implementation and then work our way down:

The complete implementation is available here

extension ObservableType {
    /// Retries the source observable sequence on error using a provided retry
    /// strategy.
    /// - parameter maxAttemptCount: Maximum number of times to repeat the
    /// sequence. `Int.max` by default.
    /// - parameter didBecomeReachable: Trigger which is fired when network
    /// connection becomes reachable.
    /// - parameter shouldRetry: Always returns `true` by default.
    func retry(_ maxAttemptCount: Int = Int.max,
               delay: DelayOptions,
               didBecomeReachable: Observable<Void> = Reachability.shared.didBecomeReachable,
               shouldRetry: @escaping (Error) -> Bool = { _ in true }) -> Observable<E> {
        return retryWhen { (errors: Observable<Error>) in
            return errors.flatMapWithIndex { error, attempt -> Observable<Void> in
                guard shouldRetry(error), maxAttemptCount > attempt + 1 else {
                    return .error(error)
                }

                let timer = Observable<Int>.timer(
                    RxTimeInterval(delay.make(attempt + 1)),
                    scheduler: MainScheduler.instance
                ).map { _ in () } // cast to Observable<Void>

                return Observable.merge(timer, didBecomeReachable)
            }
        }
    }
}

That’s a lot to chew on. Let’s approach it piece by piece.

retryWhen Operator

The retryWhen operator is the most important and the most complex part. It’s very powerful, but it takes some time to grasp.

Here’s how it is defined in RxSwift:

extension ObservableType {
    /**
     Repeats the source observable sequence on error when the notifier emits
     a next value. If the source observable errors and the notifier completes,
     it will complete the source sequence.

     - seealso: [retry operator on reactivex.io](http://reactivex.io/documentation/operators/retry.html)

     - parameter notificationHandler: A handler that is passed an observable
     sequence of errors raised by the source observable and returns and
     observable that either continues, completes or errors. This behavior is
     then applied to the source observable.
     - returns: An observable sequence producing the elements of the given
     sequence repeatedly until it terminates successfully or is notified to
     error or complete.
     */
    public func retryWhen<TriggerObservable: ObservableType, Error: Swift.Error>(
        _ notificationHandler: @escaping (Observable<Error>) -> TriggerObservable)
        -> Observable<E>

The documentation describes it really nicely. The basic idea is the retryWhen gives you an observable sequence of errors over which you can then for example flatMap over to handle each of the individual errors.

A couple of important points to keep in mind are:

  • You should not ignore errors observable sequence in your notificationHandler. If you do and for example just return a timer from the handler, then the timer will simply run in parallel with the source sequence! This would probably not want you intended.
  • In RxSwift 3.x if the trigger completes the source sequence also completes. This might not be what you expect. This behavior might change in RxSwift 4. I would suggest not to rely on it in your code.

Delay Options

The DelayOptions is just a simple enum which defines a number of strategies for calculating delay interval for each attempt:

enum DelayOptions {
    case immediate()
    case constant(time: Double)
    case exponential(initial: Double, multiplier: Double, maxDelay: Double)
    case custom(closure: (Int) -> Double)
}

extension DelayOptions {
    func make(_ attempt: Int) -> Double {
        switch self {
        case .immediate: return 0.0
        case .constant(let time): return time
        case .exponential(let initial, let multiplier, let maxDelay):
            // if it's first attempt, simply use initial delay, otherwise calculate delay
            let delay = attempt == 1 ? initial : initial * pow(1 + multiplier, Double(attempt - 1))
            return min(maxDelay, delay)
        case .custom(let closure): return closure(attempt)
        }
    }
}

Reachability

Reachability monitors network state. There are a couple of open source libraries that provide a convenient API on top of it. For example, I personally use NetworkReachabilityManager which is part of Alamofire since I already use this library in my projects.

final class Reachability {
    static let shared = Reachability()

    private let reachability = NetworkReachabilityManager()

    var didBecomeReachable: Observable<Void> { return _didBecomeReachable.asObservable() }
    private let _didBecomeReachable = PublishSubject<Void>()

    init() {
        if let reachability = self.reachability {
            reachability.listener = { [weak self] in
                self?.update($0)
            }
            reachability.startListening()
        }
    }

    private func update(_ status: NetworkReachabilityManager.NetworkReachabilityStatus) {
        if case .reachable = status {
            _didBecomeReachable.onNext(())
        }
    }
}

Links