Class: Cloudist::Queue

Inherits:
Object show all
Defined in:
lib/cloudist/queue.rb

Overview

NOTE: Queue is Deprecated, please use BasicQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, options = {}) ⇒ Queue

Returns a new instance of Queue.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/cloudist/queue.rb', line 11

def initialize(name, options = {})
  self.class.cached_queues ||= {}

  options = {
    :auto_delete => false,
    :durable => true
  }.update(options)

  @name, @options = name, options

  setup
  p self.cached_queues.keys

  log.debug(tag)
  purge
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



7
8
9
# File 'lib/cloudist/queue.rb', line 7

def channel
  @channel
end

#exObject (readonly)

Returns the value of attribute ex.



7
8
9
# File 'lib/cloudist/queue.rb', line 7

def ex
  @ex
end

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/cloudist/queue.rb', line 7

def name
  @name
end

#optionsObject (readonly)

Returns the value of attribute options.



7
8
9
# File 'lib/cloudist/queue.rb', line 7

def options
  @options
end

#qObject (readonly)

Returns the value of attribute q.



7
8
9
# File 'lib/cloudist/queue.rb', line 7

def q
  @q
end

Instance Method Details

#destroyObject



84
85
86
# File 'lib/cloudist/queue.rb', line 84

def destroy
  teardown
end

#inspectObject



32
33
34
# File 'lib/cloudist/queue.rb', line 32

def inspect
  "<#{self.class.name} name=#{name} exchange=#{ex ? ex.name : 'nil'}>"
end

#logObject



36
37
38
# File 'lib/cloudist/queue.rb', line 36

def log
  Cloudist.log
end

#publish(msg) ⇒ Object

Raises:

  • (ArgumentError)


47
48
49
50
51
52
53
54
55
56
# File 'lib/cloudist/queue.rb', line 47

def publish(msg)
  raise ArgumentError, "Publish expects a Cloudist::Message object" unless msg.is_a?(Cloudist::Message)

  body, headers = msg.encoded
  # EM.defer {
    publish_to_ex(body, headers)
  # }

  p msg.body.to_hash
end

#publish_to_ex(body, headers = {}) ⇒ Object

def channel

self.class.channel

end

def q

self.class.q

end

def ex

self.class.ex

end



70
71
72
# File 'lib/cloudist/queue.rb', line 70

def publish_to_ex(body, headers = {})
  ex.publish(body, headers)
end

#publish_to_q(body, headers = {}) ⇒ Object



74
75
76
# File 'lib/cloudist/queue.rb', line 74

def publish_to_q(body, headers = {})
  q.publish(body, headers)
end

#purgeObject



28
29
30
# File 'lib/cloudist/queue.rb', line 28

def purge
  q.purge
end

#subscribe(options = {}, &block) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/cloudist/queue.rb', line 88

def subscribe(options = {}, &block)
  options[:ack] = true
  q.subscribe(options) do |queue_header, encoded_message|
    request = Cloudist::Request.new(self, encoded_message, queue_header)

    msg = Cloudist::Message.new(*request.for_message)

    EM.defer do
      begin
        raise Cloudist::ExpiredMessage if request.expired?
        yield msg

      rescue Cloudist::ExpiredMessage
        log.error "AMQP Message Timeout: #{tag} ttl=#{request.ttl} age=#{request.age}"

      rescue Exception => e
        Cloudist.handle_error(e)

      ensure
        request.ack unless Cloudist.closing?
      end
    end
  end
end

#tagObject



40
41
42
43
44
45
# File 'lib/cloudist/queue.rb', line 40

def tag
  [].tap { |a|
    a << "queue=#{q.name}" if q
    a << "exchange=#{ex.name}" if ex
  }.join(' ')
end

#teardownObject



78
79
80
81
82
# File 'lib/cloudist/queue.rb', line 78

def teardown
  q.unsubscribe
  channel.close
  log.debug "AMQP Unsubscribed: #{tag}"
end