Class: SimpleRedisMQHelper

Inherits:
Object
  • Object
show all
Defined in:
lib/helpers/simple_redis_mq_helper.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, host_ip, port, input_channel, output_channel) ⇒ SimpleRedisMQHelper

Returns a new instance of SimpleRedisMQHelper.



8
9
10
11
12
13
14
15
16
17
18
# File 'lib/helpers/simple_redis_mq_helper.rb', line 8

def initialize(name, host_ip, port, input_channel, output_channel)
  @name = name
  @instance = Redis.new(host: host_ip, port: port)
  @ip = host_ip
  @port = port
  @input_channel = input_channel
  @input_lock = "#{input_channel}_lock"
  @output_channel = output_channel

  @instance.setnx(@input_lock, 0)
end

Instance Attribute Details

#input_channelObject (readonly)

Returns the value of attribute input_channel.



6
7
8
# File 'lib/helpers/simple_redis_mq_helper.rb', line 6

def input_channel
  @input_channel
end

#ipObject (readonly)

Returns the value of attribute ip.



6
7
8
# File 'lib/helpers/simple_redis_mq_helper.rb', line 6

def ip
  @ip
end

#output_channelObject (readonly)

Returns the value of attribute output_channel.



6
7
8
# File 'lib/helpers/simple_redis_mq_helper.rb', line 6

def output_channel
  @output_channel
end

#portObject (readonly)

Returns the value of attribute port.



6
7
8
# File 'lib/helpers/simple_redis_mq_helper.rb', line 6

def port
  @port
end

Instance Method Details

#pop_input!Object

wait until get input



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/helpers/simple_redis_mq_helper.rb', line 26

def pop_input!
  input = old_lock = new_lock = nil

  while true
    if @instance.llen(@input_channel) == 0
      puts "#{@name} check empty, sleep"
      sleep(rand(5) + 5)
      next
    end

  puts "#{@name} check exists, go on"

    if (old_lock = @instance.get(@input_lock).to_i) == 0
      new_lock = @instance.incr(@input_lock).to_i

      if new_lock != 1
        puts "#{@name} check conflict, sleep"
        sleep(rand(10) * 0.1 + 1)      # sleep 1~2
      next
      end

      puts "#{@name} get key!"
      input = @instance.rpop(@input_channel)
      @instance.set(@input_lock, 0)
      break
    end

  end

  return input
end

#push_output!(message) ⇒ Object

push directly without lock, message is string



21
22
23
# File 'lib/helpers/simple_redis_mq_helper.rb', line 21

def push_output!(message)
  @instance.rpush(@output_channel, message) 
end