Programming Erlang - sellaprime.

I’ve been slowly working through Joe Armstrong’s Programming Erlang book and I think it is worth writing down some of what I learned from the chapter 23 exercises. Chapter 23 introduces the basics of OTP applications, and the exercises provide an opportunity gradually build up a simple application (only gen_servers and supervisors) but one that incorporates mnesia and a rudimentary distributed fallback mechanism.

I looked around for other examples of solutions to these exercises and I only found one on github that is based on the ppool example presented in Learn You Some Erlang. Since I took a very different approach, I wanted to share my solution as well.

Overview

In the preceding chapter Joe introduces OTP applications by developing an application for the extremely lucrative business of selling prime numbers. The exercises gradually build up a new service for this business: testing whether numbers are prime.

A single server

We start by adding a single prime tester server to the sellaprime supervision tree. Simple enough:

-module(prime_tester_server).
-behaviour(gen_server).

-export([is_prime/1]).

-spec test(N :: pos_integer()) -> boolean().
is_prime(N) ->
    gen_server:call(?MODULE, {test, N}).

%% ...

More parallel

Step two is to create a pool of prime testers and distribute work between them. In the initial implementation there is a single server that maintains a single centralized work queue. Instead of starting a single prime_tester_server we start a server that spawns a pool of ten prime_tester_servers to do the testing. The state of the new server consists of a list of server PIDs and a queue of primes to test (along with where the reply should be sent).

-module(isaprime).

-behaviour(gen_server).

-export([test/1]).

-record(state, {queue = queue:new() :: queue:queue({pos_integer(), term()}),
                available_servers = [] :: [pid()])}).

In the init/1 callback we set up the pool of workers. The number of workers is passed in as an argument so we can easily change the size of the pool in the future.

init(PoolSize) ->
    {ok, #state{available_servers =
        [element(2, {ok, _Pid} = prime_tester_server:start_link())
         || _ <- lists:seq(1, PoolSize)]}}.

When calls are made to the isaprime server, the request is processed by one of the workers when it becomes available. In my implementation I handle two cases. When there is an idle worker available the request is forwarded to that server immediately, but if no workers are available then the request is added to the queue to be processed later.

