Module: Consumer
- Included in:
- Controls::Consumer::ErrorHandler::Example, Controls::Consumer::Incrementing
- Defined in:
- lib/consumer/log.rb,
lib/consumer/actor.rb,
lib/consumer/consumer.rb,
lib/consumer/defaults.rb,
lib/consumer/log_text.rb,
lib/consumer/substitute.rb,
lib/consumer/controls/id.rb,
lib/consumer/controls/get.rb,
lib/consumer/subscription.rb,
lib/consumer/controls/poll.rb,
lib/consumer/controls/actor.rb,
lib/consumer/controls/error.rb,
lib/consumer/position_store.rb,
lib/consumer/controls/handle.rb,
lib/consumer/controls/session.rb,
lib/consumer/handler_registry.rb,
lib/consumer/controls/category.rb,
lib/consumer/controls/consumer.rb,
lib/consumer/controls/position.rb,
lib/consumer/controls/identifier.rb,
lib/consumer/controls/stream_name.rb,
lib/consumer/controls/message_data.rb,
lib/consumer/controls/subscription.rb,
lib/consumer/subscription/defaults.rb,
lib/consumer/subscription/get_batch.rb,
lib/consumer/controls/position_store.rb,
lib/consumer/position_store/telemetry.rb,
lib/consumer/controls/get/incrementing.rb,
lib/consumer/position_store/substitute.rb,
lib/consumer/controls/handle/raise_error.rb,
lib/consumer/controls/message_data/batch.rb,
lib/consumer/controls/position_store/file.rb,
lib/consumer/controls/consumer/incrementing.rb,
lib/consumer/controls/consumer/error_handler.rb
Defined Under Namespace
Modules: Build, Configure, Controls, Defaults, HandlerMacro, IdentifierMacro, LogText, PositionStore, Start, Substitute
Classes: Actor, HandlerRegistry, Log, Subscription
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.included(cls) ⇒ Object
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/consumer/consumer.rb', line 2
def self.included(cls)
cls.class_exec do
include Dependency
include Initializer
include Virtual
include Log::Dependency
extend Build
extend Start
extend HandlerMacro
extend IdentifierMacro
prepend Configure
initializer :stream_name
attr_writer :identifier
def identifier
@identifier ||= self.class.identifier
end
attr_writer :position_update_interval
def position_update_interval
@position_update_interval ||= Defaults.position_update_interval
end
attr_writer :position_update_counter
def position_update_counter
@position_update_counter ||= 0
end
attr_accessor :session
attr_accessor :poll_interval_milliseconds
dependency :get
dependency :position_store, PositionStore
dependency :subscription, Subscription
virtual :error_raised do |error, message_data|
raise error
end
alias_method :call, :dispatch
end
end
|
Instance Method Details
#dispatch(message_data) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/consumer/consumer.rb', line 50
def dispatch(message_data)
logger.trace { "Dispatching message (#{LogText.message_data(message_data)})" }
self.class.handler_registry.each do |handler|
handler.(message_data, session: session)
end
update_position(message_data.global_position)
logger.info { "Message dispatched (#{LogText.message_data(message_data)})" }
rescue => error
logger.error { "Error raised (Error Class: #{error.class}, Error Message: #{error.message}, #{LogText.message_data(message_data)})" }
error_raised(error, message_data)
end
|
#start(&probe) ⇒ Object
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'lib/consumer/consumer.rb', line 66
def start(&probe)
logger.info(tag: :*) { "Starting consumer: #{self.class.name} (Stream: #{stream_name}, Identifier: #{identifier || '(none)'}, Position: #{subscription.position})" }
starting() if respond_to?(:starting)
self.class.handler_registry.each do |handler|
logger.info(tag: :*) { "Handler: #{handler.name} (Stream Name: #{stream_name}, Consumer: #{self.class.name})" }
end
_, subscription_thread = ::Actor::Start.(subscription)
actor_address, actor_thread = Actor.start(self, subscription, include: :thread)
if probe
subscription_address = subscription.address
probe.(self, [actor_thread, subscription_thread], [actor_address, subscription_address])
end
logger.info(tag: :*) { "Started consumer: #{self.class.name} (Stream: #{stream_name}, Identifier: #{identifier || '(none)'}, Position: #{subscription.position})" }
AsyncInvocation::Incorrect
end
|
#update_position(position) ⇒ Object
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/consumer/consumer.rb', line 90
def update_position(position)
logger.trace { "Updating position (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }
self.position_update_counter += 1
if position_update_counter >= position_update_interval
position_store.put(position)
logger.debug { "Updated position (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }
self.position_update_counter = 0
else
logger.debug { "Interval not reached; position not updated (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }
end
end
|