Combine: Yet One Another Quick Start Guide

Combine: Yet One Another Quick Start Guide

All you need to know to start using Combine

Apple announced the Combine framework at the WWDC 2019, which allows the processing of values over time. It is the native Swift implementation of the functional reactive programming paradigm (FRP). Despite its usefulness, many developers still don't use it in their projects. I didn't use it either before I joined my current project where Combine is actively used for a lot of things. So I had to jump in Combine quickly. This article is meant to help others understand the basic concepts of Combine and start using it, even if they have never worked with FRP frameworks before.

FRP is a combination of functional and reactive programming that manages time-varying values, such as user inputs and server responses, in a clear and straightforward way. In FRP, values and events are represented as streams of values that change over time, and reactive functions are used to show how changes in input streams affect output streams. The aim of FRP is to provide a straightforward method for describing the behavior of a system over time.

Observer pattern

A good way to start comprehending Combine is by familiarizing oneself with the Observer design pattern.

The Observer pattern allows an object, called the Subject (aka Observable aka Publisher), to notify a set of dependent objects, called Observers (aka Subscribers), about changes in its state.

An example should help clarify the concept:

protocol Observer: AnyObject {
    func update(value: Int)
}

protocol Subject {
    func addObserver(_ observer: Observer)
    func removeObserver(_ observer: Observer)

    func notifyObservers()
}

class ConcreteObserver: Observer {

    let id: String // printed in console to identify observers 

    init(id: String) {
        self.id = id
    }

    func update(value: Int) {
        print("\(id): Value changed to \(value)")
    }
}

class ConcreteSubject: Subject {

    private var observers: [Observer] = []

    private var value: Int = 0 {
        didSet {
            notifyObservers()
        }
    }

    func addObserver(_ observer: Observer) {
        observers.append(observer)
    }

    func removeObserver(_ observer: Observer) {
        if let index = observers.firstIndex(where: { $0 === observer }) {
            observers.remove(at: index)
        }
    }

    func notifyObservers() {
        observers.forEach { $0.update(value: value) }
    }

    func setValue(value: Int) {
        self.value = value
    }
}

let subject = ConcreteSubject()
// Subject will add observers to notify them 
// later about it's value change

let firstObserver = ConcreteObserver(id: "First")
let secondObserver = ConcreteObserver(id: "Second")

subject.addObserver(firstObserver)

subject.setValue(value: 1)
// First: Value changed to 1

subject.addObserver(secondObserver)

subject.setValue(value: 2)
// First: Value changed to 2
// Second: Value changed to 2

// remove observer to stop notifing this observer about value changes
subject.removeObserver(firstObserver)

subject.setValue(value: 3)
// Second: Value changed to 3

The Observer pattern establishes a one-to-many relationship between the subject and its observers, such that any changes in the subject are automatically notified to all its observers.

The Observer provides a way to achieve loose coupling between objects in a system, allowing objects to interact without having direct references to each other. This makes it easier to add or remove observers, as well as to modify the subject or observer objects independently, without affecting the rest of the system.

The Observer allows you to follow the Open/Closed Principle. You can introduce new subscriber classes without having to change the publisher’s code and vice versa.

TIP: You can consider Combine as the advanced Observer (or Observer on steroids).

Let's take a look at a simple example of the Observer pattern implemented using Combine:

let subject = CurrentValueSubject<String, Never>("Hello World!")

subject.sink { // addObserver and get current value via the closure
    print("Value:", $0)
} 
// Value: Hello World!

subject.value = "Brand New World!"
// Value: Brand New World!

As you can see it is very similar and easy to use, maybe even easier than our Observer implementation above. Combine allows you not only to subscribe for data updates but also to use different operators to manipulate the stream of updates. In this example we will map the value from the subject to the new one:

let subject = CurrentValueSubject<String, Never>("Hello World!")

subject
    .map {
        $0 + " Here I am!"
    }
    .sink {
        print("Value:", $0)
    }
// Value: Hello World! Here I am!

Core concepts

There are three core Combine concepts: Publisher, Subscriber and Operator. You already know about publishers and subscribers from the Observer pattern example above but let’s look at them more deeply from the Combine perspective.

Publisher