handle_call({is_prime, N}, From, State = #state{ available_servers = [],
                                                  queue = Q}) ->
    {noreply, State#state{queue = queue:in({N, From}, Q)}};
handle_call({is_prime, N}, From, State = #state{ available_servers = [S|Servers] }) ->
    do_test_prime(S, From, N),
    {noreply, State#state{ available_servers = Servers }}.

The function do_test_prime spawns a new process that calls the tester server, sends the response to the original requester, and notifies isaprime that it is ready for more work.

do_test_prime(S, From, N) ->
    spawn_link(fun() ->
                       IsPrime = prime_tester_server:is_prime(S, N),
                       gen_server:reply(From, IsPrime),
                       ?SERVER ! {ready, S}
               end).

Finally, the isaprime server needs to distribute new work to testers in response to the ready message. If no work is available then the worker is places back in the ready list to wait for additional requests.

handle_info({ready, S}, State = #state{ available_servers = Servers, queue = Q }) ->
    case queue:out(Q) of
        {empty, _} ->
            {noreply, State#state{ available_servers = [S|Servers]}};
        {{value, {N, From}}, NewQ} ->
            do_test_prime(S, From, N),
            {noreply, State#state{ queue = NewQ }}
    end.

That’s pretty much it. It gets the job done, but it also leaves a lot to be desired. If any of the tester servers dies, then all the work queue and all the other testers are also killed. Unfortunately that means that all the pending requests are lost, even if they could have been completed successfully. We will fix both of these problems later, but first we address a smaller issue of performance.

Removing the bottleneck

The first improvement we make will be to remove the queue from the isaprime and let each prime_tester_server manage its own queue, relegating isaprime to load balancing. This will remove the bottleneck in the previous version of isaprime that resulted from every tester needing to request new work.

Turning isaprime into a load balancer

The first change is to remove the queue from the isaprime state. At the same time we will replace the list of available testers with information about the load on each server.

%% isaprime.erl

-record(state, {tester_load = []}).

init(PoolSize) ->
    Testers = [prime_tester_server:start_link() || _ <- lists:seq(1, PoolSize)],
    {ok, #state{tester_load = [{Pid, 0} || {ok, Pid} <- Testers]}}.

Now when we get a request to test a number we identify the server with the lowest load, and forward the request to that server. Finding the lowest load is simple because we keep the load information sorted.

handle_call({is_prime, N}, From, State=#state{ tester_load = Load }) ->
    Tester = lowest_load(Load),
    prime_tester_server:is_prime(Tester, N, From),
    {noreply, State#state{ tester_load = increase_load(Tester, Load) }}.

update_load(Tester, Load, Increment) ->
    {value, {_, L}, Load2} = lists:keytake(Tester, 1, Load),
    lists:keysort(2, [{Tester, L + Increment}|Load2]).

increase_load(Tester, Load) ->
    update_load(Tester, Load, 1).

decrease_load(Tester, Load) ->
    update_load(Tester, Load, -1).

lowest_load([{T, _}|_]) -> T.

One particularly notable change above is that the prime_tester_server API has changed. I added and argument to the is_prime function, and changed it from a call to a cast`. In this way the request is entirely handed off to the tester server, and the load balancer doesn’t need to keep any information about it other than noting the increased load on the tester.

This is all we need to do to handle incoming requests, but it isn’t quite right. We still need to know when the tester has finished a test and reduced its load. For that we add the work_done/0 function, which causes the load balancer to reduce the load of the tester that called it.

handle_cast({work_done, Tester}, #state{tester_load=Load}) ->
    {noreply, #state{tester_load=reduce_load(Tester, Load)}}.

Giving each tester its own queue

As I mentioned above, the first thing we need to do is change the is_prime function from a call to the server into a function that hands off the work via a cast. There is a little more we need to do though, since I want to keep the server available while requests are being processed. To do this we basically move the same structure we had with the original worker pool into the tester server. It spawns a process that will do the actual prime tests so the server itself never blocks. Here’s what it looks like.

-module(prime_tester_server).

-export([is_prime/3]).

-record(state, {worker :: pid()}).

-spec is_prime(S :: pid(), N :: pos_integer(), From :: term()) -> ok.
is_prime(S, N, From) ->
    gen_server:cast(S, {is_prime, {N, From}}).

init([]) ->
    Self = self(),
    {ok, #state{ worker = spawn_link(fun() -> worker(Self) end)}}.

The worker is a little tricky. It calls the server to request work, and if there is no work available, it waits for the server to notify it that there is work to do.

handle_call(get_work, _From, State=#state{queue=Q}) ->
    case queue:peek(Q) of
        empty ->
            {reply, no_work, State};
        {value, V} ->
            {reply, {work, V}, State}
    end.

handle_cast(work_done, State=#state{queue=Q}) ->
    {noreply, State#state{queue=queue:drop(Q)}};
handle_cast({is_prime, Request}, State=#state{queue=Q, worker=W}) ->
    case queue:is_empty(Q) of
        true -> alert_worker(W);
        false -> ok
    end,
    {noreply, State#state{queue=queue:in(Request, Q)}}.

get_work(Server) ->
    gen_server:call(Server, get_work).

work_done(Server) ->
    gen_server:cast(Server, work_done),
    isaprime:work_done(Server).

alert_worker(W) -> W ! work.

worker(Server) ->
    receive
        work ->
            do_work(Server),
            worker(Server);
        stop ->
            ok
    end.

do_work(Server) ->
    case get_work(Server) of
        {work, {N, From}} ->
            IsPrime = lib_primes:is_prime(N),
            gen_server:reply(From, IsPrime),
            work_done(Server),
            do_work(Server);
        no_work ->
            waiting
    end.

What if the tester dies?

The implementation above still suffers from the problem that if one tester dies, then all of them die. Furthermore, because the pending requests are kept in a queue in the server state if a tester dies we will lose all the pending requests that had been sent to it, even once we add the testers to a supervision tree. To address this I changed the data in the queue into an index used to look up the full request in an ETS table owned by the load balancer.

Using an ETS table to store the work

I’ll forgo dumping all the code onto the page here, but in general the changes required are relatively minor. The isaprime server creates a single ETS table that holds tuples of the form {JobId, Number, From, Tester}. When work is assigned to the tester, just the job ID is sent to it. When it services a request it looks up the job id in the table, services the request, and deletes the request from the table. I also maintain the load on each tester in the same table which allows each tester to decrement its own load whenever it finishes a job (it may have been better to use a separate table).

To try to manage the complexity of shared state (and set myself up for the changes in the next part of the exercise) I implemented all interactions with the table through functions in the isaprime module.

-module(isaprime).

-export([work_done/2, get_job/2]).

work_done(WorkTable, JobId) ->
    %% This could be a problem since it is not atomic, if the server
    %% is killed externally then the load could be inconsistent.
    ets:delete(WorkTable, JobId),
    decrease_load(self(), WorkTable).

get_job(WorkTable, JobId) ->
    [{JobId, N, From, _}] = ets:lookup(WorkTable, JobId),
    {N, From}.

decrease_load(Tester, Work) ->
    ets:update_counter(Work, Tester, -1).

increase_load(Tester, Work) ->
    ets:update_counter(Work, Tester, 1).

Assigning the job to a tester now looks like this.

handle_call({is_prime, N}, From, State=#state{ work = Work, next_job = JobId }) ->
    assign_work(JobId, N, From, Work),
    {noreply, State#state{next_job = JobId + 1}}.

assign_work(JobId, N, From, Work) ->
    Tester = lowest_load(Work),
    ets:insert(Work, {JobId, N, From, Tester}),
    prime_tester_server:is_prime(Tester, JobId),
    increase_load(Tester, Work).

On the tester server side we need to pass the table ID to the start_link function so each tester can access the table directly. Other than that the tester remains largely the same, we just hold job IDs in the queue now, instead of the actual job data.

Keeping the records of the pending jobs in an ETS table goes a long way towards handling failures. When a tester fails, its pending jobs are still safe in memory and can be redistributed to other workers. To make that happen we need to add the testers to a supervision tree.

Making it resilient

We will build a supervision tree that includes a dedicated supervisor for the prime tester servers. When the supervised testers start they register themselves with the load balancer. To make this work, I take advantage of the order supervisors start their children to ensure that the load balancer is up before the testers start.

i s a p p r r i i m m e e t p e r s i t m _ e s _ u t p e s p t r e i r m _ s e s e _ e l p t r l r e v a i s e p m t r r e e i _ r m s _ e e s _ r u s v p u e p r a r e a _ s e r v e r

When a tester registers itself an entry representing its load is added to the ETS table, and the load balancer monitors it. The monitor is used so the load balancer can detect a failure and reassign the work that the tester was responsible for.

handle_info({'DOWN', Ref, process, _, _},
            State = #state{ monitors = Monitors, work = Work}) ->
    {Tester, NewMonitors} = maps:take(Ref, Monitors),
    %% Distribute the work to other workers
    redistribute_work(Tester, Work),
    {noreply, State#state{monitors = NewMonitors}}.

redistribute_work(Tester, Work) ->
    ets:delete(Work, Tester),
    WorkToReassign = ets:select(Work, [{{'$1', '$2', '$3', '$4'},
                                        [{'=:=', Tester, '$4'}],
                                        [{{'$1', '$2', '$3'}}]}]),
    %% If there is a lot of pending work in the failed worker's
    %% queue this could result in a bad imbalance since all of
    %% work is reassigned before the restarted tester can
    %% register itself. We will fix this problem in the next
    %% part.
    lists:foreach(fun({JobId, N, From}) ->
                          assign_work(JobId, N, From, Work)
                  end, WorkToReassign).

Replication

Now we need to replicate the job data across two nodes. This part gets a little contrived. Since the servers are registered locally any calls to the prime tester are coming from local processes, so all the requests die with the node. We’ll pretend this isn’t the case, which is fair since the architecture of the application could apply to a problem that doesn’t suffer from this flaw.

We’ll use mnesia to do the data replication. It’s not a big jump from ETS. The API in isaprime will remain basically unchanged but we will build a new module to handle all the database interaction. We’ll also take this opportunity (because we have to) to fix my ugly design that combined the job data and the load data in one table.

The new module is called jobdb. It exposes pretty much the same API as the functions we used to update the ETS table along with a few other utility functions. In isaprime the ETS functions are replaces with calls to jobdb. The interesting changes are to the code that handles tester failures and, since the database persists even if the load balancer fails, handles recovery after the load balancer is restarted.

-module(jobdb).

-record(job, {id, n, from, tester}).
-record(load, {tester, jobs}).

initialize(Nodes) ->
    ok = mnesia:create_schema(Nodes),
    rpc:multicall(Nodes, application, start, [mnesia]),
    mnesia:create_table(job, [{attributes, record_info(fields, job)},
                              {ram_copies, Nodes}]),
    mnesia:create_table(load, [{attributes, record_info(fields, load)},
                               {ram_copies, Nodes}]),
    rpc:multicall(Nodes, application, stop, [mnesia]).

start() ->
    mnesia:wait_for_tables([job, load], 10000),
    NumWorkers = mnesia:table_info(load, size),
    if
        NumWorkers =:= 0 ->
            ok;
        NumWorkers > 0 ->
            {error, already_up}
    end.

When a tester fails we remove its monitor and attempt to redistribute its work. Unlike the ETS version, the redistribution is done one job at a time. I accomplish this by sending messages to the load balancer to do the redistribution. This allows other messages (like the registration of a restarted tester) to come through and be processed, reducing the potential load imbalance.

%% isaprime.erl

handle_info({reassign, JobId, N, From}, State) ->
    assign_job(JobId, N, From),
    {noreply, State};
handle_info({recover_tester, Tester, Attempt}, State=#state{ monitors = #{} })
  when Attempt =< 3 ->
    %% If no testers have been registered. Wait and try again.
    timer:send_after(10 * Attempt, {recover_tester, Tester, Attempt + 1}),
    {noreply, State};
handle_info({recover_tester, Tester, _}, State) ->
    redistribute_jobs(Tester),
    {noreply, State};
handle_info({'DOWN', Ref, process, _, _},
            State = #state{ monitors = Monitors, work = Work}) ->
            State = #state{ monitors = Monitors}) ->
    {Tester, NewMonitors} = maps:take(Ref, Monitors),
    %% Distribute the work to other workers
    redistribute_work(Tester, Work),
    redistribute_jobs(Tester),
    {noreply, State#state{monitors = NewMonitors}}.

redistribute_jobs(Tester) ->
    jobdb:delete_load(Tester),
    Jobs = jobdb:jobs(Tester),
    lists:foreach(
      fun([JobId, N, From]) ->
              %% reassign asynchronously, one at a time, to allow for
              %% a new tester (or testers) to come up. Otherwise we
              %% could end up with a huge load imbalance.
              self() ! {reassign, JobId, N, From}
      end,
      Jobs).

assign_job(JobId, N, From) ->
    Tester = jobdb:assign_job(JobId, N, From),
    prime_tester_server:is_prime(Tester, JobId).

The handle_info implementation above includes some clauses to support recovering from a load balancer failure. The main idea of recovering from a load balancer failure is to check whether each tester is still alive and re-establish monitoring if it is. If a tester is dead, then it’s work is redistributed. Since it is possible that all the testers have died (e.g. if the whole supervision tree was restarted) there is a special clause to detect this and retry later. The jobdb:start/0 function detects whether there is already data in the tables and this information is used by the load balancer to know that it needs to recover from a failure (if it was starting normally the load table would be empty). To make sure that the testers are able to start if the whole supervision tree has been restarted, the recovery doesn’t actually take place in init/1. Instead we use continue to let init return immediately and allow the supervisor to start the testers.

%% isaprime.erl

init([]) ->
    %% Set up a table of work
    T = ets:new(work, [set, public, {write_concurrency, true}]),  % only write concurrency since reads and writes are interleaved
    {ok, #state{work = T}}.
    case jobdb:start() of
        ok -> {ok, #state{}};
        {error, already_up} -> {ok, #state{}, {continue, recover}}
    end.

handle_continue(recover, State = #state{ monitors = M }) ->
    Testers = jobdb:testers(),
    Monitors = lists:foldl(
                 fun(Tester, Monitors) ->
                         case is_process_alive(Tester) of
                             true -> Monitors#{ erlang:monitor(process, Tester) => Tester };
                             false -> recover_tester(Tester), Monitors
                         end
                 end, M, Testers),
    %% TODO get the max job ID
    {noreply, State#state{ monitors = Monitors }}.

There is one substantial problem with this implementation. If the load balancer dies, all of the processes waiting for a response will get an error, since they monitor the server while blocked in gen_server:call/2. Not much point in keeping all the requests alive in the database if all the callers are just going to fail anyway. So we need to do a call without a monitor. Well, the other option is to use cast instead and wait for a reply in a receive (direct messages rather than using gen_server:reply/2).

%% isaprime.erl

test(N) ->
    gen_server:cast(?SERVER, {is_prime, N, self()}),
    receive
        IsPrime -> IsPrime;
    after
        5000 ->
            error(timeout)
    end.

%% Use this instead of gen_server:reply, this keeps the details
%% of the workaround contained within one module
respond(To, Response) ->
    To ! Response.

However, there is a subtle problem with this. What happens when there is a timeout? As I wrote it above, the process will just get an unexpected message. This is kind of rude to our clients. We shouldn’t force them to handle messages they don’t want anymore. There really wasn’t a good answer for this in older versions of Erlang, but in OTP 24 we got aliases. Aliases let us create what is effectively temporary PID that we can send to the load balancer instead. If we send a message to an alias that is no longer valid, Erlang will just drop the message. Usually aliases seem to be created by passing special options to monitor, but since we aren’t creating a monitor here we use the basic functions instead.

%% isaprime.erl

test(N) ->
    Alias = alias(),
    gen_server:cast(?SERVER, {is_prime, N, Alias}),
    receive
        IsPrime ->
            unalias(Alias),
            IsPrime
    after
        5000 ->
            unalias(Alias),
            error(timeout)
    end.

It’s basically that simple. No unwanted messages after the timeout, and the callers can tolerate a complete failure and restart of the load balancer. Pretty cool.

Falling back

This is probably the least satisfying part of the exercise, and I go about it in a hacky way that doesn’t use any of the tools for distribution provided by OTP. This is an interesting exercise because it reveals a little about the problem that I hadn’t really thought about before.

The basic idea is simple enough, the application will run on the node primary. Mnesia is configured to keep a copy of its tables in RAM on both primary and backup. Finally we spawn a process on the backup node that monitors primary and if is sees it die, starts the sellaprime application on the backup node. This seems pretty straightforward, but what if our monitoring process dies? Then we are left unprotected. So we need to put it under a supervision tree. I also want it to start at the same time as the application on the primary node, which means we should start it from the sellaprime_app:start/2. The way I built this is by passing the names of the primary and backup nodes as the start arguments. If the node we are running on is the primary node, then it uses rpc to start the sellaprime application on the backup node. If it is the backup node then we just start the sellaprime_supervisor. The start arguments are passed to the supervisor and used in the same way to determine whether to start the full application or just the watchdog.

-module(sellaprime_supervisor).

init(Nodes) ->
    %% ...
    PrimaryNode = proplists:get_value(primary, Nodes),
    BackupNode = proplists:get_value(backup, Nodes),
    SupSpec = {one_for_one, 5, 10},
    ChildSpecs = if node() =:= PrimaryNode ->
                         ?SELLAPRIME_CHILDREN;
                    node() =:= BackupNode ->
                         [{watchdog,
                           {watchdog_server, start_link, [PrimaryNode]},
                            transient,
                            10000,
                            worker,
                            [watchdog_server]}]
                 end,
    {ok, {SupSpec, ChildSpecs}}.

recover() ->
    jobdb:reset(),
    lists:foreach(
      fun(ChildSpec) -> supervisor:start_child(?MODULE, ChildSpec) end,
      ?SELLAPRIME_CHILDREN).

The watchdog is a very simple gen_server. It is a transient process, so if it exits abnormally the supervisor will restart it, but if it exits normally then it is not restarted. The watchdog just waits for a {nodedown, Node} message, and when it is received uses sellaprime_supervisor:recover/0 to start the full sellaprime supervision tree. Once the application is started it exits normally since it has nothing else to do.

It occurs to me that I basically just wrote an event handler using a gen_server. It would probably be better create an event manager and register a gen_event event handler to monitor the primary node. I think I got gen_server tunnel vision.

The last thing that needs to happen is to clean up after the failed node. My original intention was to have the new node pick up where the old one left off, answering all the pending requests with the only interruption being a few timeouts. This is where my hopes were crushed though, because the processes waiting for answers are all dead along with the primary node. So our application is fundamentally flawed, and I don’t want to do the work of figuring out some kind of “out of band” channel for making and replying to requests. Alternatively we could use the global process registry so that processes can test primes from a third node and stay alive when the primary node fails. Instead we’ll just accept this problem and start the application with a clean slate, ready to handle any new requests with no downtime. That just leaves us with the task of recognizing when we are recovering from a primary node failure and clearing the database tables. We already took care of this in sellaprime_supervisor:recover/0 by calling jobdb:reset/0.

In the future, if we implemented some mechanism where the callers don’t die with the node, the changes to make the recovery work how I originally hoped would not be substantial. Instead of resetting the job database, we would start the load balancer like normal and let the recovery mechanisms that are already in place for smaller failures do what they are meant to do. There is just one subtle bug that would need to be addressed: is_process_alive/1 can only be called on local process IDs. When we restart the server on a new node and it checks whether the testers are still up it will crash, and prevent the application from starting at all. The simplest option would be to catch the badarg error and treat it the same as if the process is dead. The other option is to clear the load table before the load balancer starts (from sellaprime_supervisor:recover/0) and treat that as an indicator that we are recovering from a major failure and all the jobs need to be reassigned. I don’t really know what the best option is out of these two, but I’m more inclined towards the latter (probably because I’m afraid of error handling for some reason).

~wfv

26 June 2022

Albuquerque, NM