Class: HotBunnies::Queue

Inherits:
Object
  • Object
show all
Includes:
Hollywood
Defined in:
lib/euston-rabbitmq/hot_bunnies/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#timeout=(value) ⇒ Object (writeonly)

Sets the attribute timeout

Parameters:

  • value

    the value to set the attribute timeout to.



5
6
7
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 5

def timeout=(value)
  @timeout = value
end

Instance Method Details

#consumer(auto_ack = false) ⇒ Object



7
8
9
10
11
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 7

def consumer auto_ack = false
  consumer = QueueingConsumer.new @channel
  @channel.basic_consume @name, auto_ack, consumer
  consumer
end

#delivery_timeoutObject



13
14
15
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 13

def delivery_timeout
  @timeout ||= 500
end

#safe_getObject



17
18
19
20
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 17

def safe_get
  message = get :ack => true
  safe_handle_message ReactiveMessage.new(@channel, *message) unless message.nil?
end

#safe_handle_message(reactive_message) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 22

def safe_handle_message reactive_message
  begin
    message = parse_json reactive_message.body

    begin
      callback :message_received, message
      reactive_message.ack
    rescue Euston::EventStore::ConcurrencyError
      reactive_message.reject #requeue
    rescue => e
      callback :message_failed, message, e, reactive_message
    end
  rescue => e
    callback :message_decode_failed, reactive_message.body, e
    reactive_message.ack
    Safely.report! e
  end

  reactive_message.headers.ack unless reactive_message.reacted?
end

#safe_subscribeObject



43
44
45
46
47
48
49
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 43

def safe_subscribe
  _consumer = self.consumer

  until Thread.current[:stop] do
    safe_subscribe_with_timeout _consumer, self.delivery_timeout
  end
end

#safe_subscribe_with_timeout(consumer, timeout = 500) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 51

def safe_subscribe_with_timeout consumer, timeout = 500
  loop do
    delivery = nil

    begin
      delivery = consumer.next_delivery timeout
    rescue NativeException => e
      Thread.current[:exception] = e
      break
    end

    break if delivery.nil?

    headers = Headers.new @channel, nil, delivery.envelope, delivery.properties
    body = String.from_java_bytes(delivery.get_body)

    reactive_message = ReactiveMessage.new @channel, headers, body
    safe_handle_message reactive_message
  end
end