Введение в RxSwift. Часть 2. Observables


Логотип проекта ReactiveX

В прошлой статье мы разобрались с основными понятиями в мире Rx. Сегодня мы обсудим подробнее различные способы создания наблюдаемых последовательностей (Observables) и рассмотрим особенности подписки на транслируемые события.

Освежим в памяти основные моменты. Наблюдаемая последовательность транслирует (в оригинальной терминологии используется слово «излучает» — emits) события, которые содержат в себе элементы определённого типа. Часто этот процесс визуализируется на временных диаграммах (в оригинальной терминологии «шариковые» диаграммы — marbels). Трансляция событий (onNext) происходит до тех пор, пока последовательность не завершится с успехом (onCompleted) либо с ошибкой (onError).

Создание Observable

Простейший способ создать обозреваемую последовательность — воспользоваться оператором just. Он будет представлять последовательность ровно из одного значения, которое мы передадим ему в качестве параметра.

let justObservable = Observable<Int>.just(1)

Но что если у нас появится законное желание передать больше одного значения? Для этого есть оператор of.

let ofObservable = Observable.of(1, 2, 3, 4, 5)

Если мы подпишемся на такую последовательность, то получим 5 событий. Но что будет, если мы захотим оператору of передать массив? Придет одно событие с массивом из чисел (хотя это и может показаться немного нелогичным). А вот чтобы передать массив и получить последовательность событий со значениями из него, нужно воспользоваться оператором from.

let fromObservable = Observable.from([1, 2, 3, 4, 5])

В случае с целыми числами можно даже не заморачиваться с массивами, а просто задать диапазон значений. Тут нам поможет оператор range.

let rangeObservable = Observable.range(start: 1, count: 5)

Мы рассмотрели несколько способов создания наблюдаемой последовательности для некой заранее определённой последовательности событий. Их существует ещё немало, но думаю, общий принцип понятен, поэтому не будем углубляться в эту тему. Мы помним, что Rx — это про работу с асинхронными событиями, поэтому предлагаю рассмотреть более приближенный к жизни пример: обернём в Observable вызов URLSession при помощи оператора create.

let urlObservable = Observable<String>.create { observer in
    let defaultSession = URLSession(configuration: .default)
    let url = URL(string: "https://api.github.com")!
    
    let dataTask = defaultSession.dataTask(with: url) { data, response, error in
        if let error = error {
            observer.on(.error(error))
        } else if let data = data {
            let responseString = String(decoding: data, as: UTF8.self)
            observer.on(.next(responseString))
            observer.on(.completed)
        }
    }
    
    dataTask.resume()
    
    return Disposables.create {
        dataTask.cancel()
    }
}

Метод create позволяет нам создать Observable с нуля. Мы передаем ему замыкание, которое принимает обозревателя (observer), как параметр. Передавая в метод обозревателя on() соотвествующие события, мы формируем поведение наблюдаемой последовательности. В случае ошибки, мы передадим её в событие .error, а когда получим ответ от сервера, то передадим его, как строку, в событие .next. После того как мы получили данные, нам больше ничего не требуется, поэтому мы завершаем последовательность событием .completed.

Не забываем, что для корректной работы с памятью, мы должны вернуть Disposable-объект, в данном случае он будет пустым, и создается методом Disposables.create(). Данный метод принимает в качестве параметра замыкание, которое будет вызвано при вызове метода dispose. Мы используем эту возможность, чтобы отменить запрос, в случае если, созданная последовательность будет уничтожена раньше, чем запрос завершится.

Последнее о чем важно упомянуть — это оператор deffered. Он позволяет вернуть новый Observable, каждому подписчику на событие. Обычно это используется чтобы создавать фабрики наблюдаемых последовательностей, либо чтобы выполнить какие-то сложные вычисления не при создании наблюдаемой последовательности, а при подписке на неё.

Рассмотрим следующий пример:

var i = 1
let plainObservable = Observable<Int>.just(i)
let deferredObservable = Observable.deferred{ return Observable<Int>.just(i) }
i += 1
plainObservable.subscribe(onNext: { i in print(i) }) // 1
deferredObservable.subscribe(onNext: { i in print(i) }) // 2

