Class: Stapfen::Worker

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/stapfen/worker.rb

Constant Summary collapse

@@signals_handled =

Class variables!

false
@@workers =
[]

Constants included from Logger

Logger::PROXY_METHODS

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included

Class Attribute Details

.configurationObject

Returns the value of attribute configuration.



15
16
17
# File 'lib/stapfen/worker.rb', line 15

def configuration
  @configuration
end

.consumersObject

Returns the value of attribute consumers.



15
16
17
# File 'lib/stapfen/worker.rb', line 15

def consumers
  @consumers
end

.destructorObject

Returns the value of attribute destructor.



15
16
17
# File 'lib/stapfen/worker.rb', line 15

def destructor
  @destructor
end

.loggerObject

Returns the value of attribute logger.



15
16
17
# File 'lib/stapfen/worker.rb', line 15

def logger
  @logger
end

Instance Attribute Details

#clientObject

Instance Methods



158
159
160
# File 'lib/stapfen/worker.rb', line 158

def client
  @client
end

Class Method Details

.configure(&block) ⇒ Object

Expects a block to be passed which will yield the appropriate configuration for the Stomp gem. Whatever the block yields will be passed directly into the {Stomp{Stomp::Client{Stomp::Client#new} method



32
33
34
35
36
37
# File 'lib/stapfen/worker.rb', line 32

def self.configure(&block)
  unless block_given?
    raise Stapfen::ConfigurationError
  end
  @configuration = block
end

.consume(queue_name, headers = {}, &block) ⇒ Object

Main message consumption block



91
92
93
94
95
96
97
# File 'lib/stapfen/worker.rb', line 91

def self.consume(queue_name, headers={}, &block)
  unless block_given?
    raise Stapfen::ConsumeError, "Cannot consume #{queue_name} without a block!"
  end
  @consumers ||= []
  @consumers << [queue_name, headers, block]
end

.exit_cleanlyBoolean

Invoke exit_cleanly on each of the registered Worker instances that this class is keeping track of

Returns:

  • (Boolean)

    Whether or not we’ve exited/terminated cleanly



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/stapfen/worker.rb', line 115

def self.exit_cleanly
  return false if workers.empty?

  cleanly = true
  workers.each do |w|
    begin
      w.exit_cleanly
    rescue StandardError => ex
      $stderr.write("Failure while exiting cleanly #{ex.inspect}\n#{ex.backtrace}")
      cleanly = false
    end
  end

  if RUBY_PLATFORM == 'java'
    info "Telling the JVM to exit cleanly"
    Java::JavaLang::System.exit(0)
  end

  return cleanly
end

.handle_signalsObject

Utility method to set up the proper worker signal handlers



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/stapfen/worker.rb', line 137

def self.handle_signals
  return if @@signals_handled

  Signal.trap(:INT) do
    self.exit_cleanly
    exit!
  end

  Signal.trap(:TERM) do
    self.exit_cleanly
  end

  @@signals_handled = true
end

.jms?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/stapfen/worker.rb', line 80

def self.jms?
  !(stomp?)
end

.log(&block) ⇒ Object

Optional method, should be passed a block which will yield a {Logger} instance for the Stapfen worker to use



86
87
88
# File 'lib/stapfen/worker.rb', line 86

def self.log(&block)
  @logger = block
end

.run!Object

Instantiate a new Worker instance and run it



19
20
21
22
23
24
25
26
27
# File 'lib/stapfen/worker.rb', line 19

def self.run!
  worker = self.new

  @@workers << worker

  handle_signals

  worker.run
end

.shutdown(&block) ⇒ Object

Optional method, specifes a block to execute when the worker is shutting down.



101
102
103
# File 'lib/stapfen/worker.rb', line 101

def self.shutdown(&block)
  @destructor = block
end

.stomp?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/stapfen/worker.rb', line 54

def self.stomp?
  @use_stomp.nil? || @use_stomp
end

.use_jms!Boolean

Force the worker to use JMS as the messaging protocol.

Note: Only works under JRuby

Returns:

  • (Boolean)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/stapfen/worker.rb', line 63

def self.use_jms!
  unless RUBY_PLATFORM == 'java'
    raise Stapfen::ConfigurationError, "You cannot use JMS unless you're running under JRuby!"
  end

  begin
    require 'java'
    require 'jms'
  rescue LoadError
    puts "You need the `jms` gem to be installed to use JMS!"
    raise
  end

  @use_stomp = false
  return true
end

.use_stomp!Boolean

Force the worker to use STOMP as the messaging protocol (default)

Returns:

  • (Boolean)


42
43
44
45
46
47
48
49
50
51
52
# File 'lib/stapfen/worker.rb', line 42

def self.use_stomp!
  begin
    require 'stomp'
  rescue LoadError
    puts "You need the `stomp` gem to be installed to use stomp!"
    raise
  end

  @use_stomp = true
  return true
end

.workersObject

Return all the currently running Stapfen::Worker instances in this process



107
108
109
# File 'lib/stapfen/worker.rb', line 107

def self.workers
  @@workers
end

Instance Method Details

#exit_cleanlyObject

Invokes the shutdown block if it has been created, and closes the Stapfen::Worker.{Stomp{Stomp::Client} connection unless it has already been shut down



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/stapfen/worker.rb', line 215

def exit_cleanly
  info("#{self} exiting ")
  self.class.destructor.call if self.class.destructor

  info "Killing client"
  begin
    # Only close the client if we have one sitting around
    if client
      unless client.closed?
        client.close
      end
    end
  rescue StandardError => exc
    error "Exception received while trying to close client! #{exc.inspect}"
  end
end

#runObject



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/stapfen/worker.rb', line 160

def run
  if self.class.stomp?
    require 'stapfen/client/stomp'
    @client = Stapfen::Client::Stomp.new(self.class.configuration.call)
  elsif self.class.jms?
    require 'stapfen/client/jms'
    @client = Stapfen::Client::JMS.new(self.class.configuration.call)
  end

  debug("Running with #{@client} inside of Thread:#{Thread.current.inspect}")

  @client.connect

  self.class.consumers.each do |name, headers, block|
    unreceive_headers = {}
    [:max_redeliveries, :dead_letter_queue].each do |sym|
      unreceive_headers[sym] = headers.delete(sym) if headers.has_key? sym
    end

    # We're taking each block and turning it into a method so that we can
    # use the instance scope instead of the blocks originally bound scope
    # which would be at a class level
    method_name = name.gsub(/[.|\-]/, '_').to_sym
    self.class.send(:define_method, method_name, &block)

    client.subscribe(name, headers) do |m|
      message = nil
      if self.class.stomp?
        message = Stapfen::Message.from_stomp(m)
      end

      if self.class.jms?
        message = Stapfen::Message.from_jms(m)
      end

      success = self.send(method_name, message)

      unless success
        if client.can_unreceive? && !unreceive_headers.empty?
          client.unreceive(m, unreceive_headers)
        end
      end
    end
  end

  begin
    client.runloop
    warn("Exiting the runloop for #{self}")
  rescue Interrupt
    exit_cleanly
  end
end