android

[안드로이드] ReactiveX 개념 소개

sieunju 2022. 7. 3. 22:33
반응형

안녕하세요. 안드로이드에서 주로 사용하는 비동기 라이브러리중 하나인 Rx 에 대해서 설명 해보도록 하겠습니다.

안드로이드에서 어떤 서비스나 프로젝트를 만들때 네트워크 통신을 합니다. 그러면 자연스레 따라오는 비동기 네트워크 통신을 해야 합니다. 이유는 아래와 같습니다.

네트워크 통신은 메인 쓰레드에서 사용하지 말라는 Exception 리턴합니다.

어쩔수 없이 WorkThread 로 접근하여 네트워크 통신을 해야 합니다.

현재 Java 비동기를 자체적으로 지원해주는건 Callable, Runnable, ExecutorService .. 등이 있는데 이것들은 사실상 네트워크를 통해서 데이터를 가져오고 그걸 UiModel 에 맞게 가공하고 처리하는 과정을 간단하게 처리하는게 어렵습니다.

간단하게 Callable 과 Rx 을 사용 했을때 코드 스타일이 어떻게 다르고 어떤게 좋아 보이는지 간단한 예시로 보여드리겠습니다.

Callable Rx

Mapper 함수

 

어떤 타입이 더 나아보이시나요??

해당 예시는 정말 간단한 예시입니다.. 또한, 한가지 중요한 사실은 Callable 방식으로 사용하게 되면 "Mapper" 에 대한 처리는 반 강제적으로 UIThread 로 처리하게 되고, 이를 Worker Thread 로 하기가 좀 까다롭습니다. 이런 여러 이유 때문에 안드로이드에서는 아주 좋은 라이브러리들인 Rx, Coroutines 를 사용합니다.

그래서 이번시간에 흔히들 사용하는 Rx 에 대해서 설명해보도록 하겠습니다.

ReactiveX 란,

비동기 라이브러리중 하나로, 안드로이드에서만 존재하는게 아닌 RxJava, RxSwift 등등..여러 언어들을 지원하는 비동기 라이브러리 입니다.

Rx에는 크게 Single, Maybe, Flowable, Observable(이건 Rx2 부터 잘 안쓰고 Flowable 를 사용합니다.)

추가적으로 이 글에서 Observable 에 대한 설명은 안하도록 하겠습니다.  (굳이..할필요가..)

설명하면서 Rx의 버전은 v3.1.5 입니다.
Rx2 는 이미 업데이트 끝난지가 오래되서 굳이 거기에 맞춰서 설명할 필요는 없습니다.

 

Index

1. Single, Maybe, Flowable 간단 소개

2. 차가운 Observable, 뜨거운 Observable 간단 소개

3. 데이터 가공이나 유틸적인 부분을 처리할수 있는 함수들 몇가지만 소개

 

1. Single 

성공 또는 실패만 리턴하는 확실한 Class 주로 Network 통신할때 사용합니다. 그이유는 네트워크 통신은 성공 혹은 실패 둘중하나기 때문에 Single 과 아주 적합한 타입이기 때문에 사용됩니다.

간단한 예제로 설명 해드리면

- Just Type.

Single.just(System.currentTimeMillis())
            .subscribe(object : SingleObserver<Long> {
                override fun onSubscribe(d: Disposable) {
                    d.addTo(compositeDisposable)
                }

                override fun onSuccess(t: Long) {
                    Timber.d("onSuccess $t")
                }

                override fun onError(e: Throwable) {
                    Timber.d("onError $e")
                }
            })

보통은 subscribe (구독) 함수 호출할때 저렇게 안하고, Consumer 를 사용하는 함수 타입을 사용하는데 이해를 위해 SingleObserver Class 사용해서 처리했습니다. 

- Create Type

Single.create<Long> { emitter ->
            if (Random.nextBoolean()) {
                emitter.onSuccess(System.currentTimeMillis())
            } else {
                emitter.onError(RuntimeException("Sample Error"))
            }
        }.subscribe(object : Consumer<Long> {
            override fun accept(t: Long) {
                Timber.d("SUCC $t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable) {
                Timber.d("ERROR")
            }
        }).addTo(compositeDisposable)

이번에는 Consumer 함수를 사용해서 표현해봤습니다.

위 방법과 같이 아주 간단하게 처리를 할수 있습니다. 하지만, 서비스에서는 절대 저렇게 간단하게 표현하는 곳은 아마..없을겁니다..

Maybe, Flowable 에 대해 간단한 설명이후 실제로 "데이터 스트림" 을 어떻게 가공하고, 병렬로 처리하는지 설명 하도록 하겠습니다. :)

 

