Totally Ordered Multicast.

Suppose a group of processes broadcasting messages to each other need to agree on the order in which messages are delivered. This problem is called totally ordered multicasting. If we can guarantee that all messages from a single process are delivered in order and that all messages are delivered this problem has a simple solution using Lamport clocks.

Lamport clocks

Before looking at the algorithm for totally ordered multicast, we need to understand Lamport clocks. A Lamport clock is just a positive integer maintained by each process. The value of the clock is influenced by the messages that are sent and received by the process. Effectively the clock is an indicator of the events that the process is aware of or may have been influenced by. A clock is updated in two circumstances

  1. Before a process sends a message its clock is incremented by 1.
  2. When a process receives a message with timestamp p its clock is updated to the maximum of p and its current clock value.


The algorithm for totally ordered multicast is fairly simple. A process P sends a multicast message m to all other processes. Upon receiving the message at process Q it is not immediately delivered to the application. Instead Q multicasts an acknowledgment to all other processes. (Note that even though P sent the message it also implicitly receives the multicast and responds by sending an acknowledgment.) The message is only delivered to the application once Q receives an acknowledgment with a timestamp greater than the timestamp of m from every process. Implemented in Erlang the algorithm looks like this.



-export([start_link/0, send/2, recv/1]).
-export([init/1, handle_cast/2, handle_call/3]).

-type timestamped_message() :: {{non_neg_integer(), pid()}, any()}.

-record(state, {clock = 0 :: non_neg_integer(),
                queue = [] :: [timestamped_message()],
                procs = [] :: [pid()]}).

start_link() ->
    gen_server:start_link(?MODULE, [], []).

send(Server, Message) ->
    gen_server:cast(Server, {send, Message}).

recv(Server) ->
    gen_server:call(Server, recv).

init([]) ->
    {ok, #state{clock = 0, queue = [], procs = []}}.

handle_cast({send, Message}, State) ->
    Clock = State#state.clock + 1,
      fun (P) ->
              gen_server:cast(P, {message, {Clock, self()}, Message})
      end, [self() | State#state.procs]),
    {noreply, State#state{clock = Clock}};
handle_cast({message, {C, _} = Timestamp, Message}, State) ->
    Clock = max(C, State#state.clock) + 1,
      fun (P) ->
              gen_server:cast(P, {ack, {Clock, self()}})
      end, State#state.procs),
    Q = [{Timestamp, {message, Message}} | State#state.queue],
    {noreply, State#state{clock = Clock, queue = Q}};
handle_cast({ack, Clk}, State) ->
    {noreply, State#state{queue = [{Clk, ack} | State#state.queue]}}.

handle_call({set_group, Procs}, _From, State) ->
    {reply, ok, State#state{procs = Procs -- [self()]}};
handle_call(recv, _From, #state{queue = []} = State) ->
    {reply, false, State};
handle_call(recv, _From, State) ->
    [{{Clk, _}, {message, Message}}| Q] = lists:keysort(1, State#state.queue),
    AckProcs = [P || {{C, P}, ack} <- Q, C > Clk],
    case State#state.procs -- AckProcs of
        [] ->
            {reply, {message, Message},
               queue = lists:dropwhile(fun ({_, M}) -> M =:= ack end, Q)}};
        _ ->
            {reply, false, State}

When a message or an acknowledgment is received it is stored in a list along with its timestamp. Only once the list contains an acknowledgment from every other process with a timestamp greater than the timestamp of the message is the message and its associated acknowledgments removed from the list. The message with the lowest timestamp is always delivered first, with any ties broken by the PID of the sender.

why does it work?

Messages are delivered in the order given by their timestamps, so a message M with timestamp t + k will not be delivered before a message O with timestamp t. This ordering is guaranteed because there must exist an acknowledgment for message O from some process with a timestamp between t and t + k that has to be sent before any acknowledgment to message M is sent. Because all messages are held until an acknowledgment with a greater timestamp is received and because messages from the same process arrive in order the acknowledgment for message O must arrive before the one for M. Thus it is impossible for all acknowledgments to M to arrive before all acknowledgments to O; therefore O will be acknowledged first and delivered before M.

Multi cast photo by Austin Neill