Class: Unrestful::AsyncJob
- Inherits:
-
Object
- Object
- Unrestful::AsyncJob
- Includes:
- ActiveModel::Serializers::JSON
- Defined in:
- lib/unrestful/async_job.rb
Constant Summary collapse
- ALLOCATED =
0
- RUNNING =
1
- FAILED =
2
- SUCCESS =
3
- KEY_TIMEOUT =
3600
- KEY_LENGTH =
10
- CHANNEL_TIMEOUT =
10
Instance Attribute Summary collapse
-
#job_id ⇒ Object
readonly
Returns the value of attribute job_id.
Instance Method Summary collapse
- #attributes ⇒ Object
- #close ⇒ Object
- #delete ⇒ Object
-
#initialize(job_id: nil) ⇒ AsyncJob
constructor
A new instance of AsyncJob.
- #last_message ⇒ Object
- #publish(message) ⇒ Object
- #redis ⇒ Object
- #state ⇒ Object
- #subscribe(timeout: CHANNEL_TIMEOUT, &block) ⇒ Object
- #ttl ⇒ Object
- #unsubscribe ⇒ Object
- #update(state, message: '') ⇒ Object
- #valid? ⇒ Boolean
Constructor Details
#initialize(job_id: nil) ⇒ AsyncJob
Returns a new instance of AsyncJob.
27 28 29 30 31 32 33 |
# File 'lib/unrestful/async_job.rb', line 27 def initialize(job_id: nil) if job_id.nil? @job_id = SecureRandom.hex(KEY_LENGTH) else @job_id = job_id end end |
Instance Attribute Details
#job_id ⇒ Object (readonly)
Returns the value of attribute job_id.
16 17 18 |
# File 'lib/unrestful/async_job.rb', line 16 def job_id @job_id end |
Instance Method Details
#attributes ⇒ Object
18 19 20 21 22 23 24 25 |
# File 'lib/unrestful/async_job.rb', line 18 def attributes { job_id: job_id, state: state, last_message: , ttl: ttl } end |
#close ⇒ Object
90 91 92 93 94 |
# File 'lib/unrestful/async_job.rb', line 90 def close redis.unsubscribe(job_channel) if redis.subscribed? ensure @redis.quit end |
#delete ⇒ Object
59 60 61 62 |
# File 'lib/unrestful/async_job.rb', line 59 def delete redis.del(job_key) redis.del() end |
#last_message ⇒ Object
55 56 57 |
# File 'lib/unrestful/async_job.rb', line 55 def redis.get() end |
#publish(message) ⇒ Object
70 71 72 73 74 |
# File 'lib/unrestful/async_job.rb', line 70 def publish() raise AsyncError, "job #{job_key} doesn't exist" unless valid? redis.publish(job_channel, ) end |
#redis ⇒ Object
86 87 88 |
# File 'lib/unrestful/async_job.rb', line 86 def redis @redis ||= Redis.new(url: Unrestful.configuration.redis_address) end |
#state ⇒ Object
51 52 53 |
# File 'lib/unrestful/async_job.rb', line 51 def state redis.get(job_key) end |
#subscribe(timeout: CHANNEL_TIMEOUT, &block) ⇒ Object
64 65 66 67 68 |
# File 'lib/unrestful/async_job.rb', line 64 def subscribe(timeout: CHANNEL_TIMEOUT, &block) raise AsyncError, "job #{job_key} doesn't exist" unless valid? redis.subscribe_with_timeout(timeout, job_channel, &block) end |
#ttl ⇒ Object
47 48 49 |
# File 'lib/unrestful/async_job.rb', line 47 def ttl redis.ttl(job_key) end |
#unsubscribe ⇒ Object
80 81 82 83 84 |
# File 'lib/unrestful/async_job.rb', line 80 def unsubscribe redis.unsubscribe(job_channel) rescue # ignore unsub errors end |
#update(state, message: '') ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/unrestful/async_job.rb', line 35 def update(state, message: '') raise ArgumentError, 'failed states must have a message' if .blank? && state == FAILED redis.set(job_key, state) redis.set(, ) unless .blank? if state == ALLOCATED redis.expire(job_key, KEY_TIMEOUT) redis.expire(, KEY_TIMEOUT) end end |
#valid? ⇒ Boolean
76 77 78 |
# File 'lib/unrestful/async_job.rb', line 76 def valid? redis.exists(job_key) end |