Class: Stapfen::Worker

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/stapfen/worker.rb

Constant Summary

Constants included from Logger

Logger::PROXY_METHODS

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included

Class Attribute Details

.configurationObject

Returns the value of attribute configuration.



9
10
11
# File 'lib/stapfen/worker.rb', line 9

def configuration
  @configuration
end

.consumersObject

Returns the value of attribute consumers.



9
10
11
# File 'lib/stapfen/worker.rb', line 9

def consumers
  @consumers
end

.destructorObject

Returns the value of attribute destructor.



9
10
11
# File 'lib/stapfen/worker.rb', line 9

def destructor
  @destructor
end

.loggerObject

Returns the value of attribute logger.



9
10
11
# File 'lib/stapfen/worker.rb', line 9

def logger
  @logger
end

.workersObject

Returns the value of attribute workers.



10
11
12
# File 'lib/stapfen/worker.rb', line 10

def workers
  @workers
end

Instance Attribute Details

#clientObject

Instance Methods



81
82
83
# File 'lib/stapfen/worker.rb', line 81

def client
  @client
end

Class Method Details

.configureObject

Expects a block to be passed which will yield the appropriate configuration for the Stomp gem. Whatever the block yields will be passed directly into the {Stomp{Stomp::Client{Stomp::Client#new} method



28
29
30
31
32
33
# File 'lib/stapfen/worker.rb', line 28

def self.configure
  unless block_given?
    raise Stapfen::ConfigurationError
  end
  @configuration = yield
end

.consume(queue_name, headers = {}, &block) ⇒ Object

Main message consumption block



42
43
44
45
46
47
48
# File 'lib/stapfen/worker.rb', line 42

def self.consume(queue_name, headers={}, &block)
  unless block_given?
    raise Stapfen::ConsumeError, "Cannot consume #{queue_name} without a block!"
  end
  @consumers ||= []
  @consumers << [queue_name, headers, block]
end

.handle_signalsObject

Utility method to set up the proper worker signal handlers



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/stapfen/worker.rb', line 57

def self.handle_signals
  return if @signals_handled

  Signal.trap(:INT) do
    workers.each do |w|
      w.exit_cleanly
    end
    exit!
  end
  Signal.trap(:TERM) do
    workers.each do |w|
      w.exit_cleanly
    end
  end

  @signals_handled = true
end

.logObject

Optional method, should be passed a block which will yield a {Logger} instance for the Stapfen worker to use



37
38
39
# File 'lib/stapfen/worker.rb', line 37

def self.log
  @logger = yield
end

.run!Object

Instantiate a new Worker instance and run it



14
15
16
17
18
19
20
21
22
23
# File 'lib/stapfen/worker.rb', line 14

def self.run!
  worker = self.new

  @workers ||= []
  @workers << worker

  handle_signals

  worker.run
end

.shutdown(&block) ⇒ Object

Optional method, specifes a block to execute when the worker is shutting down.



52
53
54
# File 'lib/stapfen/worker.rb', line 52

def self.shutdown(&block)
  @destructor = block
end

Instance Method Details

#exit_cleanlyObject

Invokes the shutdown block if it has been created, and closes the Stapfen::Worker.{Stomp{Stomp::Client} connection unless it has already been shut down



122
123
124
125
126
127
128
# File 'lib/stapfen/worker.rb', line 122

def exit_cleanly
  self.class.destructor.call if self.class.destructor

  unless client.closed?
    client.close
  end
end

#runObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/stapfen/worker.rb', line 83

def run
  @client = Stomp::Client.new(self.class.configuration)
  debug("Running with #{@client} inside of Thread:#{Thread.current.object_id}")

  self.class.consumers.each do |name, headers, block|
    unreceive_headers = {}
    [:max_redeliveries, :dead_letter_queue].each do |sym|
      unreceive_headers[sym] = headers.delete(sym) if headers.has_key? sym
    end

    # We're taking each block and turning it into a method so that we can
    # use the instance scope instead of the blocks originally bound scope
    # which would be at a class level
    method_name = name.gsub(/[.|\-]/, '_').to_sym
    self.class.send(:define_method, method_name, &block)

    client.subscribe(name, headers) do |message|
      success = self.send(method_name, message)

      if !success && !unreceive_headers.empty?
        client.unreceive(message, unreceive_headers)
      end
    end
  end

  begin
    # Performing this join/open loop to make sure that we don't
    # experience potential deadlocks between signal handlers who might
    # close the connection, and an infinite Client#join call
    while client.open? do
      client.join(1)
    end
  rescue Interrupt
    exit_cleanly
  end
end