Module: Pgq::Api

Defined in:
lib/pgq/api.rb

Instance Method Summary collapse

Instance Method Details

#pgq_create_queue(queue_name) ⇒ Object

manage queues



6
7
8
# File 'lib/pgq/api.rb', line 6

def pgq_create_queue(queue_name)
  connection.select_value(sanitize_sql_array ["SELECT pgq.create_queue(?)", queue_name]).to_i
end

#pgq_drop_queue(queue_name) ⇒ Object



10
11
12
# File 'lib/pgq/api.rb', line 10

def pgq_drop_queue(queue_name)
  connection.select_value(sanitize_sql_array ["SELECT pgq.drop_queue(?)", queue_name]).to_i
end

#pgq_event_failed(batch_id, event_id, reason) ⇒ Object

failed/retry



47
48
49
# File 'lib/pgq/api.rb', line 47

def pgq_event_failed(batch_id, event_id, reason)
  connection.select_value(sanitize_sql_array ["SELECT pgq.event_failed(?, ?, ?)", batch_id, event_id, reason]).to_i
end

#pgq_event_retry(batch_id, event_id, retry_seconds) ⇒ Object



51
52
53
# File 'lib/pgq/api.rb', line 51

def pgq_event_retry(batch_id, event_id, retry_seconds)
  connection.select_value(sanitize_sql_array ["SELECT pgq.event_retry(?, ?, ?)", batch_id, event_id, retry_seconds]).to_i
end

#pgq_failed_event_count(queue_name, consumer_name) ⇒ Object



65
66
67
68
# File 'lib/pgq/api.rb', line 65

def pgq_failed_event_count(queue_name, consumer_name)
  res = connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_count(?, ?)", queue_name, consumer_name])
  res ? res.to_i : nil
end

#pgq_failed_event_delete(queue_name, consumer_name, event_id) ⇒ Object



61
62
63
# File 'lib/pgq/api.rb', line 61

def pgq_failed_event_delete(queue_name, consumer_name, event_id)
  connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_delete(?, ?, ?)", queue_name, consumer_name, event_id])
end

#pgq_failed_event_list(queue_name, consumer_name, limit = nil, offset = nil, order = 'desc') ⇒ Object



70
71
72
73
# File 'lib/pgq/api.rb', line 70

def pgq_failed_event_list queue_name, consumer_name, limit = nil, offset = nil, order = 'desc'
  order = (order.to_s == 'desc') ? 'desc' : 'asc'
  connection.select_all(sanitize_sql_array ["SELECT * FROM pgq.failed_event_list(?, ?, ?, ?) ORDER BY ev_id #{order.upcase}", queue_name, consumer_name, limit.to_i, offset.to_i])
end

#pgq_failed_event_retry(queue_name, consumer_name, event_id) ⇒ Object

failed events



57
58
59
# File 'lib/pgq/api.rb', line 57

def pgq_failed_event_retry(queue_name, consumer_name, event_id)
  connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_retry(?, ?, ?)", queue_name, consumer_name, event_id])
end

#pgq_finish_batch(batch_id) ⇒ Object



41
42
43
# File 'lib/pgq/api.rb', line 41

def pgq_finish_batch(batch_id)
  connection.select_value(sanitize_sql_array ["SELECT pgq.finish_batch(?)", batch_id])
end

#pgq_force_tick(queue_name) ⇒ Object



132
133
134
# File 'lib/pgq/api.rb', line 132

def pgq_force_tick(queue_name)
  connection.select_value(sanitize_sql_array ["SELECT pgq.force_tick(?)", queue_name]).to_i
end

#pgq_get_batch_events(batch_id) ⇒ Object



37
38
39
# File 'lib/pgq/api.rb', line 37

def pgq_get_batch_events(batch_id)
  connection.select_all(sanitize_sql_array ["SELECT * FROM pgq.get_batch_events(?)", batch_id])
end

#pgq_get_consumer_infoObject



87
88
89
# File 'lib/pgq/api.rb', line 87

def pgq_get_consumer_info
  connection.select_all("SELECT *, EXTRACT(epoch FROM last_seen) AS last_seen_sec, EXTRACT(epoch FROM lag) AS lag_sec FROM pgq.get_consumer_info()")
end

#pgq_get_consumer_queue_info(queue_name) ⇒ Object



91
92
93
# File 'lib/pgq/api.rb', line 91

