Class: Disc
- Inherits:
-
Object
show all
- Defined in:
- lib/disc.rb,
lib/disc/errors.rb,
lib/disc/testing.rb,
lib/disc/version.rb
Defined Under Namespace
Modules: Job
Classes: Error, NonJobClassError, NonParsableJobError, UnknownJobClassError, Worker
Constant Summary
collapse
- VERSION =
"0.2.0"
Class Method Summary
collapse
Class Method Details
.[](disque_id) ⇒ Object
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/disc.rb', line 54
def self.[](disque_id)
job_data = disque.call("SHOW", disque_id)
return nil if job_data.nil?
job_data = Hash[*job_data]
job_data['arguments'] = Disc.deserialize(job_data['body'])['arguments']
job_data['class'] = Disc.deserialize(job_data['body'])['class']
job_data
end
|
.default_queue ⇒ Object
26
27
28
|
# File 'lib/disc.rb', line 26
def self.default_queue
@default_queue ||= 'default'
end
|
.default_queue=(queue) ⇒ Object
30
31
32
|
# File 'lib/disc.rb', line 30
def self.default_queue=(queue)
@default_queue = queue
end
|
.deserialize(data) ⇒ Object
50
51
52
|
# File 'lib/disc.rb', line 50
def self.deserialize(data)
JSON.parse(data)
end
|
.disque ⇒ Object
6
7
8
9
10
11
12
|
# File 'lib/disc.rb', line 6
def self.disque
@disque ||= Disque.new(
ENV.fetch('DISQUE_NODES', 'localhost:7711'),
auth: ENV.fetch('DISQUE_AUTH', nil),
cycle: Integer(ENV.fetch('DISQUE_CYCLE', '1000'))
)
end
|
.disque=(disque) ⇒ Object
14
15
16
|
# File 'lib/disc.rb', line 14
def self.disque=(disque)
@disque = disque
end
|
.disque_timeout ⇒ Object
18
19
20
|
# File 'lib/disc.rb', line 18
def self.disque_timeout
@disque_timeout ||= 100
end
|
.disque_timeout=(timeout) ⇒ Object
22
23
24
|
# File 'lib/disc.rb', line 22
def self.disque_timeout=(timeout)
@disque_timeout = timeout
end
|
.enqueue(klass, arguments, at: nil, queue: nil, **options) ⇒ Object
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/disc/testing.rb', line 28
def self.enqueue(klass, arguments, at: nil, queue: nil, **options)
job_attrs = { arguments: arguments, class: klass, options: options }
if queues[queue].nil?
queues[queue] = [job_attrs]
else
queues[queue] << job_attrs
end
job_attrs
end
|
.enqueue! ⇒ Object
10
11
12
|
# File 'lib/disc/testing.rb', line 10
def self.enqueue!
@testing_mode = 'enqueue'
end
|
.flush ⇒ Object
38
39
40
|
# File 'lib/disc.rb', line 38
def self.flush
Disc.disque.call('DEBUG', 'FLUSHALL')
end
|
.inline! ⇒ Object
14
15
16
|
# File 'lib/disc/testing.rb', line 14
def self.inline!
@testing_mode = 'inline'
end
|
.load_job(serialized_job, disque_id = nil) ⇒ Object
Receives:
A string containing data serialized by `Disc.serialize`
Returns:
An array containing:
* An instance of the given job class
* An array of arguments to pass to the job's `#perorm` class.
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/disc.rb', line 76
def self.load_job(serialized_job, disque_id = nil)
begin
job_data = Disc.deserialize(serialized_job)
rescue => err
raise Disc::NonParsableJobError.new(err)
end
begin
job_class = Object.const_get(job_data['class'])
rescue => err
raise Disc::UnknownJobClassError.new(err)
end
begin
job_instance = job_class.new
job_instance.disque_id = disque_id
rescue => err
raise Disc::NonJobClassError.new(err)
end
return [job_instance, job_data['arguments']]
end
|
.on_error(exception, job) ⇒ Object
42
43
44
|
# File 'lib/disc.rb', line 42
def self.on_error(exception, job)
$stderr.puts exception
end
|
.qlen(queue) ⇒ Object
34
35
36
|
# File 'lib/disc.rb', line 34
def self.qlen(queue)
disque.call('QLEN', queue)
end
|
.queues ⇒ Object
2
3
4
|
# File 'lib/disc/testing.rb', line 2
def self.queues
@queues ||= {}
end
|
.serialize(args) ⇒ Object
46
47
48
|
# File 'lib/disc.rb', line 46
def self.serialize(args)
JSON.dump(args)
end
|
.testing_mode ⇒ Object
6
7
8
|
# File 'lib/disc/testing.rb', line 6
def self.testing_mode
@testing_mode ||= 'enqueue'
end
|