Class: Cadence::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/cadence/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



10
11
12
13
14
15
16
17
18
# File 'lib/cadence/worker.rb', line 10

def initialize(options = {})
  @options = options
  @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
  @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
  @pollers = []
  @decision_middleware = []
  @activity_middleware = []
  @shutting_down = false
end

Instance Method Details

#add_activity_middleware(middleware_class, *args) ⇒ Object



38
39
40
# File 'lib/cadence/worker.rb', line 38

def add_activity_middleware(middleware_class, *args)
  @activity_middleware << Middleware::Entry.new(middleware_class, args)
end

#add_decision_middleware(middleware_class, *args) ⇒ Object



34
35
36
# File 'lib/cadence/worker.rb', line 34

def add_decision_middleware(middleware_class, *args)
  @decision_middleware << Middleware::Entry.new(middleware_class, args)
end

#register_activity(activity_class, options = {}) ⇒ Object



27
28
29
30
31
32
# File 'lib/cadence/worker.rb', line 27

def register_activity(activity_class, options = {})
  execution_options = ExecutionOptions.new(activity_class, options)
  key = [execution_options.domain, execution_options.task_list]

  @activities[key].add(execution_options.name, activity_class)
end

#register_workflow(workflow_class, options = {}) ⇒ Object



20
21
22
23
24
25
# File 'lib/cadence/worker.rb', line 20

def register_workflow(workflow_class, options = {})
  execution_options = ExecutionOptions.new(workflow_class, options)
  key = [execution_options.domain, execution_options.task_list]

  @workflows[key].add(execution_options.name, workflow_class)
end

#startObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/cadence/worker.rb', line 42

def start
  workflows.each_pair do |(domain, task_list), lookup|
    pollers << workflow_poller_for(domain, task_list, lookup)
  end

  activities.each_pair do |(domain, task_list), lookup|
    pollers << activity_poller_for(domain, task_list, lookup)
  end

  trap_signals

  pollers.each(&:start)

  # keep the main worker thread alive
  sleep(1) while !shutting_down?
end

#stopObject



59
60
61
62
63
64
65
66
# File 'lib/cadence/worker.rb', line 59

def stop
  @shutting_down = true

  Thread.new do
    pollers.each(&:stop)
    pollers.each(&:wait)
  end.join
end