Class: RedisMQ::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/redismq/client.rb

Instance Method Summary collapse

Instance Method Details

#bind(topic, queue) ⇒ Object



7
8
9
# File 'lib/redismq/client.rb', line 7

def bind(topic, queue)
  redis.sadd(topic_key(topic), queue_key(queue))
end

#bpop(queue, timeout = 0) ⇒ Object



11
12
13
14
# File 'lib/redismq/client.rb', line 11

def bpop(queue, timeout=0)
  (queue, message) = redis.blpop(queue_key(queue), timeout)
  JSON.parse(message, symbolize_names: true)
end

#connect_to_redisObject



16
17
18
19
20
# File 'lib/redismq/client.rb', line 16

def connect_to_redis
  url = ENV['REDISMQ_URL'] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379'
  uri = URI.parse(url)
  Redis.new(host: uri.host, port: uri.port, password: uri.password)
end

#pop(queue) ⇒ Object



22
23
24
25
26
# File 'lib/redismq/client.rb', line 22

def pop(queue)
  result = redis.lpop(queue_key(queue))
  return nil if result.nil?
  JSON.parse(result, symbolize_names: true)
end

#publish(topic, payload) ⇒ Object



28
29
30
31
32
33
# File 'lib/redismq/client.rb', line 28

def publish(topic, payload)
  message = { header: { topic: topic }, payload: payload }
  redis.smembers(topic_key(topic)).each do |queue|
    push(queue, message)        
  end
end

#push(queue, message) ⇒ Object



35
36
37
# File 'lib/redismq/client.rb', line 35

def push(queue, message)
  redis.rpush(queue, message.to_json)
end

#queue_key(queue) ⇒ Object



43
44
45
# File 'lib/redismq/client.rb', line 43

def queue_key(queue)
  "rmq:queue:#{queue}"
end

#redisObject



39
40
41
# File 'lib/redismq/client.rb', line 39

def redis
  @redis ||= connect_to_redis
end

#topic_key(topic) ⇒ Object



47
48
49
# File 'lib/redismq/client.rb', line 47

def topic_key(topic)
  "rmq:topic:#{topic}"
end