Class: QueueWorker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/queue_worker.rb,
lib/queue_worker/version.rb

Constant Summary collapse

VERSION =
'1.0.1'

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name = nil, client = nil) ⇒ QueueWorker



26
27
28
29
30
# File 'lib/queue_worker.rb', line 26

def initialize(queue_name = nil, client = nil)
  @queue = queue_name
  @client = client
  @handler = proc { |args, message| load_handler_class(message.headers['destination']).new(args).call }
end

Class Attribute Details

.stompObject

Returns the value of attribute stomp.



14
15
16
# File 'lib/queue_worker.rb', line 14

def stomp
  @stomp
end

Instance Attribute Details

#clientObject



142
143
144
# File 'lib/queue_worker.rb', line 142

def client
  @client ||= Stomp::Client.new(self.class.stomp)
end

#handlerObject

Returns the value of attribute handler.



22
23
24
# File 'lib/queue_worker.rb', line 22

def handler
  @handler
end

#queueObject

Returns the value of attribute queue.



22
23
24
# File 'lib/queue_worker.rb', line 22

def queue
  @queue
end

Class Method Details

.configure {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:

  • _self (QueueWorker)

    the object that the method was called on



16
17
18
# File 'lib/queue_worker.rb', line 16

def configure
  yield self
end

.peek(queue_name, size = 1) ⇒ Object

Peek at any number messages in the queue



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/queue_worker.rb', line 46

def self.peek(queue_name, size = 1)
  counter = 0
  messages = []
  worker = new(queue_name)
  worker.subscribe_with_timeout(2, size) do |message|
    counter += 1
    messages << JSON.parse(message.body).merge('message-id' => message.headers['message-id'])
    worker.quit if counter == size
  end
  messages
end

.publish(queue, *messages) ⇒ Object

Publish one or more messages to a queue



36
37
38
39
40
# File 'lib/queue_worker.rb', line 36

def self.publish(queue, *messages)
  worker = new(queue)
  messages.each { |msg| worker.publish(msg) }
  worker.close
end

Instance Method Details

#call(message) ⇒ Object

Handles subscribe callback

Tries to delegate processing of message to a class based on the name of the queue. For example:

If the queue is named "scheduled/default" it will look for a class called Scheduled::Default to
initialize with the message body and then call it's +call+ method


111
112
113
114
115
116
117
118
# File 'lib/queue_worker.rb', line 111

def call(message)
  handler.call(JSON.parse(message.body, symbolize_names: true), message)
rescue => e
  log.error(e.message) { "\n#{e.backtrace.inspect}" }
ensure
  ack(message)
  log.info('Processed') { message }
end

#default_subscribe_callback(message) ⇒ Object

Private



124
125
126
127
128
129
130
131
132
# File 'lib/queue_worker.rb', line 124

def default_subscribe_callback(message)
  if message.command == 'MESSAGE'
    if message.body == 'UNSUBSCRIBE'
      unsubscribe
    else
      call(message)
    end
  end
end

#load_handler_class(destination_queue) ⇒ Object

Converts the queue name to a constant



138
139
140
# File 'lib/queue_worker.rb', line 138

def load_handler_class(destination_queue)
  destination_queue.gsub(%r!^/?queue/!, '').camelize.constantize
end

#logObject



146
147
148
# File 'lib/queue_worker.rb', line 146

def log
  @log ||= Logger.new(STDOUT)
end

#publish(message, headers = {}) ⇒ Object Also known as: push

Publish a message to a queue



62
63
64
65
# File 'lib/queue_worker.rb', line 62

def publish(message, headers = {})
  message = message.to_json unless message.is_a?(String)
  client.publish("/queue/#{queue}", message, { priority: 4, persistent: true }.merge(headers))
end

#quitObject

Unsubscribe from the current queue and close the connection



98
99
100
101
# File 'lib/queue_worker.rb', line 98

def quit
  unsubscribe
  close
end

#subscribe(size = 1, &block) ⇒ Object

Subscribe (listen) to a queue



73
74
75
76
# File 'lib/queue_worker.rb', line 73

def subscribe(size = 1, &block)
  callback = block || method(:default_subscribe_callback)
  client.subscribe("/queue/#{queue}", { :ack => 'client', 'activemq.prefetchSize' => size }, &callback)
end

#subscribe_with_timeout(duration, size = 1, &block) ⇒ Object

Subscribe to a queue for a limited time



83
84
85
86
87
88
89
90
# File 'lib/queue_worker.rb', line 83

def subscribe_with_timeout(duration, size = 1, &block)
  Timeout::timeout(duration) do
    subscribe(size, &block)
    join
  end
rescue Timeout::Error
  quit
end

#unsubscribeObject

Unsubscribe from the current queue



93
94
95
# File 'lib/queue_worker.rb', line 93

def unsubscribe
  client.unsubscribe("/queue/#{queue}")
end