2. Maybe

성공, 실패, 완료 이렇게 3가지 타입을 주는 애매한 Class 입니다.

- Just Type

Maybe.just(System.currentTimeMillis())
            .subscribe(object : Consumer<Long> {
                override fun accept(t: Long) {
                    Timber.d("SUCC $t")
                }
            }, object : Consumer<Throwable> {
                override fun accept(t: Throwable) {
                    Timber.d("ERROR")
                }
            }, object : Action {
                override fun run() {
                    Timber.d("onCompleted")
                }
            }).addTo(compositeDisposable)

- Create Type

        Maybe.create<Long> { emitter ->
            val ran = Random.nextInt(0 until 10)
            if (ran < 3) {
                emitter.onSuccess(System.currentTimeMillis())
            } else if (ran in 3..5) {
                emitter.onError(RuntimeException("Sample Error"))
            } else {
                emitter.onComplete()
            }
        }.subscribe(object : Consumer<Long> {
            override fun accept(t: Long) {
                Timber.d("SUCC $t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable) {
                Timber.d("ERROR")
            }
        }, object : Action {
            override fun run() {
                Timber.d("onCompleted")
            }
        }).addTo(compositeDisposable)

 

위에서 말했다 싶이 "Success", "Error", "Completed" 이렇게 3가지 타입으로 리턴 할수 있습니다. 그러다보니 성공을 스킵하고 바로 완료로 넘겨버리는 아주 애매한 클래스라고 볼수 있겠습니다.

3. Flowable 

Observable Class 에서 "배압이슈" 로 인한 대응책으로 나온 클래스라고 보시면 되겠습니다. Single 의 상위버전? 이라고 생각하시면 되겠습니다. Single 보다 좀더 기능적으로 많은 Class 라고 생각 하시면 되겠습니다. 

배압 이슈가 머죠? 

쉽게 말해 몇초단위로 어떤 데이터를 계속 처리하는 구조에서 "subscribe" 안에 처리가 데이터를 가져오는 속도보다 느린경우 "배압" 이 생기게 됩니다. 그러다 보면 기존에 Observable 같은 경우는 정상적인 처리가 안됐습니다. 하지만, Rx 에서는 이를 처리하기 위해 몇가지 솔루션을 냈는데 그래서 나온게 Flowable 입니다. 

간단한 예제로 배압 이슈를 설명해드리겠습니다.

Flowable.interval(100,TimeUnit.MILLISECONDS)
            .onBackpressureBuffer()
            .subscribe({
               // 여기서의 처리가 500ms 걸리는 경우 배압 이슈가 생깁니다.
            },{
                
            }).addTo(compositeDisposable)

100ms 단위로 데이터를 방출합니다. subscribe 로 콜백 때립니다. 하지만 subscribe 에서의 처리가 500ms  걸리는 로직이라면 점점 데이터를 방출해야 하는데 쌓이겠죠?? 이걸 보고 배압 이슈라고 합니다. 

여기서는 설명하지는 않을건데 이 배압 이슈를 완벽히 Flowable 가 처리한건 아닙니다.  그냥 좀더 배압 이슈를 막을 솔루션을 내놨다 정도?

- Just Type

Flowable.just(System.currentTimeMillis())
            .subscribe(object : Consumer<Long> {
                override fun accept(t: Long) {
                    Timber.d("SUCC $t")
                }
            }, object : Consumer<Throwable> {
                override fun accept(t: Throwable) {
                    Timber.d("ERROR")
                }
            }, object : Action {
                override fun run() {
                    Timber.d("onCompleted")
                }
            }).addTo(compositeDisposable)

- Create Type

Flowable.create<Long>(object : FlowableOnSubscribe<Long> {
            override fun subscribe(emitter: FlowableEmitter<Long>) {
                val ran = Random.nextInt(0 until 10)
                if (ran < 3) {
                    emitter.onNext(System.currentTimeMillis())
                } else if (ran in 3..5) {
                    emitter.onError(RuntimeException("Sample Error"))
                } else {
                    emitter.onComplete()
                }
            }
        }, BackpressureStrategy.BUFFER)
            .subscribe(object : Consumer<Long> {
                override fun accept(t: Long) {
                    Timber.d("SUCC $t")
                }
            }, object : Consumer<Throwable> {
                override fun accept(t: Throwable) {
                    Timber.d("ERROR")
                }
            }, object : Action {
                override fun run() {
                    Timber.d("onCompleted")
                }
            }).addTo(compositeDisposable)

당연하겠지만, Single이나 Maybe, Flowable 함수 선언되는 방식은 비슷 비슷 합니다. ㅎㅎ Single 에서는 이게 되고 안되고.. 차이정도?

 

차가운 Observable? 뜨거운 Observable?

간단히 말해서 차가운 Observable 이라함은 subscribe 함수를 호출해야지만 데이터를 받을수 있다.

뜨거운은 subscribe 함수를 호출안하고 "onNext" 로 데이터를 방출하면 그냥 받아야 하는 곳에서 떄에 맞게 받을수 있고, Flowable 로 처리합니다. 

감이 안오신다고요? 몇가지 예시로 설명해보겠습니다. :)

