Class: Message::Worker
- Inherits:
-
Object
- Object
- Message::Worker
- Defined in:
- lib/message/worker.rb
Defined Under Namespace
Modules: Enqueue
Constant Summary collapse
- DEFAULT_JOB_NAME =
'message-worker-default'
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(job_name) ⇒ Worker
constructor
A new instance of Worker.
- #process(size = 1) ⇒ Object
- #start(size = 10, interval = 1) ⇒ Object
Constructor Details
#initialize(job_name) ⇒ Worker
Returns a new instance of Worker.
51 52 53 |
# File 'lib/message/worker.rb', line 51 def initialize(job_name) @job_name = job_name || DEFAULT_JOB_NAME end |
Class Method Details
.enq(name, work) ⇒ Object
39 40 41 |
# File 'lib/message/worker.rb', line 39 def enq(name, work) job(name).enq(YAML.dump(work)) end |
.job(name) ⇒ Object
35 36 37 |
# File 'lib/message/worker.rb', line 35 def job(name) jobs[name] ||= Message.job(name, &job_processor) end |
.job_processor ⇒ Object
43 44 45 46 47 48 |
# File 'lib/message/worker.rb', line 43 def job_processor lambda do |msg| obj, m, args = YAML.load(msg) obj.send(m, *args) end end |
.jobs ⇒ Object
31 32 33 |
# File 'lib/message/worker.rb', line 31 def jobs @jobs ||= RUBY_PLATFORM =~ /java/ ? java.util.concurrent.ConcurrentHashMap.new : {} end |
Instance Method Details
#process(size = 1) ⇒ Object
70 71 72 |
# File 'lib/message/worker.rb', line 70 def process(size=1) Worker.job(@job_name).process(size) end |
#start(size = 10, interval = 1) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/message/worker.rb', line 55 def start(size=10, interval=1) Thread.start do begin log(:info) { "start" } loop do process(size) sleep interval end log(:info) { "stopped" } rescue => e log(:error) { "crashed: #{e.}\n#{e.backtrace.join("\n")}"} end end end |