Module: Consumer

Included in:
Controls::Consumer::ErrorHandler::Example, Controls::Consumer::Incrementing
Defined in:
lib/consumer/log.rb,
lib/consumer/actor.rb,
lib/consumer/consumer.rb,
lib/consumer/defaults.rb,
lib/consumer/log_text.rb,
lib/consumer/substitute.rb,
lib/consumer/controls/id.rb,
lib/consumer/controls/get.rb,
lib/consumer/subscription.rb,
lib/consumer/controls/poll.rb,
lib/consumer/controls/actor.rb,
lib/consumer/controls/error.rb,
lib/consumer/position_store.rb,
lib/consumer/controls/handle.rb,
lib/consumer/controls/session.rb,
lib/consumer/handler_registry.rb,
lib/consumer/controls/category.rb,
lib/consumer/controls/consumer.rb,
lib/consumer/controls/position.rb,
lib/consumer/controls/identifier.rb,
lib/consumer/controls/stream_name.rb,
lib/consumer/controls/message_data.rb,
lib/consumer/controls/subscription.rb,
lib/consumer/subscription/defaults.rb,
lib/consumer/subscription/get_batch.rb,
lib/consumer/controls/position_store.rb,
lib/consumer/position_store/telemetry.rb,
lib/consumer/controls/get/incrementing.rb,
lib/consumer/position_store/substitute.rb,
lib/consumer/controls/handle/raise_error.rb,
lib/consumer/controls/message_data/batch.rb,
lib/consumer/controls/position_store/file.rb,
lib/consumer/controls/consumer/incrementing.rb,
lib/consumer/controls/consumer/error_handler.rb

Defined Under Namespace

Modules: Build, Configure, Controls, Defaults, HandlerMacro, IdentifierMacro, LogText, PositionStore, Start, Substitute Classes: Actor, HandlerRegistry, Log, Subscription

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(cls) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/consumer/consumer.rb', line 2

def self.included(cls)
  cls.class_exec do
    include Dependency
    include Initializer
    include Virtual
    include Log::Dependency

    extend Build
    extend Start

    extend HandlerMacro
    extend IdentifierMacro

    prepend Configure

    initializer :stream_name

    attr_writer :identifier
    def identifier
      @identifier ||= self.class.identifier
    end

    attr_writer :position_update_interval
    def position_update_interval
      @position_update_interval ||= Defaults.position_update_interval
    end

    attr_writer :position_update_counter
    def position_update_counter
      @position_update_counter ||= 0
    end

    attr_accessor :session

    attr_accessor :poll_interval_milliseconds

    dependency :get
    dependency :position_store, PositionStore
    dependency :subscription, Subscription

    virtual :error_raised do |error, message_data|
      raise error
    end

    alias_method :call, :dispatch
  end
end

Instance Method Details

#dispatch(message_data) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/consumer/consumer.rb', line 50

def dispatch(message_data)
  logger.trace { "Dispatching message (#{LogText.message_data(message_data)})" }

  self.class.handler_registry.each do |handler|
    handler.(message_data, session: session)
  end

  update_position(message_data.global_position)

  logger.info { "Message dispatched (#{LogText.message_data(message_data)})" }

rescue => error
  logger.error { "Error raised (Error Class: #{error.class}, Error Message: #{error.message}, #{LogText.message_data(message_data)})" }
  error_raised(error, message_data)
end

#start(&probe) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/consumer/consumer.rb', line 66

def start(&probe)
  logger.info(tag: :*) { "Starting consumer: #{self.class.name} (Stream: #{stream_name}, Identifier: #{identifier || '(none)'}, Position: #{subscription.position})" }

  starting() if respond_to?(:starting)

  self.class.handler_registry.each do |handler|
    logger.info(tag: :*) { "Handler: #{handler.name} (Stream Name: #{stream_name}, Consumer: #{self.class.name})" }
  end

  _, subscription_thread = ::Actor::Start.(subscription)

  actor_address, actor_thread = Actor.start(self, subscription, include: :thread)

  if probe
    subscription_address = subscription.address

    probe.(self, [actor_thread, subscription_thread], [actor_address, subscription_address])
  end

  logger.info(tag: :*) { "Started consumer: #{self.class.name} (Stream: #{stream_name}, Identifier: #{identifier || '(none)'}, Position: #{subscription.position})" }

  AsyncInvocation::Incorrect
end

#update_position(position) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/consumer/consumer.rb', line 90

def update_position(position)
  logger.trace { "Updating position (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }

  self.position_update_counter += 1

  if position_update_counter >= position_update_interval
    position_store.put(position)

    logger.debug { "Updated position (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }

    self.position_update_counter = 0
  else
    logger.debug { "Interval not reached; position not updated (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }
  end
end