Class: Stapfen::Worker

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/stapfen/worker.rb

Constant Summary collapse

@@signals_handled =

Class variables!

false
@@workers =
[]

Constants included from Logger

Logger::PROXY_METHODS

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included

Class Attribute Details

.configurationObject

Returns the value of attribute configuration.



15
16
17
# File 'lib/stapfen/worker.rb', line 15

def configuration
  @configuration
end

.consumersObject

Returns the value of attribute consumers.



15
16
17
# File 'lib/stapfen/worker.rb', line 15

def consumers
  @consumers
end

.destructorObject

Returns the value of attribute destructor.



15
16
17
# File 'lib/stapfen/worker.rb', line 15

def destructor
  @destructor
end

.loggerObject

Returns the value of attribute logger.



15
16
17
# File 'lib/stapfen/worker.rb', line 15

def logger
  @logger
end

Instance Attribute Details

#clientObject

Instance Methods



184
185
186
# File 'lib/stapfen/worker.rb', line 184

def client
  @client
end

Class Method Details

.configure(&block) ⇒ Object

Expects a block to be passed which will yield the appropriate configuration for the Stomp gem. Whatever the block yields will be passed directly into the {Stomp{Stomp::Client{Stomp::Client#new} method



32
33
34
35
36
37
# File 'lib/stapfen/worker.rb', line 32

def self.configure(&block)
  unless block_given?
    raise Stapfen::ConfigurationError
  end
  @configuration = block
end

.consume(queue_name, headers = {}, &block) ⇒ Object

Main message consumption block



117
118
119
120
121
122
123
# File 'lib/stapfen/worker.rb', line 117

def self.consume(queue_name, headers={}, &block)
  unless block_given?
    raise Stapfen::ConsumeError, "Cannot consume #{queue_name} without a block!"
  end
  @consumers ||= []
  @consumers << [queue_name, headers, block]
end

.exit_cleanlyBoolean

Invoke exit_cleanly on each of the registered Worker instances that this class is keeping track of

Returns:

  • (Boolean)

    Whether or not we’ve exited/terminated cleanly



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/stapfen/worker.rb', line 141

def self.exit_cleanly
  return false if workers.empty?

  cleanly = true
  workers.each do |w|
    begin
      w.exit_cleanly
    rescue StandardError => ex
      $stderr.write("Failure while exiting cleanly #{ex.inspect}\n#{ex.backtrace}")
      cleanly = false
    end
  end

  if RUBY_PLATFORM == 'java'
    info "Telling the JVM to exit cleanly"
    Java::JavaLang::System.exit(0)
  end

  return cleanly
end

.handle_signalsObject

Utility method to set up the proper worker signal handlers



163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/stapfen/worker.rb', line 163

def self.handle_signals
  return if @@signals_handled

  Signal.trap(:INT) do
    self.exit_cleanly
    exit!
  end

  Signal.trap(:TERM) do
    self.exit_cleanly
  end

  @@signals_handled = true
end

.jms?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/stapfen/worker.rb', line 80

def self.jms?
  @protocol == 'jms'
end

.kafka?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/stapfen/worker.rb', line 106

def self.kafka?
  @protocol == 'kafka'
end

.log(&block) ⇒ Object

Optional method, should be passed a block which will yield a {Logger} instance for the Stapfen worker to use



112
113
114
# File 'lib/stapfen/worker.rb', line 112

def self.log(&block)
  @logger = block
end

.run!Object

Instantiate a new Worker instance and run it



19
20
21
22
23
24
25
26
27
# File 'lib/stapfen/worker.rb', line 19

def self.run!
  worker = self.new

  @@workers << worker

  handle_signals

  worker.run
end

.shutdown(&block) ⇒ Object

Optional method, specifes a block to execute when the worker is shutting down.



127
128
129
# File 'lib/stapfen/worker.rb', line 127

def self.shutdown(&block)
  @destructor = block
end

.stomp?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/stapfen/worker.rb', line 54

def self.stomp?
  @protocol.nil? || @protocol == 'stomp'
end

.use_jms!Boolean

Force the worker to use JMS as the messaging protocol.

Note: Only works under JRuby

Returns:

  • (Boolean)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/stapfen/worker.rb', line 63

def self.use_jms!
  unless RUBY_PLATFORM == 'java'
    raise Stapfen::ConfigurationError, "You cannot use JMS unless you're running under JRuby!"
  end

  begin
    require 'java'
    require 'jms'
  rescue LoadError
    puts "You need the `jms` gem to be installed to use JMS!"
    raise
  end

  @protocol = 'jms'
  return true
end

.use_kafka!Boolean

Force the worker to use Kafka as the messaging protocol.

Note: Only works under JRuby

Returns:

  • (Boolean)


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/stapfen/worker.rb', line 89

def self.use_kafka!
  unless RUBY_PLATFORM == 'java'
    raise Stapfen::ConfigurationError, "You cannot use Kafka unless you're running under JRuby!"
  end

  begin
    require 'java'
    require 'hermann'
  rescue LoadError
    puts "You need the `hermann` gem to be installed to use Kafka!"
    raise
  end

  @protocol = 'kafka'
  return true
end

.use_stomp!Boolean

Force the worker to use STOMP as the messaging protocol (default)

Returns:

  • (Boolean)


42
43
44
45
46
47
48
49
50
51
52
# File 'lib/stapfen/worker.rb', line 42

def self.use_stomp!
  begin
    require 'stomp'
  rescue LoadError
    puts "You need the `stomp` gem to be installed to use stomp!"
    raise
  end

  @protocol = 'stomp'
  return true
end

.workersObject

Return all the currently running Stapfen::Worker instances in this process



133
134
135
# File 'lib/stapfen/worker.rb', line 133

def self.workers
  @@workers
end

Instance Method Details

#exit_cleanlyObject

Invokes the shutdown block if it has been created, and closes the Stapfen::Worker.{Stomp{Stomp::Client} connection unless it has already been shut down



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/stapfen/worker.rb', line 248

def exit_cleanly
  info("#{self} exiting ")
  self.class.destructor.call if self.class.destructor

  info "Killing client"
  begin
    # Only close the client if we have one sitting around
    if client
      unless client.closed?
        client.close
      end
    end
  rescue StandardError => exc
    error "Exception received while trying to close client! #{exc.inspect}"
  end
end

#runObject



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/stapfen/worker.rb', line 186

def run
  if self.class.stomp?
    require 'stapfen/client/stomp'
    @client = Stapfen::Client::Stomp.new(self.class.configuration.call)
  elsif self.class.jms?
    require 'stapfen/client/jms'
    @client = Stapfen::Client::JMS.new(self.class.configuration.call)
  elsif self.class.kafka?
    require 'stapfen/client/kafka'
    @client = Stapfen::Client::Kafka.new(self.class.configuration.call)
  end

  debug("Running with #{@client} inside of Thread:#{Thread.current.inspect}")

  @client.connect

  self.class.consumers.each do |name, headers, block|
    unreceive_headers = {}
    [:max_redeliveries, :dead_letter_queue].each do |sym|
      unreceive_headers[sym] = headers[sym] if headers.has_key? sym
    end

    # We're taking each block and turning it into a method so that we can
    # use the instance scope instead of the blocks originally bound scope
    # which would be at a class level
    method_name = name.gsub(/[.|\-]/, '_').to_sym
    self.class.send(:define_method, method_name, &block)

    client.subscribe(name, headers) do |m|
      message = nil
      if self.class.stomp?
        message = Stapfen::Message.from_stomp(m)
      end

      if self.class.jms?
        message = Stapfen::Message.from_jms(m)
      end

      if self.class.kafka?
        message = Stapfen::Message.from_kafka(m)
      end

      success = self.send(method_name, message)

      unless success
        if client.can_unreceive? && !unreceive_headers.empty?
          client.unreceive(m, unreceive_headers)
        end
      end
    end
  end

  begin
    client.runloop
    warn("Exiting the runloop for #{self}")
  rescue Interrupt
    exit_cleanly
  end
end