Class: Moqueue::MockQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/moqueue/mock_queue.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ MockQueue

Returns a new instance of MockQueue.



20
21
22
23
# File 'lib/moqueue/mock_queue.rb', line 20

def initialize(name)
  @name = name
  MockBroker.instance.register_queue(self)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/moqueue/mock_queue.rb', line 7

def name
  @name
end

Class Method Details

.new(name) ⇒ Object



11
12
13
14
15
16
# File 'lib/moqueue/mock_queue.rb', line 11

def new(name)
  if existing_queue = MockBroker.instance.find_queue(name)
    return existing_queue
  end
  super
end

Instance Method Details

#acked_messagesObject



76
77
78
# File 'lib/moqueue/mock_queue.rb', line 76

def acked_messages
  @acked_messages ||= []
end

#bind(exchange, key = nil) ⇒ Object



67
68
69
70
# File 'lib/moqueue/mock_queue.rb', line 67

def bind(exchange, key=nil)
  exchange.attach_queue(self, key)
  self
end

#callback_defined?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/moqueue/mock_queue.rb', line 85

def callback_defined?
  !!message_handler_callback
end

#null_subscribeObject

configures a do-nothing subscribe block to force received messages to be processed and stored in #received_messages



92
93
94
95
# File 'lib/moqueue/mock_queue.rb', line 92

def null_subscribe
  subscribe {|msg| nil}
  self
end

#publish(message) ⇒ Object



57
58
59
60
61
62
63
64
65
# File 'lib/moqueue/mock_queue.rb', line 57

def publish(message)
  if message_handler_callback
    real_publish(message)
  else
    deferred_publishing_fibers << Fiber.new do
      real_publish(message)
    end
  end
end

#receive(message, header_opts = {}) ⇒ Object



34
35
36
37
38
39
40
41
42
43
# File 'lib/moqueue/mock_queue.rb', line 34

def receive(message, header_opts={})
  if callback = message_handler_callback
    headers = MockHeaders.new(header_opts)
    callback.call(*(callback.arity == 1 ? [message] : [headers, message]))
    received_messages << message
    @ack_msgs && headers.received_ack? ? message : nil
  else
    receive_message_later(message, header_opts)
  end
end

#received_ack_for_message?(message_content) ⇒ Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/moqueue/mock_queue.rb', line 53

def received_ack_for_message?(message_content)
  acked_messages.include?(message_content)
end

#received_message?(message_content) ⇒ Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/moqueue/mock_queue.rb', line 45

def received_message?(message_content)
  received_messages.include?(message_content)
end

#received_messagesObject



72
73
74
# File 'lib/moqueue/mock_queue.rb', line 72

def received_messages
  @received_messages ||= []
end

#run_callback(*args) ⇒ Object



80
81
82
83
# File 'lib/moqueue/mock_queue.rb', line 80

def run_callback(*args)
  callback = message_handler_callback
  callback.call(*(callback.arity == 1 ? [args.first] : args))
end

#subscribe(opts = {}, &block) ⇒ Object



25
26
27
28
29
30
31
32
# File 'lib/moqueue/mock_queue.rb', line 25

def subscribe(opts={}, &block)
  if @subscribe_block 
    raise DoubleSubscribeError, "you can't subscribe to the same queue twice"
  end
  @subscribe_block = block
  @ack_msgs = opts[:ack] || false
  process_unhandled_messages
end

#unsubscribeObject



49
50
51
# File 'lib/moqueue/mock_queue.rb', line 49

def unsubscribe
  true
end