Class: Shinq::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/shinq/client.rb

Class Method Summary collapse

Class Method Details

.abortObject



59
60
61
# File 'lib/shinq/client.rb', line 59

def self.abort
  Shinq.connection.query('select queue_abort()')
end

.builderObject



7
8
9
# File 'lib/shinq/client.rb', line 7

def self.builder
  @builder ||= SQL::Maker.new(driver: 'mysql', auto_bind: true)
end

.dequeue(table_name:) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/shinq/client.rb', line 24

def self.dequeue(table_name:)
  quoted = SQL::Maker::Quoting.quote(table_name)
  queue_timeout_quoted = SQL::Maker::Quoting.quote(Shinq.configuration.queue_timeout)

  wait_query = "queue_wait(#{quoted}, #{queue_timeout_quoted})"
  has_queue = Shinq.connection.query("select #{wait_query}").first

  unless has_queue[wait_query].to_i == 0
    sql = builder.select(table_name, ['*'])
    results = Shinq.connection.query(sql)
    # select always returns 1 line in the owner (queue_wait) mode
    return results.first.symbolize_keys
  end
end

.doneObject



55
56
57
# File 'lib/shinq/client.rb', line 55

def self.done
  Shinq.connection.query('select queue_end()')
end

.enqueue(table_name:, job_id:, args:) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/shinq/client.rb', line 11

def self.enqueue(table_name: , job_id: , args:)
  case args
  when Hash
    sql = builder.insert(table_name, args.merge(
      job_id: job_id,
      enqueued_at: Time.now
    ))
    Shinq.connection.query(sql)
  else
    raise ArgumentError, "`args` should be a Hash"
  end
end

.queue_stats(table_name:) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/shinq/client.rb', line 39

def self.queue_stats(table_name:)
  quoted = SQL::Maker::Quoting.quote(table_name)

  stats_query = "queue_stats(#{quoted})"
  result = Shinq.connection.query("select #{stats_query}")

  stats = result.first[stats_query].split(/\n/).each_with_object({}) do |s, h|
    (k,v) = s.split(/:/)
    h[k.to_sym] = v.to_i
  end

  stats.merge(
    queue_count: stats[:rows_written] - stats[:rows_removed]
  )
end