lambda 서버는 request가 올 때 최대 5분까지 살아있다가 다시 다음 request가 오기 전까지 죽어 있습니다.
이런 서버환경에서 request를 날리면 서버가 다 기동하기 전에 504 에러가 리턴됩니다. 일단 504가 리터되면 다시 한 번 request를 날리면 응답을 받을 수 있어서 이런 요구사항이 나오게 되었습니다.
- 504 에러가 온 경우
- 총 3번의 재 시도를 해본다.
- 처음엔 1초, 다음에 2초, 마지막으로 4초 후에 시도한다.
- 그래도 에러가 나오면 무시해 버린다.
- 504가 아닌 에러에 대해서는 기존과 동일하게 처리한다.
이것을 Rx에서 처리하기 위해 여러 삽질 끝에 다음과 같은 코드를 얻게 되었습니다.
1 2 3 4 5 6 7 8 9 10 11 12
| .retryWhen { error -> Observable<Int> in error .flatMap { e -> Observable<Void> in guard (e as NSError).code == 504 else { return .error(e) } return .just(()) } .zip(with: Observable<RxTimeInterval>.from([1, 2, 4])) { $1 } .flatMap { Observable<Int> .timer($0, scheduler: ConcurrentDispatchQueueScheduler(qos: .background)) } }
|
테스트했던 코드와 함께 보면 이렇습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Observable<String>.create { emitter in emitter.onNext("emit 1") emitter.onNext("emit 2") emitter.onError(NSError(domain: "error", code: 504, userInfo: nil))
return Disposables.create() } .retryWhen { error -> Observable<Int> in error .flatMap { e -> Observable<Void> in guard (e as NSError).code == 504 else { return .error(e) } return .just(()) } .zip(with: Observable<RxTimeInterval>.from([1, 2, 4])) { $1 } .flatMap { Observable<Int> .timer($0, scheduler: ConcurrentDispatchQueueScheduler(qos: .background)) } } .subscribe { print($0) } .disposed(by: disposeBag)
|
이제 이것을 extension 으로 만듭니다. RxSwiftExt의 의존성도 제거하였습니다.
Gist 에서 보기
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| extension ObservableType { public func retryInterval(_ intervals: [RxTimeInterval], when f: @escaping (Error) -> Bool) -> Observable<Self.E> { return retryWhen { error -> Observable<Int> in let filteredError = error .flatMap { e -> Observable<Void> in guard f(e) else { return .error(e) } return .just(()) } let intervalsObservable = Observable<RxTimeInterval>.from(intervals) return Observable.zip(filteredError, intervalsObservable) { $1 } .flatMap { Observable<Int>.timer($0, scheduler: ConcurrentDispatchQueueScheduler(qos: .background)) } } } }
|
그리고 코드는 이렇게 간결해 졌습니다.
1 2 3 4 5 6 7 8 9 10
| Observable<String>.create { emitter in emitter.onNext("emit 1") emitter.onNext("emit 2") emitter.onError(NSError(domain: "error", code: 504, userInfo: nil))
return Disposables.create() } .retryInterval([1, 2, 4]) { ($0 as NSError).code == 504 } .subscribe { print($0) } .disposed(by: disposeBag)
|
이 코드가 만들어 지는 데 최완복님께서 많은 도움을 주셨습니다. 감사합니다.
RxJava에서도 Kotlin용으로 확장을 만들어 안드로이드에서도 적용할 수 있도록 만들었습니다.
Gist에서 보기
1 2 3 4 5 6 7 8 9 10 11
| fun <T> Observable<T>.retryInterval(intervals: List<Long>, filter: (Throwable) -> Boolean): Observable<T> { return this.retryWhen { throwable -> throwable .flatMap { e -> if (!filter(e)) return@flatMap Observable.error<Throwable>(e) return@flatMap Observable.just(e) } .zipWith(Observable.fromIterable(intervals)) { _, i -> i } .flatMap { Observable.timer(it, TimeUnit.MILLISECONDS, Schedulers.newThread()) } } }
|
역시 사용은 매우 간결해 졌습니다.
1 2 3 4 5 6 7 8 9 10
| Observable.create<String> { emitter -> emitter.onNext("emit 1") emitter.onNext("emit 2") emitter.onError(NetworkException(code = 503)) } .retryInterval(listOf(1000L, 2000L, 4000L)) { e -> (e as? NetworkException)?.code == 504 } .subscribe(System.out::println, System.out::println, { print("completed") }) .addTo(disposeBag)
|