I’ve been slowly working through Joe Armstrong’s Programming Erlang
book and I think it is worth writing down some of what I learned from
the chapter 23 exercises. Chapter 23 introduces the basics of OTP
applications, and the exercises provide an opportunity gradually build
up a simple application (only gen_server
s and supervisor
s) but one
that incorporates mnesia
and a rudimentary distributed fallback
mechanism.
I looked around for other examples of solutions to these exercises and
I only found one on github that is based on the ppool
example
presented in Learn You Some Erlang. Since I took a very different
approach, I wanted to share my
solution as well.
Overview
In the preceding chapter Joe introduces OTP applications by developing an application for the extremely lucrative business of selling prime numbers. The exercises gradually build up a new service for this business: testing whether numbers are prime.
A single server
We start by adding a single prime tester server to the sellaprime supervision tree. Simple enough:
-module(prime_tester_server).
-behaviour(gen_server).
-export([is_prime/1]).
-spec test(N :: pos_integer()) -> boolean().
is_prime(N) ->
gen_server:call(?MODULE, {test, N}).
%% ...
More parallel
Step two is to create a pool of prime testers and distribute work
between them. In the initial implementation there is a single server
that maintains a single centralized work queue. Instead of starting a
single prime_tester_server
we start a server that spawns a pool of
ten prime_tester_server
s to do the testing. The state of the new
server consists of a list of server PIDs and a queue of primes to test
(along with where the reply should be sent).
-module(isaprime).
-behaviour(gen_server).
-export([test/1]).
-record(state, {queue = queue:new() :: queue:queue({pos_integer(), term()}),
available_servers = [] :: [pid()])}).
In the init/1
callback we set up the pool of workers. The number of
workers is passed in as an argument so we can easily change the size
of the pool in the future.
init(PoolSize) ->
{ok, #state{available_servers =
[element(2, {ok, _Pid} = prime_tester_server:start_link())
|| _ <- lists:seq(1, PoolSize)]}}.
When calls are made to the isaprime
server, the request is processed
by one of the workers when it becomes available. In my implementation
I handle two cases. When there is an idle worker available the request
is forwarded to that server immediately, but if no workers are
available then the request is added to the queue to be processed
later.
handle_call({is_prime, N}, From, State = #state{ available_servers = [],
queue = Q}) ->
{noreply, State#state{queue = queue:in({N, From}, Q)}};
handle_call({is_prime, N}, From, State = #state{ available_servers = [S|Servers] }) ->
do_test_prime(S, From, N),
{noreply, State#state{ available_servers = Servers }}.
The function do_test_prime
spawns a new process that calls the
tester server, sends the response to the original requester, and
notifies isaprime
that it is ready for more work.
do_test_prime(S, From, N) ->
spawn_link(fun() ->
IsPrime = prime_tester_server:is_prime(S, N),
gen_server:reply(From, IsPrime),
?SERVER ! {ready, S}
end).
Finally, the isaprime
server needs to distribute new work to testers
in response to the ready
message. If no work is available then the
worker is places back in the ready list to wait for additional
requests.
handle_info({ready, S}, State = #state{ available_servers = Servers, queue = Q }) ->
case queue:out(Q) of
{empty, _} ->
{noreply, State#state{ available_servers = [S|Servers]}};
{{value, {N, From}}, NewQ} ->
do_test_prime(S, From, N),
{noreply, State#state{ queue = NewQ }}
end.
That’s pretty much it. It gets the job done, but it also leaves a lot to be desired. If any of the tester servers dies, then all the work queue and all the other testers are also killed. Unfortunately that means that all the pending requests are lost, even if they could have been completed successfully. We will fix both of these problems later, but first we address a smaller issue of performance.
Removing the bottleneck
The first improvement we make will be to remove the queue from the
isaprime
and let each prime_tester_server
manage its own queue,
relegating isaprime
to load balancing. This will remove the
bottleneck in the previous version of isaprime
that resulted from
every tester needing to request new work.
Turning isaprime
into a load balancer
The first change is to remove the queue from the isaprime
state. At
the same time we will replace the list of available testers with
information about the load on each server.
%% isaprime.erl
-record(state, {tester_load = []}).
init(PoolSize) ->
Testers = [prime_tester_server:start_link() || _ <- lists:seq(1, PoolSize)],
{ok, #state{tester_load = [{Pid, 0} || {ok, Pid} <- Testers]}}.
Now when we get a request to test a number we identify the server with the lowest load, and forward the request to that server. Finding the lowest load is simple because we keep the load information sorted.
handle_call({is_prime, N}, From, State=#state{ tester_load = Load }) ->
Tester = lowest_load(Load),
prime_tester_server:is_prime(Tester, N, From),
{noreply, State#state{ tester_load = increase_load(Tester, Load) }}.
update_load(Tester, Load, Increment) ->
{value, {_, L}, Load2} = lists:keytake(Tester, 1, Load),
lists:keysort(2, [{Tester, L + Increment}|Load2]).
increase_load(Tester, Load) ->
update_load(Tester, Load, 1).
decrease_load(Tester, Load) ->
update_load(Tester, Load, -1).
lowest_load([{T, _}|_]) -> T.
One particularly notable change above is that the
prime_tester_server
API has changed. I added and argument to the
is_prime
function, and changed it from a call
to a cast`. In this
way the request is entirely handed off to the tester server, and the
load balancer doesn’t need to keep any information about it other than
noting the increased load on the tester.
This is all we need to do to handle incoming requests, but it isn’t
quite right. We still need to know when the tester has finished a test
and reduced its load. For that we add the work_done/0
function,
which causes the load balancer to reduce the load of the tester that
called it.
handle_cast({work_done, Tester}, #state{tester_load=Load}) ->
{noreply, #state{tester_load=reduce_load(Tester, Load)}}.
Giving each tester its own queue
As I mentioned above, the first thing we need to do is change the
is_prime
function from a call to the server into a function that
hands off the work via a cast. There is a little more we need to do
though, since I want to keep the server available while requests are
being processed. To do this we basically move the same structure we
had with the original worker pool into the tester server. It spawns a
process that will do the actual prime tests so the server itself never
blocks. Here’s what it looks like.
-module(prime_tester_server).
-export([is_prime/3]).
-record(state, {worker :: pid()}).
-spec is_prime(S :: pid(), N :: pos_integer(), From :: term()) -> ok.
is_prime(S, N, From) ->
gen_server:cast(S, {is_prime, {N, From}}).
init([]) ->
Self = self(),
{ok, #state{ worker = spawn_link(fun() -> worker(Self) end)}}.
The worker is a little tricky. It calls the server to request work, and if there is no work available, it waits for the server to notify it that there is work to do.
handle_call(get_work, _From, State=#state{queue=Q}) ->
case queue:peek(Q) of
empty ->
{reply, no_work, State};
{value, V} ->
{reply, {work, V}, State}
end.
handle_cast(work_done, State=#state{queue=Q}) ->
{noreply, State#state{queue=queue:drop(Q)}};
handle_cast({is_prime, Request}, State=#state{queue=Q, worker=W}) ->
case queue:is_empty(Q) of
true -> alert_worker(W);
false -> ok
end,
{noreply, State#state{queue=queue:in(Request, Q)}}.
get_work(Server) ->
gen_server:call(Server, get_work).
work_done(Server) ->
gen_server:cast(Server, work_done),
isaprime:work_done(Server).
alert_worker(W) -> W ! work.
worker(Server) ->
receive
work ->
do_work(Server),
worker(Server);
stop ->
ok
end.
do_work(Server) ->
case get_work(Server) of
{work, {N, From}} ->
IsPrime = lib_primes:is_prime(N),
gen_server:reply(From, IsPrime),
work_done(Server),
do_work(Server);
no_work ->
waiting
end.
What if the tester dies?
The implementation above still suffers from the problem that if one tester dies, then all of them die. Furthermore, because the pending requests are kept in a queue in the server state if a tester dies we will lose all the pending requests that had been sent to it, even once we add the testers to a supervision tree. To address this I changed the data in the queue into an index used to look up the full request in an ETS table owned by the load balancer.
Using an ETS table to store the work
I’ll forgo dumping all the code onto the page here, but in general the
changes required are relatively minor. The isaprime
server creates a
single ETS table that holds tuples of the form {JobId, Number, From, Tester}
. When work is assigned to the tester, just the job ID is sent
to it. When it services a request it looks up the job id in the table,
services the request, and deletes the request from the table. I also
maintain the load on each tester in the same table which allows each
tester to decrement its own load whenever it finishes a job (it may
have been better to use a separate table).
To try to manage the complexity of shared state (and set myself up for
the changes in the next part of the exercise) I implemented all
interactions with the table through functions in the isaprime
module.
-module(isaprime).
-export([work_done/2, get_job/2]).
work_done(WorkTable, JobId) ->
%% This could be a problem since it is not atomic, if the server
%% is killed externally then the load could be inconsistent.
ets:delete(WorkTable, JobId),
decrease_load(self(), WorkTable).
get_job(WorkTable, JobId) ->
[{JobId, N, From, _}] = ets:lookup(WorkTable, JobId),
{N, From}.
decrease_load(Tester, Work) ->
ets:update_counter(Work, Tester, -1).
increase_load(Tester, Work) ->
ets:update_counter(Work, Tester, 1).
Assigning the job to a tester now looks like this.
handle_call({is_prime, N}, From, State=#state{ work = Work, next_job = JobId }) ->
assign_work(JobId, N, From, Work),
{noreply, State#state{next_job = JobId + 1}}.
assign_work(JobId, N, From, Work) ->
Tester = lowest_load(Work),
ets:insert(Work, {JobId, N, From, Tester}),
prime_tester_server:is_prime(Tester, JobId),
increase_load(Tester, Work).
On the tester server side we need to pass the table ID to the
start_link
function so each tester can access the table directly.
Other than that the tester remains largely the same, we just hold job
IDs in the queue now, instead of the actual job data.
Keeping the records of the pending jobs in an ETS table goes a long way towards handling failures. When a tester fails, its pending jobs are still safe in memory and can be redistributed to other workers. To make that happen we need to add the testers to a supervision tree.
Making it resilient
We will build a supervision tree that includes a dedicated supervisor for the prime tester servers. When the supervised testers start they register themselves with the load balancer. To make this work, I take advantage of the order supervisors start their children to ensure that the load balancer is up before the testers start.
When a tester registers itself an entry representing its load is added to the ETS table, and the load balancer monitors it. The monitor is used so the load balancer can detect a failure and reassign the work that the tester was responsible for.
handle_info({'DOWN', Ref, process, _, _},
State = #state{ monitors = Monitors, work = Work}) ->
{Tester, NewMonitors} = maps:take(Ref, Monitors),
%% Distribute the work to other workers
redistribute_work(Tester, Work),
{noreply, State#state{monitors = NewMonitors}}.
redistribute_work(Tester, Work) ->
ets:delete(Work, Tester),
WorkToReassign = ets:select(Work, [{{'$1', '$2', '$3', '$4'},
[{'=:=', Tester, '$4'}],
[{{'$1', '$2', '$3'}}]}]),
%% If there is a lot of pending work in the failed worker's
%% queue this could result in a bad imbalance since all of
%% work is reassigned before the restarted tester can
%% register itself. We will fix this problem in the next
%% part.
lists:foreach(fun({JobId, N, From}) ->
assign_work(JobId, N, From, Work)
end, WorkToReassign).
Replication
Now we need to replicate the job data across two nodes. This part gets a little contrived. Since the servers are registered locally any calls to the prime tester are coming from local processes, so all the requests die with the node. We’ll pretend this isn’t the case, which is fair since the architecture of the application could apply to a problem that doesn’t suffer from this flaw.
We’ll use mnesia
to do the data replication. It’s not a big jump
from ETS. The API in isaprime
will remain basically unchanged but we
will build a new module to handle all the database interaction. We’ll
also take this opportunity (because we have to) to fix my ugly design
that combined the job data and the load data in one table.
The new module is called jobdb
. It exposes pretty much the same API
as the functions we used to update the ETS table along with a few
other utility functions. In isaprime
the ETS functions are replaces
with calls to jobdb
. The interesting changes are to the code that
handles tester failures and, since the database persists even if the
load balancer fails, handles recovery after the load balancer is
restarted.
-module(jobdb).
-record(job, {id, n, from, tester}).
-record(load, {tester, jobs}).
initialize(Nodes) ->
ok = mnesia:create_schema(Nodes),
rpc:multicall(Nodes, application, start, [mnesia]),
mnesia:create_table(job, [{attributes, record_info(fields, job)},
{ram_copies, Nodes}]),
mnesia:create_table(load, [{attributes, record_info(fields, load)},
{ram_copies, Nodes}]),
rpc:multicall(Nodes, application, stop, [mnesia]).
start() ->
mnesia:wait_for_tables([job, load], 10000),
NumWorkers = mnesia:table_info(load, size),
if
NumWorkers =:= 0 ->
ok;
NumWorkers > 0 ->
{error, already_up}
end.
When a tester fails we remove its monitor and attempt to redistribute its work. Unlike the ETS version, the redistribution is done one job at a time. I accomplish this by sending messages to the load balancer to do the redistribution. This allows other messages (like the registration of a restarted tester) to come through and be processed, reducing the potential load imbalance.
%% isaprime.erl
handle_info({reassign, JobId, N, From}, State) ->
assign_job(JobId, N, From),
{noreply, State};
handle_info({recover_tester, Tester, Attempt}, State=#state{ monitors = #{} })
when Attempt =< 3 ->
%% If no testers have been registered. Wait and try again.
timer:send_after(10 * Attempt, {recover_tester, Tester, Attempt + 1}),
{noreply, State};
handle_info({recover_tester, Tester, _}, State) ->
redistribute_jobs(Tester),
{noreply, State};
handle_info({'DOWN', Ref, process, _, _},
State = #state{ monitors = Monitors, work = Work}) ->
State = #state{ monitors = Monitors}) ->
{Tester, NewMonitors} = maps:take(Ref, Monitors),
%% Distribute the work to other workers
redistribute_work(Tester, Work),
redistribute_jobs(Tester),
{noreply, State#state{monitors = NewMonitors}}.
redistribute_jobs(Tester) ->
jobdb:delete_load(Tester),
Jobs = jobdb:jobs(Tester),
lists:foreach(
fun([JobId, N, From]) ->
%% reassign asynchronously, one at a time, to allow for
%% a new tester (or testers) to come up. Otherwise we
%% could end up with a huge load imbalance.
self() ! {reassign, JobId, N, From}
end,
Jobs).
assign_job(JobId, N, From) ->
Tester = jobdb:assign_job(JobId, N, From),
prime_tester_server:is_prime(Tester, JobId).
The handle_info
implementation above includes some clauses to
support recovering from a load balancer failure. The main idea of
recovering from a load balancer failure is to check whether each
tester is still alive and re-establish monitoring if it is. If a
tester is dead, then it’s work is redistributed. Since it is possible
that all the testers have died (e.g. if the whole supervision tree was
restarted) there is a special clause to detect this and retry
later. The jobdb:start/0
function detects whether there is already
data in the tables and this information is used by the load balancer
to know that it needs to recover from a failure (if it was starting
normally the load
table would be empty). To make sure that the
testers are able to start if the whole supervision tree has been
restarted, the recovery doesn’t actually take place in
init/1
. Instead we use continue
to let init
return immediately
and allow the supervisor to start the testers.
%% isaprime.erl
init([]) ->
%% Set up a table of work
T = ets:new(work, [set, public, {write_concurrency, true}]), % only write concurrency since reads and writes are interleaved
{ok, #state{work = T}}.
case jobdb:start() of
ok -> {ok, #state{}};
{error, already_up} -> {ok, #state{}, {continue, recover}}
end.
handle_continue(recover, State = #state{ monitors = M }) ->
Testers = jobdb:testers(),
Monitors = lists:foldl(
fun(Tester, Monitors) ->
case is_process_alive(Tester) of
true -> Monitors#{ erlang:monitor(process, Tester) => Tester };
false -> recover_tester(Tester), Monitors
end
end, M, Testers),
%% TODO get the max job ID
{noreply, State#state{ monitors = Monitors }}.
There is one substantial problem with this implementation. If the load
balancer dies, all of the processes waiting for a response will get an
error, since they monitor the server while blocked in
gen_server:call/2
. Not much point in keeping all the requests alive
in the database if all the callers are just going to fail anyway. So
we need to do a call without a monitor. Well, the other option is to
use cast
instead and wait for a reply in a receive (direct messages
rather than using gen_server:reply/2
).
%% isaprime.erl
test(N) ->
gen_server:cast(?SERVER, {is_prime, N, self()}),
receive
IsPrime -> IsPrime;
after
5000 ->
error(timeout)
end.
%% Use this instead of gen_server:reply, this keeps the details
%% of the workaround contained within one module
respond(To, Response) ->
To ! Response.
However, there is a subtle problem with this. What happens when there
is a timeout? As I wrote it above, the process will just get an
unexpected message. This is kind of rude to our clients. We shouldn’t
force them to handle messages they don’t want anymore. There really
wasn’t a good answer for this in older versions of Erlang, but in OTP
24 we got aliases. Aliases let us create what is effectively temporary
PID that we can send to the load balancer instead. If we send a
message to an alias that is no longer valid, Erlang will just drop the
message. Usually aliases seem to be created by passing special options
to monitor
, but since we aren’t creating a monitor here we use the
basic functions instead.
%% isaprime.erl
test(N) ->
Alias = alias(),
gen_server:cast(?SERVER, {is_prime, N, Alias}),
receive
IsPrime ->
unalias(Alias),
IsPrime
after
5000 ->
unalias(Alias),
error(timeout)
end.
It’s basically that simple. No unwanted messages after the timeout, and the callers can tolerate a complete failure and restart of the load balancer. Pretty cool.
Falling back
This is probably the least satisfying part of the exercise, and I go about it in a hacky way that doesn’t use any of the tools for distribution provided by OTP. This is an interesting exercise because it reveals a little about the problem that I hadn’t really thought about before.
The basic idea is simple enough, the application will run on the node
primary
. Mnesia is configured to keep a copy of its tables in RAM on
both primary
and backup
. Finally we spawn a process on the
backup
node that monitors primary
and if is sees it die, starts
the sellaprime
application on the backup
node. This seems pretty
straightforward, but what if our monitoring process dies? Then we are
left unprotected. So we need to put it under a supervision tree. I
also want it to start at the same time as the application on the
primary node, which means we should start it from the
sellaprime_app:start/2
. The way I built this is by passing the names
of the primary and backup nodes as the start arguments. If the node we
are running on is the primary node, then it uses rpc
to start the
sellaprime application on the backup node. If it is the backup node
then we just start the sellaprime_supervisor
. The start arguments
are passed to the supervisor and used in the same way to determine
whether to start the full application or just the watchdog.
-module(sellaprime_supervisor).
init(Nodes) ->
%% ...
PrimaryNode = proplists:get_value(primary, Nodes),
BackupNode = proplists:get_value(backup, Nodes),
SupSpec = {one_for_one, 5, 10},
ChildSpecs = if node() =:= PrimaryNode ->
?SELLAPRIME_CHILDREN;
node() =:= BackupNode ->
[{watchdog,
{watchdog_server, start_link, [PrimaryNode]},
transient,
10000,
worker,
[watchdog_server]}]
end,
{ok, {SupSpec, ChildSpecs}}.
recover() ->
jobdb:reset(),
lists:foreach(
fun(ChildSpec) -> supervisor:start_child(?MODULE, ChildSpec) end,
?SELLAPRIME_CHILDREN).
The watchdog is a very simple gen_server
. It is a transient process,
so if it exits abnormally the supervisor will restart it, but if it
exits normally then it is not restarted. The watchdog just waits for a
{nodedown, Node}
message, and when it is received uses
sellaprime_supervisor:recover/0
to start the full sellaprime
supervision tree. Once the application is started it exits normally
since it has nothing else to do.
It occurs to me that I basically just wrote an event handler using a
gen_server
. It would probably be better create an event manager and register agen_event
event handler to monitor the primary node. I think I gotgen_server
tunnel vision.
The last thing that needs to happen is to clean up after the failed
node. My original intention was to have the new node pick up where the
old one left off, answering all the pending requests with the only
interruption being a few timeouts. This is where my hopes were crushed
though, because the processes waiting for answers are all dead along
with the primary node. So our application is fundamentally flawed, and
I don’t want to do the work of figuring out some kind of “out of band”
channel for making and replying to requests. Alternatively we could
use the global process registry so that processes can test primes from
a third node and stay alive when the primary node fails. Instead we’ll
just accept this problem and start the application with a clean slate,
ready to handle any new requests with no downtime. That just leaves us
with the task of recognizing when we are recovering from a primary
node failure and clearing the database tables. We already took care of
this in sellaprime_supervisor:recover/0
by calling jobdb:reset/0
.
In the future, if we implemented some mechanism where the callers
don’t die with the node, the changes to make the recovery work how I
originally hoped would not be substantial. Instead of resetting the
job database, we would start the load balancer like normal and let the
recovery mechanisms that are already in place for smaller failures do
what they are meant to do. There is just one subtle bug that would
need to be addressed: is_process_alive/1
can only be called on
local process IDs. When we restart the server on a new node and it
checks whether the testers are still up it will crash, and prevent the
application from starting at all. The simplest option would be to
catch the badarg
error and treat it the same as if the process is
dead. The other option is to clear the load
table before the load
balancer starts (from sellaprime_supervisor:recover/0
) and treat
that as an indicator that we are recovering from a major failure and
all the jobs need to be reassigned. I don’t really know what the best
option is out of these two, but I’m more inclined towards the latter
(probably because I’m afraid of error handling for some reason).
~wfv
26 June 2022
Albuquerque, NM