protocol Publisher {
    associatedtype Output
    associatedtype Failure : Error

    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

The Publisher is a protocol describing an object which can transmit values over time. It is generic over two types:

  • Publisher.Output is the type of the output values of the publisher. If it’s an Int publisher, it can never emit a String or a Data value.

  • Publisher.Failure is the type of error the publisher can throw if it fails. If the publisher can never fail, you specify that by using a Never failure type.

As you can see, every publisher must be able to receive new subscribers so you need to implement the receive method, in which you are given access to the Subscriber when it subscribes to the publisher.

Subscriber

protocol Subscriber {
    associatedtype Input
    associatedtype Failure: Error

    func receive(subscription: Subscription)
    func receive(_ input: Self.Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

The subscriber’s Subscriber.Input and Subscriber.Failure associated types must match the Publisher.Output and Publisher.Failure types declared by the publisher.

Subscribers are usually created implicitly using 2 methods of Publisher protocol:

  • sink (closure);

  • assign (bind key path).

publisher
    .sink(
      receiveCompletion: {
        print("Received completion", $0)
      },
      receiveValue: {
        print("Received value", $0)
      }
    ) // closure based

publisher
     .assign(to: \.value, on: object) // assign to objects KeyPath

publisher
     .assign(to: &object.$value) // assign to publisher

Apple provides the classes Sink and Assign which implement the Subscriber protocol, but you'll likely never need to create them manually. These objects are created under the hood in the sink and assign methods and they are returned as type-erased AnyCancellable objects, allowing you to cancel the subscription without knowing the details about the subscription.

class Sink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
    // Subscriber
    func receive(subscription: Subscription)
    func receive(_ value: Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Failure>)

    // Cancellable
    func cancel()
}

You can read more about type erasures in Swift here if you want.

class AnyCancellable: Cancellable, Hashable { // type erasure
    func cancel()
}

Memory Management

With the help of the Cancellable protocol, you don’t need to specifically memory manage a subscription. You saw that your subscription code returns a Cancellable object. Whenever you release that object from memory, it cancels the whole subscription and releases its resources from memory. This means you can easily “bind” the lifespan of a subscription by storing it in a property on your view controller, for example. This way, any time the user dismisses the view controller from the view stack, that will deinitialize its properties and will also cancel your subscription.

let cancellable1 = publisher
    .sink {
        print("Received value", $0)
    }

let cancellable2 = publisher // assign to objects KeyPath
     .assign(to: \.value, on: object)

cancellable1.cancel() // cancel the subscriprion
// cancellable2 will be cancelled when local variable cancellable2 stops to exist

To automate this process, you can just have a Set<AnyCancellable> property and throw as many subscriptions inside it as you want. They’ll all be automatically canceled and released when the property is released from memory. (And this is why AnyCancellable conforms to Hashable - to store it in a Set).

var cancellables = Set<AnyCancellable>()

publisher
    .sink {
        print("Received value", $0)
    }
    .store(in: &cancellables)

publisher // assign to objects KeyPath
    .assign(to: \.value, on: object)
    .store(in: &cancellables)

NOTE: The assign(to:) (second one) method doesn’t return an AnyCancellable token, because it manages the lifecycle internally and cancels the subscription when the @Published property deinitializes (we will discuss @Published later in this post).

Subject

Subject is an extended publisher which knows how to send values into a stream, by calling its send(_:) method. Subject also has methods to send completion and subscription to the subscribers. Subject can be useful for adapting existing imperative code to the Combine model.

protocol Subject<Output, Failure>: AnyObject, Publisher {
    func send(_ value: Self.Output)
    func send(completion: Subscribers.Completion<Self.Failure>)
    func send(subscription: Subscription)
}

In your daily work instead of implementing the Publisher or Subject protocols yourself, you can create a publisher by using one of several types provided by the Combine framework:

  1. Use a concrete subclass of Subject, such as PassthroughSubject, to publish values on-demand by calling its Subject.send(_:) method.

  2. Use a CurrentValueSubject to publish whenever you update the subject’s underlying value.

  3. Add the @Published property wrapper to a property. In doing so, the property gains a publisher that emits an event whenever the property’s value changes.

