Class: Msgr::Connection

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Logging
Defined in:
lib/msgr/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(conn, routes, dispatcher, opts = {}) ⇒ Connection

Returns a new instance of Connection.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/msgr/connection.rb', line 10

def initialize(conn, routes, dispatcher, opts = {})
  @conn       = conn
  @dispatcher = dispatcher
  @routes     = routes
  @opts       = opts

  @channel = conn.create_channel
  @channel.prefetch(10)

  bind
end

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



7
8
9
# File 'lib/msgr/connection.rb', line 7

def conn
  @conn
end

#dispatcherObject (readonly)

Returns the value of attribute dispatcher.



7
8
9
# File 'lib/msgr/connection.rb', line 7

def dispatcher
  @dispatcher
end

#optsObject (readonly)

Returns the value of attribute opts.



7
8
9
# File 'lib/msgr/connection.rb', line 7

def opts
  @opts
end

#routesObject (readonly)

Returns the value of attribute routes.



7
8
9
# File 'lib/msgr/connection.rb', line 7

def routes
  @routes
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



110
111
112
113
# File 'lib/msgr/connection.rb', line 110

def ack(delivery_tag)
  log(:debug) { "Ack message: #{delivery_tag}" }
  @channel.ack delivery_tag
end

#bindObject



27
28
29
30
31
32
# File 'lib/msgr/connection.rb', line 27

def bind
  # Create new bindings
  routes.each { |route| bindings << Binding.new(Actor.current, route, dispatcher) }

  log(:debug) { 'New routes bound.' }
end

#bindingsObject

Used to store al bindings. Allows use to release bindings when receiver should not longer receive messages but channel need to be open to allow further acknowledgments.



43
44
45
# File 'lib/msgr/connection.rb', line 43

def bindings
  @bindings ||= []
end

#closeObject



120
121
122
123
# File 'lib/msgr/connection.rb', line 120

def close
  @channel.close if @channel.open?
  log(:debug) { 'Connection closed.' }
end

#deleteObject



85
86
87
88
89
90
91
92
93
94
# File 'lib/msgr/connection.rb', line 85

def delete
  return unless bindings.any?

  log(:debug) { 'Delete all bindings and exchange.' }

  bindings.each { |binding| binding.delete }
  bindings.clear

  @exchange.delete if @exchange
end

#exchangeObject



53
54
55
56
57
58
59
60
61
# File 'lib/msgr/connection.rb', line 53

def exchange
  unless @exchange
    @exchange = @channel.topic prefix('msgr'), durable: true

    log(:debug) { "Created exchange #{@exchange.name} (type: #{@exchange.type}, durable: #{@exchange.durable?}, auto_delete: #{@exchange.auto_delete?})" }
  end

  @exchange
end

#prefix(name = '') ⇒ Object



34
35
36
# File 'lib/msgr/connection.rb', line 34

def prefix(name = '')
  opts[:prefix] ? "#{opts[:prefix]}-#{name}" : name
end

#publish(payload, opts = {}) ⇒ Object

Raises:

  • (ArgumentError)


96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/msgr/connection.rb', line 96

def publish(payload, opts = {})
  opts[:routing_key] ||= opts[:to]
  raise ArgumentError, 'Missing routing key.' unless opts[:routing_key]

  log(:debug) { "Publish message to #{opts[:routing_key]}" }

  begin
    payload = JSON.generate(payload)
    exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json')
  rescue => error
    exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text')
  end
end

#queue(name) ⇒ Object



47
48
49
50
51
# File 'lib/msgr/connection.rb', line 47

def queue(name)
  @channel.queue(prefix(name), durable: true).tap do |queue|
    log(:debug) { "Create queue #{queue.name} (durable: #{queue.durable?}, auto_delete: #{queue.auto_delete?})" }
  end
end

#rebindObject



22
23
24
25
# File 'lib/msgr/connection.rb', line 22

def rebind
  release
  bind
end

#reject(delivery_tag, requeue = true) ⇒ Object



115
116
117
118
# File 'lib/msgr/connection.rb', line 115

def reject(delivery_tag, requeue = true)
  log(:debug) { "Reject message: #{delivery_tag}" }
  @channel.reject delivery_tag, requeue
end

#release(wait = false) ⇒ Object

Release all bindings but do not close channel. Will not longer receive any message but channel can be used to acknowledge currently processing messages.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/msgr/connection.rb', line 67

def release(wait = false)
  return unless bindings.any?

  log(:debug) { "Release all bindings#{wait ? ' after queues are empty': ''}..." }

  if wait
    binds = bindings.dup
    while binds.any?
      binds.reject! { |b| b.release_if_empty }
      sleep 1
    end
  else
    bindings.each &:release
  end

  log(:debug) { 'All bindings released.' }
end