Class: Stapfen::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/stapfen/worker.rb

Constant Summary collapse

KAFKA =
:kafka.freeze
STOMP =
:stomp.freeze
JMS =
:jms.freeze

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWorker

Instance Methods



110
111
112
113
114
115
116
# File 'lib/stapfen/worker.rb', line 110

def initialize
  instance_configuration = self.class.instance_configuration
  if instance_configuration
    self.configure &instance_configuration
  end
  self.client_options ||= {}
end

Class Attribute Details

.consumersObject

Returns the value of attribute consumers.



13
14
15
# File 'lib/stapfen/worker.rb', line 13

def consumers
  @consumers
end

.destructorObject

Returns the value of attribute destructor.



13
14
15
# File 'lib/stapfen/worker.rb', line 13

def destructor
  @destructor
end

.instance_configurationObject

Returns the value of attribute instance_configuration.



13
14
15
# File 'lib/stapfen/worker.rb', line 13

def instance_configuration
  @instance_configuration
end

Instance Attribute Details

#client_optionsObject

Returns the value of attribute client_options.



10
11
12
# File 'lib/stapfen/worker.rb', line 10

def client_options
  @client_options
end

#loggerObject

Returns the value of attribute logger.



10
11
12
# File 'lib/stapfen/worker.rb', line 10

def logger
  @logger
end

#protocolObject

Returns the value of attribute protocol.



10
11
12
# File 'lib/stapfen/worker.rb', line 10

def protocol
  @protocol
end

#stapfen_clientObject

Returns the value of attribute stapfen_client.



10
11
12
# File 'lib/stapfen/worker.rb', line 10

def stapfen_client
  @stapfen_client
end

Class Method Details

.configure(&configuration_block) ⇒ Object



15
16
17
18
19
20
# File 'lib/stapfen/worker.rb', line 15

def configure(&configuration_block)
  unless block_given?
    raise Stapfen::ConfigurationError, "Method `configure` requires a block"
  end
  self.instance_configuration = configuration_block
end

.consume(config_overrides = {}, &consume_block) ⇒ Object

Main message consumption block



34
35
36
37
38
39
40
# File 'lib/stapfen/worker.rb', line 34

def consume(config_overrides={}, &consume_block)
  unless block_given?
    raise Stapfen::ConsumeError, "Method `consume` requires a block"
  end
  @consumers ||= ThreadSafe::Array.new
  @consumers << [config_overrides, consume_block]
end

.exit_cleanlyBoolean

Invoke exit_cleanly on each of the registered Worker instances that this class is keeping track of

Returns:

  • (Boolean)

    Whether or not we’ve exited/terminated cleanly



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/stapfen/worker.rb', line 58

def exit_cleanly
  return false if workers.empty?

  cleanly = true
  workers.each do |w|
    begin
      w.exit_cleanly
    rescue StandardError => ex
      $stderr.write("Failure while exiting cleanly #{ex.inspect}\n#{ex.backtrace}")
      cleanly = false
    end
  end

  if RUBY_PLATFORM == 'java'
    Stapfen.logger.info 'Telling the JVM to exit cleanly'
    Java::JavaLang::System.exit(0)
  end

  return cleanly
end

.handle_signalsObject

Utility method to set up the proper worker signal handlers



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/stapfen/worker.rb', line 80

def handle_signals
  return if @@signals_handled

  Signal.trap(:INT) do
    self.exit_cleanly
    exit!
  end

  Signal.trap(:TERM) do
    self.exit_cleanly
  end

  @@signals_handled = true
end

.run!Object

Instantiate a new Worker instance and run it



23
24
25
26
27
28
29
30
31
# File 'lib/stapfen/worker.rb', line 23

def run!
  worker = self.new

  @@workers << worker

  handle_signals

  worker.run
end

.set_class_variable_defaultsObject

Class variables are put in this method to allow for “reset” style functionality if needed. Useful for testing (see worker_spec.rb).



97
98
99
100
# File 'lib/stapfen/worker.rb', line 97

def set_class_variable_defaults
  @@signals_handled = false
  @@workers = ThreadSafe::Array.new
end

.shutdown(&block) ⇒ Object

Optional method, specifes a block to execute when the worker is shutting down.



44
45
46
# File 'lib/stapfen/worker.rb', line 44

def shutdown(&block)
  @destructor = block
end

.workersObject

Return all the currently running Stapfen::Worker instances in this process



50
51
52
# File 'lib/stapfen/worker.rb', line 50

def workers
  @@workers
end

Instance Method Details

#configure(&configuration_block) ⇒ Object



118
119
120
# File 'lib/stapfen/worker.rb', line 118

def configure(&configuration_block)
  self.instance_eval &configuration_block
end

#exit_cleanlyObject

