Class: Vx::Consumer::Session

Inherits:
Object
  • Object
show all
Includes:
Instrument
Defined in:
lib/vx/consumer/session.rb

Constant Summary collapse

@@session_lock =
Mutex.new
@@shutdown_lock =
Mutex.new
@@shutdown =
ConditionVariable.new
@@live =
false

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Instrument

#instrument

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



16
17
18
# File 'lib/vx/consumer/session.rb', line 16

def conn
  @conn
end

Instance Method Details

#allocate_pub_channelObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/vx/consumer/session.rb', line 112

def allocate_pub_channel
  assert_connection_is_open

  key = :vx_consumer_session_pub_channel

  if Thread.current[key]
    yield
  else
    ch = conn.create_channel
    assign_error_handlers_to_channel(ch)
    Thread.current[key] = ch
    begin
      yield
    ensure
      ch = Thread.current[key]
      ch.close if ch.open?
      Thread.current[key] = nil
    end
  end
end

#assign_error_handlers_to_channel(ch) ⇒ Object



147
148
149
150
# File 'lib/vx/consumer/session.rb', line 147

def assign_error_handlers_to_channel(ch)
  ch.on_uncaught_exception {|e, c| ::Vx::Consumer.exception_handler(e, consumer: c) }
  ch.on_error {|e, c| ::Vx::Consumer.exception_handler(e, consumer: c) }
end

#closeObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/vx/consumer/session.rb', line 40

def close
  if open?
    @@session_lock.synchronize do
      instrument("closing_connection", info: conn_info)

      instrument("close_connection", info: conn_info) do
        begin
          conn.close
          while conn.status != :closed
            sleep 0.01
          end
        rescue Bunny::ChannelError, Bunny::ClientTimeout => e
          Consumer.exception_handler(e, {})
        end
      end
      @conn = nil
    end
  end
end

#conn_infoObject



93
94
95
96
97
98
99
# File 'lib/vx/consumer/session.rb', line 93

def conn_info
  if conn
    "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
  else
    "not connected"
  end
end

#declare_exchange(ch, name, options = nil) ⇒ Object



133
134
135
136
137
138
# File 'lib/vx/consumer/session.rb', line 133

def declare_exchange(ch, name, options = nil)
  assert_connection_is_open

  options  ||= {}
  ch.exchange name, options
end

#declare_queue(ch, name, options = nil) ⇒ Object



140
141
142
143
144
145
# File 'lib/vx/consumer/session.rb', line 140

def declare_queue(ch, name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch.queue name, options
end

#live?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/vx/consumer/session.rb', line 25

def live?
  @@live
end

#open(options = {}) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/vx/consumer/session.rb', line 60

def open(options = {})
  return self if open?

  @@session_lock.synchronize do
    unless open?
      resume

      @conn ||= Bunny.new(
        nil,       # from ENV['RABBITMQ_URL']
        heartbeat: Consumer.configuration.heartbeat,
        automatically_recover: false
      )

      instrumentation = { info: conn_info }.merge(options)

      instrument("start_connecting", instrumentation)

      instrument("connect", instrumentation) do
        conn.start
        while conn.connecting?
          sleep 0.01
        end
      end
    end
  end

  self
end

#open?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/vx/consumer/session.rb', line 89

def open?
  conn && conn.open? && conn.status == :open
end

#resumeObject



29
30
31
# File 'lib/vx/consumer/session.rb', line 29

def resume
  @@live = true
end

#shutdownObject



18
19
20
21
22
23
# File 'lib/vx/consumer/session.rb', line 18

def shutdown
  @@shutdown_lock.synchronize do
    @@live = false
    @@shutdown.broadcast
  end
end

#wait_shutdown(timeout = nil) ⇒ Object



33
34
35
36
37
38
# File 'lib/vx/consumer/session.rb', line 33

def wait_shutdown(timeout = nil)
  @@shutdown_lock.synchronize do
    @@shutdown.wait(@@shutdown_lock, timeout)
    not live?
  end
end

#with_pub_channelObject



101
102
103
104
105
106
107
108
109
110
# File 'lib/vx/consumer/session.rb', line 101

def with_pub_channel
  key = :vx_consumer_session_pub_channel
  if ch = Thread.current[key]
    yield ch
  else
    conn.with_channel do |c|
      yield c
    end
  end
end