Rx.retryInterval

lambda 서버는 request가 올 때 최대 5분까지 살아있다가 다시 다음 request가 오기 전까지 죽어 있습니다.

이런 서버환경에서 request를 날리면 서버가 다 기동하기 전에 504 에러가 리턴됩니다. 일단 504가 리터되면 다시 한 번 request를 날리면 응답을 받을 수 있어서 이런 요구사항이 나오게 되었습니다.

  • 504 에러가 온 경우
  • 총 3번의 재 시도를 해본다.
  • 처음엔 1초, 다음에 2초, 마지막으로 4초 후에 시도한다.
  • 그래도 에러가 나오면 무시해 버린다.
  • 504가 아닌 에러에 대해서는 기존과 동일하게 처리한다.

이것을 Rx에서 처리하기 위해 여러 삽질 끝에 다음과 같은 코드를 얻게 되었습니다.

1
2
3
4
5
6
7
8
9
10
11
12
.retryWhen { error -> Observable<Int> in
error
.flatMap { e -> Observable<Void> in
guard (e as NSError).code == 504 else { return .error(e) }
return .just(())
}
.zip(with: Observable<RxTimeInterval>.from([1, 2, 4])) { $1 }
.flatMap {
Observable<Int>
.timer($0, scheduler: ConcurrentDispatchQueueScheduler(qos: .background))
}
}

테스트했던 코드와 함께 보면 이렇습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable<String>.create { emitter in
emitter.onNext("emit 1")
emitter.onNext("emit 2")
emitter.onError(NSError(domain: "error", code: 504, userInfo: nil))

return Disposables.create()
}
.retryWhen { error -> Observable<Int> in
error
.flatMap { e -> Observable<Void> in
guard (e as NSError).code == 504 else { return .error(e) }
return .just(())
}
.zip(with: Observable<RxTimeInterval>.from([1, 2, 4])) { $1 }
.flatMap {
Observable<Int>
.timer($0, scheduler: ConcurrentDispatchQueueScheduler(qos: .background))
}
}
.subscribe { print($0) }
.disposed(by: disposeBag)

이제 이것을 extension 으로 만듭니다. RxSwiftExt의 의존성도 제거하였습니다.

Gist 에서 보기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
extension ObservableType {
public func retryInterval(_ intervals: [RxTimeInterval], when f: @escaping (Error) -> Bool) -> Observable<Self.E> {
return retryWhen { error -> Observable<Int> in
let filteredError = error
.flatMap { e -> Observable<Void> in
guard f(e) else { return .error(e) }
return .just(())
}
let intervalsObservable = Observable<RxTimeInterval>.from(intervals)
return Observable.zip(filteredError, intervalsObservable) { $1 }
.flatMap { Observable<Int>.timer($0, scheduler: ConcurrentDispatchQueueScheduler(qos: .background)) }
}
}
}

그리고 코드는 이렇게 간결해 졌습니다.

1
2
3
4
5
6
7
8
9
10
Observable<String>.create { emitter in
emitter.onNext("emit 1")
emitter.onNext("emit 2")
emitter.onError(NSError(domain: "error", code: 504, userInfo: nil))

return Disposables.create()
}
.retryInterval([1, 2, 4]) { ($0 as NSError).code == 504 }
.subscribe { print($0) }
.disposed(by: disposeBag)

이 코드가 만들어 지는 데 최완복님께서 많은 도움을 주셨습니다. 감사합니다.




RxJava에서도 Kotlin용으로 확장을 만들어 안드로이드에서도 적용할 수 있도록 만들었습니다.

Gist에서 보기

1
2
3
4
5
6
7
8
9
10
11
fun <T> Observable<T>.retryInterval(intervals: List<Long>, filter: (Throwable) -> Boolean): Observable<T> {
return this.retryWhen { throwable ->
throwable
.flatMap { e ->
if (!filter(e)) return@flatMap Observable.error<Throwable>(e)
return@flatMap Observable.just(e)
}
.zipWith(Observable.fromIterable(intervals)) { _, i -> i }
.flatMap { Observable.timer(it, TimeUnit.MILLISECONDS, Schedulers.newThread()) }
}
}

역시 사용은 매우 간결해 졌습니다.

1
2
3
4
5
6
7
8
9
10
Observable.create<String> { emitter ->
emitter.onNext("emit 1")
emitter.onNext("emit 2")
emitter.onError(NetworkException(code = 503))
}
.retryInterval(listOf(1000L, 2000L, 4000L)) { e ->
(e as? NetworkException)?.code == 504
}
.subscribe(System.out::println, System.out::println, { print("completed") })
.addTo(disposeBag)