Invokes the shutdown block if it has been created, and closes the Stapfen::Worker.{Stomp{Stomp::Client} connection unless it has already been shut down



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/stapfen/worker.rb', line 265

def exit_cleanly
  logger.info("#{self} exiting ")
  self.class.destructor.call if self.class.destructor

  logger.info 'Killing client'
  begin
    # Only close the client if we have one sitting around
    if stapfen_client
      unless stapfen_client.closed?
        stapfen_client.close
      end
    end
  rescue StandardError => exc
    logger.error "Exception received while trying to close client! #{exc.inspect}"
  end
end

#jms?Boolean

Returns:

  • (Boolean)


167
168
169
# File 'lib/stapfen/worker.rb', line 167

def jms?
  @protocol == JMS
end

#kafka?Boolean

Returns:

  • (Boolean)


193
194
195
# File 'lib/stapfen/worker.rb', line 193

def kafka?
  @protocol == KAFKA
end

#runObject



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/stapfen/worker.rb', line 201

def run
  case protocol
  when STOMP
    require 'stapfen/client/stomp'
    stapfen_client = Stapfen::Client::Stomp.new(client_options)
  when JMS
    require 'stapfen/client/jms'
    stapfen_client = Stapfen::Client::JMS.new(client_options)
  when KAFKA
    require 'stapfen/client/kafka'
    stapfen_client = Stapfen::Client::Kafka.new(client_options)
  else
    raise 'No client specified'
  end

  logger.info("Running with #{stapfen_client} inside of Thread:#{Thread.current.inspect}")

  stapfen_client.connect

  self.class.consumers.each do |config_overrides, block|
    consumer_config = client_options.merge(config_overrides)
    consumer_topic = consumer_config[:topic]
    consumer_can_unreceive = !(consumer_config.keys & [:max_redeliveries, :dead_letter_queue]).empty?

    # We're taking each block and turning it into a method so that we can
    # use the instance scope instead of the blocks originally bound scope
    # which would be at a class level
    methodized_topic = consumer_topic.gsub(/[.|\-]/, '_').to_sym
    self.class.send(:define_method, methodized_topic, &block)

    stapfen_client.subscribe(consumer_topic, consumer_config) do |message_entity|
      stapfen_message = nil
      if stomp?
        stapfen_message = Stapfen::Message.from_stomp(message_entity)
      end

      if jms?
        stapfen_message = Stapfen::Message.from_jms(message_entity)
      end

      if kafka?
        stapfen_message = Stapfen::Message.from_kafka(message_entity)
      end

      success = self.send(methodized_topic, stapfen_message)

      unless success
        if stapfen_client.can_unreceive? && consumer_can_unreceive
          stapfen_client.unreceive(message_entity, consumer_config)
        end
      end
    end
  end

  begin
    stapfen_client.runloop
    logger.info("Exiting the runloop for #{self}")
  rescue Interrupt
    exit_cleanly
  end
end

#stomp?Boolean

Returns:

  • (Boolean)


141
142
143
# File 'lib/stapfen/worker.rb', line 141

def stomp?
  @protocol == STOMP
end

#use_jms!Boolean

Force the worker to use JMS as the messaging protocol.

Note: Only works under JRuby

Returns:

  • (Boolean)


150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/stapfen/worker.rb', line 150

def use_jms!
  unless RUBY_PLATFORM == 'java'
    raise Stapfen::ConfigurationError, 'You cannot use JMS unless you are running under JRuby!'
  end

  begin
    require 'java'
    require 'jms'
  rescue LoadError
    Stapfen.logger.info 'You need the `jms` gem to be installed to use JMS!'
    raise
  end

  @protocol = JMS
  return true
end

#use_kafka!Boolean

Force the worker to use Kafka as the messaging protocol.

Note: Only works under JRuby

Returns:

  • (Boolean)


176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/stapfen/worker.rb', line 176

def use_kafka!
  unless RUBY_PLATFORM == 'java'
    raise Stapfen::ConfigurationError, 'You cannot use Kafka unless you are running under JRuby!'
  end

  begin
    require 'java'
    require 'hermann'
  rescue LoadError
    Stapfen.logger.info 'You need the `hermann` gem to be installed to use Kafka!'
    raise
  end

  @protocol = KAFKA
  return true
end

#use_stomp!Boolean

Force the worker to use STOMP as the messaging protocol (default)

Returns:

  • (Boolean)


129
130
131
132
133
134
135
136
137
138
139
# File 'lib/stapfen/worker.rb', line 129

def use_stomp!
  begin
    require 'stomp'
  rescue LoadError
    Stapfen.logger.info 'You need the `stomp` gem to be installed to use stomp!'
    raise
  end

  @protocol = STOMP
  return true
end