Class: Vx::Lib::Consumer::Session

Inherits:
Object
  • Object
show all
Includes:
Instrument
Defined in:
lib/vx/lib/consumer/session.rb

Constant Summary collapse

@@session_lock =
Mutex.new
@@shutdown_lock =
Mutex.new
@@shutdown =
ConditionVariable.new
@@live =
false

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Instrument

#instrument

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



17
18
19
# File 'lib/vx/lib/consumer/session.rb', line 17

def conn
  @conn
end

Instance Method Details

#allocate_pub_channelObject



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/vx/lib/consumer/session.rb', line 113

def allocate_pub_channel
  assert_connection_is_open

  key = :vx_consumer_session_pub_channel

  if Thread.current[key]
    yield
  else
    ch = conn.create_channel
    assign_error_handlers_to_channel(ch)
    Thread.current[key] = ch
    begin
      yield
    ensure
      ch = Thread.current[key]
      ch.close if ch.open?
      Thread.current[key] = nil
    end
  end
end

#assign_error_handlers_to_channel(ch) ⇒ Object



150
151
152
153
# File 'lib/vx/lib/consumer/session.rb', line 150

def assign_error_handlers_to_channel(ch)
  ch.on_uncaught_exception {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) }
  ch.on_error {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) }
end

#closeObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/vx/lib/consumer/session.rb', line 41

def close
  if open?
    @@session_lock.synchronize do
      instrument("closing_connection", info: conn_info)

      instrument("close_connection", info: conn_info) do
        begin
          conn.close
          while conn.status != :closed
            sleep 0.01
          end
        rescue Bunny::ChannelError, Bunny::ClientTimeout => e
          Consumer.exception_handler(e, {})
        end
      end
      @conn = nil
    end
  end
end

#conn_infoObject



94
95
96
97
98
99
100
# File 'lib/vx/lib/consumer/session.rb', line 94

def conn_info
  if conn
    "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
  else
    "not connected"
  end
end

#declare_exchange(ch, name, options = nil) ⇒ Object



134
135
136
137
138
139
140
141
# File 'lib/vx/lib/consumer/session.rb', line 134

def declare_exchange(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  options = {} if name == ''.freeze

  ch.exchange name, options
end

#declare_queue(ch, name, options = nil) ⇒ Object



143
144
145
146
147
148
# File 'lib/vx/lib/consumer/session.rb', line 143

def declare_queue(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch.queue name, options
end

#live?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/vx/lib/consumer/session.rb', line 26

def live?
  @@live
end

#open(options = {}) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/vx/lib/consumer/session.rb', line 61

def open(options = {})
  return self if open?

  @@session_lock.synchronize do
    unless open?
      resume

      @conn ||= Bunny.new(
        nil,       # from ENV['RABBITMQ_URL']
        heartbeat: Consumer.configuration.heartbeat,
        automatically_recover: false
      )

      instrumentation = { info: conn_info }.merge(options)

      instrument("start_connecting", instrumentation)

      instrument("connect", instrumentation) do
        conn.start
        while conn.connecting?
          sleep 0.01
        end
      end
    end
  end

  self
end

#open?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/vx/lib/consumer/session.rb', line 90

def open?
  conn && conn.open? && conn.status == :open
end

#resumeObject



30
31
32
# File 'lib/vx/lib/consumer/session.rb', line 30

def resume
  @@live = true
end

#shutdownObject



19
20
21
22
23
24
# File 'lib/vx/lib/consumer/session.rb', line 19

def shutdown
  @@shutdown_lock.synchronize do
    @@live = false
    @@shutdown.broadcast
  end
end

#wait_shutdown(timeout = nil) ⇒ Object



34
35
36
37
38
39
# File 'lib/vx/lib/consumer/session.rb', line 34

def wait_shutdown(timeout = nil)
  @@shutdown_lock.synchronize do
    @@shutdown.wait(@@shutdown_lock, timeout)
    not live?
  end
end

#with_pub_channelObject



102
103
104
105
106
107
108
109
110
111
# File 'lib/vx/lib/consumer/session.rb', line 102

def with_pub_channel
  key = :vx_consumer_session_pub_channel
  if ch = Thread.current[key]
    yield ch
  else
    conn.with_channel do |c|
      yield c
    end
  end
end