Class: RKS::Event::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/rks/event/processor.rb

Defined Under Namespace

Classes: ProcessorNotInitialized

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(correlation_id:, event:, payload:) ⇒ Processor

Returns a new instance of Processor.



10
11
12
13
14
15
16
# File 'lib/rks/event/processor.rb', line 10

def initialize(correlation_id:, event:, payload:)
  @correlation_id = correlation_id
  @event = event
  @payload = payload

  self.class.set_current_processor(self)
end

Instance Attribute Details

#correlation_idObject

Returns the value of attribute correlation_id.



8
9
10
# File 'lib/rks/event/processor.rb', line 8

def correlation_id
  @correlation_id
end

#eventObject

Returns the value of attribute event.



8
9
10
# File 'lib/rks/event/processor.rb', line 8

def event
  @event
end

#payloadObject

Returns the value of attribute payload.



8
9
10
# File 'lib/rks/event/processor.rb', line 8

def payload
  @payload
end

Class Method Details

.currentObject



29
30
31
32
33
34
35
# File 'lib/rks/event/processor.rb', line 29

def current
  if @current
    @current.value
  else
    raise ProcessorNotInitialized, "#set_current_processor is needed"
  end
end

.process(args) ⇒ Object



37
38
39
# File 'lib/rks/event/processor.rb', line 37

def process(args)
  new(args).process
end

.set_current_processor(processor) ⇒ Object



25
26
27
# File 'lib/rks/event/processor.rb', line 25

def set_current_processor(processor)
  @current = Concurrent::ThreadLocalVar.new(processor)
end

Instance Method Details

#processObject



18
19
20
21
22
# File 'lib/rks/event/processor.rb', line 18

def process
  Application.logger.with_rescue_and_duration_event(@correlation_id, @event, @payload) do
    RKS::Event::Handler.call(correlation_id: @correlation_id, event: @event, payload: @payload)
  end
end