Class: MessageChannel::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/message_channel/redis.rb

Instance Method Summary collapse

Constructor Details

#initialize(host: nil, port: nil, db: nil) ⇒ Redis

Returns a new instance of Redis.



7
8
9
10
11
12
13
# File 'lib/message_channel/redis.rb', line 7

def initialize( host: nil, port: nil, db: nil )
  @host  =  host  || "127.0.0.1"
  @port  =  ( port  ||  6379 ).to_i
  @db  =  ( db  ||  0 ).to_i
  @redis  =  ::Redis.new( host: @host, port: @port, db: @db )
  @threads  =  {}
end

Instance Method Details

#listen(*patterns, &block) ⇒ Object



66
67
68
69
70
71
72
73
74
# File 'lib/message_channel/redis.rb', line 66

def listen( *patterns, &block )
  if block.nil?
    listen_once( *patterns )
  else
    listen_each( *patterns ) do |topic, items|
      block.call( topic, items )
    end
  end
end

#listen_each(*patterns, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/message_channel/redis.rb', line 45

def listen_each( *patterns, &block )
  patterns.each do |pattern|
    @threads[pattern]  =  ::Thread.start(pattern) do |pattern|
      redis  =  ::Redis.new( host: @host, port: @port, db: @db )
      begin
        redis.psubscribe( pattern ) do |on|
          on.pmessage do |pattern, channel, message|
            items  =  JSON.parse( message, symbolize_names: true )
            block.call( channel, items )
          end
        end
      rescue  ::Redis::BaseConnectionError => error
        sleep 1
        retry
      ensure
        redis.punsubscribe( topic )    rescue  nil
      end
    end
  end
end

#listen_once(*patterns) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/message_channel/redis.rb', line 15

def listen_once( *patterns )
  queue  =  Queue.new
  threads  =  {}
  patterns.each do |pattern|
    threads[pattern]  =  ::Thread.start(pattern) do |pattern|
      redis  =  ::Redis.new( host: @host, port: @port, db: @db )
      begin
        redis.psubscribe( pattern ) do |on|
          on.pmessage do |pattern, channel, message|
            items  =  JSON.parse( message, symbolize_names: true )
            redis.punsubscribe( topic )    rescue  nil
            queue.push  [channel, items]
          end
        end
      rescue  ::Redis::BaseConnectionError => error
        sleep 1
        retry
      ensure
      end
    end
  end

  topic, items  =  queue.pop
  patterns.each do |pattern|
    threads[pattern].kill    rescue  nil
    threads.delete( pattern )    rescue  nil
  end
  [topic, items]
end

#notify(topic, **items) ⇒ Object



83
84
85
# File 'lib/message_channel/redis.rb', line 83

def notify( topic, **items )
  @redis.publish( topic, items.to_json )
end

#unlisten(**patterns) ⇒ Object



76
77
78
79
80
81
# File 'lib/message_channel/redis.rb', line 76

def unlisten( **patterns )
  patterns.each do |pattern|
    @threads[pattern].kill    rescue  nil
    @threads.delete( pattern )    rescue  nil
  end
end