Class: Vx::Lib::Consumer::Session
- Inherits:
-
Object
- Object
- Vx::Lib::Consumer::Session
- Defined in:
- lib/vx/lib/consumer/session.rb
Constant Summary collapse
- @@session_lock =
Mutex.new
- @@shutdown_lock =
Mutex.new
- @@shutdown =
ConditionVariable.new
- @@live =
false
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
Instance Method Summary collapse
- #allocate_pub_channel ⇒ Object
- #assign_error_handlers_to_channel(ch) ⇒ Object
- #close ⇒ Object
- #conn_info ⇒ Object
- #declare_exchange(ch, name, options = nil) ⇒ Object
- #declare_queue(ch, name, options = nil) ⇒ Object
- #live? ⇒ Boolean
- #open(options = {}) ⇒ Object
- #open? ⇒ Boolean
- #resume ⇒ Object
- #shutdown ⇒ Object
- #wait_shutdown(timeout = nil) ⇒ Object
- #with_pub_channel ⇒ Object
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
15 16 17 |
# File 'lib/vx/lib/consumer/session.rb', line 15 def conn @conn end |
Instance Method Details
#allocate_pub_channel ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/vx/lib/consumer/session.rb', line 109 def allocate_pub_channel assert_connection_is_open key = :vx_consumer_session_pub_channel if Thread.current[key] yield else ch = conn.create_channel assign_error_handlers_to_channel(ch) Thread.current[key] = ch begin yield ensure ch = Thread.current[key] ch.close if ch.open? Thread.current[key] = nil end end end |
#assign_error_handlers_to_channel(ch) ⇒ Object
146 147 148 149 |
# File 'lib/vx/lib/consumer/session.rb', line 146 def assign_error_handlers_to_channel(ch) ch.on_uncaught_exception {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) } ch.on_error {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) } end |
#close ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/vx/lib/consumer/session.rb', line 39 def close if open? @@session_lock.synchronize do begin conn.close while conn.status != :closed sleep 0.01 end rescue Bunny::ChannelError, Bunny::ClientTimeout => e Consumer.exception_handler(e, {}) end @conn = nil end end end |
#conn_info ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/vx/lib/consumer/session.rb', line 90 def conn_info if conn "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}" else "not connected" end end |
#declare_exchange(ch, name, options = nil) ⇒ Object
130 131 132 133 134 135 136 137 |
# File 'lib/vx/lib/consumer/session.rb', line 130 def declare_exchange(ch, name, = nil) assert_connection_is_open ||= {} = {} if name == ''.freeze ch.exchange name, end |
#declare_queue(ch, name, options = nil) ⇒ Object
139 140 141 142 143 144 |
# File 'lib/vx/lib/consumer/session.rb', line 139 def declare_queue(ch, name, = nil) assert_connection_is_open ||= {} ch.queue name, end |
#live? ⇒ Boolean
24 25 26 |
# File 'lib/vx/lib/consumer/session.rb', line 24 def live? @@live end |
#open(options = {}) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/vx/lib/consumer/session.rb', line 55 def open( = {}) return self if open? @@session_lock.synchronize do unless open? resume = { heartbeat: Consumer.configuration.heartbeat, automatically_recover: false, } if Consumer.configuration.logger .merge!(logger: Consumer.configuration.logger) end @conn ||= Bunny.new( nil, # from ENV['RABBITMQ_URL'] ) conn.start while conn.connecting? sleep 0.01 end end end self end |
#open? ⇒ Boolean
86 87 88 |
# File 'lib/vx/lib/consumer/session.rb', line 86 def open? conn && conn.open? && conn.status == :open end |
#resume ⇒ Object
28 29 30 |
# File 'lib/vx/lib/consumer/session.rb', line 28 def resume @@live = true end |
#shutdown ⇒ Object
17 18 19 20 21 22 |
# File 'lib/vx/lib/consumer/session.rb', line 17 def shutdown @@shutdown_lock.synchronize do @@live = false @@shutdown.broadcast end end |
#wait_shutdown(timeout = nil) ⇒ Object
32 33 34 35 36 37 |
# File 'lib/vx/lib/consumer/session.rb', line 32 def wait_shutdown(timeout = nil) @@shutdown_lock.synchronize do @@shutdown.wait(@@shutdown_lock, timeout) not live? end end |
#with_pub_channel ⇒ Object
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/vx/lib/consumer/session.rb', line 98 def with_pub_channel key = :vx_consumer_session_pub_channel if ch = Thread.current[key] yield ch else conn.with_channel do |c| yield c end end end |