Class: Message::Worker
- Inherits:
-
Object
- Object
- Message::Worker
- Defined in:
- lib/message/worker.rb
Defined Under Namespace
Modules: Enqueue
Constant Summary collapse
- DEFAULT_JOB_NAME =
'message-worker-default'- DEFAULT_PROCESS_SIZE =
10- DEFAULT_PROCESS_INTERVAL =
5
Class Attribute Summary collapse
-
.default_job ⇒ Object
Returns the value of attribute default_job.
-
.sync ⇒ Object
Returns the value of attribute sync.
Instance Attribute Summary collapse
-
#job_name ⇒ Object
readonly
Returns the value of attribute job_name.
Class Method Summary collapse
- .callbacks ⇒ Object
- .default ⇒ Object
- .jobs ⇒ Object
- .process(*args) ⇒ Object
- .reset ⇒ Object
- .start(*args) ⇒ Object
Instance Method Summary collapse
- #enq(work) ⇒ Object (also: #<<)
-
#initialize(job_name) ⇒ Worker
constructor
A new instance of Worker.
- #process(size = 1) ⇒ Object
- #start(options = {}) ⇒ Object
- #work_in_thread(options, &block) ⇒ Object
Constructor Details
#initialize(job_name) ⇒ Worker
Returns a new instance of Worker.
72 73 74 |
# File 'lib/message/worker.rb', line 72 def initialize(job_name) @job_name = job_name end |
Class Attribute Details
.default_job ⇒ Object
Returns the value of attribute default_job.
33 34 35 |
# File 'lib/message/worker.rb', line 33 def default_job @default_job end |
.sync ⇒ Object
Returns the value of attribute sync.
33 34 35 |
# File 'lib/message/worker.rb', line 33 def sync @sync end |
Instance Attribute Details
#job_name ⇒ Object (readonly)
Returns the value of attribute job_name.
70 71 72 |
# File 'lib/message/worker.rb', line 70 def job_name @job_name end |
Class Method Details
.callbacks ⇒ Object
55 56 57 |
# File 'lib/message/worker.rb', line 55 def callbacks @callbacks ||= {:start => [], :crash => [], :stop => []} end |
.default ⇒ Object
39 40 41 |
# File 'lib/message/worker.rb', line 39 def default new(default_job) end |
.jobs ⇒ Object
51 52 53 |
# File 'lib/message/worker.rb', line 51 def jobs @jobs ||= RUBY_PLATFORM =~ /java/ ? java.util.concurrent.ConcurrentHashMap.new : {} end |
.process(*args) ⇒ Object
43 44 45 |
# File 'lib/message/worker.rb', line 43 def process(*args) default.process(*args) end |
.reset ⇒ Object
59 60 61 62 63 |
# File 'lib/message/worker.rb', line 59 def reset @default_job = nil @sync = nil @jobs = nil end |
.start(*args) ⇒ Object
47 48 49 |
# File 'lib/message/worker.rb', line 47 def start(*args) default.start(*args) end |
Instance Method Details
#enq(work) ⇒ Object Also known as: <<
106 107 108 109 110 111 112 |
# File 'lib/message/worker.rb', line 106 def enq(work) if self.class.sync process_work(work) else job.enq(YAML.dump(work)) end end |
#process(size = 1) ⇒ Object
102 103 104 |
# File 'lib/message/worker.rb', line 102 def process(size=1) job.process(size) end |
#start(options = {}) ⇒ Object
76 77 78 79 80 81 82 |
# File 'lib/message/worker.rb', line 76 def start(={}) Thread.start do self.work_in_thread() do |size| process(size) end end end |
#work_in_thread(options, &block) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/message/worker.rb', line 84 def work_in_thread(, &block) size = [:size] || DEFAULT_PROCESS_SIZE interval = [:interval] || DEFAULT_PROCESS_INTERVAL delay = [:delay] || 10 + rand(20) begin callback(:start, ) sleep delay if delay > 0 loop do yield(size) sleep interval end rescue => e callback(:crash, e) ensure callback(:stop) end end |