Module: StartHer::Subscriber

Includes:
RedisClient, Utils
Defined in:
lib/start_her/subscriber.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

DEFAULT_OPTS =
{ channels: ['*'], heartbeat: Heartbeat::CHANNELS }
DEFAULT_ERROR_BLOCK =
lambda do |e|
  StartHer.logger.error e
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils

#fingerprint, #msid, #service_klass

Methods included from RedisClient

#real_client

Methods included from RetryPolicies

#exponential_backoff

Instance Attribute Details

#channelsObject (readonly)

Returns the value of attribute channels.



6
7
8
# File 'lib/start_her/subscriber.rb', line 6

def channels
  @channels
end

#optionsObject (readonly)

Returns the value of attribute options.



6
7
8
# File 'lib/start_her/subscriber.rb', line 6

def options
  @options
end

#subscriber_heartbeat_blockObject (readonly)

Returns the value of attribute subscriber_heartbeat_block.



6
7
8
# File 'lib/start_her/subscriber.rb', line 6

def subscriber_heartbeat_block
  @subscriber_heartbeat_block
end

#subscriber_on_psubscribe_blockObject (readonly)

Returns the value of attribute subscriber_on_psubscribe_block.



6
7
8
# File 'lib/start_her/subscriber.rb', line 6

def subscriber_on_psubscribe_block
  @subscriber_on_psubscribe_block
end

Class Method Details

.included(base) ⇒ Object



13
14
15
# File 'lib/start_her/subscriber.rb', line 13

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#initialize(channels) ⇒ Object



42
43
44
45
46
47
# File 'lib/start_her/subscriber.rb', line 42

def initialize(channels)
  @channels = channels
  @options = self.class.subscriber_options_hash || DEFAULT_OPTS
  @subscriber_heartbeat_block = self.class.subscriber_heartbeat_block || ->(_response) {}
  @subscriber_on_psubscribe_block = self.class.subscriber_on_psubscribe_block || ->(_chan) {}
end

#psubscribe(channels, &block) ⇒ Object

rubocop:disable Metrics/AbcSize,Metrics/MethodLength



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/start_her/subscriber.rb', line 64

def psubscribe(channels, &block)
  exponential_backoff({}, ::Redis::BaseConnectionError) do
    client.psubscribe(options[:heartbeat][:in], *channels) do |on|
      on.psubscribe do |channel, _|
        StartHer.logger.info "PSuscribe on #{channel}"

        resync_messages(client, channel, &block)
        subscriber_on_psubscribe_block.call(unamespace(channel))
      end

      on.pmessage do |_pattern, channel, message|
        chan = unamespace(channel)
        msg = MessagePack.unpack(message)

        if chan == options[:heartbeat][:in]
          Heartbeat.call(msg, service_klass,
            heartbeat: options[:heartbeat], &subscriber_heartbeat_block)
        else
          block.call(chan, msg)
        end
      end
    end
  end
rescue => e
  StartHer.logger.error e
  raise e
end

#resync_messages(client, channel, &block) ⇒ Object

rubocop:enable Metrics/AbcSize,Metrics/MethodLength



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/start_her/subscriber.rb', line 93

def resync_messages(client, channel, &block)
  StartHer.logger.info "Re-synchronize message from #{channel}"

  # Retrieve all backlogs for a given channel
  client.keys(unamespace(channel) + '*').each do |backlog|
    # Retrieve all messages for a given backlog
    client.lrange(backlog, 0, -1).each do |message|
      block.call(backlog.split(':').first, MessagePack.unpack(message))
    end
  end
end

#run!Object



49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/start_her/subscriber.rb', line 49

def run!
  psubscribe(channels) do |channel, message|
    begin
      process_message(channel, message)
    rescue => e
      if block_given?
        subscriber_error_block.call(e)
      else
        DEFAULT_ERROR_BLOCK.call(e)
      end
    end
  end
end