  4. Use one of Apple’s Convenience Publishers like Empty, Just, Deferred or Future.

We will take a look at them in practice a little bit later.

Subscription

protocol Subscription: Cancellable {
    // Tells a publisher that it may send more values to the subscriber.
    func request(_ demand: Subscribers.Demand)
}

Subscription is the protocol representing the connection of a subscriber to a publisher.

Subscription.request method is needed to provide demand for new values. After publisher calls Subscriber.receive(subscription:) to give the subscription to the subscriber, the subscriber can use this subscription to tell a publisher that it may send more values.

Subscription is cancellable. The subscription is canceled after you call cancel() of the subscriber. Canceling a subscription frees up any resources previously allocated.

TIP: You can only cancel a Subscription once.

Lifecycle of subscription

  1. The subscriber subscribes to the publisher.

  2. The publisher creates a subscription and gives it to the subscriber.

  3. The subscriber requests values via subscription.

  4. The publisher sends values.

  5. The publisher sends a completion.

TIP: Once a publisher emits a completion event (whether it’s a normal completion or an error), it’s finished and can no longer emit any more events.

Publishers in practice

Let’s take a look at publishers provided by Apple out of the box.

PassthroughSubject

PassthroughSubject allows you to publish new values on demand (when the subject asked to send them). Use send method to send the new values to the subscribers.

let subject = PassthroughSubject<Int, Never>()

let subscription = subject.sink {
    print("Received value:", $0)
}

subject.send(1) // Received value: 1
subject.send(2) // Received value: 2

CurrentValueSubject

CurrentValueSubject stores the current value. It broadcasts the current value to subscribers whenever they subscribe. Calling send(:) on a current value subject is one way to send a new value. Another way is to assign a new value to its value property. You still need to send completion events by using send(completion:).

let subject = CurrentValueSubject<Int, Never>(0)

let subscription1 = subject.sink {
    print("Subscriber 1 received value:", $0)
} 
// Subscriber 1 received value: 0

subject.send(1) 
// Subscriber 1 received value: 1

let subscription2 = subject.sink {
    print("Subscriber 2 received value:", $0)
} 
// Subscriber 2 received value: 1

subject.value = 2 // Same if call send(2) 
// Subscriber 1 received value: 2
// Subscriber 2 received value: 2

@Published

@Published is a property wrapper that allows you to easily turn a property into a publisher.

class MyClass {
    @Published var myValue: Int = 0
}

let myInstance = MyClass()

myInstance.$myValue.sink {
    print("new value:", $0)
}
// new value: 0

myInstance.myValue = 1
// new value: 1

Basically, @Published is very similar to CurrentValueSubject. It can be more simple in use sometimes and is intended to be used in SwiftUI. However, there are some differences:

  • @Published fires on willSet of its wrapped property, while CurrentValueSubject fires on didSet;

  • @Published is not supported in a protocol declaration;

