Class: Message::Worker
- Inherits:
-
Object
- Object
- Message::Worker
- Defined in:
- lib/message/worker.rb
Defined Under Namespace
Modules: Enqueue
Constant Summary collapse
- DEFAULT_JOB_NAME =
'message-worker-default'- DEFAULT_PROCESS_SIZE =
10- DEFAULT_PROCESS_INTERVAL =
5
Class Attribute Summary collapse
-
.default_job ⇒ Object
Returns the value of attribute default_job.
-
.sync ⇒ Object
Returns the value of attribute sync.
Instance Attribute Summary collapse
-
#job_name ⇒ Object
readonly
Returns the value of attribute job_name.
Class Method Summary collapse
Instance Method Summary collapse
- #enq(work) ⇒ Object (also: #<<)
-
#initialize(job_name) ⇒ Worker
constructor
A new instance of Worker.
- #process(size = 1) ⇒ Object
- #start(options = {}) ⇒ Object
Constructor Details
#initialize(job_name) ⇒ Worker
Returns a new instance of Worker.
64 65 66 |
# File 'lib/message/worker.rb', line 64 def initialize(job_name) @job_name = job_name end |
Class Attribute Details
.default_job ⇒ Object
Returns the value of attribute default_job.
33 34 35 |
# File 'lib/message/worker.rb', line 33 def default_job @default_job end |
.sync ⇒ Object
Returns the value of attribute sync.
33 34 35 |
# File 'lib/message/worker.rb', line 33 def sync @sync end |
Instance Attribute Details
#job_name ⇒ Object (readonly)
Returns the value of attribute job_name.
62 63 64 |
# File 'lib/message/worker.rb', line 62 def job_name @job_name end |
Class Method Details
.default ⇒ Object
39 40 41 |
# File 'lib/message/worker.rb', line 39 def default new(default_job) end |
.jobs ⇒ Object
51 52 53 |
# File 'lib/message/worker.rb', line 51 def jobs @jobs ||= RUBY_PLATFORM =~ /java/ ? java.util.concurrent.ConcurrentHashMap.new : {} end |
.process(*args) ⇒ Object
43 44 45 |
# File 'lib/message/worker.rb', line 43 def process(*args) default.process(*args) end |
.reset ⇒ Object
55 56 57 58 59 |
# File 'lib/message/worker.rb', line 55 def reset @default_job = nil @sync = nil @jobs = nil end |
.start(*args) ⇒ Object
47 48 49 |
# File 'lib/message/worker.rb', line 47 def start(*args) default.start(*args) end |
Instance Method Details
#enq(work) ⇒ Object Also known as: <<
89 90 91 92 93 94 95 |
# File 'lib/message/worker.rb', line 89 def enq(work) if self.class.sync process_work(work) else job.enq(YAML.dump(work)) end end |
#process(size = 1) ⇒ Object
85 86 87 |
# File 'lib/message/worker.rb', line 85 def process(size=1) job.process(size) end |
#start(options = {}) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/message/worker.rb', line 68 def start(={}) size = [:size] || DEFAULT_PROCESS_SIZE interval = [:interval] || DEFAULT_PROCESS_INTERVAL Thread.start do begin log(:info) { "start" } loop do process(size) sleep interval end log(:info) { "stopped" } rescue => e log(:error) { "crashed: #{e.}\n#{e.backtrace.join("\n")}"} end end end |