Class: Vx::Lib::Consumer::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/vx/lib/consumer/session.rb

Constant Summary collapse

@@session_lock =
Mutex.new
@@shutdown_lock =
Mutex.new
@@shutdown =
ConditionVariable.new
@@live =
false

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



15
16
17
# File 'lib/vx/lib/consumer/session.rb', line 15

def conn
  @conn
end

Instance Method Details

#allocate_pub_channelObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/vx/lib/consumer/session.rb', line 101

def allocate_pub_channel
  assert_connection_is_open

  key = :vx_consumer_session_pub_channel

  if Thread.current[key]
    yield
  else
    ch = conn.create_channel
    assign_error_handlers_to_channel(ch)
    Thread.current[key] = ch
    begin
      yield
    ensure
      ch = Thread.current[key]
      ch.close if ch.open?
      Thread.current[key] = nil
    end
  end
end

#assign_error_handlers_to_channel(ch) ⇒ Object



138
139
140
141
# File 'lib/vx/lib/consumer/session.rb', line 138

def assign_error_handlers_to_channel(ch)
  ch.on_uncaught_exception {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) }
  ch.on_error {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) }
end

#closeObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/vx/lib/consumer/session.rb', line 39

def close
  if open?
    @@session_lock.synchronize do
      begin
        conn.close
        while conn.status != :closed
          sleep 0.01
        end
      rescue Bunny::ChannelError, Bunny::ClientTimeout => e
        Consumer.exception_handler(e, {})
      end
      @conn = nil
    end
  end
end

#conn_infoObject



82
83
84
85
86
87
88
# File 'lib/vx/lib/consumer/session.rb', line 82

def conn_info
  if conn
    "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
  else
    "not connected"
  end
end

#declare_exchange(ch, name, options = nil) ⇒ Object



122
123
124
125
126
127
128
129
# File 'lib/vx/lib/consumer/session.rb', line 122

def declare_exchange(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  options = {} if name == ''.freeze

  ch.exchange name, options
end

#declare_queue(ch, name, options = nil) ⇒ Object



131
132
133
134
135
136
# File 'lib/vx/lib/consumer/session.rb', line 131

def declare_queue(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch.queue name, options
end

#live?Boolean



24
25
26
# File 'lib/vx/lib/consumer/session.rb', line 24

def live?
  @@live
end

#open(options = {}) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/vx/lib/consumer/session.rb', line 55

def open(options = {})
  return self if open?

  @@session_lock.synchronize do
    unless open?
      resume

      @conn ||= Bunny.new(
        nil,       # from ENV['RABBITMQ_URL']
        heartbeat: Consumer.configuration.heartbeat,
        automatically_recover: false
      )

      conn.start
      while conn.connecting?
        sleep 0.01
      end
    end
  end

  self
end

#open?Boolean



78
79
80
# File 'lib/vx/lib/consumer/session.rb', line 78

def open?
  conn && conn.open? && conn.status == :open
end

#resumeObject



28
29
30
# File 'lib/vx/lib/consumer/session.rb', line 28

def resume
  @@live = true
end

#shutdownObject



17
18
19
20
21
22
# File 'lib/vx/lib/consumer/session.rb', line 17

def shutdown
  @@shutdown_lock.synchronize do
    @@live = false
    @@shutdown.broadcast
  end
end

#wait_shutdown(timeout = nil) ⇒ Object



32
33
34
35
36
37
# File 'lib/vx/lib/consumer/session.rb', line 32

def wait_shutdown(timeout = nil)
  @@shutdown_lock.synchronize do
    @@shutdown.wait(@@shutdown_lock, timeout)
    not live?
  end
end

#with_pub_channelObject



90
91
92
93
94
95
96
97
98
99
# File 'lib/vx/lib/consumer/session.rb', line 90

def with_pub_channel
  key = :vx_consumer_session_pub_channel
  if ch = Thread.current[key]
    yield ch
  else
    conn.with_channel do |c|
      yield c
    end
  end
end