Class: RedisMQ::Client
- Inherits:
-
Object
- Object
- RedisMQ::Client
- Defined in:
- lib/redismq/client.rb
Instance Method Summary collapse
- #bind(topic, queue) ⇒ Object
- #bpop(queue, timeout = 0) ⇒ Object
- #connect_to_redis ⇒ Object
- #pop(queue) ⇒ Object
- #publish(topic, payload) ⇒ Object
- #push(queue, message) ⇒ Object
- #queue_key(queue) ⇒ Object
- #redis ⇒ Object
- #topic_key(topic) ⇒ Object
Instance Method Details
#bind(topic, queue) ⇒ Object
7 8 9 |
# File 'lib/redismq/client.rb', line 7 def bind(topic, queue) redis.sadd(topic_key(topic), queue_key(queue)) end |
#bpop(queue, timeout = 0) ⇒ Object
11 12 13 14 |
# File 'lib/redismq/client.rb', line 11 def bpop(queue, timeout=0) (queue, ) = redis.blpop(queue_key(queue), timeout) JSON.parse(, symbolize_names: true) end |
#connect_to_redis ⇒ Object
16 17 18 19 20 |
# File 'lib/redismq/client.rb', line 16 def connect_to_redis url = ENV['REDISMQ_URL'] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379' uri = URI.parse(url) Redis.new(host: uri.host, port: uri.port, password: uri.password) end |
#pop(queue) ⇒ Object
22 23 24 25 26 |
# File 'lib/redismq/client.rb', line 22 def pop(queue) result = redis.lpop(queue_key(queue)) return nil if result.nil? JSON.parse(result, symbolize_names: true) end |
#publish(topic, payload) ⇒ Object
28 29 30 31 32 33 |
# File 'lib/redismq/client.rb', line 28 def publish(topic, payload) = { header: { topic: topic }, payload: payload } redis.smembers(topic_key(topic)).each do |queue| push(queue, ) end end |
#push(queue, message) ⇒ Object
35 36 37 |
# File 'lib/redismq/client.rb', line 35 def push(queue, ) redis.rpush(queue, .to_json) end |
#queue_key(queue) ⇒ Object
43 44 45 |
# File 'lib/redismq/client.rb', line 43 def queue_key(queue) "rmq:queue:#{queue}" end |
#redis ⇒ Object
39 40 41 |
# File 'lib/redismq/client.rb', line 39 def redis @redis ||= connect_to_redis end |
#topic_key(topic) ⇒ Object
47 48 49 |
# File 'lib/redismq/client.rb', line 47 def topic_key(topic) "rmq:topic:#{topic}" end |