Module: Fluere::Workflow

Defined in:
lib/fluere/workflow.rb

Constant Summary collapse

EXECUTION_METHOD =
:run

Instance Method Summary collapse

Instance Method Details

#activities_classObject



55
56
57
58
59
60
61
62
63
64
# File 'lib/fluere/workflow.rb', line 55

def activities_class
  @activities_class ||= const_set('Activities', Class.new {
    extend AWS::Flow::Activities

    # Stupid AWS doesn't allow colons in workflow or activity type names...
    def self.aws_name
      name.gsub(/:/, '_')
    end
  }).tap { |a| Fluere.activities << a }
end

#activity(name, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluere/workflow.rb', line 32

def activity(name, &block)
  # Define a method to be used as the activity implementation
  activities_class.send :define_method, name, &block

  # Wire this method in as an AWS Flow activity
  activities_class.activity(name) {
    Fluere.default_activity_options.merge(
      prefix_name: activities_class.aws_name,
      version:     Fluere::Config.version,
    )
  }

  # Establish a shortcut method for the decider implementation to use to
  # call this activity as though it were a local method
  decider_class.send :define_method, name do |*args|
    if Fluere.stubbed?
      activities_class.new.send(name, *args)
    else
      activities_client.send(name, *args)
    end
  end
end

#decider(&block) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/fluere/workflow.rb', line 5

def decider(&block)
  # Set the provided block as the workflow execution method
  decider_class.send :define_method, EXECUTION_METHOD, &block

  # Wire that method in as an AWS Flow workflow
  decider_class.workflow(EXECUTION_METHOD) {
    Fluere.default_workflow_options.merge(
      prefix_name:       decider_class.aws_name,
      version:           Fluere::Config.version,
    )
  }

  # Give the decider a direct reference to activities for use in stubbed
  # implementations
  activities_class.tap do |klass|
    decider_class.send(:define_method, :activities_class) { klass }
  end

  # Establish the activity client which will be used in defining the helper
  # method for each activity (when we are not stubbed)
  decider_class.activity_client(:activities_client) {{
    prefix_name: activities_class.aws_name,
    version:     Fluere::Config.version,
    task_list:   Fluere.activities_task_list
  }}
end

#decider_classObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluere/workflow.rb', line 66

def decider_class
  @decider_class ||= const_set('Decider', Class.new {
    extend AWS::Flow::Workflows

    # Stupid AWS doesn't allow colons in workflow or activity type names...
    def self.aws_name
      name.gsub(/:/, '_')
    end

    def send_async(*args)
      if Fluere.stubbed?
        activities_class.new.send(*args)
      else
        activities_client.send_async(*args)
      end
    end

    def wait_for_all(*args)
      unless Fluere.stubbed?
        super
      end
    end
  }).tap { |d| Fluere.workflows << d }
end

#workflow_clientObject



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluere/workflow.rb', line 91

def workflow_client
  @workflow_client ||= AWS::Flow.workflow_client(
    Fluere.swf.client,
    Fluere.domain
  ) {{
    workflow_name:    decider_class.aws_name,
    execution_method: EXECUTION_METHOD,
    version:          Fluere::Config.version,
    task_list:        Fluere.decisions_task_list
  }}
end