Realtime Market-Data Updates with Elixir

We define then a new function handle_msg/2 .def handle_frame({:text, msg}, state) do handle_msg(Poison.decode!(msg), state)enddef handle_msg(%{"type" => "match"}=trade, state) do IO.inspect(trade) {:ok, state}enddef handle_msg(_, state), do: {:ok, state}The first handle_msg/2 function imposes that the "type" => "match" and is called only when this condition is true. The second handle_msg(_,state) works as a catch all, which ignore the messages returning {:ok, state}.This works like a filter and it’s exactly the same as using casedef handle_msg(msg, state) do case msg do %{"type" => "match"}=trade -> IO.inspect(trade) _ -> :ignore end {:ok, state}endLet’s go back to iex and see now what happens just starting our Coinbase Clientiex> Coinbase.Client.start_link ["BTC-USD"]connected!{:ok, #PID<0.184.0>}%{ "maker_order_id" => "9050ea18-440f-442e-9129-358d77351685", "price" => "3531.42000000", "product_id" => "BTC-USD", "sequence" => 7584073701, "side" => "sell", "size" => "0.04000084", "taker_order_id" => "e5df8af2-20d0-4592-af98-e41bbb2ed8a9", "time" => "2018-12-18T15:20:01.237000Z", "trade_id" => 56140459, "type" => "match"}Great, it works!You can find the full code at this link: client.exSupervisionWe can monitor disconnections implementing the handle_disconnect(conn, state) callbackdef handle_disconnect(_conn, state) do IO.puts "disconnected" {:reconnect, state}endReturning {:reconnect, state} we ask to WebSockex to reconnect using the same process..Unfortunately this solution isn’t the best in our case because we have the subscription process inside the Coinbase.Client.start_link function, which is not called when the connection is restarted. We opt then to let the process exit, returning :ok instead of :reconnect.def handle_disconnect(_conn, state) do IO.puts "disconnected" {:ok, state}endIt’s easy to test a disconnection..Just switching off the wifi (or disconnecting the ethernet cable), inducing the client to fire a timeout error.iex> Coinbase.Client.start_link ["BTC-USD"]…%{"type" => "match", …}%{"type" => "match", …}…# switch off the connection and wait15:33:51.839 [error] [83, 83, 76, 58, 32, 83, 111, 99, 107, 101, 116, 32, 101, 114, 114, 111, 114, 58, 32, 'etimedout', 32, 10]disconnected** (EXIT from #PID<0.182.0>) shell process exited with reason: {:remote, :closed}iex>Since we’ve used the WebSockex.start_link function, the websockex process was linked to our iex process which is taken down after the error.To cope with client’s disconnections and crashes, we need to add some basic supervision..When we created the project, we’ve passed the –sup option to the mix command..This created a supervision skeleton we can now use. Let’s open the application.ex file and add the processes we want to monitor.defmodule Coinbase.Application do use Application def start(_type, _args) do children = [ {Coinbase.Client, ["BTC-USD"]} ] opts = [strategy: :one_for_one, name: Coinbase.Supervisor] Supervisor.start_link(children, opts) endendAt line 6, we’ve added the {Coinbase.Client, products}tuple..The Supervisor will then start, running and monitoring our process, started with Coinbase.Client.start_link ["BTC-USD"]..In the case of a disconnection (or a crash) it will start a new client.Running the Application$ mix run –no-haltconnected!%{ …."price" => "3521.20000000", …."type" => "match"}In both cases the application starts, running the Coinbase.Supervisor which, in turn, starts and monitors the Coinbase.Client.What happens if we kill the Coinbase.Client process?.We can get the pid from the list of processes monitored by the Coinbase.Supervisor and kill it using the Process.exit(pid, reason) function.[{Coinbase.Client, pid, _, _}]= Supervisor.which_children(Coinbase.Supervisor)Process.exit(pid, :kill)Let’s see it in action$ iex -S mix…connected!…%{ …, "price" => "3522.85000000", "type" => "match"}…iex> [{Coinbase.Client, pid, _, _}]= Supervisor.which_children(Coinbase.Supervisor)Process.exit(pid, :kill)trueiex(6)> connected!%{ …, "price" => "3535.00000000", "type" => "match"}We see that after we kill the client process, the supervisor starts a new client that connects immediately to the Coinbase server.Wrap UpThis is initial and simple implementation of a supervised Coinbase client we can use to start processing the trades’ stream. We’ve implemented a basic supervision, which is great if you don’t mind to loose some trades after the disconnections, but is not enough if you want a proper fault-tolerant client, that’s able to recover lost data..After a disconnection occurs -and before the client is reconnected and ready to receive trades again- there could have been other trades..In that case we would have lost them.A solution, to properly recover the lost trades, is to check the id of the last trade and then download the lost trades using a different API, like the Get Trades in Coinbase.Originally published at www.poeticoding.com.. More details

Leave a Reply