Three real-world examples of distributed Elixir (pt. 3)

The distributed download requester and progress tracker.

Ricardo García Vega
14 min readJun 18, 2021

In the last part of the series, we saw how to create a singleton process across a cluster of nodes, using three different global registries. This approach is commonly used to run a unique background task that you want to keep running no matter what changes happen in the cluster’s topology. However, we can use a similar pattern to run short-time running tasks that will die once they finish their job, with the same guarantees. An excellent example of this would be an application where users can download a file. However, generating the file is an expensive task that can take some seconds, and you want to notify the user of the progress. Eventually, when the file is ready, the application should provide the user with the download URL. Of course, all these should happen if the node where the download task is running shuts down, the user refreshes the page connecting to a different instance, etc. Let’s get cracking!

The base application

Source code of this example

For today’s example, we will create a web application on top of:

  • libcluster for building the cluster of nodes.
  • Horde to handle both the process registry and supervision, as we saw in the previous part of the series.
  • Phoenix, Elixir’s web framework, designed and built from the ground up to take advantage of Elixir’s distribution features.
  • Phoenix.LiveView for the front-end to give real-time updates to the user.

Let’s start by generating a new project from the terminal:

mix phx.new download_manager --no-ecto --live --no-dashboard --no-gettext

Next, let’s add the initial dependencies that we need and configure the cluster definition:

# ./mix.exsdefmodule DownloadManager.MixProject do
use Mix.Project
# ...
defp deps do
[
# ...
{:libcluster, "~> 3.3"},
{:horde, "~> 0.8.3"},
]
end
# ...
end
# .lib/download_manager/application.exdefmodule DownloadManager.Application do
use Application
def start(_type, _args) do
children = [
{Cluster.Supervisor, [topologies(), [name: BackgroundJob.ClusterSupervisor]]},
# Start the Telemetry supervisor
DownloadManagerWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: DownloadManager.PubSub},
DownloadManager.HordeRegistry,
DownloadManager.HordeSupervisor,
DownloadManager.NodeObserver,
# Start the Endpoint (http/https)
DownloadManagerWeb.Endpoint
]
opts = [strategy: :one_for_one, name: DownloadManager.Supervisor]
Supervisor.start_link(children, opts)
end
def config_change(changed, _new, removed) do
DownloadManagerWeb.Endpoint.config_change(changed, removed)
:ok
end
defp topologies do
[
background_job: [
strategy: Cluster.Strategy.Gossip
]
]
end
end

We are not going to dive into the details of the DownloadManager.HordeRegistry, DownloadManager.HordeSupervisor, and DownloadManager.NodeObserver, since we already did in the last part. Still, they are the modules needed to make Horde work appropriately in a cluster of dynamic nodes, so we will just copy them and rename their namespace to match the current DownloadManager. Since we will start multiple nodes and Phoenix runs in port 4000 by default, we will have issues due to the port already taken by the first instance that we start. Let's change the development configuration so that it takes the port number from the environment:

# ./config/dev.exsuse Mix.Configconfig :download_manager, DownloadManagerWeb.Endpoint,
http: [
port: String.to_integer(System.get_env("PORT") || "4000")
],
# ...

Last but not least, let’s get rid of all the predefined Phoenix styles and, to create a simple yet beautiful UI, let’s add the Tailwind CSS CDN stylesheet link:

# ./lib/download_manager_web/templates/layout/root.html.leex<!DOCTYPE html>
<html lang="en">
<head>
# ...
<link href="https://unpkg.com/tailwindcss@^2/dist/tailwind.min.css" rel="stylesheet"> # ...
</head>
<body>
<%= @inner_content %>
</body>
</html>

We are not going to do an in-depth exploration of the styles either. However, you can find all the files in the example repository, and this is how the final result looks like:

The downlad struct and repository

With the basic project structure ready, let’s start working on the download request logic. First of all, let’s define what a download request looks like:

# ./lib/download_manager/download.exdefmodule DownloadManager.Download do
alias __MODULE__
alias DownloadManager.Token
@pending_state :pending
@processing_state :processing
@error_state :error
@ready_state :ready
@enforce_keys [:id, :state, :user_id] defstruct [
:file_url,
:id,
:state,
:user_id
]
def new(params) do
with {:ok, user_id} <- Keyword.fetch(params, :user_id) do
%Download{
id: Token.generate(),
state: @pending_state,
user_id: user_id
}
end
end
end

The DownloadManager.Download struct holds the following data:

  • id: the unique internal ID of the download.
  • state: the current state of the download, which can be any of :pending, :processing, :error, or :ready.
  • file_url: the downloadable file's URL.
  • user_id: the ID of the user who requested the download.

We are also adding a convenient new/1 helper function to build download structs, which by default sets an randomly generated :id value along with the :pending state. In a real-life application, we would temporarily store download requests in some sort of in-memory data storage like the good old Redis. However, we will stick to our no-external-dependencies approach and define a new module that serves as the download request repository:

# ./lib/download_manager/download/repo.exdefmodule DownloadManager.Download.Repo do
alias DownloadManager.Download
@adapter Application.compile_env(:download_manager, __MODULE__)[:adapter] @type user_id :: String.t()
@type result :: {:ok, Download.t()} | {:error, term}
@callback start(keyword) :: GenServer.on_start()
@callback fetch(user_id()) :: result
@callback insert(Download.t()) :: result
@callback update(Download.t()) :: result
@callback remove(Download.t()) :: result
defdelegate start_link(opts), to: @adapter
defdelegate fetch(user_id), to: @adapter
defdelegate insert(download), to: @adapter
defdelegate update(download), to: @adapter
defdelegate remove(download), to: @adapter
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
end

This module defines five different behavior callbacks:

  • start/1: which starts the current adapter.
  • fetch/1: which searches the current download for a given user_id.
  • insert/1: which inserts the given download.
  • update/1: which updates the given download.
  • remove/1: which deletes the given download.

Using dependency injection to set the @adapter module variable, it delegates all the public functions to the configured adapter. This technique is very convenient when you want to have different adapters for different environments, especially when the production adapter points to an external service, but you want your tests to use a mock implementation to avoid any external dependencies. For the sake simplicity, we will use nebulex, which is a distributed in-memory cache library, so let's add it to the application dependencies:

# ./mix.exsdefmodule DownloadManager.MixProject do
use Mix.Project
# ... defp deps do
[
# ...
# Additional deps
# ...
{:nebulex, "~> 2.1"}
]
end
# ...
end

Next, we can implement a DownloadManager.Download.Repo adapter module following nebulex's documentation:

# .lib/download_manager/download/repo/nebulex.exdefmodule DownloadManager.Download.Repo.Nebulex do
use Nebulex.Cache,
otp_app: :download_manager,
adapter: Nebulex.Adapters.Replicated
alias DownloadManager.{Download, Download.Repo} @behaviour Repo @impl Repo
def start(opts) do
start_link(opts)
end
@impl Repo
def fetch(user_id) do
case get(user_id) do
nil ->
{:error, :not_found}
download ->
{:ok, download}
end
end
@impl Repo
def insert(%Download{user_id: user_id} = download) do
if put_new(user_id, download) do
{:ok, download}
else
{:error, :unexpected_error}
end
end
@impl Repo
def update(%Download{user_id: user_id} = download) do
:ok = put(user_id, download, ttl: :timer.seconds(5))
{:ok, download}
end
@impl Repo
def remove(%Download{user_id: user_id} = download) do
:ok = delete(user_id)
{:ok, download}
end
end

The module implements the DownloadManager.Download.Repo behavior by calling the specific Nebulex functions in its callbacks, thanks to Nebulex's convenient API. Finally, we have to configure the repository's adapter in the application's configuration, and add the repository module to the main application supervision tree:

# ./config/config.exsuse Mix.Config# ...config :download_manager, DownloadManager.Download.Repo,
adapter: DownloadManager.Download.Repo.Nebulex
# ...# .lib/download_manager/application.exdefmodule DownloadManager.Application do
use Application
def start(_type, _args) do
children = [
# ...
DownloadManager.Download.Repo,
# ...
]
opts = [strategy: :one_for_one, name: DownloadManager.Supervisor]
Supervisor.start_link(children, opts)
end
# ...
end

