Envío → PubSub inside OTP
OTP stands for Open Telecom Platform. It is a historical quirk; Ericsson was a culpritin initiating the development of fault-tolerant software platform, hence Telecom. It turned out, OTP as we know it has nothing to do with Telecom. Nowadays the title has the same connotations with its functionality, as apples are related to medium quality phones.
The main distinguishing feature of the OTP according to the authors, is fault tolerance. Not multithreading, not the actor model, not the rich pattern matching, even not transparent clustering and not hot code upgrades. Fault tolerance.
The virtual machine of erlang on the surface is very simple: there are a bunch of “processes” (not system processes, erlang processes) with isolated memory, that can exchange messages. That’s it. As Joe Armstrong had written once:
In my blog I argued that processes should behave pretty much like people. People have private memories and exchange data by message passing.
— Why I don’t like shared memory
The exchange of messages within OTP is deadly simple. One process sends a message to another process (or to a group of other processes,) synchronously, or asynchronously. Note, that it is necessary to know who is to receive these messages. That is, the manager of the exchange is the sender. But what if we wanted to send broadcast and to provide an opportunity to all interested processes to subscribe to this message?
Yes, we are naturally coming to a generic PubSub, but it is not available in OTP out of the box. Luckily enough, all the bricks to build our own implementation are there. Let’s get started.
Embodiments
Elixir includes a module Registry
, which can be used as a scaffold для pubsub. We are only to provide several lines of glue code, and the neat care for all workers. The only problem is Registry
is local and therefore it doesn’t support clustering. That is, in a distributed environment this beauty would not work.
OK, there is a distributed implementation Phoenix.PubSub
, which comes with two implementations: Phoenix.PubSub.PG2
and Phoenix.PubSub.Redis
. Well, Redis
is clearly the alien link in our chain; but PG2
built on top of the erlang process groups pg2
it our saviour. It still requires a significant amount of code as a boilerplate in each project, though.
That said, we have everything to establish a convenient PubSub
subscriptions in our app. Is it a time to open text editor? Not so fast. I personally don’t like to copy-paste code from one project to another. Instead everything that could be separated into kinda library gets extracted for reuse.
Envío
Thus was born the package Envío. The talk is cheap, so I would start with showing the code.
Local broadcat → Registry
defmodule MyApp.Sub do
use Envio.Subscriber, channels: [{MyApp.Pub, :main}]
def handle_envio(message, state) do
# optionally call the default implementation
{:noreply, state} = super(message, state)
# handle it!
IO.inspect({message, state}, label: "Received")
# respond with `{:noreply, state}` as by contract
{:noreply, state}
end
end
This is all the code needed to produce fully functional subscriber. Just add MyApp.Sub
to the supervision tree, and this process will begin receiving all the messages sent using functions in MyApp.Pub
(which is in turn not too overwhelmed with the code as well.)
defmodule MyApp.Pub do
use Envio.Publisher, channel: :main
def publish(channel, what), do: broadcast(channel, what)
def publish(what), do: broadcast(what) # send to :main
end
Distributed newsletter → PG2
For distributed systems consisting of many nodes, the approach above would not work. We need to be able to subscribe to messages from other nodes, and Registry
is not of any help here. Although we have PG2
implementing the same behaviour that would serve all we need.
defmodule MyApp.Pg2Sub do
use Envio.Subscriber, channels: ["main"], manager: :phoenix_pub_sub
def handle_envio(message, state) do
{:noreply, state} = super(message, state)
IO.inspect({message, state}, label: "Received")
{:noreply, state}
end
end
The only difference from the standalone code above would be manager: :phoenix_pub_sub
parameter that we pass to use Envio.Subscriber
call (and use Envio.Publisher
as well.) That would produce a module, backed by :pg2
instead of local Registry
. Now messages sent through this Publisher
will be available on all nodes in the cluster.
Application
Envío supports the so-called backends. The package includes Envio.Slack
that allows to utterly simplify sending messages to Slack
. All that is required from the application to transparently start sending messages to Slack
would be a channel configuration in the file config/prod.exs
. Broadcast the message to the channel specified, Envío will take care about the rest. Here is an example configuration:
config :envio, :backends, %{
Envio.Slack => %{
{MyApp.Pub, :slack} => [
hook_url: {:system, "SLACK_ENVIO_HOOK_URL"}
]
}
}
Now all messages broadcasted with MyApp.Pub.publish(:slack, %{foo: :bar})
will be delivered to the appropriate channel in Slack
, nicely formatted. In order to stop sending messages to Slack
, it’s enough to stop the process Envio.Slack
. More examples (e.g. log in IO
) can be found in the tests.
Well, why would you continue reading this? Simply try it yourself.
def deps do
[
{:envio, "~> 0.8"}
]
end
Happy broadcasting!