Class: Vx::Lib::Consumer::Session
- Inherits:
-
Object
- Object
- Vx::Lib::Consumer::Session
- Includes:
- Instrument
- Defined in:
- lib/vx/lib/consumer/session.rb
Constant Summary collapse
- @@session_lock =
Mutex.new
- @@shutdown_lock =
Mutex.new
- @@shutdown =
ConditionVariable.new
- @@live =
false
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
Instance Method Summary collapse
- #allocate_pub_channel ⇒ Object
- #assign_error_handlers_to_channel(ch) ⇒ Object
- #close ⇒ Object
- #conn_info ⇒ Object
- #declare_exchange(ch, name, options = nil) ⇒ Object
- #declare_queue(ch, name, options = nil) ⇒ Object
- #live? ⇒ Boolean
- #open(options = {}) ⇒ Object
- #open? ⇒ Boolean
- #resume ⇒ Object
- #shutdown ⇒ Object
- #wait_shutdown(timeout = nil) ⇒ Object
- #with_pub_channel ⇒ Object
Methods included from Instrument
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
17 18 19 |
# File 'lib/vx/lib/consumer/session.rb', line 17 def conn @conn end |
Instance Method Details
#allocate_pub_channel ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/vx/lib/consumer/session.rb', line 113 def allocate_pub_channel assert_connection_is_open key = :vx_consumer_session_pub_channel if Thread.current[key] yield else ch = conn.create_channel assign_error_handlers_to_channel(ch) Thread.current[key] = ch begin yield ensure ch = Thread.current[key] ch.close if ch.open? Thread.current[key] = nil end end end |
#assign_error_handlers_to_channel(ch) ⇒ Object
150 151 152 153 |
# File 'lib/vx/lib/consumer/session.rb', line 150 def assign_error_handlers_to_channel(ch) ch.on_uncaught_exception {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) } ch.on_error {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) } end |
#close ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/vx/lib/consumer/session.rb', line 41 def close if open? @@session_lock.synchronize do instrument("closing_connection", info: conn_info) instrument("close_connection", info: conn_info) do begin conn.close while conn.status != :closed sleep 0.01 end rescue Bunny::ChannelError, Bunny::ClientTimeout => e Consumer.exception_handler(e, {}) end end @conn = nil end end end |
#conn_info ⇒ Object
94 95 96 97 98 99 100 |
# File 'lib/vx/lib/consumer/session.rb', line 94 def conn_info if conn "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}" else "not connected" end end |
#declare_exchange(ch, name, options = nil) ⇒ Object
134 135 136 137 138 139 140 141 |
# File 'lib/vx/lib/consumer/session.rb', line 134 def declare_exchange(ch, name, = nil) assert_connection_is_open ||= {} = {} if name == ''.freeze ch.exchange name, end |
#declare_queue(ch, name, options = nil) ⇒ Object
143 144 145 146 147 148 |
# File 'lib/vx/lib/consumer/session.rb', line 143 def declare_queue(ch, name, = nil) assert_connection_is_open ||= {} ch.queue name, end |
#live? ⇒ Boolean
26 27 28 |
# File 'lib/vx/lib/consumer/session.rb', line 26 def live? @@live end |
#open(options = {}) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/vx/lib/consumer/session.rb', line 61 def open( = {}) return self if open? @@session_lock.synchronize do unless open? resume @conn ||= Bunny.new( nil, # from ENV['RABBITMQ_URL'] heartbeat: Consumer.configuration.heartbeat, automatically_recover: false ) instrumentation = { info: conn_info }.merge() instrument("start_connecting", instrumentation) instrument("connect", instrumentation) do conn.start while conn.connecting? sleep 0.01 end end end end self end |
#open? ⇒ Boolean
90 91 92 |
# File 'lib/vx/lib/consumer/session.rb', line 90 def open? conn && conn.open? && conn.status == :open end |
#resume ⇒ Object
30 31 32 |
# File 'lib/vx/lib/consumer/session.rb', line 30 def resume @@live = true end |
#shutdown ⇒ Object
19 20 21 22 23 24 |
# File 'lib/vx/lib/consumer/session.rb', line 19 def shutdown @@shutdown_lock.synchronize do @@live = false @@shutdown.broadcast end end |
#wait_shutdown(timeout = nil) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/vx/lib/consumer/session.rb', line 34 def wait_shutdown(timeout = nil) @@shutdown_lock.synchronize do @@shutdown.wait(@@shutdown_lock, timeout) not live? end end |
#with_pub_channel ⇒ Object
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/vx/lib/consumer/session.rb', line 102 def with_pub_channel key = :vx_consumer_session_pub_channel if ch = Thread.current[key] yield ch else conn.with_channel do |c| yield c end end end |