Building Concurrent, Scalable Data Pipelines with GenStage and Phoenix Channels

In a real time application, keeping users up to date with the latest information possible is crucial. As such, creating an efficient data pipeline is a necessity. According to Stephen Bussey in his book , there are a few traits a data pipeline must have to be considered efficient.

  • The data pipeline delivers messages to all relevant clients and sends the correct data.
  • Fast data delivery. The pipeline should be as fast as possible.
  • As durable as needed. Your guarantees of delivery should be as strong as your application calls for. It should also be adaptive enough to change as needed throughout your application’s lifespan.
  • As concurrent as needed. The concurrency of the pipeline should not overwhelm the application.
  • Measurability. It is important we know how long it takes to send data to clients.

Lucky for us, the OTP framework shipped with the Elixir language comes with a solution that makes it easy to handle all of these traits. GenStage. GenStage is not an out of the box data pipeline. It merely provides specs on how to pass around data. It is up to us as developers to implement GenStage within our pipeline.

GenStage comes with two main stage types that are used to model our pipeline: Producer and ConsumerSupervisor. Producer’s handle the fetching of data while Consumers process it. ConsumerSupervisor adds additional functionality to the generic Consumer model. It allows for built in concurrency through the use of built in functions. The way it works is instead of relying on one Elixir process to handle each item in the GenStage queue, it generates a unique process for each item and handles it in batches. This sounds expensive but due to Elixir’s unique nature of having very cheap individual processes, it is actually quite cheap. The BEAM also takes those individual processes and runs them in parallel over multiple cores. Parallelism is different in concurrency in the sense that the concurrent processes are all handling work simultaneously and the BEAM is making the system parallel by running those concurrent processes over multiple CPU cores. They go hand in hand but are different! Let’s look at some code!

Defining the Consumer Module

Here using the ConsumerSupervisor module, we are able to start an individual Worker process for each process in the queue.

Defining the Worker Module

Here each Worker process will start up on an item by item basis. The individual worker process then broadcasts to the user:* channel a “push” message. It takes a second pause between broadcasts.

Defining the Producer module

Here is the definition of the Producer module. We first pattern match our list of options into tuples using the Keyword.split/2 function. Next, we start a Producer process for each name found. The init/1 function handles initialization and returns a tuple with a :producer tuple. This lets GenStage know we are returning a Producer process.

Phoenix application.ex file

Our application.ex file also needs to be updated. We must name our Producer and Consumer in our supervision tree. We must add these before our endpoint because we want our data pipeline to be available before our web endpoints are. The min/max arguments allow us to control how concurrent we want our application to be. For applications using in memory configuration, lower concurrency is fine. However, once there is a database involved, using higher levels of concurrency could boost performance drastically by limiting the amount of asynchronous database calls.

Updating the socket.js file

Updating our JavaScript client to perform some functionality on the push message we send using our broadcast in our Worker processes.

After starting our server using the iex -S mix phx.server command, we alias our Producer module and define a push lambda function. The push lambda function calls on our Producer.push function. We send that out for each item in a range of 1 to 50. Our JavaScript client receives the message and the console output should look something like this:

As you can see, sometimes the messages come in out of order. This is expected as we have our concurrency set to allow 5 items at a time. These messages are being delivered at the same exact time and as a result might appear in a seemingly random order.

With our data pipeline fully built, we can now start building custom logic into it. In a future blog, I plan on showing how to add efficiency metrics and tests to our newly built pipeline!

Full-Stack Engineer