Class: Vx::Lib::Consumer::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/vx/lib/consumer/session.rb

Constant Summary collapse

@@session_lock =
Mutex.new
@@shutdown_lock =
Mutex.new
@@shutdown =
ConditionVariable.new
@@live =
false

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



15
16
17
# File 'lib/vx/lib/consumer/session.rb', line 15

def conn
  @conn
end

Instance Method Details

#allocate_pub_channelObject



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/vx/lib/consumer/session.rb', line 109

def allocate_pub_channel
  assert_connection_is_open

  key = :vx_consumer_session_pub_channel

  if Thread.current[key]
    yield
  else
    ch = conn.create_channel
    assign_error_handlers_to_channel(ch)
    Thread.current[key] = ch
    begin
      yield
    ensure
      ch = Thread.current[key]
      ch.close if ch.open?
      Thread.current[key] = nil
    end
  end
end

#assign_error_handlers_to_channel(ch) ⇒ Object



146
147
148
149
# File 'lib/vx/lib/consumer/session.rb', line 146

def assign_error_handlers_to_channel(ch)
  ch.on_uncaught_exception {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) }
  ch.on_error {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) }
end

#closeObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/vx/lib/consumer/session.rb', line 39

def close
  if open?
    @@session_lock.synchronize do
      begin
        conn.close
        while conn.status != :closed
          sleep 0.01
        end
      rescue Bunny::ChannelError, Bunny::ClientTimeout => e
        Consumer.exception_handler(e, {})
      end
      @conn = nil
    end
  end
end

#conn_infoObject



90
91
92
93
94
95
96
# File 'lib/vx/lib/consumer/session.rb', line 90

def conn_info
  if conn
    "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
  else
    "not connected"
  end
end

#declare_exchange(ch, name, options = nil) ⇒ Object



130
131
132
133
134
135
136
137
# File 'lib/vx/lib/consumer/session.rb', line 130

def declare_exchange(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  options = {} if name == ''.freeze

  ch.exchange name, options
end

#declare_queue(ch, name, options = nil) ⇒ Object



139
140
141
142
143
144
# File 'lib/vx/lib/consumer/session.rb', line 139

def declare_queue(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch.queue name, options
end

#live?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/vx/lib/consumer/session.rb', line 24

def live?
  @@live
end

#open(options = {}) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/vx/lib/consumer/session.rb', line 55

def open(options = {})
  return self if open?

  @@session_lock.synchronize do
    unless open?
      resume

      bunny_options = {
        heartbeat: Consumer.configuration.heartbeat,
        automatically_recover: false,
      }

      if Consumer.configuration.logger
        bunny_options.merge!(logger: Consumer.configuration.logger)
      end

      @conn ||= Bunny.new(
        nil,       # from ENV['RABBITMQ_URL']
        bunny_options
      )

      conn.start
      while conn.connecting?
        sleep 0.01
      end
    end
  end

  self
end

#open?Boolean

Returns:

  • (Boolean)


86
87
88
# File 'lib/vx/lib/consumer/session.rb', line 86

def open?
  conn && conn.open? && conn.status == :open
end

#resumeObject



28
29
30
# File 'lib/vx/lib/consumer/session.rb', line 28

def resume
  @@live = true
end

#shutdownObject



17
18
19
20
21
22
# File 'lib/vx/lib/consumer/session.rb', line 17

def shutdown
  @@shutdown_lock.synchronize do
    @@live = false
    @@shutdown.broadcast
  end
end

#wait_shutdown(timeout = nil) ⇒ Object



32
33
34
35
36
37
# File 'lib/vx/lib/consumer/session.rb', line 32

def wait_shutdown(timeout = nil)
  @@shutdown_lock.synchronize do
    @@shutdown.wait(@@shutdown_lock, timeout)
    not live?
  end
end

#with_pub_channelObject



98
99
100
101
102
103
104
105
106
107
# File 'lib/vx/lib/consumer/session.rb', line 98

def with_pub_channel
  key = :vx_consumer_session_pub_channel
  if ch = Thread.current[key]
    yield ch
  else
    conn.with_channel do |c|
      yield c
    end
  end
end