- 차가운 Observable

apiService.fetchSingle()
            .map { toPayload(it) }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                // API 요청하고 toPayload 로 데이터 가공할때까지 Cache Thread
                // Current UI Thread
                tvTitle.text = it.toString()
            }, {
                Timber.d("ERROR $it")
            }).addTo(compositeDisposable)

예..아까 위에 적었던 기본 예시중하나 맞습니다. 저게 차가운 Observable 입니다.

아..그래도 잘 모르시겠다고요? 한가지더 예시를 설명하겠습니다.

Flowable.interval(1000,TimeUnit.MILLISECONDS)
            .onBackpressureBuffer()
            .subscribe({
                // Do Working
            },{
                
            }).addTo(compositeDisposable)

그냥 단순하게 subscribe 를 해야지만 데이터를 받을수 있는것을 차가운 Observable 이라고 합니다. 

- 뜨거운 Observable

subscribe (구독) 을 받고 있는 곳과는 상관 없이 데이터 발행을 할수 있는 것 주로 "뒤로가기", "글로벌하게 회원 상태 변경시에 대한 UI 처리" 이럴때 많이 사용합니다.

크게 "subscribe" (구독) 이전 값을 받고 싶으면 Behavior, "subscribe" (구독) 이후 값을 받고 싶으면 Publish

몇가지 간단한 예제로 설명해드리겠습니다.

private val behavior = BehaviorProcessor.create<Long>()

// 데이터 받는 곳--1
behavior.subscribe({
    Timber.d("One Sub $it")
},{

})

// 데이터 방출!!
behavior.onNext(System.currentTimeMillis())

// 데이터 받는 곳--2
behavior.subscribe({
	Timber.d("Two Sub $it")
},{

})

구독이전 값을 받을수 있는 Behavior 를 사용했습니다. 저렇게 구성 했을때 One Sub 와 Two Sub 둘다 데이터를 받을수 있습니다. 

여러군데에 선언되어있어도 해당 클래스를 "onNext" 를 호출하면 subscribe 가 된곳 어디든 받을수 있습니다. like RxBusEvent..

private val publisher = PublishProcessor.create<Long>()

// 데이터 받는 곳--1
publisher.subscribe({
    Timber.d("One Sub $it")
},{

})

// 데이터 방출!!
publisher.onNext(System.currentTimeMillis())

// 데이터 받는 곳--2
publisher.subscribe({
	Timber.d("Two Sub $it")
},{

})

구독이후 값을 받을수 있는 Publish 를 사용했습니다. 저렇게 구성하면 One Sub 만 받고 Two 는 데이터를 받을수 없습니다. 왜냐면 데이터를 방출하는 시점이 Two Sub 보다 위에 있기 때문입니다. 

 

각 클래스별 간단히 설명을 해봤습니다.  그렇다면 이제 어떤 상황에서 어떤걸 사용하는게 가장 효율적일지 제 아주 주관적인 생각으로 설명해보겠습니다.

1.  로그인 성공이후 좋아요한 상품들을 보고싶어!

이 경우에서 중요한점은 로그인 성공 이후입니다. 그러니 "순차적" 으로 처리해야 합니다.

- 않좋은예

private fun exampleOne(){
        apiService.postLogin()
            .subscribe({
                // 로그인 성궁 후 좋아요한 상품 조회
                if(it.status) {
                    fetchUserLike()
                }
            },{
                
            }).addTo(compositeDisposable)
    }
    
private fun fetchUserLike(){
	apiService.fetchUserLike().subscribe({},{}).addTo(compositeDisposable)
}

 사실 이렇게 해도 되긴 되죠..상관은 없습니다만, Rx 에서 제공하는 함수들을 적극적으로 활용하지 못한 예입니다.

