Class: Cloudist::Queues::BasicQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudist/queues/basic_queue.rb

Direct Known Subclasses

JobQueue, ReplyQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, options = {}) ⇒ BasicQueue

Returns a new instance of BasicQueue.



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/cloudist/queues/basic_queue.rb', line 14

def initialize(queue_name, options = {})
  @prefetch ||= options.delete(:prefetch) || 1

  options = {
    :auto_delete => true,
    :durable => false,
    :nowait => true
  }.update(options)

  @queue_name, @options = queue_name, options

  setup
end

Instance Attribute Details

#channelObject (readonly) Also known as: mq

Returns the value of attribute channel.



8
9
10
# File 'lib/cloudist/queues/basic_queue.rb', line 8

def channel
  @channel
end

#exchangeObject (readonly) Also known as: ex

Returns the value of attribute exchange.



8
9
10
# File 'lib/cloudist/queues/basic_queue.rb', line 8

def exchange
  @exchange
end

#optionsObject (readonly)

Returns the value of attribute options.



7
8
9
# File 'lib/cloudist/queues/basic_queue.rb', line 7

def options
  @options
end

#prefetchObject (readonly)

Returns the value of attribute prefetch.



8
9
10
# File 'lib/cloudist/queues/basic_queue.rb', line 8

def prefetch
  @prefetch
end

#queueObject (readonly) Also known as: q

Returns the value of attribute queue.



8
9
10
# File 'lib/cloudist/queues/basic_queue.rb', line 8

def queue
  @queue
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



7
8
9
# File 'lib/cloudist/queues/basic_queue.rb', line 7

def queue_name
  @queue_name
end

Instance Method Details

#destroyObject



126
127
128
# File 'lib/cloudist/queues/basic_queue.rb', line 126

def destroy
  teardown
end

#inspectObject



28
29
30
# File 'lib/cloudist/queues/basic_queue.rb', line 28

def inspect
  "<#{self.class.name} queue_name=#{queue_name}>"
end

#logObject

def setup_exchange

@exchange = channel.direct(queue_name)
queue.bind(exchange)

end



55
56
57
# File 'lib/cloudist/queues/basic_queue.rb', line 55

def log
  Cloudist.log
end


100
101
102
103
104
# File 'lib/cloudist/queues/basic_queue.rb', line 100

def print_status
  # queue.status{ |num_messages, num_consumers|
  #   log.info("STATUS: #{queue.name}: JOBS: #{num_messages} WORKERS: #{num_consumers+1}")
  # }
end

#publish(payload) ⇒ Object



106
107
108
109
110
111
# File 'lib/cloudist/queues/basic_queue.rb', line 106

def publish(payload)
  payload.set_reply_to(queue_name)
  body, headers = payload.to_a
  headers.merge!(:routing_key => queue.name)
  exchange.publish(body, headers)
end

#publish_to_q(payload) ⇒ Object



113
114
115
116
117
118
# File 'lib/cloudist/queues/basic_queue.rb', line 113

def publish_to_q(payload)
  body, headers = payload.to_a
  # headers.merge!(:routing_key => queue.name)
  queue.publish(body, headers)
  return headers
end

#setupObject



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/cloudist/queues/basic_queue.rb', line 32

def setup
  return if @setup.eql?(true)

  @channel ||= AMQP::Channel.new(Cloudist.connection) do
    channel.prefetch(self.prefetch, false) if self.prefetch
  end

  @queue = @channel.queue(queue_name, options)

  setup_exchange

  @setup = true
end

#setup_exchangeObject



46
47
48
# File 'lib/cloudist/queues/basic_queue.rb', line 46

def setup_exchange
  @exchange = channel.direct("")
end

#subscribe(&block) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/cloudist/queues/basic_queue.rb', line 65

def subscribe(&block)
  queue.subscribe(:ack => true) do |queue_header, encoded_message|
    # next if Cloudist.closing?

    request = Cloudist::Request.new(self, encoded_message, queue_header)

    handle_request = proc {
      begin
        raise Cloudist::ExpiredMessage if request.expired?
        # yield request if block_given?
        block.call(request)

      rescue Cloudist::ExpiredMessage
        log.error "AMQP Message Timeout: #{tag} ttl=#{request.ttl} age=#{request.age}"

      rescue => e
        Cloudist.handle_error(e)
      ensure
        request.ack
        # unless Cloudist.closing?
        # finished = Time.now.utc.to_i
        # log.debug("Finished Job in #{finished - request.start} seconds")
      end
    }

    handle_ack = proc {
      request.ack
    }

    EM.defer(handle_request, handle_ack)
  end
  log.info "AMQP Subscribed: #{tag}"
  self
end

#tagObject



59
60
61
62
63
# File 'lib/cloudist/queues/basic_queue.rb', line 59

def tag
  s = "queue=#{queue.name}"
  s += " exchange=#{exchange.name}" if exchange
  s
end

#teardownObject



120
121
122
123
124
# File 'lib/cloudist/queues/basic_queue.rb', line 120

def teardown
  @queue.unsubscribe
  @channel.close
  log.debug "AMQP Unsubscribed: #{tag}"
end