Class: MessageBus::Client
- Inherits:
-
Object
- Object
- MessageBus::Client
- Defined in:
- lib/message_bus/client.rb
Instance Attribute Summary collapse
-
#async_response ⇒ Object
Returns the value of attribute async_response.
-
#cleanup_timer ⇒ Object
Returns the value of attribute cleanup_timer.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connect_time ⇒ Object
Returns the value of attribute connect_time.
-
#group_ids ⇒ Object
Returns the value of attribute group_ids.
-
#headers ⇒ Object
Returns the value of attribute headers.
-
#io ⇒ Object
Returns the value of attribute io.
-
#seq ⇒ Object
Returns the value of attribute seq.
-
#site_id ⇒ Object
Returns the value of attribute site_id.
-
#subscribed_sets ⇒ Object
Returns the value of attribute subscribed_sets.
-
#use_chunked ⇒ Object
Returns the value of attribute use_chunked.
-
#user_id ⇒ Object
Returns the value of attribute user_id.
Instance Method Summary collapse
- #<<(msg) ⇒ Object
- #allowed?(msg) ⇒ Boolean
- #backlog ⇒ Object
- #cancel ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #deliver_backlog(backlog) ⇒ Object
- #ensure_closed! ⇒ Object
- #ensure_first_chunk_sent ⇒ Object
- #in_async? ⇒ Boolean
-
#initialize(opts) ⇒ Client
constructor
A new instance of Client.
- #subscribe(channel, last_seen_id) ⇒ Object
- #subscriptions ⇒ Object
- #synchronize ⇒ Object
Constructor Details
#initialize(opts) ⇒ Client
Returns a new instance of Client.
7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/message_bus/client.rb', line 7 def initialize(opts) self.client_id = opts[:client_id] self.user_id = opts[:user_id] self.group_ids = opts[:group_ids] || [] self.site_id = opts[:site_id] self.seq = opts[:seq].to_i self.connect_time = Time.now @lock = Mutex.new @bus = opts[:message_bus] || MessageBus @subscriptions = {} @chunks_sent = 0 end |
Instance Attribute Details
#async_response ⇒ Object
Returns the value of attribute async_response.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def async_response @async_response end |
#cleanup_timer ⇒ Object
Returns the value of attribute cleanup_timer.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def cleanup_timer @cleanup_timer end |
#client_id ⇒ Object
Returns the value of attribute client_id.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def client_id @client_id end |
#connect_time ⇒ Object
Returns the value of attribute connect_time.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def connect_time @connect_time end |
#group_ids ⇒ Object
Returns the value of attribute group_ids.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def group_ids @group_ids end |
#headers ⇒ Object
Returns the value of attribute headers.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def headers @headers end |
#io ⇒ Object
Returns the value of attribute io.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def io @io end |
#seq ⇒ Object
Returns the value of attribute seq.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def seq @seq end |
#site_id ⇒ Object
Returns the value of attribute site_id.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def site_id @site_id end |
#subscribed_sets ⇒ Object
Returns the value of attribute subscribed_sets.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def subscribed_sets @subscribed_sets end |
#use_chunked ⇒ Object
Returns the value of attribute use_chunked.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def use_chunked @use_chunked end |
#user_id ⇒ Object
Returns the value of attribute user_id.
3 4 5 |
# File 'lib/message_bus/client.rb', line 3 def user_id @user_id end |
Instance Method Details
#<<(msg) ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/message_bus/client.rb', line 94 def <<(msg) json = ([msg]) if use_chunked write_chunk json else write_and_close json end end |
#allowed?(msg) ⇒ Boolean
107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/message_bus/client.rb', line 107 def allowed?(msg) allowed = !msg.user_ids || msg.user_ids.include?(self.user_id) allowed &&= !msg.client_ids || msg.client_ids.include?(self.client_id) allowed && ( msg.group_ids.nil? || msg.group_ids.length == 0 || ( msg.group_ids - self.group_ids ).length < msg.group_ids.length ) end |
#backlog ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/message_bus/client.rb', line 119 def backlog r = [] = nil @subscriptions.each do |k, v| next if v.to_i < 0 = @bus.backlog(k, v, site_id) .each do |msg| if allowed?(msg) r << msg else ||= {} [k] = msg. end end end # stats message for all newly subscribed = nil @subscriptions.each do |k, v| if v.to_i == -1 || ( && [k]) ||= {} @subscriptions[k] = [k] = @bus.last_id(k, site_id) end end r << MessageBus::Message.new(-1, -1, '/__status', ) if r || [] end |
#cancel ⇒ Object
24 25 26 27 28 29 30 31 |
# File 'lib/message_bus/client.rb', line 24 def cancel if cleanup_timer # concurrency may nil cleanup timer cleanup_timer.cancel rescue nil self.cleanup_timer = nil end ensure_closed! end |
#close ⇒ Object
76 77 78 |
# File 'lib/message_bus/client.rb', line 76 def close ensure_closed! end |
#closed? ⇒ Boolean
80 81 82 |
# File 'lib/message_bus/client.rb', line 80 def closed? !@async_response && !@io end |
#deliver_backlog(backlog) ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/message_bus/client.rb', line 37 def deliver_backlog(backlog) if backlog.length > 0 if use_chunked write_chunk((backlog)) else write_and_close (backlog) end end end |
#ensure_closed! ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/message_bus/client.rb', line 53 def ensure_closed! return unless in_async? if use_chunked write_chunk("[]") if @io @io.write("0\r\n\r\n") @io.close @io = nil end if @async_response @async_response << ("0\r\n\r\n") @async_response.done @async_response = nil end else write_and_close "[]" end rescue # we may have a dead socket, just nil the @io @io = nil @async_response = nil end |
#ensure_first_chunk_sent ⇒ Object
47 48 49 50 51 |
# File 'lib/message_bus/client.rb', line 47 def ensure_first_chunk_sent if use_chunked && @chunks_sent == 0 write_chunk("[]") end end |
#in_async? ⇒ Boolean
33 34 35 |
# File 'lib/message_bus/client.rb', line 33 def in_async? @async_response || @io end |
#subscribe(channel, last_seen_id) ⇒ Object
84 85 86 87 88 |
# File 'lib/message_bus/client.rb', line 84 def subscribe(channel, last_seen_id) last_seen_id = nil if last_seen_id == "" last_seen_id ||= @bus.last_id(channel) @subscriptions[channel] = last_seen_id.to_i end |
#subscriptions ⇒ Object
90 91 92 |
# File 'lib/message_bus/client.rb', line 90 def subscriptions @subscriptions end |
#synchronize ⇒ Object
20 21 22 |
# File 'lib/message_bus/client.rb', line 20 def synchronize @lock.synchronize { yield } end |