Class: Message::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/message/worker.rb

Defined Under Namespace

Modules: Enqueue

Constant Summary collapse

DEFAULT_JOB_NAME =
'message-worker-default'

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_name) ⇒ Worker

Returns a new instance of Worker.



51
52
53
# File 'lib/message/worker.rb', line 51

def initialize(job_name)
  @job_name = job_name || DEFAULT_JOB_NAME
end

Class Method Details

.enq(name, work) ⇒ Object



39
40
41
# File 'lib/message/worker.rb', line 39

def enq(name, work)
  job(name).enq(YAML.dump(work))
end

.job(name) ⇒ Object



35
36
37
# File 'lib/message/worker.rb', line 35

def job(name)
  jobs[name] ||= Message.job(name, &job_processor)
end

.job_processorObject



43
44
45
46
47
48
# File 'lib/message/worker.rb', line 43

def job_processor
  lambda do |msg|
    obj, m, args = YAML.load(msg)
    obj.send(m, *args)
  end
end

.jobsObject



31
32
33
# File 'lib/message/worker.rb', line 31

def jobs
  @jobs ||= RUBY_PLATFORM =~ /java/ ? java.util.concurrent.ConcurrentHashMap.new : {}
end

Instance Method Details

#process(size = 1) ⇒ Object



70
71
72
# File 'lib/message/worker.rb', line 70

def process(size=1)
  Worker.job(@job_name).process(size)
end

#start(size = 10, interval = 1) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/message/worker.rb', line 55

def start(size=10, interval=1)
  Thread.start do
    begin
      log(:info) { "start" }
      loop do
        process(size)
        sleep interval
      end
      log(:info) { "stopped" }
    rescue => e
      log(:error) { "crashed: #{e.message}\n#{e.backtrace.join("\n")}"}
    end
  end
end