Module: StartHer::Subscriber
- Includes:
- RedisClient, Utils
- Defined in:
- lib/start_her/subscriber.rb
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- DEFAULT_OPTS =
{ channels: ['*'], heartbeat: Heartbeat::CHANNELS }
- DEFAULT_ERROR_BLOCK =
lambda do |e| StartHer.logger.error e end
Instance Attribute Summary collapse
-
#channels ⇒ Object
readonly
Returns the value of attribute channels.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#subscriber_heartbeat_block ⇒ Object
readonly
Returns the value of attribute subscriber_heartbeat_block.
-
#subscriber_on_psubscribe_block ⇒ Object
readonly
Returns the value of attribute subscriber_on_psubscribe_block.
Class Method Summary collapse
Instance Method Summary collapse
- #initialize(channels) ⇒ Object
-
#psubscribe(channels, &block) ⇒ Object
rubocop:disable Metrics/AbcSize,Metrics/MethodLength.
-
#resync_messages(client, channel, &block) ⇒ Object
rubocop:enable Metrics/AbcSize,Metrics/MethodLength.
- #run! ⇒ Object
Methods included from Utils
#fingerprint, #msid, #service_klass
Methods included from RedisClient
Methods included from RetryPolicies
Instance Attribute Details
#channels ⇒ Object (readonly)
Returns the value of attribute channels.
6 7 8 |
# File 'lib/start_her/subscriber.rb', line 6 def channels @channels end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
6 7 8 |
# File 'lib/start_her/subscriber.rb', line 6 def @options end |
#subscriber_heartbeat_block ⇒ Object (readonly)
Returns the value of attribute subscriber_heartbeat_block.
6 7 8 |
# File 'lib/start_her/subscriber.rb', line 6 def subscriber_heartbeat_block @subscriber_heartbeat_block end |
#subscriber_on_psubscribe_block ⇒ Object (readonly)
Returns the value of attribute subscriber_on_psubscribe_block.
6 7 8 |
# File 'lib/start_her/subscriber.rb', line 6 def subscriber_on_psubscribe_block @subscriber_on_psubscribe_block end |
Class Method Details
.included(base) ⇒ Object
13 14 15 |
# File 'lib/start_her/subscriber.rb', line 13 def self.included(base) base.extend(ClassMethods) end |
Instance Method Details
#initialize(channels) ⇒ Object
42 43 44 45 46 47 |
# File 'lib/start_her/subscriber.rb', line 42 def initialize(channels) @channels = channels @options = self.class. || DEFAULT_OPTS @subscriber_heartbeat_block = self.class.subscriber_heartbeat_block || ->(_response) {} @subscriber_on_psubscribe_block = self.class.subscriber_on_psubscribe_block || ->(_chan) {} end |
#psubscribe(channels, &block) ⇒ Object
rubocop:disable Metrics/AbcSize,Metrics/MethodLength
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/start_her/subscriber.rb', line 64 def psubscribe(channels, &block) exponential_backoff({}, ::Redis::BaseConnectionError) do client.psubscribe([:heartbeat][:in], *channels) do |on| on.psubscribe do |channel, _| StartHer.logger.info "PSuscribe on #{channel}" (client, channel, &block) subscriber_on_psubscribe_block.call(unamespace(channel)) end on. do |_pattern, channel, | chan = unamespace(channel) msg = MessagePack.unpack() if chan == [:heartbeat][:in] Heartbeat.call(msg, service_klass, heartbeat: [:heartbeat], &subscriber_heartbeat_block) else block.call(chan, msg) end end end end rescue => e StartHer.logger.error e raise e end |
#resync_messages(client, channel, &block) ⇒ Object
rubocop:enable Metrics/AbcSize,Metrics/MethodLength
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/start_her/subscriber.rb', line 93 def (client, channel, &block) StartHer.logger.info "Re-synchronize message from #{channel}" # Retrieve all backlogs for a given channel client.keys(unamespace(channel) + '*').each do |backlog| # Retrieve all messages for a given backlog client.lrange(backlog, 0, -1).each do || block.call(backlog.split(':').first, MessagePack.unpack()) end end end |
#run! ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/start_her/subscriber.rb', line 49 def run! psubscribe(channels) do |channel, | begin (channel, ) rescue => e if block_given? subscriber_error_block.call(e) else DEFAULT_ERROR_BLOCK.call(e) end end end end |