Подписка на plainObservable выведет в коносоль “1”, а на deferredObservable — “2”. Почему так происходит? Потому что в случае использования deferred обращение к переменной i произойдет только в момент подписки на него, т.е. тогда, когда мы уже увеличили её на единицу.

Подписываемся на события

Мы рассмотрели несколько способов создания наблюдаемой последовательности, которая сможет транслировать нам события. Теперь рассмотрим подробнее, как мы можем на неё подписаться. Сразу отметим, что наблюдаемая последовательность не будет транслировать события до тех пор, пока мы на неё не подпишемся, а закончит она это делать, когда произойдёт событие .completed или .error.

Самый простой способ — вызвать метод subscribe, который принимает в качестве параметра замыкание, в которое в свою очередь передаётся событие класса Event.

rangeObservable.subscribe { (event) in
    print(event)
}

Внутри замыкания мы cделаем просто print этого события и посмотрим, что у нас получается. В результате мы обнаружим в консоли следующую последовательность:

next(1)
next(2)
next(3)
next(4)
next(5)
completed

Мы получили 6 событий: 5 соответствующих диапазону значений, который мы превратили в последовательность, и событие .completed, которое нам говорит, что наблюдаемая последовательность успешно завершила свою работу. Сам же метод subscribe вернет нам объект типа Disposable, который мы должны будем положить в DisposableBag, что мы обсудили довольно подробно в предыдущей статье.

Конечно, нам интересны не столько сами события, сколько значения, которые они возвращают, да и неплохо было бы как-то обрабатывать возникающие ошибки и т.д. Поэтому методом subscribe, в том виде, который я привел выше, пользуются редко, а обычно используют следующий вариант:

fromObservable.subscribe(
    onNext: { (number) in
        print(number)
},
    onError: { (err) in
        print(err.localizedDescription)
},
    onCompleted: {
        print("Completed")
},
    onDisposed: {
        print("Disposed")
})

Он чуть более многословный, но зато для каждого вида события у нас есть отдельное замыкание, где мы его сможем обработать. Вывод в консоль, после выполнения этого метода будет такой:

1
2
3
4
5
Completed
Disposed

Если заглянуть в исходник, то обнаружим, что все параметры являются опциональными, так что можно, например, оставить только обработчик события onNext, если другие не нужны (как в нашем случае):

fromObservable.subscribe(
    onNext: { (number) in
        print(number)
})

Traits

Помимо самих Observable в RxSwift для удобства есть их частные производные, которые называются Traits. Принцип работы тот же: мы на них подписываемся, они нам возвращают события. Всего их три вида:

  • Single — представляет собой последовательность, которая содержит один элемент (возвращает .success или .error);
  • Completable — последовательность, которая содержит 0 элементов (возвращает .completed или .error);
  • Maybe — последовательность, которая может содержать 0 или 1 элемент (может возвращать .success, .completed или error).

Внимательному читателю могла прийти в голову идея, переписать пример с URLSession на Single, ведь в данном случае этот тип гораздо лучше подходит для подобных целей.

let urlSingle = Single<String>.create { single in
    let defaultSession = URLSession(configuration: .default)
    let url = URL(string: "https://api.github.com")!
    
    let dataTask = defaultSession.dataTask(with: url) { data, response, error in
        if let error = error {
            single(.error(error))
        } else if let data = data {
            let responseString = String(decoding: data, as: UTF8.self)
            single(.success(responseString))
        }
    }
    
    dataTask.resume()
    
    return Disposables.create {
        dataTask.cancel()
    }
}
urlSingle.subscribe(
    onSuccess: { (response) in
        print("SUCCESS: \(response)")
},
    onError: { (error) in
        print("ERROR: \(error.localizedDescription)")
})

Как можно заметить, код не сильно изменился, по сути у нас только пропала строка передававшая обозревателю событие .completed (так как теперь за него отвечает событие .success).

На этом мы закончим знакомство с обозреваемыми последовательностями. За кадром остаётся ещё немало особенностей, но сегодня мы получили в своё распоряжение довольно мощный набор инструментов, который уже можно начать применять на практике.

Исходный код примеров