  • @Published is class-constrained, you can only use it in classes.

Hot and cold signals

In reactive programming, hot and cold signals are two types of signals that describe the behavior of data streams.

A hot signal is a signal that starts emitting events as soon as it is created, regardless of whether there are any subscribers. For example, a hot signal could be a continuous stream of data from a sensor, such as the accelerometer of a device, or a live feed from a stock market.

In contrast, a cold signal is a signal that only starts emitting events when there is a subscriber. When the subscriber cancels its subscription, the signal stops emitting events. For example, a cold signal could be a file download, where the download only starts when a subscriber subscribes to the signal and stops when the subscriber cancels its subscription.

The distinction between hot and cold signals is important because it affects the behavior of subscribers, and the way events are emitted and processed. It can be useful to know about a hot and cold signal when you try to understand or explain how a specific data stream works.

TIP: CurrentValueSubject and PassthroughSubject are hot signals.

Just

Just is a publisher that emits an output to each subscriber just once after the subscription and then it completes. Just can’t fail with an error.

let justPublisher = Just("Hello World")

justPublisher
    .sink {
         print("First subscriber", $0)
     }
// First subscriber Hello World

justPublisher
    .sink {
         print("Second subscriber", $0)
     }
// Second subscriber Hello World

TIP: Just is an example of a cold signal in Combine.

For example, it can be useful in writing stubs for your unit tests when you need to mock the method that returns a publisher.

Futures

Futures in Combine is another example of a cold publisher. A future is a publisher that emits a single value or an error on a new subscription and then completes.

It does this by invoking a closure when a value or error is available, and that closure is, in fact, the promise. If the promise will not be called, the subscriber never gets the value.

let future = Future<String, Never> { promise in
    print("First future started")
    promise(.success("First success"))
}
// First future started

future.sink { value in
    print(value)
}
// First success

let future2 = Future<String, Never> { promise in
    print("Second future started")
    // promise is not called
}
// Second future started

future2.sink { value in
    print(value) // will never be called
}

TIP: Future is greedy, meaning executes as soon as it’s created and doesn’t wait for a subscription. It does not require a subscriber like regular publishers that are lazy.

TIP: Future does not re-execute its promise; instead, it replays its output.

Futures are great to use for network requests because it’s a great abstraction to the asynchronous nature of network requests that can succeed with value or fail. By using futures, you can ensure that the network request is only made once, even if multiple parts of your application need the data.

enum NetworkError: Error {
    case invalidResponse
    case decodingError
}

struct User: Decodable {
    let name: String
    let email: String
}

func fetchUser() -> AnyPublisher<User, NetworkError> {
    Future { promise in
        guard let url = URL(string: "<https://api.example.com/user>") else { return }

        URLSession.shared.dataTask(with: url) { data, response, error in
            if let error = error {
                promise(.failure(error))
                return
            }
            if response.statusCode != 200 {
                promise(.failure(NetworkError.invalidResponse))
                return
            }
            do {
                let user = try JSONDecoder().decode(User.self, from: data)
                promise(.success(user))
            } catch {
                promise(.failure(NetworkError.decodingError))
            }
        }.resume()
    }
    .eraseToAnyPublisher()
}

You probably noticed something new in the code above - AnyPublisher.

AnyPublisher

AnyPublisher is a type-erased struct that conforms to the Publisher protocol. It allows you to hide details about the publisher that you may not want to expose to subscribers, which is good for encapsulation and user access control. It does the same job for Publisher as AnyCancellable for Subscriber.

TIP: AnyPublisher does not have a send(_:) operator or value property, so you cannot add new values to that publisher directly.

You can call eraseToAnyPublisher() operator to map your publisher to AnyPublisher and erase its type.

Operators

Combine operators are a set of functions provided by Apple that allow you to process and transform streams of values over time. These operators can be intuitive and already known to you from using Swift like filter, map, first and etc. But operators can be also quite complex and hard to understand. Will will take a look at some of them.

Each operator is the function in the Publisher extension:

extension Publisher {
    // map operator
    func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T>
}

As you can see it returns Publishers.Map as a result.

Publishers is the enum and it is the namespace for publishers structs like Map.

struct Map<Upstream, Output>: Publisher where Upstream: Publisher {

    typealias Failure = Upstream.Failure

    // The publisher from which this publisher receives elements.
    let upstream: Upstream

    // The closure that transforms elements from the upstream publisher.
    let transform: (Upstream.Output) -> Output

    // Creates a publisher that transforms all elements from the upstream publisher with a provided closure.
    public init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output)

    // Attaches the specified subscriber to this publisher.
    public func receive<S>(subscriber: S) where Output == S.Input, S: Subscriber, Upstream.Failure == S.Failure
}

As you might guess already, under the hood, operators create a new publisher that transforms the elements from the upstream publisher and returns this new publisher as the result. This explains why you can chain many operators in a row.

To give you a quick introduction, we'll focus on some of the most useful examples of operators. I won't go into detail about all of them, as there are many, but I'll provide the best use cases that you're likely to encounter in your daily work.

flatMap

The flatMap is used to transform a stream of values into a new stream of values. It works by taking the values emitted by one publisher and transforming them into new publishers using a closure.

The most common use case for flatMap is network requests. For example, you might have a publisher that emits API request, and you want to make another API request when the initial one completes.

In the example below we call the getUserData request when authenticateUser request completes. So flatMap can be very useful to chain the API requests.

struct API {
    typealias Token = String

