Class: MessageBus::ReliablePubSub
- Inherits:
-
Object
- Object
- MessageBus::ReliablePubSub
- Defined in:
- lib/message_bus/reliable_pub_sub.rb
Defined Under Namespace
Classes: BackLogOutOfOrder, NoMoreRetries
Constant Summary collapse
- UNSUB_MESSAGE =
"$$UNSUBSCRIBE"
Instance Attribute Summary collapse
-
#subscribed ⇒ Object
readonly
Returns the value of attribute subscribed.
Instance Method Summary collapse
- #after_fork ⇒ Object
- #backlog(channel, last_id = nil) ⇒ Object
- #backlog_id_key(channel) ⇒ Object
- #backlog_key(channel) ⇒ Object
- #get_message(channel, message_id) ⇒ Object
- #global_backlog(last_id = nil) ⇒ Object
- #global_backlog_key ⇒ Object
- #global_id_key ⇒ Object
- #global_subscribe(last_id = nil, &blk) ⇒ Object
- #global_unsubscribe ⇒ Object
-
#initialize(redis_config = {}, max_backlog_size = 1000) ⇒ ReliablePubSub
constructor
max_backlog_size is per multiplexed channel.
- #last_id(channel) ⇒ Object
-
#max_backlog_size=(val) ⇒ Object
per channel backlog size.
-
#max_global_backlog_size=(val) ⇒ Object
amount of global backlog we can spin through.
- #max_publish_retries ⇒ Object
- #max_publish_retries=(val) ⇒ Object
- #max_publish_wait ⇒ Object
- #max_publish_wait=(ms) ⇒ Object
- #new_redis_connection ⇒ Object
- #process_global_backlog(highest_id, raise_error, &blk) ⇒ Object
-
#pub_redis ⇒ Object
redis connection used for publishing messages.
- #publish(channel, data) ⇒ Object
- #redis_channel_name ⇒ Object
-
#reset! ⇒ Object
use with extreme care, will nuke all of the data.
- #subscribe(channel, last_id = nil) ⇒ Object
Constructor Details
#initialize(redis_config = {}, max_backlog_size = 1000) ⇒ ReliablePubSub
max_backlog_size is per multiplexed channel
42 43 44 45 46 47 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 42 def initialize(redis_config = {}, max_backlog_size = 1000) @redis_config = redis_config @max_backlog_size = 1000 # we can store a ton here ... @max_global_backlog_size = 100000 end |
Instance Attribute Details
#subscribed ⇒ Object (readonly)
Returns the value of attribute subscribed.
12 13 14 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 12 def subscribed @subscribed end |
Instance Method Details
#after_fork ⇒ Object
63 64 65 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 63 def after_fork pub_redis.client.reconnect end |
#backlog(channel, last_id = nil) ⇒ Object
140 141 142 143 144 145 146 147 148 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 140 def backlog(channel, last_id = nil) redis = pub_redis backlog_key = backlog_key(channel) items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf" items.map do |i| MessageBus::Message.decode(i) end end |
#backlog_id_key(channel) ⇒ Object
81 82 83 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 81 def backlog_id_key(channel) "__mb_backlog_id_n_#{channel}" end |
#backlog_key(channel) ⇒ Object
77 78 79 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 77 def backlog_key(channel) "__mb_backlog_n_#{channel}" end |
#get_message(channel, message_id) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 168 def (channel, ) redis = pub_redis backlog_key = backlog_key(channel) items = redis.zrangebyscore backlog_key, , if items && items[0] MessageBus::Message.decode(items[0]) else nil end end |
#global_backlog(last_id = nil) ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 150 def global_backlog(last_id = nil) last_id = last_id.to_i redis = pub_redis items = redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf" items.map! do |i| pipe = i.index "|" = i[0..pipe].to_i channel = i[pipe+1..-1] m = (channel, ) m end items.compact! items end |
#global_backlog_key ⇒ Object
89 90 91 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 89 def global_backlog_key "__mb_global_backlog_n" end |
#global_id_key ⇒ Object
85 86 87 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 85 def global_id_key "__mb_global_id_n" end |
#global_subscribe(last_id = nil, &blk) ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 223 def global_subscribe(last_id=nil, &blk) raise ArgumentError unless block_given? highest_id = last_id clear_backlog = lambda do retries = 4 begin highest_id = process_global_backlog(highest_id, retries > 0, &blk) rescue BackLogOutOfOrder => e highest_id = e.highest_id retries -= 1 sleep(rand(50) / 1000.0) retry end end begin @redis_global = new_redis_connection if highest_id clear_backlog.call(&blk) end @redis_global.subscribe(redis_channel_name) do |on| on.subscribe do if highest_id clear_backlog.call(&blk) end @subscribed = true end on.unsubscribe do @subscribed = false end on. do |c,m| if m == UNSUB_MESSAGE @redis_global.unsubscribe return end m = MessageBus::Message.decode m # we have 2 options # # 1. message came in the correct order GREAT, just deal with it # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog if highest_id.nil? || m.global_id == highest_id + 1 highest_id = m.global_id yield m else clear_backlog.call(&blk) end end end rescue => error MessageBus.logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace}" sleep 1 retry end end |
#global_unsubscribe ⇒ Object
215 216 217 218 219 220 221 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 215 def global_unsubscribe if @redis_global pub_redis.publish(redis_channel_name, UNSUB_MESSAGE) @redis_global.disconnect @redis_global = nil end end |
#last_id(channel) ⇒ Object
135 136 137 138 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 135 def last_id(channel) backlog_id_key = backlog_id_key(channel) pub_redis.get(backlog_id_key).to_i end |
#max_backlog_size=(val) ⇒ Object
per channel backlog size
55 56 57 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 55 def max_backlog_size=(val) @max_backlog_size = val end |
#max_global_backlog_size=(val) ⇒ Object
amount of global backlog we can spin through
50 51 52 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 50 def max_global_backlog_size=(val) @max_global_backlog_size = val end |
#max_publish_retries ⇒ Object
29 30 31 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 29 def max_publish_retries @max_publish_retries ||= 10 end |
#max_publish_retries=(val) ⇒ Object
25 26 27 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 25 def max_publish_retries=(val) @max_publish_retries = val end |
#max_publish_wait ⇒ Object
37 38 39 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 37 def max_publish_wait @max_publish_wait ||= 500 end |
#max_publish_wait=(ms) ⇒ Object
33 34 35 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 33 def max_publish_wait=(ms) @max_publish_wait = ms end |
#new_redis_connection ⇒ Object
59 60 61 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 59 def new_redis_connection ::Redis.new(@redis_config) end |
#process_global_backlog(highest_id, raise_error, &blk) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 199 def process_global_backlog(highest_id, raise_error, &blk) global_backlog(highest_id).each do |old| if highest_id + 1 == old.global_id yield old highest_id = old.global_id else raise BackLogOutOfOrder.new(highest_id) if raise_error if old.global_id > highest_id yield old highest_id = old.global_id end end end highest_id end |
#pub_redis ⇒ Object
redis connection used for publishing messages
73 74 75 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 73 def pub_redis @pub_redis ||= new_redis_connection end |
#publish(channel, data) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 100 def publish(channel, data) redis = pub_redis backlog_id_key = backlog_id_key(channel) backlog_key = backlog_key(channel) global_id = nil backlog_id = nil redis.multi do |m| global_id = m.incr(global_id_key) backlog_id = m.incr(backlog_id_key) end global_id = global_id.value backlog_id = backlog_id.value msg = MessageBus::Message.new global_id, backlog_id, channel, data payload = msg.encode redis.zadd backlog_key, backlog_id, payload redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel redis.publish redis_channel_name, payload if backlog_id > @max_backlog_size redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size end if global_id > @max_global_backlog_size redis.zremrangebyscore global_backlog_key, 1, backlog_id - @max_backlog_size end backlog_id end |
#redis_channel_name ⇒ Object
67 68 69 70 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 67 def redis_channel_name db = @redis_config[:db] || 0 "discourse_#{db}" end |
#reset! ⇒ Object
use with extreme care, will nuke all of the data
94 95 96 97 98 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 94 def reset! pub_redis.keys("__mb_*").each do |k| pub_redis.del k end end |
#subscribe(channel, last_id = nil) ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/message_bus/reliable_pub_sub.rb', line 180 def subscribe(channel, last_id = nil) # trivial implementation for now, # can cut down on connections if we only have one global subscriber raise ArgumentError unless block_given? if last_id # we need to translate this to a global id, at least give it a shot # we are subscribing on global and global is always going to be bigger than local # so worst case is a replay of a few messages = (channel, last_id) if last_id = .global_id end end global_subscribe(last_id) do |m| yield m if m.channel == channel end end |