- 좋은 예

apiService.postLogin()
            .flatMap {
                if (it.status) {
                    apiService.fetchUserLike()
                } else {
                    throw NullPointerException("Login is Fail")
                }
            }.subscribe({ list ->
                // 좋아요한 상품들..
            }, {

            }).addTo(compositeDisposable)

Rx 에서 제공하는 flatMap 을 중간에 사용하면 되겠습니다. 그렇게 되면 로그인 성공했을때 "좋아요한 상품" 을 호출하면 그 이후에 대한 처리는 subscribe 에서 처리할수 있겠습니다.

 

2.  메인 탭에 탑배너, 하단배너, 날씨추천 상품 등등..을 병렬로 처리하고 싶어!

- 않좋은 예

    private fun start(){
        탑배너()
        하단배너()
        날씨추천상품()
    }

    private fun 탑배너(){
        Single.just("탑배너 API 호출해서 데이터에 추가합니다.")
            .subscribe({

            },{

            }).addTo(compositeDisposable)
    }

    private fun 하단배너(){
        Single.just("하단 배너 API 호출해서 데이터에 추가합니다.")
            .subscribe({

            },{

            }).addTo(compositeDisposable)
    }

    private fun 날씨추천상품(){
        Single.just("날씨추천상품 API 호출하여 데이터에 추가합니다.")
            .subscribe({

            },{

            }).addTo(compositeDisposable)
    }

이렇게 해도 상관은 없습니다만, 리스트 화면에 각 인덱스에 맞게 처리하기에는 한곳에서 처리 안하니까 버거워보이기도 하죠??

"merge" 를 이용한 방법

- 좋은예

private fun start() {
        Single.merge(탑배너(), 하단배너(), 날씨추천상품())
            .observeOn(AndroidSchedulers.mainThread())
            .buffer(3)
            .subscribe({ list ->
                // buffer 함수를 통해 호출한 데이터들을 한꺼번에 List 형식으로도 처리할수 있습니다.
            }, {

            }).addTo(compositeDisposable)
    }

    private fun 탑배너(): Single<String> {
        return Single.just("탑배너 API 호출해서 데이터에 추가합니다.").subscribeOn(Schedulers.io())
    }

    private fun 하단배너(): Single<String> {
        return Single.just("하단 배너 API 호출해서 데이터에 추가합니다.").subscribeOn(Schedulers.io())
    }

    private fun 날씨추천상품(): Single<String> {
        return Single.just("날씨추천상품 API 호출하여 데이터에 추가합니다.").subscribeOn(Schedulers.io())
    }

유의사항이라면 merger 를 사용할떄 스트림을 좀더 병렬적으로 처리하기위해 각 Single 마다 작업할 곳에 스케줄러를 따로 설정해줘야 합니다. 

이와 비슷한 mergeDelayError 도 있지만, 그거는 쉽게 말해서 "merge" 에서 병렬로 수행하다가 에러가 발생시 못받은 데이터들은 Dispose 되는 이슈가 있습니다. 그거에 대한 솔루션이라고 볼수 있겠습니다.

하지만, 제 경험상 스트림이 단순한 구조인경우 저게 제대로 되지만, 복잡한 스트림 구조들이라면 차리리 "onErrorReturn" 을 사용하는게 정신건강에 좋습니다. :)

 

아주 간단하게 flatMap, merge 에 대해서 설명해봤습니다. Rx 에서는 이것 말고도 진~짜 많은 기능?들을 지원합니다. 여러개의 "job" 들을 조합해서 새로운 데이터를 방출하는 zip, 특정 데이터들을 바꿔치기 하는 Flowable.switchMap.. 순서를 보장하는 concatMap 등등 진짜 많은 기능들을 지원하기 때문에 적제적소에 맞게 처리할수 있겠습니다. 

해당 포스팅에 관련된 자료는 제 깃허브에 올려놨습니다. 최대한 간단하게 예제들을 만들었으니 참고하시면 되겠습니다. :)

https://github.com/sieunju/rxSample

 

GitHub - sieunju/rxSample: RxJava 관련 설명 입니다다다

RxJava 관련 설명 입니다다다. Contribute to sieunju/rxSample development by creating an account on GitHub.

github.com

 

마지막으로 좀더 궁금한 사항이 있으면 댓글 남겨두시면 빠른시간내에 답변 달아주도록 하겠습니다.

반응형