Class: Remq
- Inherits:
-
Object
- Object
- Remq
- Includes:
- MonitorMixin
- Defined in:
- lib/remq.rb,
lib/remq/script.rb,
lib/remq/version.rb
Defined Under Namespace
Constant Summary collapse
- LIMIT =
1000- VERSION =
"0.0.4"
Instance Attribute Summary collapse
-
#predis ⇒ Object
readonly
Returns the value of attribute predis.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
-
#consume(pattern, options = {}) ⇒ Array<Remq::Message>
Consume persisted messages from channels matching the given
pattern, starting with thecursorif provided, or the first message. -
#initialize(options = {}) ⇒ Remq
constructor
Create a
Remqclient with the givenoptions, which are passed to redis. - #inspect ⇒ Object
-
#key(*args) ⇒ String
Build a key from the given
nameandchannel. -
#off(event, listener) ⇒ Remq
Remove a listener from the given event.
-
#on(event, listener = nil) { ... } ⇒ Remq
Add a listener to the given event.
-
#publish(channel, message) ⇒ Integer
Publish a
messageto the givenchannel. -
#quit ⇒ Object
Forcibly close the connections to the Redis server.
-
#subscribe(pattern, options = {}) {|received| ... } ⇒ Thread
Subscribe to the channels matching given
pattern. -
#unsubscribe ⇒ Object
Unsubscribe.
Constructor Details
#initialize(options = {}) ⇒ Remq
Create a Remq client with the given options, which are passed to redis.
24 25 26 27 28 29 30 |
# File 'lib/remq.rb', line 24 def initialize( = {}) @redis = Redis.new() @predis = Redis.new() # seperate connection for pub/sub @listeners = Hash.new { |h, k| h[k] = [] } super() # Monitor#initialize end |
Instance Attribute Details
#predis ⇒ Object (readonly)
Returns the value of attribute predis.
19 20 21 |
# File 'lib/remq.rb', line 19 def predis @predis end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
18 19 20 |
# File 'lib/remq.rb', line 18 def redis @redis end |
Instance Method Details
#consume(pattern, options = {}) ⇒ Array<Remq::Message>
Consume persisted messages from channels matching the given pattern,
starting with the cursor if provided, or the first message. limit
determines how many messages will be return each time consume is called.
98 99 100 101 102 103 104 |
# File 'lib/remq.rb', line 98 def consume(pattern, = {}) synchronize do cursor, limit = .fetch(:cursor, 0), .fetch(:limit, LIMIT) msgs = call(:consume, pattern, cursor, limit) msgs.map { |msg| (msg) } end end |
#inspect ⇒ Object
159 160 161 |
# File 'lib/remq.rb', line 159 def inspect "#<Remq client v#{Remq::VERSION} for #{redis.client.id}>" end |
#key(*args) ⇒ String
Build a key from the given name and channel.
155 156 157 |
# File 'lib/remq.rb', line 155 def key(*args) (['remq'] + args).join(':') end |
#off(event, listener) ⇒ Remq
Remove a listener from the given event.
141 142 143 144 145 146 147 |
# File 'lib/remq.rb', line 141 def off(event, listener) synchronize do @listeners[event.to_sym].delete(listener) end self end |
#on(event, listener = nil) { ... } ⇒ Remq
Add a listener to the given event.
122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/remq.rb', line 122 def on(event, listener=nil, &block) listener ||= block unless listener.respond_to?(:call) raise ArgumentError.new('Listener must respond to #call') end synchronize do @listeners[event.to_sym] << listener end self end |
#publish(channel, message) ⇒ Integer
Publish a message to the given channel. The message must be a string,
but objects can easily be serialized using JSON, etc. The id of the
published message will be returned as an integer.
40 41 42 43 44 45 |
# File 'lib/remq.rb', line 40 def publish(channel, ) synchronize do id = call(:publish, channel, ) id && id.to_i end end |
#quit ⇒ Object
Forcibly close the connections to the Redis server.
107 108 109 110 111 112 |
# File 'lib/remq.rb', line 107 def quit synchronize do redis.quit predis.quit end end |
#subscribe(pattern, options = {}) {|received| ... } ⇒ Thread
Subscribe to the channels matching given pattern. If no initial from_id
is provided, Remq subscribes using vanilla Redis pub/sub. Any Redis pub/sub
pattern will work. If from_id is provided, Remq replays messages after the
given id until its caught up and able to switch to pub/sub.
Remq-rb subscribes to pub/sub on another thread, which is returned so you
can handle it and call Thread#join when ready to block.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/remq.rb', line 63 def subscribe(pattern, = {}, &block) synchronize do return if @subscription on(:message, &block) if block @subscription = true if cursor = [:from_id] @subscription = _subscribe_from_cursor(pattern, cursor) else @subscription = _subscribe_to_pubsub(pattern) end end end |
#unsubscribe ⇒ Object
Unsubscribe. No more message events will be emitted after this is called.
80 81 82 83 84 85 86 |
# File 'lib/remq.rb', line 80 def unsubscribe synchronize do return unless @subscription @subscription.exit if @subscription.is_a?(Thread) @subscription = nil end end |