Class: Bunny::ReaderLoop
- Inherits:
-
Object
- Object
- Bunny::ReaderLoop
- Defined in:
- lib/bunny/reader_loop.rb
Overview
Network activity loop that reads and passes incoming AMQP 0.9.1 methods for processing. They are dispatched further down the line in Bunny::Session and Bunny::Channel. This loop uses a separate thread internally.
This mimics the way RabbitMQ Java is designed quite closely.
Instance Method Summary collapse
-
#initialize(transport, session, session_error_handler) ⇒ ReaderLoop
constructor
A new instance of ReaderLoop.
- #join ⇒ Object
- #kill ⇒ Object
- #raise(e) ⇒ Object
- #resume ⇒ Object
- #run_loop ⇒ Object
- #run_once ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
- #stopping? ⇒ Boolean
- #terminate_with(e) ⇒ Object
Constructor Details
#initialize(transport, session, session_error_handler) ⇒ ReaderLoop
Returns a new instance of ReaderLoop.
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/bunny/reader_loop.rb', line 14 def initialize(transport, session, session_error_handler) @transport = transport @session = session @session_error_handler = session_error_handler @logger = @session.logger @mutex = Mutex.new @stopping = false @stopped = false @network_is_down = false end |
Instance Method Details
#join ⇒ Object
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/bunny/reader_loop.rb', line 120 def join # Thread#join can/would trigger a re-raise of an unhandled exception in this thread. # In addition, Thread.handle_interrupt can be used by other libraries or application code # that would make this join operation fail with an obscure exception. # So we try to save everyone some really unpleasant debugging time by introducing # this condition which typically would not evaluate to true anyway. # # See ruby-amqp/bunny#589 and ruby-amqp/bunny#590 for background. @thread.join if @thread && @thread != Thread.current end |
#kill ⇒ Object
131 132 133 134 135 136 |
# File 'lib/bunny/reader_loop.rb', line 131 def kill if @thread @thread.kill @thread.join end end |
#raise(e) ⇒ Object
116 117 118 |
# File 'lib/bunny/reader_loop.rb', line 116 def raise(e) @thread.raise(e) if @thread end |
#resume ⇒ Object
32 33 34 |
# File 'lib/bunny/reader_loop.rb', line 32 def resume start end |
#run_loop ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/bunny/reader_loop.rb', line 37 def run_loop loop do begin break if @mutex.synchronize { @stopping || @stopped || @network_is_down } run_once rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Timeout::Error, OpenSSL::OpenSSLError => e break if terminate? || @session.closing? || @session.closed? @network_is_down = true if @session.automatically_recover? log_exception(e, level: :warn) @session.handle_network_failure(e) else log_exception(e) @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.}", e)) end rescue ShutdownSignal => _ @mutex.synchronize { @stopping = true } break rescue Exception => e break if terminate? if !(@session.closing? || @session.closed?) log_exception(e) @network_is_down = true @session_error_handler.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.}", e)) end rescue Errno::EBADF => _ebadf break if terminate? # ignored, happens when we loop after the transport has already been closed @mutex.synchronize { @stopping = true } end end @mutex.synchronize { @stopped = true } end |
#run_once ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/bunny/reader_loop.rb', line 75 def run_once frame = @transport.read_next_frame return if frame.is_a?(AMQ::Protocol::HeartbeatFrame) if !frame.final? || frame.method_class.has_content? header = @transport.read_next_frame content = +'' if header.body_size > 0 loop do body_frame = @transport.read_next_frame content << body_frame.decode_payload break if content.bytesize >= header.body_size end end @session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content]) else @session.handle_frame(frame.channel, frame.decode_payload) end end |
#start ⇒ Object
28 29 30 |
# File 'lib/bunny/reader_loop.rb', line 28 def start @thread = Thread.new(&method(:run_loop)) end |
#stop ⇒ Object
98 99 100 |
# File 'lib/bunny/reader_loop.rb', line 98 def stop @mutex.synchronize { @stopping = true } end |
#stopped? ⇒ Boolean
102 103 104 |
# File 'lib/bunny/reader_loop.rb', line 102 def stopped? @mutex.synchronize { @stopped } end |
#stopping? ⇒ Boolean
106 107 108 |
# File 'lib/bunny/reader_loop.rb', line 106 def stopping? @mutex.synchronize { @stopping } end |
#terminate_with(e) ⇒ Object
110 111 112 113 114 |
# File 'lib/bunny/reader_loop.rb', line 110 def terminate_with(e) @mutex.synchronize { @stopping = true } self.raise(e) end |