Module: Ntswf::Base

Included in:
Client, Utils, Worker
Defined in:
lib/ntswf/base.rb

Instance Method Summary collapse

Instance Method Details

#activity_nameObject



71
72
73
# File 'lib/ntswf/base.rb', line 71

def activity_name
  "master-activity"
end

#activity_task_list(unit: nil) ⇒ Object



83
84
85
86
87
# File 'lib/ntswf/base.rb', line 83

def activity_task_list(unit: nil)
  unit ||= default_unit
  activity_task_lists[unit] or raise Errors::InvalidArgument.new(
      "Missing activity task list configuration for unit '#{unit}'")
end

#activity_task_listsObject



79
80
81
# File 'lib/ntswf/base.rb', line 79

def activity_task_lists
  autocompleted_activity_task_lists || {}
end

#activity_typeObject



133
134
135
# File 'lib/ntswf/base.rb', line 133

def activity_type
  @activity_type ||= domain.activity_types[activity_name, workflow_version]
end

#configure(config) ⇒ Object

Parameters:

  • config (Hash)

    A configuration with the following keys:

Options Hash (config):

  • :access_key_id (String)

    deprecated: AWS credential. Deprecated, use :swf instead.

  • :activity_task_lists (Hash)

    The task list names for activities per :unit.

  • :decision_task_list (String)

    deprecated: The task list name for decisions. Deprecated, use :decision_task_lists instead.

  • :decision_task_lists (Hash)

    The task list names for decisions per :unit.

  • :domain (String)

    The SWF domain name.

  • :execution_id_prefix (String) — default: value of :unit

    Workflow ID prefix (see Client#start_execution‘s :execution_id for allowed values).

  • :execution_version (Numeric)

    Value allowing clients to reject future execution versions.

  • :identity_suffix (String)

    When polling for a task, the suffix will be appended to the (default) identity (<hostname>:<pid>), delimited by a “:”. Allows to distinguish worker activity on different hosts with identical hostnames.

  • :isolation_file (String)

    Development/test option. A random ID is stored at the given path, and prepended to task list names and execution IDs.

  • :pidfile (String)

    A path receiving the current PID for looping methods. Causes exit, if overwritten by another process. See Worker#in_subprocess.

  • :secret_access_key (String)

    deprecated: AWS credential. Deprecated, use :swf instead.

  • :subprocess_retries (Numeric) — default: 0
  • :swf (AWS::SimpleWorkflow)

    AWS simple workflow object (created e.g. with AWS::SimpleWorkflow.new).

  • :unit (String)

    This worker/client’s activity task list key.

Raises:



40
41
42
43
# File 'lib/ntswf/base.rb', line 40

def configure(config)
  @config = OpenStruct.new(config)
  raise_if_invalid_task_list
end

#decision_task_list(unit: nil) ⇒ Object



93
94
95
96
97
98
# File 'lib/ntswf/base.rb', line 93

def decision_task_list(unit: nil)
  unit ||= default_unit
  decision_task_lists[unit] || decision_task_lists[default_unit] or
      raise Errors::InvalidArgument.new(
      "Missing decision task list configuration for unit '#{unit}'")
end

#decision_task_listsObject



89
90
91
# File 'lib/ntswf/base.rb', line 89

def decision_task_lists
  autocompleted_decision_task_lists || fallback_decision_task_lists
end

#default_unitObject



100
101
102
# File 'lib/ntswf/base.rb', line 100

def default_unit
  @default_unit ||= @config.unit.to_s
end

#domainObject



75
76
77
# File 'lib/ntswf/base.rb', line 75

def domain
  @domain ||= swf.domains[@config.domain]
end

#execution_id_prefixObject



104
105
106
# File 'lib/ntswf/base.rb', line 104

def execution_id_prefix
  "#{isolation_id}#{@config.execution_id_prefix || default_unit}"
end

#execution_versionObject



108
109
110
# File 'lib/ntswf/base.rb', line 108

def execution_version
  @config.execution_version
end

#notify(message, params) ⇒ Object



123
124
125
126
# File 'lib/ntswf/base.rb', line 123

def notify(message, params)
  log("#{message.message}\n  #{message.backtrace.join("\n  ")}") if message.kind_of? Exception
  @notify_callback.call(message: message, params: params) if @notify_callback
end

#on_notify(proc = nil) {|error| ... } ⇒ Object

Configure a proc or block to be called on handled errors

Parameters:

  • proc (Proc) (defaults to: nil)

    The callback

Yield Parameters:

  • error (Hash)

    Description of the error:

    :message

    The error message or the exception

    :params

    Error details



51
52
53
# File 'lib/ntswf/base.rb', line 51

def on_notify(proc = nil, &block)
  @notify_callback = proc || block
end

#parse_input(input) ⇒ Hash

Parse the options stored in a task’s input value

Parameters:

  • input (String)

    A task’s input

Returns:

  • (Hash)

    Input, converted back from JSON

See Also:



116
117
118
119
120
121
# File 'lib/ntswf/base.rb', line 116

def parse_input(input)
  options, legacy_params = JSON.parse(input)
  options = {"name" => options} unless options.kind_of? Hash
  options.merge!("params" => legacy_params) if legacy_params
  options
end

#separatorString

Returns separator for composite workflow_id.

Returns:

  • (String)

    separator for composite workflow_id



129
130
131
# File 'lib/ntswf/base.rb', line 129

def separator
  ";"
end

#swfAWS::SimpleWorkflow

Returns:

  • (AWS::SimpleWorkflow)


56
57
58
59
60
61
# File 'lib/ntswf/base.rb', line 56

def swf
  @swf ||= (@config.swf || AWS::SimpleWorkflow.new({
    access_key_id: @config.access_key_id,
    secret_access_key: @config.secret_access_key,
  }))
end

#workflow_nameObject



63
64
65
# File 'lib/ntswf/base.rb', line 63

def workflow_name
  "master-workflow"
end

#workflow_versionObject



67
68
69
# File 'lib/ntswf/base.rb', line 67

def workflow_version
  "v1"
end