Class: Evrone::Common::AMQP::Session

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/evrone/common/amqp/session.rb

Defined Under Namespace

Classes: ConnectionDoesNotExist

Constant Summary collapse

CHANNEL_KEY =
:evrone_amqp_channel
@@session_lock =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



15
16
17
# File 'lib/evrone/common/amqp/session.rb', line 15

def conn
  @conn
end

Class Method Details

.resumeObject



26
27
28
# File 'lib/evrone/common/amqp/session.rb', line 26

def resume
  @shutdown = false
end

.shutdownObject



18
19
20
# File 'lib/evrone/common/amqp/session.rb', line 18

def shutdown
  @shutdown = true
end

.shutdown?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/evrone/common/amqp/session.rb', line 22

def shutdown?
  @shutdown == true
end

Instance Method Details

#channelObject



95
96
97
98
99
# File 'lib/evrone/common/amqp/session.rb', line 95

def channel
  assert_connection_is_open

  Thread.current[CHANNEL_KEY] || conn.default_channel
end

#closeObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/evrone/common/amqp/session.rb', line 31

def close
  if open?
    @@session_lock.synchronize do
      info "closing connection"
      begin
        conn.close
      rescue Bunny::ChannelError => e
        warn e
      end
      info "wait..."
      while conn.status != :closed
        sleep 0.01
      end
      @conn = nil
      info "connection closed"
    end
  end
end

#configObject



128
129
130
# File 'lib/evrone/common/amqp/session.rb', line 128

def config
  Common::AMQP.config
end

#conn_infoObject



115
116
117
118
119
# File 'lib/evrone/common/amqp/session.rb', line 115

def conn_info
  if conn
    "#{conn.user}:#{conn.host}:#{conn.port}/#{conn.vhost}"
  end
end

#declare_exchange(name, options = nil) ⇒ Object



76
77
78
79
80
81
82
83
84
# File 'lib/evrone/common/amqp/session.rb', line 76

def declare_exchange(name, options = nil)
  assert_connection_is_open

  options  ||= {}
  name     ||= config.default_exchange_name
  ch         = options.delete(:channel) || channel
  type, opts = get_exchange_type_and_options options
  ch.exchange name, opts.merge(type: type)
end

#declare_queue(name, options = nil) ⇒ Object



86
87
88
89
90
91
92
93
# File 'lib/evrone/common/amqp/session.rb', line 86

def declare_queue(name, options = nil)
  assert_connection_is_open

  options ||= {}
  ch = options.delete(:channel) || channel
  name, opts = get_queue_name_and_options(name, options)
  ch.queue name, opts
end

#openObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/evrone/common/amqp/session.rb', line 50

def open
  return self if open?

  @@session_lock.synchronize do
    self.class.resume

    @conn ||= Bunny.new config.url, heartbeat: :server

    unless conn.open?
      info "connecting to #{conn_info}"
      conn.start
      info "wait connection to #{conn_info}"
      while conn.connecting?
        sleep 0.01
      end
      info "connected successfuly (#{server_name})"
    end
  end

  self
end

#open?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/evrone/common/amqp/session.rb', line 72

def open?
  conn && conn.open? && conn.status == :open
end

#server_nameObject



121
122
123
124
125
126
# File 'lib/evrone/common/amqp/session.rb', line 121

def server_name
  if conn
    p = conn.server_properties || {}
    "#{p["product"]}/#{p["version"]}"
  end
end

#with_channelObject



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/evrone/common/amqp/session.rb', line 101

def with_channel
  assert_connection_is_open

  old,new = nil
  begin
    old,new = Thread.current[CHANNEL_KEY], conn.create_channel
    Thread.current[CHANNEL_KEY] = new
    yield
  ensure
    Thread.current[CHANNEL_KEY] = old
    new.close if new && new.open?
  end
end