def pgq_get_consumer_queue_info(queue_name)
  connection.select_one(sanitize_sql_array ["SELECT *, EXTRACT(epoch FROM last_seen) AS last_seen_sec, EXTRACT(epoch FROM lag) AS lag_sec FROM pgq.get_consumer_info(?)", queue_name]) || {}
end

#pgq_get_queue_info(queue_name) ⇒ Object

info methods



77
78
79
# File 'lib/pgq/api.rb', line 77

def pgq_get_queue_info(queue_name)
  connection.select_value(sanitize_sql_array ["SELECT pgq.get_queue_info(?)", queue_name])
end

#pgq_get_queues_infoObject

Get list of queues. Result: (queue_name, queue_ntables, queue_cur_table, queue_rotation_period, queue_switch_time, queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag)



83
84
85
# File 'lib/pgq/api.rb', line 83

def pgq_get_queues_info
  connection.select_values("SELECT pgq.get_queue_info()")
end

#pgq_insert_event(queue_name, ev_type, ev_data, ev_extra1 = nil, ev_extra2 = nil, ev_extra3 = nil, ev_extra4 = nil) ⇒ Object

insert events



24
25
26
27
28
# File 'lib/pgq/api.rb', line 24

def pgq_insert_event(queue_name, ev_type, ev_data, ev_extra1 = nil, ev_extra2 = nil, ev_extra3 = nil, ev_extra4 = nil)
  result = connection.select_value(sanitize_sql_array ["SELECT pgq.insert_event(?, ?, ?, ?, ?, ?, ?)", 
                                                       queue_name, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4])
  result ? result.to_i : nil
end

#pgq_last_event_id(queue_name) ⇒ Object

utils



97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/pgq/api.rb', line 97

def pgq_last_event_id(queue_name)
  ticks = pgq_get_consumer_queue_info(queue_name)
  table = connection.select_value("SELECT queue_data_pfx AS table FROM pgq.queue WHERE queue_name = #{sanitize(queue_name)}")

  result = nil

  if ticks['current_batch']
    sql = connection.select_value("SELECT * FROM pgq.batch_event_sql(#{sanitize(ticks['current_batch'].to_i)})")
    last_event = connection.select_value("SELECT MAX(ev_id) AS count FROM (#{sql}) AS x")
    result = last_event.to_i
  end

  [table, result]
end

#pgq_mass_delete_failed_events(queue_name, consumer_name, limit = 5_000) ⇒ Object



122
123
124
125
126
127
128
129
130
# File 'lib/pgq/api.rb', line 122

def pgq_mass_delete_failed_events(queue_name, consumer_name, limit = 5_000)
  events = pgq_failed_event_list(queue_name, consumer_name, limit, nil, 'asc') || []

  events.each do |event|
    pgq_failed_event_delete(queue_name, consumer_name, event['ev_id'])
  end

  events.length
end

#pgq_mass_retry_failed_events(queue_name, consumer_name, limit = 5_000) ⇒ Object



112
113
114
115
116
117
118
119
120
# File 'lib/pgq/api.rb', line 112

def pgq_mass_retry_failed_events(queue_name, consumer_name, limit = 5_000)
  events = pgq_failed_event_list(queue_name, consumer_name, limit, nil, 'asc') || []

  events.each do |event|
    pgq_failed_event_retry(queue_name, consumer_name, event['ev_id'])
  end

  events.length
end

#pgq_next_batch(queue_name, consumer_name) ⇒ Object

consuming



32
33
34
35
# File 'lib/pgq/api.rb', line 32

def pgq_next_batch(queue_name, consumer_name)
  result = connection.select_value(sanitize_sql_array ["SELECT pgq.next_batch(?, ?)", queue_name, consumer_name])
  result ? result.to_i : nil
end

#pgq_register_consumer(queue_name, consumer_name) ⇒ Object



14
15
16
# File 'lib/pgq/api.rb', line 14

def pgq_register_consumer(queue_name, consumer_name)
  connection.select_value(sanitize_sql_array ["SELECT pgq.register_consumer(?, ?)", queue_name, consumer_name]).to_i
end

#pgq_unregister_consumer(queue_name, consumer_name) ⇒ Object



18
19
20
# File 'lib/pgq/api.rb', line 18

def pgq_unregister_consumer(queue_name, consumer_name)
  connection.select_value(sanitize_sql_array ["SELECT pgq.unregister_consumer(?, ?)", queue_name, consumer_name]).to_i
end