Class: ClusteredRpc::Transport::RedisCluster

Inherits:
Base
  • Object
show all
Defined in:
lib/clustered_rpc/transport/redis_cluster.rb

Instance Method Summary collapse

Methods inherited from Base

#run_method_from_message

Constructor Details

#initializeRedisCluster

Returns a new instance of RedisCluster.



7
8
9
10
11
12
# File 'lib/clustered_rpc/transport/redis_cluster.rb', line 7

def initialize
  @redis_subscriber = nil
  @redis_publish = nil
  @redis_message_pubsub_key = "__#{ClusteredRpc.cluster_namespace}_messages"
  connect
end

Instance Method Details

#connectObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/clustered_rpc/transport/redis_cluster.rb', line 45

def connect 
  return if !@redis_subscriber.nil? # already connected - call reconnect...
  @redis_subscriber = ::Redis.new(ClusteredRpc.options)
  @redis_publish = ::Redis.new(ClusteredRpc.options)
  @subscribed = false
  @subscriber_thread = Thread.new do 
    begin
      @redis_subscriber.subscribe( @redis_message_pubsub_key ) do |on|
        on.subscribe do |channel, subscriptions|
          @retry_count = 0
          ClusteredRpc.logger.info {"ClusteredRpc: Subscribed to ##{channel} (#{subscriptions} subscriptions)"}
          @subscribed = true
        end

        on.message do |channel, message|
          ClusteredRpc.logger.debug {"ClusteredRpc: Handling message ##{channel}: #{message}"}
          begin
            message = JSON.parse(message) rescue message
            if message.is_a? Hash
              result = run_method_from_message(message)
              ClusteredRpc.logger.debug {"ClusteredRpc: Got result: #{result}"}
              request_id = message['request_id'] 
              if request_id
                # Store the result for 10 minutes in hash identified by the request_id and for this particular PID
                ClusteredRpc.logger.debug {"Setting ClusterSend result:  request:#{request_id}[#{Process.pid}]"}
                @redis_publish.pipelined do 
                  @redis_publish.hmset "request:#{request_id}", ClusteredRpc.instance_id, result.to_json
                  @redis_publish.expire "request:#{request_id}", 600
                end
              end
            else
              ClusteredRpc.logger.warn "Unknown message type: #{message.class}"
            end
          rescue => e
            ClusteredRpc.logger.error e.backtrace.join("\n")
            ClusteredRpc.logger.error "Error[#{e.message}] Handling message ##{channel}: #{message}"
          end
        end

        on.unsubscribe do |channel, subscriptions|
          ClusteredRpc.logger.info {"ClusteredRpc: Unsubscribed from ##{channel} (#{subscriptions} subscriptions)"}
        end
      end
    rescue Redis::BaseConnectionError => e
      ClusteredRpc.logger.error e.message
      @retry_count ||= 0
      @retry_count += 1
      sleep_seconds = [[@retry_count,10].min, 5].max 
      ClusteredRpc.logger.warn "ClusteredRpc: Retrying redis connection in #{sleep_seconds} seconds: #{@retry_count}"
      ClusteredRpc.logger.info @config
      sleep sleep_seconds
      retry if @retry_count <= 300
      ClusteredRpc.logger.warn "ClusteredRpc: Could not reconnect to Redis"
    ensure
      ClusteredRpc.logger.info "ClusteredRpc: Subscription thread terminated..."
      @subscribed = false
    end # begin
  end # @subscriber_thread = Thread.new do
  # Give the subscriber a chance to connect in background thread before returning
  attempts = 0
  while !subscribed? do
    sleep(1)
    attempts += 1
    ClusteredRpc.logger.info "ClusteredRpc: Waiting for subscription...#{attempts} times"
    raise "ClusteredRpc: Could not subscribe after #{attempts} attempts" if attempts > 15
  end
end

#get_result(request_id) ⇒ Object



21
22
23
# File 'lib/clustered_rpc/transport/redis_cluster.rb', line 21

def get_result(request_id)
  hgetall("request:#{request_id}")
end

#hgetall(key_name) ⇒ Object



37
38
39
# File 'lib/clustered_rpc/transport/redis_cluster.rb', line 37

def hgetall(key_name) 
  @redis_publish.hgetall key_name
end

#publish(payload = {}) ⇒ Object



14
15
16
17
18
19
# File 'lib/clustered_rpc/transport/redis_cluster.rb', line 14

def publish(payload={})
  @redis_publish.publish @redis_message_pubsub_key, payload.to_json
rescue => e 
  ClusteredRpc.logger.error "ClusteredRpc.publish encountered error: #{e.message}"
  raise e
end

#reconnectObject



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/clustered_rpc/transport/redis_cluster.rb', line 25

def reconnect
  @redis_publish = nil
  if @subscriber_thread
    ClusteredRpc.logger.warn "ClusteredRpc: killing subscriber thread"
    @subscriber_thread.kill
    # https://stackoverflow.com/questions/49490278/wait-for-a-thread-to-die-in-ruby
    sleep 0.01 while @subscriber_thread.alive?
    @redis_subscriber = nil
  end
  connect
end

#subscribed?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/clustered_rpc/transport/redis_cluster.rb', line 41

def subscribed?
  @subscribed
end