Class: Disc

Inherits:
Object
  • Object
show all
Defined in:
lib/disc.rb,
lib/disc/errors.rb,
lib/disc/testing.rb,
lib/disc/version.rb

Defined Under Namespace

Modules: Job Classes: Error, NonJobClassError, NonParsableJobError, UnknownJobClassError, Worker

Constant Summary collapse

VERSION =
"0.2.0"

Class Method Summary collapse

Class Method Details

.[](disque_id) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/disc.rb', line 54

def self.[](disque_id)
  job_data = disque.call("SHOW", disque_id)
  return nil if job_data.nil?

  job_data = Hash[*job_data]
  job_data['arguments'] = Disc.deserialize(job_data['body'])['arguments']
  job_data['class'] = Disc.deserialize(job_data['body'])['class']

  job_data
end

.default_queueObject



26
27
28
# File 'lib/disc.rb', line 26

def self.default_queue
  @default_queue ||= 'default'
end

.default_queue=(queue) ⇒ Object



30
31
32
# File 'lib/disc.rb', line 30

def self.default_queue=(queue)
  @default_queue = queue
end

.deserialize(data) ⇒ Object



50
51
52
# File 'lib/disc.rb', line 50

def self.deserialize(data)
  JSON.parse(data)
end

.disqueObject



6
7
8
9
10
11
12
# File 'lib/disc.rb', line 6

def self.disque
  @disque ||= Disque.new(
    ENV.fetch('DISQUE_NODES', 'localhost:7711'),
    auth: ENV.fetch('DISQUE_AUTH', nil),
    cycle: Integer(ENV.fetch('DISQUE_CYCLE', '1000'))
  )
end

.disque=(disque) ⇒ Object



14
15
16
# File 'lib/disc.rb', line 14

def self.disque=(disque)
  @disque = disque
end

.disque_timeoutObject



18
19
20
# File 'lib/disc.rb', line 18

def self.disque_timeout
  @disque_timeout ||= 100
end

.disque_timeout=(timeout) ⇒ Object



22
23
24
# File 'lib/disc.rb', line 22

def self.disque_timeout=(timeout)
  @disque_timeout = timeout
end

.enqueue(klass, arguments, at: nil, queue: nil, **options) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/disc/testing.rb', line 28

def self.enqueue(klass,  arguments, at: nil, queue: nil, **options)
  job_attrs = { arguments: arguments, class: klass, options: options }
  if queues[queue].nil?
    queues[queue] = [job_attrs]
  else
    queues[queue] << job_attrs
  end

  job_attrs
end

.enqueue!Object



10
11
12
# File 'lib/disc/testing.rb', line 10

def self.enqueue!
  @testing_mode = 'enqueue'
end

.flushObject



38
39
40
# File 'lib/disc.rb', line 38

def self.flush
  Disc.disque.call('DEBUG', 'FLUSHALL')
end

.inline!Object



14
15
16
# File 'lib/disc/testing.rb', line 14

def self.inline!
  @testing_mode = 'inline'
end

.load_job(serialized_job, disque_id = nil) ⇒ Object

Receives:

A string containing data serialized by `Disc.serialize`

Returns:

An array containing:

  * An instance of the given job class
  * An array of arguments to pass to the job's `#perorm` class.


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/disc.rb', line 76

def self.load_job(serialized_job, disque_id = nil)
  begin
    job_data = Disc.deserialize(serialized_job)
  rescue => err
    raise Disc::NonParsableJobError.new(err)
  end

  begin
    job_class = Object.const_get(job_data['class'])
  rescue => err
    raise Disc::UnknownJobClassError.new(err)
  end

  begin
    job_instance = job_class.new
    job_instance.disque_id = disque_id
  rescue => err
    raise Disc::NonJobClassError.new(err)
  end

  return [job_instance, job_data['arguments']]
end

.on_error(exception, job) ⇒ Object



42
43
44
# File 'lib/disc.rb', line 42

def self.on_error(exception, job)
  $stderr.puts exception
end

.qlen(queue) ⇒ Object



34
35
36
# File 'lib/disc.rb', line 34

def self.qlen(queue)
  disque.call('QLEN', queue)
end

.queuesObject



2
3
4
# File 'lib/disc/testing.rb', line 2

def self.queues
  @queues ||= {}
end

.serialize(args) ⇒ Object



46
47
48
# File 'lib/disc.rb', line 46

def self.serialize(args)
  JSON.dump(args)
end

.testing_modeObject



6
7
8
# File 'lib/disc/testing.rb', line 6

def self.testing_mode
  @testing_mode ||= 'enqueue'
end