Class: Hackle::EventProcessor::MessageProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/hackle/events/event_processor.rb

Constant Summary collapse

DEFAULT_MESSAGE_QUEUE_CAPACITY =
1000
DEFAULT_MAX_EVENT_DISPATCH_SIZE =
500

Instance Method Summary collapse

Constructor Details

#initialize(config:, event_dispatcher:) ⇒ MessageProcessor

Returns a new instance of MessageProcessor.



74
75
76
77
78
79
80
# File 'lib/hackle/events/event_processor.rb', line 74

def initialize(config:, event_dispatcher:)
  @logger = config.logger
  @event_dispatcher = event_dispatcher
  @message_queue = SizedQueue.new(DEFAULT_MESSAGE_QUEUE_CAPACITY)
  @random = Random.new
  @consumed_events = []
end

Instance Method Details

#consuming_loopObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/hackle/events/event_processor.rb', line 92

def consuming_loop
  loop do
    message = @message_queue.pop
    case message
    when Message::Event
      consume_event(event: message.event)
    when Message::Flush
      dispatch_events
    when Message::Shutdown
      break
    end
  end
rescue => e
  @logger.warn { "Uncaught exception in events message processor: #{e.inspect}" }
ensure
  dispatch_events
end

#produce(message:, non_block: true) ⇒ Object

Parameters:

  • message (Message)
  • non_block (boolean) (defaults to: true)


84
85
86
87
88
89
90
# File 'lib/hackle/events/event_processor.rb', line 84

def produce(message:, non_block: true)
  @message_queue.push(message, non_block)
rescue ThreadError
  if @random.rand(1..100) == 1 # log only 1% of the time
    @logger.warn { 'Events are produced faster than can be consumed. Some events will be dropped.' }
  end
end