Module: StartHer::Subscriber

Includes:
RedisClient
Defined in:
lib/start_her/subscriber.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

DEFAULT_OPTS =
{ channels: ['*'] }
DEFAULT_ERROR_BLOCK =
lambda do |e|
  StartHer.logger.error e
end

Constants included from RedisClient

RedisClient::MQ_DEFAULTS

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RetryPolicies

#exponential_backoff

Class Method Details

.included(base) ⇒ Object



10
11
12
# File 'lib/start_her/subscriber.rb', line 10

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#psubscribe(channels, &block) ⇒ Object

rubocop:disable Metrics/AbcSize,Metrics/MethodLength



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/start_her/subscriber.rb', line 43

def psubscribe(channels, &block)
  exponential_backoff({}, ::Redis::BaseConnectionError) do
    client.psubscribe(*channels) do |on|
      on.psubscribe do |channel, _|
        StartHer.logger.info "PSuscribe on #{channel}"
      end

      on.pmessage do |_pattern, channel, message|
        chan = channel.include?(':') ? channel.split(':').last : channel
        block.call(chan, MessagePack.unpack(message))
      end
    end
  end
rescue => e
  StartHer.logger.error e
  raise e
end