    func authenticateUser(userId: Int) -> AnyPublisher<Token, Error> {
        return Future<Token, Error> { promise in
            // Simulate a network request
            DispatchQueue.global().asyncAfter(deadline: .now() + 0.5) {
                promise(.success("Token"))
            }
        }
        .eraseToAnyPublisher()
    }

    func getUserData(token: Token) -> AnyPublisher<Data, Error> {
        return Future<Data, Error> { promise in
            // Simulate a network request
            DispatchQueue.global().asyncAfter(deadline: .now() + 0.5) {
                promise(.success(Data()))
            }
        }
        .eraseToAnyPublisher()
    }
}

var cancellable = Set<AnyCancellable>()
let api = API()

api.authenticateUser(userId: 1234)
    .flatMap { token in
        api.getUserData(token: token)
    }
    .sink(receiveCompletion: { completion in
        // handle error if needed
    }, receiveValue: { data in
        // handle user data
    })
    .store(in: &cancellable)

combineLatest

combineLatest is used to combine the latest values from multiple publishers into a single publisher. The resulting publisher emits a tuple of the latest values from each of the input publishers whenever any of the input publishers emits a new value.

A common use case for combineLatest is in user interface development, where you want to update the UI based on multiple values that are changing over time. For example, you might have two publishers, one that emits the user's current location and one that emits the user's current heading. You can use combineLatest to combine the latest values from these publishers and update the UI with the combined information.

Another use case for combineLatest is in data processing, where you want to process multiple streams of data in parallel. For example, you might have two publishers that emit data from different sensors, and you want to process the data in real time as it arrives. You can use combineLatest to combine the latest values from these publishers and process the data in a single stream.

Imagine that you have two text fields with first name and family name and you need to get a full name when the user changes one of them. It can be easily done with combineLatest :

let firstNamePublisher = CurrentValueSubject<String, Never>("John")
let familyNamePublisher = CurrentValueSubject<String, Never>("Snow")

firstNamePublisher
    .combineLatest(familyNamePublisher)
    .sink { firstName, familyName in
        print(firstName, familyName) // prints: John Snow
    }

TIP: combineLatest emits the tiple only when both of combined publishes emitted at least one value.

merge

The merge operator is used to merge the values from multiple publishers into a single publisher, but it doesn’t emit a tuple, values are merged in the single flow.

A common use case for merge is when you might want to get data from multiple sources (API, WebSocket, cache, UI events) in parallel, and you don’t care which one will happen first, all you need is the event. It will not wait for events from both sources as combineLatest does. First comes - first received as a new value.

let apiPublisher = PassthroughSubject<String, Error>()
let websocketPublisher = PassthroughSubject<String, Error>()

apiPublisher
    .merge(with: websocketPublisher)
    .sink { completion in
         // handle the error if needed
     } receiveValue: { value in
         print("Merged value recieved:", value)
     }

apiPublisher.send("New value")
// Merged value recieved: New value
websocketPublisher.send("Another value")
// Merged value recieved: Another value

share

share is a useful operator that enables you to share a single subscription to a publisher among multiple subscribers, which can help to reduce resource usage and improve the efficiency of your reactive programming system. For example, by using share, you can ensure that the network request is only decoded to the domain model once, even if multiple parts of your application need the data.

let request = api.makeRequest()
    .map { data in
        // called only once with share
        try? JSONDecoder().decode(MyDecodable.self, from: data) 
    }
    .share()

// one place
request
    .sink(...)

// another place
request
    .sink(...)

TIP: Also note that Publishers.Share is a class rather than a structure like most other publishers. This means you can use this operator to create a publisher instance that uses reference semantics.

Conclusion

In conclusion, Combine offers a robust collection of tools for implementing reactive programming in Swift and working with asynchronous events. However, a solid understanding of reactive programming is still necessary, and there is a steep learning curve to master Combine. It is important to use it responsibly and not blindly. Keep in mind the phrase, "With great power comes great responsibility.”

What's next? Although we have covered a lot of information about Combine in this article, if you want to take your understanding of it to the next level, I would recommend reading the Kodeco book for a deeper understanding.

Did you find this article valuable?

Support Vitaly Batrakov by becoming a sponsor. Any amount is appreciated!