Module: Attr::Gather::Workflow

Extended by:
DSL, Graphable::ClassMethods
Includes:
Concerns::Identifiable, Callable, Graphable::InstanceMethods
Defined in:
lib/attr/gather/workflow.rb,
lib/attr/gather/workflow/dsl.rb,
lib/attr/gather/workflow/task.rb,
lib/attr/gather/workflow/callable.rb,
lib/attr/gather/workflow/graphable.rb,
lib/attr/gather/workflow/task_graph.rb,
lib/attr/gather/workflow/dot_serializer.rb

Overview

Main mixin for functionality. Adds the ability to turn a class into a workflow.

Defined Under Namespace

Modules: Callable, DSL, Graphable Classes: DotSerializer, Task, TaskGraph

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#uuidString (readonly) Originally defined in module Concerns::Identifiable

Returns UUID of the result.

Returns:

  • (String)

    UUID of the result

Class Method Details

.aggregator(agg = nil, opts = EMPTY_HASH) ⇒ Object Originally defined in module DSL

Configures the result aggregator

Aggregators make is possible to build custom logic about how results should be “merged” together. For example, yuo could build and aggregator that prioritizes the values of some tasks over others.

Examples:

class EnhanceUserProfile
  include Attr::Gather::Workflow

  aggregator :deep_merge
end
class EnhanceUserProfile
  include Attr::Gather::Workflow

  aggregator MyCustomAggregator
end

Parameters:

  • agg (#call) (defaults to: nil)

    the aggregator to use

.container(cont = nil) ⇒ Object Originally defined in module DSL

Note:

For more information, check out https://dry-rb.org/gems/dry-container

Defines a container for task dependencies

Using a container makes it easy to re-use workflows with different data sources. Say one workflow was required to use a legacy DB, and one wanted to use a new DB. Using a container makes it easy to configure that dependency.

Examples:

LegacySystem = Dry::Container.new.tap do |c|
  c.register(:database) { Sequel.connect('sqlite://legacy.db')
end

class EnhanceUserProfile
  include Attr::Gather::Workflow

  container LegacySystem
end

Parameters:

  • cont (Dry::Container) (defaults to: nil)

    the Dry::Container to use

.fetch(task_name, opts = EMPTY_HASH) {|Attr::Gather::Workflow::Task| ... } ⇒ Object Originally defined in module DSL

Defines a task with name and options

Calling ‘fetch` will yield a task object which you can configure like a PORO. Tasks will be registered for execution in the workflow.

Examples:

class EnhanceUserProfile
  include Attr::Gather::Workflow

  # ...

  fetch :user_info do |t|
    t.depends_on = [:fetch_gravatar_info]
  end
end

Parameters:

  • task_name (Symbol)

    the name of the task

Yields:

.filter(filt = nil, *args, **opts) ⇒ Object Originally defined in module DSL

Defines a filter for filtering out invalid values

When aggregating data from many sources, it is hard to reason about all the ways invalid data will be returned. For example, if you are pulling data from a spreadsheet, there will often be typos, etc.

Defining a filter allows you to declaratively state what is valid. attr-gather will use this definition to automatically filter out invalid values, so they never make it into your system.

Filtering happens during each step of the workflow, which means that every Task will receive validated input that you can rely on.

Examples:

class UserContract < Dry::Validation::Contract do
  params do
    optional(:id).filled(:integer)
    optional(:email).filled(:str?, format?: /@/)
  end
end

class EnhanceUserProfile
  include Attr::Gather::Workflow

  # Any of the key/value pairs that had validation errors will be
  # filtered from the output.
  filter :contract, UserContract.new
end

Parameters:

  • filt (Symbol) (defaults to: nil)

    the name filter to use

  • args (Array<Object>)

    arguments for initializing the filter

.filter_with_contract(arg = nil, &blk) ⇒ Dry::Validation::Contract, NilClass Originally defined in module DSL

Defines a filter for filtering invalid values with an inline contract

This serves as a convenience method for defining a contract filter.

Examples:


class EnhanceUserProfile
  include Attr::Gather::Workflow

  # Any of the key/value pairs that had validation errors will be
  # filtered from the output.
  filter_with_contract do
     params do
       required(:name).filled(:string)
       required(:age).value(:integer)
     end

     rule(:age) do
       key.failure('must be greater than 18') if value < 18
     end
  end
end

Returns:

  • (Dry::Validation::Contract, NilClass)

See Also:

.included(klass) ⇒ Object



21
22
23
24
25
26
# File 'lib/attr/gather/workflow.rb', line 21

def self.included(klass)
  klass.extend(DSL)
  klass.include(Callable)
  klass.include(Graphable)
  klass.include(Concerns::Identifiable)
end

.step(task_name, opts = EMPTY_HASH) {|Attr::Gather::Workflow::Task| ... } ⇒ Object Originally defined in module DSL

Defines a task with name and options

Calling ‘step` will yield a task object which you can configure like a PORO. Tasks will be registered for execution in the workflow.

Examples:

class EnhanceUserProfile
  include Attr::Gather::Workflow

  # ...

  step :fetch_user_info do |t|
    t.depends_on = [:email_info]
  end
end

Parameters:

  • task_name (Symbol)

    the name of the task

Yields:

.task(task_name, opts = EMPTY_HASH) {|Attr::Gather::Workflow::Task| ... } ⇒ Object Originally defined in module DSL

Defines a task with name and options

Calling ‘task` will yield a task object which you can configure like a PORO. Tasks will be registered for execution in the workflow.

Examples:

class EnhanceUserProfile
  include Attr::Gather::Workflow

  # ...

  task :fetch_database_info do |t|
    t.depends_on = []
  end

  task :fetch_avatar_info do |t|
    t.depends_on = [:fetch_gravatar_info]
  end
end

Parameters:

  • task_name (Symbol)

    the name of the task

Yields:

.tasksTaskGraph Originally defined in module Graphable::ClassMethods

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the graph of tasks

Returns:

.to_dot(preview: true) ⇒ Object Originally defined in module Graphable::ClassMethods

Returns a graphviz visualization of the workflow

Parameters:

  • preview (Boolean) (defaults to: true)

    show a preview image of the Workflow

Instance Method Details

#call(input) ⇒ Concurrent::Promise<Hash> Originally defined in module Callable

Note:

For more information, check out https://dry-rb.org/gems/dry-monads/1.0/result

Execute a workflow

When executing the workflow, tasks are processed in dependant order, with the outputs of each batch being fed as inputs to the next batch. This means the you can enhance the data as the task moves through a workflow, so later tasks can use the enhanced input data.

Examples:

enhancer = MyEnhancingWorkflow.new
enhancer.call(user_id: 1).value! # => {user_id: 1, email: '[email protected]}

Parameters:

  • input (Hash)

Returns:

  • (Concurrent::Promise<Hash>)

#initializeObject Originally defined in module Concerns::Identifiable

#to_dot(preview: true) ⇒ Object Originally defined in module Graphable::InstanceMethods

Returns a graphviz visualization of the workflow

Parameters:

  • preview (Boolean) (defaults to: true)

    show a preview image of the Workflow