The Power of Elixir Task Module - Into Task.Supervisor

Table of Contents

Intro

The previous post was an intro about the task module and how it works in a more general way.

Now it’s time to make things a little bit complex with Task.Supervisor and see some elixir concepts.

Before going ahead let’s see some elixir base concepts:

Supervisor

A supervisor is a process that supervises other processes, which we refer to as child processes. Supervisors are used to building a hierarchical process structure called a supervision tree. Supervision trees provide fault-tolerance and encapsulate how our applications start and shutdown.

Processes

In Elixir, all code runs inside processes. Processes are isolated from each other, run concurrent to one another, and communicate via message passing.

Elixir’s processes should not be confused with operating system processes. Processes in Elixir are extremely lightweight in terms of memory and CPU (even compared to threads as used in many other programming languages). Because of this, it is not uncommon to have tens or even hundreds of thousands of processes running simultaneously.

We will use Task.Supervisor to create processes responsible for manage the related child.

Creating the playground

The first step is create a new elixir application with a supervision tree, more information about mix possibilities can be found in the official documentation.

mix new newsample --sup

Time to see what was created:

  lib
  lib/newsample
  lib/newsample.ex
  lib/newsample/application.ex
test
  test/newsample_test.exs
  test/test_helper.exs
README.md
mix.exs

For now we’ll just work with lib/newsample.ex and lib/newsample/application.ex.

The second step is to create the same function from the previous post inside lib/newsample.ex with minor changes to execute our tests easily.

To force an explosive scenario we have a pattern matching with “alpha” value.

  def say_hello("alpha" = to), do: raise("Error to say hello to #{to}")

  def say_hello(to) do
    IO.puts("Hello #{to}")
    :ok
  end

  def process() do
    items = ["alpha", "beta", "gama"]

    Enum.map(items, fn item ->
      Task.async(fn ->
        say_hello(item)
      end)
    end)
    |> Enum.map(&Task.await/1)
  end

And finally, execute it using elixir’s interactive shell.

iex -S mix

Then execute the code:

iex(1)> self
#PID<0.145.0>
iex(2)> Newsample.process
Hello beta
Hello gama

22:01:54.276 [error] Task #PID<0.148.0> started from #PID<0.145.0> terminating
** (RuntimeError) Error to say hello to alpha
    (newsample 0.1.0) lib/newsample.ex:15: anonymous fn/2 in Newsample.process/1
    (elixir 1.11.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir 1.11.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Function: #Function<1.13017213/0 in Newsample.process/1>
    Args: []
** (EXIT from #PID<0.145.0>) shell process exited with reason: an exception was raised:
    ** (RuntimeError) Error to say hello to alpha
        (newsample 0.1.0) lib/newsample.ex:15: anonymous fn/2 in Newsample.process/1
        (elixir 1.11.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
        (elixir 1.11.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
        (stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

Interactive Elixir (1.11.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> self
#PID<0.151.0>

As we can see the raised exception was propagated until the iex session. It’s possible to check it comparing the PID before #PID<0.145.0> and after #PID<0.151.0> run the code.

How we avoid this explosive error propagation?

Now we are in the step to use a supervised task, let’s open lib/newsample/application.ex.

  def start(_type, _args) do
    children = []

    opts = [strategy: :one_for_one, name: Newsample.Supervisor]
    Supervisor.start_link(children, opts)
  end

Let’s create our Task.Supervisor as a child processes

  children = [
    {Task.Supervisor, name: Newsample.TaskSupervisor}
  ]

And now use this supervised task in the process method and start as async_no_link. It means that there is no link between this child process and the parent.

  def process() do
    items = ["alpha", "beta", "gama"]

    Enum.map(items, fn item ->
      Task.Supervisor.async_nolink(Newsample.TaskSupervisor, fn ->
        say_hello(item)
      end)
    end)
    |> Enum.map(&Task.await/1)
  end

If we execute again everything will work correctly without this error propagation.

iex(1)> self
#PID<0.158.0>
iex(2)> Newsample.process
Hello beta
Hello gama

21:04:48.200 [error] Task #PID<0.161.0> started from #PID<0.158.0> terminating
** (RuntimeError) Error to say hello to alpha
    (newsample 0.1.0) lib/newsample.ex:6: Newsample.say_hello/1
    (elixir 1.11.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir 1.11.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Function: #Function<1.39266525/0 in Newsample.process/0>
    Args: []
** (exit) exited in: Task.await(%Task{owner: #PID<0.158.0>, pid: #PID<0.161.0>, ref: #Reference<0.1303494724.713818114.156158>}, 5000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) Error to say hello to alpha
            (newsample 0.1.0) lib/newsample.ex:6: Newsample.say_hello/1
            (elixir 1.11.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
            (elixir 1.11.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
            (stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
    (elixir 1.11.3) lib/task.ex:639: Task.await/2
    (elixir 1.11.3) lib/enum.ex:1411: Enum."-map/2-lists^map/1-0-"/2
iex(2)> self
#PID<0.158.0>

As we can see the PID is the same after and before the execution.

Processes and supervisors trees are important components of elixir to build resiliences and fault tolerance systems. Originally published on dev.to