Let’s jump to the interactive shell and test out the repository:

➜ iex --sname n1 -S mix
# ...
iex(n1@mbp)1> [user_id: "user-1"] |> DownloadManager.Download.new() |> DownloadManager.Download.Repo.insert()
{:ok,
%DownloadManager.Download{
file_url: nil,
id: "sOB4hY6ylz",
state: :pending,
user_id: "user-1"
}}

If we start a different node and try to get the download for user-1, we should be able to see it:

➜ iex --sname n2 -S mix
# ...
iex(n2@mbp)1> DownloadManager.Download.Repo.fetch("user-1")
{:ok,
%DownloadManager.Download{
file_url: nil,
id: "XLaXS2YeOO",
state: :pending,
user_id: "user-1"
}}

Creating and tracking downloads

With the download definition and the distributed repository working, we can move on to the next thing: requesting a download and tracking its progress. First, let’s implement the module which starts new download request processes:

# ./lib/download_manager/download/tracker.exdefmodule DownloadManager.Download.Tracker do
alias __MODULE__.Worker
alias DownloadManager.{Download, Download.Repo, HordeRegistry, HordeSupervisor}
@spec start(String.t()) :: {:ok, Download.t()} | {:error, term}
def start(user_id) do
with download <- Download.new(user_id: user_id),
{:ok, download} <- Repo.insert(download),
child_spec <- worker_spec(download),
{:ok, _} <- HordeSupervisor.start_child(child_spec) do
{:ok, download}
end
end
defp worker_spec(%Download{user_id: user_id} = download) do
%{
id: {Worker, user_id},
start: {Worker, :start_link, [[download: download, name: via_tuple(user_id)]]},
type: :worker,
restart: :transient
}
end
defp via_tuple(id) do
{:via, Horde.Registry, {HordeRegistry, {Download, id}}}
end
end

The Tracker module exposes a public start function that expects a user_id. Using this ID generates a new download, inserts it into the repository, generates a worker child spec, and finally starts the tracker worker process using the Horde supervisor, returning the created download. Please notice the {:via, Horde.Registry, {HordeRegistry, {Download, id}}} name option it sets in the worker spec to register the process globally across all nodes. The worker's responsibility is simple: to keep track of the download file generation reporting the progress. Nevertheless, depending on your particular case, generating downloadable files can imply different things, from processing significant amounts of data locally to relying on an external service, having to handle the communication between both applications. Therefore, let's follow the same dependency injection approach that we took in the download repository and create a worker behavior module:

# ./lib/download_manager/download/tracker/worker.exdefmodule DownloadManager.Download.Tracker.Worker do
@adapter Application.compile_env(:download_manager, __MODULE__)[:adapter]
@callback start_link(keyword) :: GenServer.on_start() defdelegate start_link(opts), to: @adapter
end

Again, for the sake of simplicity, we are going to build a fake worker implementation that gives us enough time to mess around in the interactive shell:

# ./lib/download_manager/download/tracker/worker/fake.exdefmodule DownloadManager.Download.Tracker.Worker.Fake do
use GenServer
alias DownloadManager.{Download, Download.Repo}
alias Phoenix.PubSub
def start_link(opts) do
name = Keyword.get(opts, :name, __MODULE__)
download = Keyword.fetch!(opts, :download)
GenServer.start_link(__MODULE__, download, name: name)
end
@impl GenServer
def init(download) do
schedule(:start, 1_000)
{:ok, download}
end
@impl GenServer
def handle_info(:start, download) do
{:ok, new_download} =
download
|> Download.with_pending_state()
|> Repo.update()
broadcast(new_download)
schedule(:process, 1_000)
{:noreply, new_download}
end
def handle_info(:process, download) do
{:ok, new_download} =
download
|> Download.with_processing_state()
|> Repo.update()
broadcast(new_download)
schedule(:ready, 5_000)
{:noreply, new_download}
end
def handle_info(:ready, %Download{id: id, user_id: user_id} = download) do
{:ok, new_download} =
download
|> Download.with_ready_state()
|> Download.with_file_url("/downloads/#{id}.pdf")
|> Repo.update()
broadcast(new_download) {:stop, :normal, new_download}
end
defp schedule(action, timeout) do
Process.send_after(self(), action, timeout)
end
defp broadcast(%Download{user_id: user_id} = download) do
PubSub.broadcast(DownloadManager.PubSub, "download:#{user_id}", {:update, download})
end
end

