Class: Stapfen::Worker

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/stapfen/worker.rb

Constant Summary collapse

@@signals_handled =

Class variables!

false
@@workers =
[]

Constants included from Logger

Logger::PROXY_METHODS

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included

Class Attribute Details

.configurationObject

Returns the value of attribute configuration.



18
19
20
# File 'lib/stapfen/worker.rb', line 18

def configuration
  @configuration
end

.consumersObject

Returns the value of attribute consumers.



18
19
20
# File 'lib/stapfen/worker.rb', line 18

def consumers
  @consumers
end

.destructorObject

Returns the value of attribute destructor.



18
19
20
# File 'lib/stapfen/worker.rb', line 18

def destructor
  @destructor
end

.loggerObject

Returns the value of attribute logger.



18
19
20
# File 'lib/stapfen/worker.rb', line 18

def logger
  @logger
end

Instance Attribute Details

#clientObject

Instance Methods



157
158
159
# File 'lib/stapfen/worker.rb', line 157

def client
  @client
end

Class Method Details

.configure(&block) ⇒ Object

Expects a block to be passed which will yield the appropriate configuration for the Stomp gem. Whatever the block yields will be passed directly into the {Stomp{Stomp::Client{Stomp::Client#new} method



36
37
38
39
40
41
# File 'lib/stapfen/worker.rb', line 36

def self.configure(&block)
  unless block_given?
    raise Stapfen::ConfigurationError
  end
  @configuration = block
end

.consume(queue_name, headers = {}, &block) ⇒ Object

Main message consumption block



95
96
97
98
99
100
101
# File 'lib/stapfen/worker.rb', line 95

def self.consume(queue_name, headers={}, &block)
  unless block_given?
    raise Stapfen::ConsumeError, "Cannot consume #{queue_name} without a block!"
  end
  @consumers ||= []
  @consumers << [queue_name, headers, block]
end

.exit_cleanlyBoolean

Invoke exit_cleanly on each of the registered Worker instances that this class is keeping track of

Returns:

  • (Boolean)

    Whether or not we’ve exited/terminated cleanly



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/stapfen/worker.rb', line 119

def self.exit_cleanly
  return false if workers.empty?

  cleanly = true
  workers.each do |w|
    begin
      w.exit_cleanly
    rescue StandardError => ex
      $stderr.write("Failure while exiting cleanly #{ex.inspect}\n#{ex.backtrace}")
      cleanly = false
    end
  end

  return cleanly
end

.handle_signalsObject

Utility method to set up the proper worker signal handlers



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/stapfen/worker.rb', line 136

def self.handle_signals
  return if @@signals_handled

  Signal.trap(:INT) do
    self.exit_cleanly
    exit!
  end

  Signal.trap(:TERM) do
    self.exit_cleanly
  end

  @@signals_handled = true
end

.jms?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/stapfen/worker.rb', line 84

def self.jms?
  !(@use_stomp)
end

.logObject

Optional method, should be passed a block which will yield a {Logger} instance for the Stapfen worker to use



90
91
92
# File 'lib/stapfen/worker.rb', line 90

def self.log
  @logger = yield
end

.run!Object

Instantiate a new Worker instance and run it



23
24
25
26
27
28
29
30
31
# File 'lib/stapfen/worker.rb', line 23

def self.run!
  worker = self.new

  @@workers << worker

  handle_signals

  worker.run
end

.shutdown(&block) ⇒ Object

Optional method, specifes a block to execute when the worker is shutting down.



105
106
107
# File 'lib/stapfen/worker.rb', line 105

def self.shutdown(&block)
  @destructor = block
end

.stomp?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/stapfen/worker.rb', line 58

def self.stomp?
  @use_stomp
end

.use_jms!Boolean

Force the worker to use JMS as the messaging protocol.

Note: Only works under JRuby

Returns:

  • (Boolean)


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/stapfen/worker.rb', line 67

def self.use_jms!
  unless RUBY_PLATFORM == 'java'
    raise Stapfen::ConfigurationError, "You cannot use JMS unless you're running under JRuby!"
  end

  begin
    require 'java'
    require 'jms'
  rescue LoadError
    puts "You need the `jms` gem to be installed to use JMS!"
    raise
  end

  @use_stomp = false
  return true
end

.use_stomp!Boolean

Force the worker to use STOMP as the messaging protocol (default)

Returns:

  • (Boolean)


46
47
48
49
50
51
52
53
54
55
56
# File 'lib/stapfen/worker.rb', line 46

def self.use_stomp!
  begin
    require 'stomp'
  rescue LoadError
    puts "You need the `stomp` gem to be installed to use stomp!"
    raise
  end

  @use_stomp = true
  return true
end

.workersObject

Return all the currently running Stapfen::Worker instances in this process



111
112
113
# File 'lib/stapfen/worker.rb', line 111

def self.workers
  @@workers
end

Instance Method Details

#exit_cleanlyObject

Invokes the shutdown block if it has been created, and closes the Stapfen::Worker.{Stomp{Stomp::Client} connection unless it has already been shut down



249
250
251
252
253
254
255
256
257
# File 'lib/stapfen/worker.rb', line 249

def exit_cleanly
  info("#{self} exiting cleanly")
  self.class.destructor.call if self.class.destructor

  # Only close the client if we have one sitting around
  if client && !client.closed?
    client.close
  end
end

#runObject



159
160
161
162
163
164
165
# File 'lib/stapfen/worker.rb', line 159

def run
  if self.class.stomp?
    run_stomp
  elsif self.class.jms?
    run_jms
  end
end

#run_jmsObject



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/stapfen/worker.rb', line 167

def run_jms
  JMS::Connection.start(self.class.configuration.call) do |connection|
    @client = connection
    debug("Running with #{@client} inside of Thread:#{Thread.current.inspect}")

    self.class.consumers.each do |name, headers, block|
      destination = Stapfen::Destination.from_string(name)
      type = 'queue'
      options = {}

      if destination.queue?
        options[:queue_name] = destination.name
      end

      if destination.topic?
        type = 'topic'
        options[:topic_name] = destination.name
      end

      method_name = "handle_#{type}_#{name}".to_sym
      self.class.send(:define_method, method_name, &block)

      connection.on_message(options) do |m|
        message = Stapfen::Message.from_jms(m)
        self.send(method_name, message)
      end
    end

    begin
      loop do
        sleep 1
      end
      debug("Exiting the JMS runloop for #{self}")
    rescue Interrupt
      exit_cleanly
    end
  end
end

#run_stompObject



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/stapfen/worker.rb', line 206

def run_stomp
  @client = Stomp::Client.new(self.class.configuration.call)
  debug("Running with #{@client} inside of Thread:#{Thread.current.inspect}")

  self.class.consumers.each do |name, headers, block|
    unreceive_headers = {}
    [:max_redeliveries, :dead_letter_queue].each do |sym|
      unreceive_headers[sym] = headers.delete(sym) if headers.has_key? sym
    end

    # We're taking each block and turning it into a method so that we can
    # use the instance scope instead of the blocks originally bound scope
    # which would be at a class level
    method_name = name.gsub(/[.|\-]/, '_').to_sym
    self.class.send(:define_method, method_name, &block)

    client.subscribe(name, headers) do |message|
      success = self.send(method_name, message)

      if !success && !unreceive_headers.empty?
        client.unreceive(message, unreceive_headers)
      end
    end
  end

  begin
    # Performing this join/runningloop to make sure that we don't
    # experience potential deadlocks between signal handlers who might
    # close the connection, and an infinite Client#join call
    #
    # Instead of using client#open? we use #running which will still be
    # true even if the client is currently in an exponential reconnect loop
    while client.running do
      client.join(1)
    end
    warn("Exiting the runloop for #{self}")
  rescue Interrupt
    exit_cleanly
  end
end