Class: QueueMap::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_map/consumer.rb

Defined Under Namespace

Modules: ForkStrategy Classes: Configurator

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, options = { }) ⇒ Consumer

Returns a new instance of Consumer.



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/queue_map/consumer.rb', line 45

def initialize(name, options = { })
  @name = name
  case options[:strategy]
  when :fork
    extend(ForkStrategy)
  when :test
    nil
  else
    raise "Invalid strategy: #{options[:strategy]}"
  end
end

Instance Attribute Details

#count_workersObject

Returns the value of attribute count_workers.



2
3
4
# File 'lib/queue_map/consumer.rb', line 2

def count_workers
  @count_workers
end

#idle_procObject



93
94
95
# File 'lib/queue_map/consumer.rb', line 93

def idle_proc
  @idle_proc ||= lambda { sleep 0.05 }
end

#job_timeoutObject

Returns the value of attribute job_timeout.



2
3
4
# File 'lib/queue_map/consumer.rb', line 2

def job_timeout
  @job_timeout
end

#log_fileObject



77
78
79
# File 'lib/queue_map/consumer.rb', line 77

def log_file
  @log_file ||= "#{name}_consumer.log"
end

#master_pidObject (readonly)

Returns the value of attribute master_pid.



3
4
5
# File 'lib/queue_map/consumer.rb', line 3

def master_pid
  @master_pid
end

#nameObject (readonly)

Returns the value of attribute name.



3
4
5
# File 'lib/queue_map/consumer.rb', line 3

def name
  @name
end

#on_exception_procObject

Returns the value of attribute on_exception_proc.



2
3
4
# File 'lib/queue_map/consumer.rb', line 2

def on_exception_proc
  @on_exception_proc
end

#pid_fileObject



73
74
75
# File 'lib/queue_map/consumer.rb', line 73

def pid_file
  @pid_file ||= "#{name}_consumer.pid"
end

#worker_procObject

Returns the value of attribute worker_proc.



2
3
4
# File 'lib/queue_map/consumer.rb', line 2

def worker_proc
  @worker_proc
end

Class Method Details

.from_file(consumer_path, options = { }) ⇒ Object



38
39
40
41
42
43
# File 'lib/queue_map/consumer.rb', line 38

def self.from_file(consumer_path, options = { })
  name = File.basename(consumer_path).gsub(/_consumer\.rb$/, '').to_sym
  consumer = new(name, options)
  Configurator.new(consumer).instance_eval(File.read(consumer_path), consumer_path, 1)
  consumer
end

.new_from_block(name, options = { }, &block) ⇒ Object



32
33
34
35
36
# File 'lib/queue_map/consumer.rb', line 32

def self.new_from_block(name, options = { }, &block)
  consumer = new(name, options)
  Configurator.new(consumer).instance_eval(&block)
  consumer
end

Instance Method Details

#after_fork_procsObject



57
58
59
# File 'lib/queue_map/consumer.rb', line 57

def after_fork_procs
  @after_fork_procs ||= []
end

#after_response_procsObject



65
66
67
# File 'lib/queue_map/consumer.rb', line 65

def after_response_procs
  @after_response_procs ||= []
end

#before_fork_procsObject



61
62
63
# File 'lib/queue_map/consumer.rb', line 61

def before_fork_procs
  @before_fork_procs ||= []
end

#before_job_procsObject



69
70
71
# File 'lib/queue_map/consumer.rb', line 69

def before_job_procs
  @before_job_procs ||= []
end

#loggerObject



81
82
83
# File 'lib/queue_map/consumer.rb', line 81

def logger
  @logger ||= Logger.new(log_file)
end

#run_consumerObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/queue_map/consumer.rb', line 97

def run_consumer
  begin
    QueueMap.with_bunny do |bunny|
      q = bunny.queue(name.to_s, :durable => false, :auto_delete => false, :ack => true)
      logger.info "Process #{Process.pid} is listening on #{name.to_s}"
      begin
        msg = q.pop
        (idle_proc.call; next) if msg == :queue_empty
        before_job_procs.each { |p| p.call }
        begin
          Timeout.timeout(job_timeout) do
            msg = Marshal.load(msg)
            result = worker_proc.call(msg[:input])
            bunny.queue(msg[:response_queue]).publish(Marshal.dump(:result => result, :index => msg[:index]))
          end
        ensure
          after_response_procs.each { |p| p.call }
        end
      rescue Qrack::ClientTimeout
      rescue Timeout::Error
        logger.info "Job took longer than #{job_timeout} seconds to complete. Aborting"
      end while ! @shutting_down
    end
  rescue Exception => e # Bunny gets into a strange state when exceptions are raised, so reconnect to queue server if it happens
    if on_exception_proc
      on_exception_proc.call(e)
    else
      logger.info e.class
      logger.error e.message
      logger.error e.backtrace
    end
    sleep 0.2
  end while ! @shutting_down
  logger.info "Done."
end

#startObject

Raises:

  • (RuntimeError)


85
86
87
# File 'lib/queue_map/consumer.rb', line 85

def start
  raise RuntimeError, "Called start on Consumer without strategy"
end

#stopObject

Raises:

  • (RuntimeError)


89
90
91
# File 'lib/queue_map/consumer.rb', line 89

def stop
  raise RuntimeError, "Called stop on Consumer without strategy"
end