The module is a straightforward GenServer implementation, which takes a name and a download in its options, being download its internal state. It then starts a loop of scheduled internal messages, in which updates its download with the next state, updating the download stored in the repo and broadcasting the new download through Phoenix.PubSub using the download:#{user_id} topic. Eventually, when the download is ready, it sets a fake download URL and exits normally. Let's jump into iex and test it out:

➜ iex -S mix
Erlang/OTP 24 [erts-12.0.1] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [jit]
[info] Starting Horde.RegistryImpl with name DownloadManager.HordeRegistry
[info] Starting Horde.DynamicSupervisorImpl with name DownloadManager.HordeSupervisor
Interactive Elixir (1.12.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> download = [user_id: "user-1"] |> DownloadManager.Download.new()
%DownloadManager.Download{
file_url: nil,
id: "QOotjQuQSP",
state: :pending,
user_id: "user-1"
}
iex(2)> DownloadManager.Download.Tracker.Worker.start_link(download: download)
{:ok, #PID<0.379.0>}
iex(3)> [info] ----[nonode@nohost-#PID<0.379.0>] Elixir.DownloadManager.Download.Tracker.Worker.Fake starting download: %DownloadManager.Download{file_url: nil, id: "QOotjQuQSP", state: :pending, user_id: "user-1"}
[info] ----[nonode@nohost-#PID<0.379.0>] Elixir.DownloadManager.Download.Tracker.Worker.Fake processing download: %DownloadManager.Download{file_url: nil, id: "QOotjQuQSP", state: :pending, user_id: "user-1"}
[info] ----[nonode@nohost-#PID<0.379.0>] Elixir.DownloadManager.Download.Tracker.Worker.Fake download ready: %DownloadManager.Download{file_url: "/downloads/QOotjQuQSP.pdf", id: "QOotjQuQSP", state: :ready, user_id: "user-1"}

It is working as expected, excellent. We can’t forget about configuring the tracker’s worker adapter though:

# ./config/config.exsuse Mix.Config# ...config :download_manager, DownloadManager.Download.Tracker.Worker,
adapter: DownloadManager.Download.Tracker.Worker.Fake
# ...

And this is pretty much it in terms of the back-end logic. Therefore, let’s move forward to the front-end and implement the interaction with the user.

Requesting downloads from the UI

User experience-wise, we want to focus on three requirements primarily:

  • A user can only request a download at a time.
  • If the user refreshes the browser while a download is in progress, the user shouldn’t lose track of the current download once the page loads again.
  • The previous point also applies when the instance to which the user connected goes down.

Having this in mind, let’s start by creating a Plug to identify users uniquely, fulfilling the first point:

# ./lib/download_manager_web/plug/auth_plug.exdefmodule DownloadManagerWeb.AuthPlug do
@behaviour Plug
alias DownloadManager.Token
alias Plug.Conn
@impl Plug
def init(opts), do: opts
@impl Plug
def call(conn, _opts) do
case Conn.get_session(conn) do
%{"user_id" => _} ->
conn
_ ->
Conn.put_session(conn, :user_id, Token.generate())
end
end
end

This plug fakes an authentication mechanism, checking if there is already a user_id in the session, setting a new random one if it does not. This user_id gets stored in a browser cookie, hence every time we visit the application we will use the same user_id. Let's add the plug into the router:

# ./lib/download_manager_web/router.exdefmodule DownloadManagerWeb.Router do
use DownloadManagerWeb, :router
pipeline :browser do
# ...
plug DownloadManagerWeb.AuthPlug
end
# ...
end

Having the user_id ready in the session, we can edit the live page module, which is the main and only entry point to our application:

# ../lib/download_manager_web/live/page_live.exdefmodule DownloadManagerWeb.PageLive do
use DownloadManagerWeb, :live_view
alias Phoenix.PubSub @impl Phoenix.LiveView
def mount(_params, %{"user_id" => user_id}, socket) do
PubSub.subscribe(DownloadManager.PubSub, "download:#{user_id}")
download =
case DownloadManager.fetch_download(user_id) do
{:ok, download} ->
download
{:error, :not_found} ->
nil
end
{:ok, assign(socket, user_id: user_id, download: download)}
end
# ...
end

In the mount/3 callback, it takes the user_id from the session and immediately subscribes to the down_load:#{user_id} topic. Next, it checks if the user already has a download in progress, assigning it to the socket (or nil if there is no download). Notice that we are using DownloadManager.fetch_download/1 to fetch the download. I like adding all the functions the DownloadManagerWeb.* namespace needs from DownloadManager.* in the main DownloadManager module, acting as a public contract between them, so that the web modules don't need to know any implementation details of the business modules. Let's add the functions that we need really quick:

# ./lib/download_manager.exdefmodule DownloadManager do
defdelegate start_download(user_id), to: DownloadManager.Download.Tracker, as: :start
defdelegate fetch_download(user_id), to: DownloadManager.Download.Repo, as: :fetch defdelegate delete_download(download), to: DownloadManager.Download.Repo, as: :remove
end

As you can see, it exposes three different functions that delegate to the proper business modules. We can now edit the live view template to add the button which triggers the download request:

# ./lib/download_manager_web/live/page_live.html.leex<section class="px-6">
# ...
<div class="my-6 flex items-center gap-x-4">
# ...
<%= if is_nil(@download) do %>
<button phx-click="request_download" class="rounded text-purple-200 text-sm bg-purple-900 ml-auto h-8 px-6 flex items-center">Download PDF</button>
<% end %>
</div>
# ...
</section

If there is no download set in the socket assigns, it renders the button which triggers a request_download event on its click event. Let' implement the corresponding callback in the live page module:

# ../lib/download_manager_web/live/page_live.exdefmodule DownloadManagerWeb.PageLive do
use DownloadManagerWeb, :live_view
# ... @impl Phoenix.LiveView
def handle_event("request_download", _, socket) do
case DownloadManager.start_download(socket.assigns.user_id) do
{:ok, download} ->
{:noreply, assign(socket, download: download)}
_ ->
{:noreply, put_flash(socket, :error, "Error creating download request")}
end
end
# ...
end

The function starts by calling the DownloadManager.start_download/1 function which we have previously delegated to DownloadManager.Download.Tracker.start/1, using the current user_id. If the call goes fine, it assigns the resulting download to the socket, setting a flash error if the contrary. By assigning the download, we can show its progress to the user by adding a live component to the page template:

# ./lib/download_manager_web/live/page_live.html.leex# ...<%= if @download != nil do %>
<%= live_component DownloadManagerWeb.DownloadLiveComponent, download: @download %>
<% end %>
# ./lib/download_manager_web/live/components/download_component.exdefmodule DownloadManagerWeb.DownloadLiveComponent do
use Phoenix.LiveComponent
alias DownloadManager.Download @pending_state Download.pending_state()
@processing_state Download.processing_state()
@ready_state Download.ready_state()
def render(%{download: %Download{state: state}} = assigns) do
~L"""
<div>
<div class="bg-white p-5 rounded shadow-md absolute top-0 right-0 w-80 mt-6 mr-6 border-gray-100 border text-sm leading-7 flex gap-x-4">
<%= icon(state) %>
<div>
<h4 class="font-bold">Generating downloadable file</h4>
<p class="text-gray-500"><%= state_text(state) %></p>
<%= if Download.ready?(@download) do %>
<a phx-click="delete_download" class="text-purple-800 cursor-pointer hover:underline" hrerf="<%= @download.file_url %>">Click me to download the file</a>
<% end %>
</div>
</div>
</div>
"""
end
defp state_text(@pending_state), do: "Starting download request"
defp state_text(@processing_state), do: "Generating file..."
defp state_text(@ready_state), do: "File generated with success"
defp icon(@ready_state) do
# icon content
end
defp icon(_) do
# icon content
end
end

The download component is pretty straightforward. It renders a popup, changing its content depending on the current state of the assigned download. However, we haven’t yet handled any download progress updates in live view, so let’s fix this:

# ../lib/download_manager_web/live/page_live.exdefmodule DownloadManagerWeb.PageLive do
use DownloadManagerWeb, :live_view
# ... @impl Phoenix.LiveView
def handle_info({:update, download}, socket) do
{:noreply, assign(socket, download: download)}
end
end

By doing this, every time the DownloadManager.Download.Tracker.Worker.Fake module broadcasts a download update, the updated download is assigned to the socket, forcing a new render of the component. The last thing we need to implement is the delete_download event triggered from the download component when the user clicks the file URL:

# ../lib/download_manager_web/live/page_live.exdefmodule DownloadManagerWeb.PageLive do
use DownloadManagerWeb, :live_view
# ... def handle_event("delete_download", _, socket) do
download = socket.assigns.download
DownloadManager.delete_download(download) {:noreply, assign(socket, download: nil)}
end
# ...
end

The callback function deletes the current user download from the repository. It also sets the assigned download to nil, conveniently rendering the button again to let the user start a new download request without refreshing the browser.

Testing the application locally

Now that we have finished implementing the solution let’s do some local testing. The tricky part of running different simultaneous instances, and let Phoenix do its magic regarding process communication and socket connections, is that:

  1. We need to run each instance on a different port number.
  2. We need a single entry point for all the possible nodes.

The first point is straightforward as we have already configured the application to accept a PORT number from the environment. We can start different instances running PORT=4001 elixir --sname n1 -S mix phx.server, using different ports and node names. On the other hand, the second point requires a bit more work, as we need to set up a load balancer. A simple way of doing this is using NGINX with a configuration similar to the following:

# /usr/local/etc/nginx/sites-available/default.confupstream loadbalancer {
server 127.0.0.1:4001 weight=1;
server 127.0.0.1:4002 weight=1;
server 127.0.0.1:4003 weight=1;
}
resolver 127.0.0.11;server {
location / {
proxy_pass http://loadbalancer;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}
}

Once we have the load balancer up and running, let’s start three different nodes on three different terminal windows/panes:

If we jump to the browser and visit http://localhost, we should see our live page. Let’s click the download button and see what happens:

The download message pops up, and we can see the different progress messages until the download is finally ready. Let’s inspect the terminal to see what happens under the hood:

n2 receives the browser connection request, serving the live page. When we clicked the download button, this request started in n2, but the download process spawned in n3 due to Horde's internals. Once the download is ready, thanks to Elixir's distribution model and Phoenix's PubSub, n2 receives the corresponding message, passing the result back to the user through the socket. Let's start and new download request, and shut down the node where the download process runs:

This time, both the socket connection and the download process start in n2. Shutting down n2 causes the following:

  1. The live page reconnects to n1.
  2. The download process dies, but thanks to Horde it continues back in n3.
  3. n1 receives the corresponding message from n3 once the download is ready.

And all of this is totally transparent to the user. Isn’t it amazing?

Conclusion

In this example, we have taken advantage of Elixir’s distributed capabilities to build a practical solution around cross-node process messaging, in-memory storage, and dynamic global supervision. The implementation has been very straightforward and without the need for any external dependency, such as job queues or third-party in-memory storage. In the next part, we will implement the last example of the series: a distributed mechanism that monitors the deployed version of the application in each node, sending a message to the front-end suggesting the user to refresh the browser if a new version gets deployed. In the meantime, don’t forget to take a look at the source code of this example.

Happy coding!

Originally published at https://bigardone.dev on June 18, 2021.

--

--

Ricardo García Vega

I’m Ricardo, a software engineer who loves building web applications using modern technologies such as Elixir, Phoenix, and Elm. https://bigardone.dev/