Class: Shinq::Client
- Inherits:
-
Object
- Object
- Shinq::Client
- Defined in:
- lib/shinq/client.rb
Class Method Summary collapse
- .abort ⇒ Object
- .builder ⇒ Object
- .column_names(table_name:) ⇒ Object
- .dequeue(table_name:) ⇒ Object
- .done ⇒ Object
- .enqueue(table_name:, job_id:, args:, scheduled_at: nil) ⇒ Object
- .fetch_column_names(table_name:) ⇒ Object
- .queue_stats(table_name:) ⇒ Object
- .schedulable?(table_name:) ⇒ Boolean
Class Method Details
.abort ⇒ Object
87 88 89 |
# File 'lib/shinq/client.rb', line 87 def self.abort Shinq.connection.query('select queue_abort()') end |
.builder ⇒ Object
7 8 9 |
# File 'lib/shinq/client.rb', line 7 def self.builder @builder ||= SQL::Maker.new(driver: 'mysql', auto_bind: true) end |
.column_names(table_name:) ⇒ Object
67 68 69 70 71 72 73 74 75 |
# File 'lib/shinq/client.rb', line 67 def self.column_names(table_name:) @column_names_by_table_name ||= {} @column_names_by_table_name[table_name.to_sym] ||= begin quoted = SQL::Maker::Quoting.quote(table_name) column = Shinq.connection.query(<<-EOS).map { |record| record['column_name'] } select column_name from information_schema.columns where table_schema = database() and table_name = #{quoted} EOS end end |
.dequeue(table_name:) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/shinq/client.rb', line 30 def self.dequeue(table_name:) condition = schedulable?(table_name: table_name) ? ":scheduled_at<=#{Time.now.to_i}" : '' quoted = SQL::Maker::Quoting.quote("#{table_name}#{condition}") queue_timeout_quoted = SQL::Maker::Quoting.quote(Shinq.configuration.queue_timeout) wait_query = "queue_wait(#{quoted}, #{queue_timeout_quoted})" has_queue = Shinq.connection.query("select #{wait_query}").first unless has_queue[wait_query].to_i == 0 sql = builder.select(table_name, ['*']) results = Shinq.connection.query(sql) # select always returns 1 line in the owner (queue_wait) mode return results.first.symbolize_keys end end |
.done ⇒ Object
83 84 85 |
# File 'lib/shinq/client.rb', line 83 def self.done Shinq.connection.query('select queue_end()') end |
.enqueue(table_name:, job_id:, args:, scheduled_at: nil) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/shinq/client.rb', line 11 def self.enqueue(table_name: , job_id: , args:, scheduled_at: nil) if scheduled_at && !schedulable?(table_name: table_name) raise ArgumentError, "table #{table_name} is not schedulable. You need column `scheduled_at`" end case args when Hash attributes = args.merge( job_id: job_id, scheduled_at: scheduled_at ? scheduled_at.to_i : nil, enqueued_at: Time.now, ).compact sql = builder.insert(table_name, attributes) Shinq.connection.query(sql) else raise ArgumentError, "`args` should be a Hash" end end |
.fetch_column_names(table_name:) ⇒ Object
77 78 79 80 81 |
# File 'lib/shinq/client.rb', line 77 def self.fetch_column_names(table_name:) @column_names_by_table_name ||= {} @column_names_by_table_name.delete(table_name.to_sym) column_names(table_name: table_name) end |
.queue_stats(table_name:) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/shinq/client.rb', line 47 def self.queue_stats(table_name:) quoted = SQL::Maker::Quoting.quote(table_name) stats_query = "queue_stats(#{quoted})" result = Shinq.connection.query("select #{stats_query}") stats = result.first[stats_query].split(/\n/).each_with_object({}) do |s, h| (k,v) = s.split(/:/) h[k.to_sym] = v.to_i end stats.merge( queue_count: stats[:rows_written] - stats[:rows_removed] ) end |
.schedulable?(table_name:) ⇒ Boolean
63 64 65 |
# File 'lib/shinq/client.rb', line 63 def self.schedulable?(table_name:) self.column_names(table_name: table_name).include?('scheduled_at') end |