Class: Shinq::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/shinq/client.rb

Class Method Summary collapse

Class Method Details

.abortObject



87
88
89
# File 'lib/shinq/client.rb', line 87

def self.abort
  Shinq.connection.query('select queue_abort()')
end

.builderObject



7
8
9
# File 'lib/shinq/client.rb', line 7

def self.builder
  @builder ||= SQL::Maker.new(driver: 'mysql', auto_bind: true)
end

.column_names(table_name:) ⇒ Object



67
68
69
70
71
72
73
74
75
# File 'lib/shinq/client.rb', line 67

def self.column_names(table_name:)
  @column_names_by_table_name ||= {}
  @column_names_by_table_name[table_name.to_sym] ||= begin
    quoted = SQL::Maker::Quoting.quote(table_name)
    column = Shinq.connection.query(<<-EOS).map { |record| record['column_name'] }
select column_name from information_schema.columns where table_schema = database() and table_name = #{quoted}
    EOS
  end
end

.dequeue(table_name:) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/shinq/client.rb', line 30

def self.dequeue(table_name:)
  condition = schedulable?(table_name: table_name) ? ":scheduled_at<=#{Time.now.to_i}" : ''
  quoted = SQL::Maker::Quoting.quote("#{table_name}#{condition}")

  queue_timeout_quoted = SQL::Maker::Quoting.quote(Shinq.configuration.queue_timeout)

  wait_query = "queue_wait(#{quoted}, #{queue_timeout_quoted})"
  has_queue = Shinq.connection.query("select #{wait_query}").first

  unless has_queue[wait_query].to_i == 0
    sql = builder.select(table_name, ['*'])
    results = Shinq.connection.query(sql)
    # select always returns 1 line in the owner (queue_wait) mode
    return results.first.symbolize_keys
  end
end

.doneObject



83
84
85
# File 'lib/shinq/client.rb', line 83

def self.done
  Shinq.connection.query('select queue_end()')
end

.enqueue(table_name:, job_id:, args:, scheduled_at: nil) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/shinq/client.rb', line 11

def self.enqueue(table_name: , job_id: , args:, scheduled_at: nil)
  if scheduled_at && !schedulable?(table_name: table_name)
    raise ArgumentError, "table #{table_name} is not schedulable. You need column `scheduled_at`"
  end

  case args
  when Hash
    attributes = args.merge(
      job_id: job_id,
      scheduled_at: scheduled_at ? scheduled_at.to_i : nil,
      enqueued_at: Time.now,
    ).compact
    sql = builder.insert(table_name, attributes)
    Shinq.connection.query(sql)
  else
    raise ArgumentError, "`args` should be a Hash"
  end
end

.fetch_column_names(table_name:) ⇒ Object



77
78
79
80
81
# File 'lib/shinq/client.rb', line 77

def self.fetch_column_names(table_name:)
  @column_names_by_table_name ||= {}
  @column_names_by_table_name.delete(table_name.to_sym)
  column_names(table_name: table_name)
end

.queue_stats(table_name:) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/shinq/client.rb', line 47

def self.queue_stats(table_name:)
  quoted = SQL::Maker::Quoting.quote(table_name)

  stats_query = "queue_stats(#{quoted})"
  result = Shinq.connection.query("select #{stats_query}")

  stats = result.first[stats_query].split(/\n/).each_with_object({}) do |s, h|
    (k,v) = s.split(/:/)
    h[k.to_sym] = v.to_i
  end

  stats.merge(
    queue_count: stats[:rows_written] - stats[:rows_removed]
  )
end

.schedulable?(table_name:) ⇒ Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/shinq/client.rb', line 63

def self.schedulable?(table_name:)
  self.column_names(table_name: table_name).include?('scheduled_at')
end