Class: Pgq::ConsumerGroup

Inherits:
Consumer show all
Defined in:
lib/pgq/consumer_group.rb

Instance Attribute Summary

Attributes inherited from ConsumerBase

#consumer_name, #logger, #queue_name

Instance Method Summary collapse

Methods inherited from Consumer

add_event, method_missing, #perform

Methods inherited from ConsumerBase

#all_events_failed, #coder, coder, #connection, connection, consumer_name, #database, database, enqueue, #event_failed, #event_retry, extract_queue_name, #finish_batch, #get_batch_events, inherited, #initialize, #log_error, #log_info, next_queue_name, #perform, #perform_batch, #perform_event, queue_name, set_queue_name

Methods included from Utils

#add_queue, #delete_failed_events, #inspect_londiste_queue, #inspect_queue, #inspect_self_queue, #proxy, #queues_list, #remove_queue, #retry_failed_events

Constructor Details

This class inherits a constructor from Pgq::ConsumerBase

Instance Method Details

#perform_events(events) ⇒ Object



13
14
15
16
17
# File 'lib/pgq/consumer_group.rb', line 13

def perform_events(events)
  events = sum_events(events)
  # log_info "consume events (#{self.queue_name}): #{events.map{|k,v| [k, v.size]}.inspect}"
  perform_group(events) if events.present?
end

#perform_group(events_hash) ⇒ Object

=> [events]



9
10
11
# File 'lib/pgq/consumer_group.rb', line 9

def perform_group(events_hash)
  raise "realize me"
end

#sum_events(events) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/pgq/consumer_group.rb', line 19

def sum_events(events)
  events.inject({}) do |result, event|
    result[event.type] ||= []
    result[event.type] << event
    result
  end
end