ProductPromotion
Logo

Elixir

made by https://0x3d.site

GitHub - Nebo15/gen_task: Generic Task behavior that helps encapsulate errors and recover from them in classic GenStage workers.
Generic Task behavior that helps encapsulate errors and recover from them in classic GenStage workers. - Nebo15/gen_task
Visit Site

GitHub - Nebo15/gen_task: Generic Task behavior that helps encapsulate errors and recover from them in classic GenStage workers.

GitHub - Nebo15/gen_task: Generic Task behavior that helps encapsulate errors and recover from them in classic GenStage workers.

GenTask

Deps Status Hex.pm Downloads Latest Version License Build Status Coverage Status

Generic Task behavior that helps to encapsulate worker errors and recover from them in classic GenStage's.

Motivation

Whenever you use RabbitMQ or similar tool for background job processing you may want to leverage acknowledgments mechanism.

For example, we spawn a supervisioned GenServer worker each time we receive a job from RabbitMQ. Job payload comes with a tag that should be used to send acknowledgment (ack) or negative acknowledgment (nack) when it is finished. All nack'ed jobs will be re-scheduled and retried (to reach "at-least once" job processing). Also, RabbitMQ remembers which tasks was sent to a connection and will nack all unacknowledged tasks when connection dies. After task is processed we send ack or nack (depending on business logic) and exit with a normal reason. (This our supervisor restart strategy is :transient).

Jobs intensity is limited by prefetch_count option that limits maximum amount of unacknowledged jobs that may be processed on a single node at a single moment in time.

But in real life jobs can have bugs or other errors because of third-party services unavailability, in this case GenServer will die. Of course Supervisor will try to restart it, but in most cases of third-party outages it will reach max restart intensity within seconds and die taking all active jobs with itself.

Supervisor gets restarted, but it won't receive receive any jobs resulting in a zombie background processing node. This happens because connection is not linked to a individual jobs or their supervisors, and will stay alive after supervisor restart, so RabbitMQ will think that node "is working on all jobs at max capacity" (because of prefetch_count) and will not send any additional jobs to it. Additionally we will loose all tags and won't be able to nack died processes within node.

Possible solutions

  1. Leverage GenServer terminate/2 callback.

This option is not safe by-default, because process that doesn't trap exits will not call this callback when supervisor is sending exit signal to it (due to supervisor restart).

  1. Linking RabbitMQ client lib channel/connection processes to a workers.

May be a bad solution because all jobs will be re-scheduled whenever a single job fails, resulting in a many duplicate-processed jobs.

  1. Store tags in a separate process which monitors supervisor and it's workers.

  2. Keep storing tags and job payload within GenStage state, but wrap any unsafe code in a Task. [1]

Internally this looks familiar to pt. 2, but doesn't require us to re-invent supervisor behavior.

Picked solution description

Tasks is started under Task.Supervisor (via async_nolink/2) inside GenTask application. They are not linked to a caller process which allows to persist state without risking that caller pid will be terminated along with task itself, so further processing is possible when error occurs. Task result is received via Task.yield/1 function.

Tasks are started with a :temporary restart strategy (never restart), to protect supervisor from exits.

The package itself provides two ways to handle asynchronous jobs:

  1. Runner functions GenTask.start_task/1 and GenTask.start_task/3 to start function under unlinked supervisioned process and yield for task result.

  2. GenTask behaviour that is based on GenServer that will start task (from run/1 callback) processing after it's start and deliver status into handle_result/3 callbacks.

Installation and usage

It's available in Hex, the package can be installed as:

  1. Add gen_task to your list of dependencies in mix.exs:
def deps do
  [{:gen_task, "~> 0.1.4"}]
end
  1. Ensure gen_task is started before your application:
def application do
  [applications: [:gen_task]]
end
  1. Define your business logic and result handling:
defmodule MyWorker do
  use GenTask
  require Logger

  # Define business logic
  def run(%{payload: _payload, tag: tag}) do
    # Simulated errors
    if :rand.uniform(2) == 1 do
      throw "Error!"
    end

    Logger.info("Processed job ##{tag}")
    :timer.sleep(100)
    :ok
  end

  # Handle task statuses
  def handle_result(:ok, _result, %{tag: tag} = state) do
    # MyQueue.ack(tag)
    {:stop, :normal, state}
  end

  def handle_result(:exit, reason, %{tag: tag} = state) do
    Logger.error("Task with tag #{inspect tag} terminated with reason: #{inspect reason}")
    # MyQueue.nack(tag)
    {:stop, :normal, state}
  end

  def handle_result(:timeout, task, state) do
    Task.shutdown(task) # Shut down task on yield timeout
    handle_result(:exit, :timeout, state)
  end
end
  1. (Optional.) Supervise your workers:

Define MyWorker supervisor:

defmodule MyWorkerSupervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def start_worker(job) do
    Supervisor.start_child(__MODULE__, [job])
  end

  def init(_) do
    children = [
      worker(MyWorker, [], restart: :transient)
    ]

    supervise(children, strategy: :simple_one_for_one)
  end
end

Add it to a application supervision tree:

# File: lib/my_app.ex
# ...

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    supervisor(MyWorkerSupervisor, [])
    # ...
  ]

  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

# ...

Then you can use MyWorkerSupervisor.start_worker/1 to start your workers.

The docs can be found at https://hexdocs.pm/gen_task

Articles
to learn more about the elixir concepts.

Resources
which are currently available to browse on.

mail [email protected] to add your project or resources here 🔥.

FAQ's
to know more about the topic.

mail [email protected] to add your project or resources here 🔥.

Queries
or most google FAQ's about Elixir.

mail [email protected] to add more queries here 🔍.

More Sites
to check out once you're finished browsing here.

0x3d
https://www.0x3d.site/
0x3d is designed for aggregating information.
NodeJS
https://nodejs.0x3d.site/
NodeJS Online Directory
Cross Platform
https://cross-platform.0x3d.site/
Cross Platform Online Directory
Open Source
https://open-source.0x3d.site/
Open Source Online Directory
Analytics
https://analytics.0x3d.site/
Analytics Online Directory
JavaScript
https://javascript.0x3d.site/
JavaScript Online Directory
GoLang
https://golang.0x3d.site/
GoLang Online Directory
Python
https://python.0x3d.site/
Python Online Directory
Swift
https://swift.0x3d.site/
Swift Online Directory
Rust
https://rust.0x3d.site/
Rust Online Directory
Scala
https://scala.0x3d.site/
Scala Online Directory
Ruby
https://ruby.0x3d.site/
Ruby Online Directory
Clojure
https://clojure.0x3d.site/
Clojure Online Directory
Elixir
https://elixir.0x3d.site/
Elixir Online Directory
Elm
https://elm.0x3d.site/
Elm Online Directory
Lua
https://lua.0x3d.site/
Lua Online Directory
C Programming
https://c-programming.0x3d.site/
C Programming Online Directory
C++ Programming
https://cpp-programming.0x3d.site/
C++ Programming Online Directory
R Programming
https://r-programming.0x3d.site/
R Programming Online Directory
Perl
https://perl.0x3d.site/
Perl Online Directory
Java
https://java.0x3d.site/
Java Online Directory
Kotlin
https://kotlin.0x3d.site/
Kotlin Online Directory
PHP
https://php.0x3d.site/
PHP Online Directory
React JS
https://react.0x3d.site/
React JS Online Directory
Angular
https://angular.0x3d.site/
Angular JS Online Directory