Class: MessageBus::Client
- Inherits:
-
Object
- Object
- MessageBus::Client
- Defined in:
- lib/message_bus/client.rb
Overview
Represents a connected subscriber and delivers published messages over its connected socket.
Instance Attribute Summary collapse
- #async_response ⇒ Thin::AsyncResponse?
-
#cleanup_timer ⇒ MessageBus::TimerThread::Cancelable
A timer job that is used to auto-disconnect the client at the configured long-polling interval.
-
#client_id ⇒ String
The unique ID provided by the client.
-
#connect_time ⇒ Time
The time at which the client connected.
-
#group_ids ⇒ Array<String,Integer>
The group IDs the authenticated client is a member of.
-
#headers ⇒ Hash<String => String>
Custom headers to include in HTTP responses.
-
#io ⇒ IO
The HTTP socket the client is connected on.
-
#seq ⇒ Integer
The connection sequence number the client provided when connecting.
-
#site_id ⇒ String
The site ID the client was authenticated for; used for hosting multiple.
-
#use_chunked ⇒ Boolean
Whether or not the client should use chunked encoding.
-
#user_id ⇒ String, Integer
The user ID the client was authenticated for.
Instance Method Summary collapse
-
#<<(msg) ⇒ void
Delivers a message to the client, even if it’s empty.
-
#allowed?(msg) ⇒ Boolean
Whether or not the client has permission to receive the passed message.
-
#backlog ⇒ Array<MessageBus::Message>
The set of messages the client is due to receive, based on its subscriptions and permissions.
-
#close ⇒ Object
Closes the client connection.
-
#closed? ⇒ Boolean
Whether the connection is closed or not.
-
#deliver_backlog(backlog) ⇒ void
Delivers a backlog of messages to the client, if there is anything in it.
-
#ensure_first_chunk_sent ⇒ Object
If no data has yet been sent to the client, sends an empty chunk; prevents clients from entering a timeout state if nothing is delivered initially.
-
#initialize(opts) ⇒ Client
constructor
A new instance of Client.
-
#subscribe(channel, last_seen_id) ⇒ void
Subscribes the client to messages on a channel, optionally from a defined starting point.
-
#subscriptions ⇒ Hash<String => Integer>
The active subscriptions, mapping channel names to last seen message IDs.
- #synchronize { ... } ⇒ void
Constructor Details
#initialize(opts) ⇒ Client
Returns a new instance of Client.
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/message_bus/client.rb', line 38 def initialize(opts) self.client_id = opts[:client_id] self.user_id = opts[:user_id] self.group_ids = opts[:group_ids] || [] self.site_id = opts[:site_id] self.seq = opts[:seq].to_i self.connect_time = Time.now @lock = Mutex.new @bus = opts[:message_bus] || MessageBus @subscriptions = {} @chunks_sent = 0 end |
Instance Attribute Details
#async_response ⇒ Thin::AsyncResponse?
20 21 22 |
# File 'lib/message_bus/client.rb', line 20 def async_response @async_response end |
#cleanup_timer ⇒ MessageBus::TimerThread::Cancelable
18 19 20 |
# File 'lib/message_bus/client.rb', line 18 def cleanup_timer @cleanup_timer end |
#client_id ⇒ String
7 8 9 |
# File 'lib/message_bus/client.rb', line 7 def client_id @client_id end |
#connect_time ⇒ Time
13 14 15 |
# File 'lib/message_bus/client.rb', line 13 def connect_time @connect_time end |
#group_ids ⇒ Array<String,Integer>
11 12 13 |
# File 'lib/message_bus/client.rb', line 11 def group_ids @group_ids end |
#headers ⇒ Hash<String => String>
24 25 26 |
# File 'lib/message_bus/client.rb', line 24 def headers @headers end |
#io ⇒ IO
22 23 24 |
# File 'lib/message_bus/client.rb', line 22 def io @io end |
#seq ⇒ Integer
26 27 28 |
# File 'lib/message_bus/client.rb', line 26 def seq @seq end |
#site_id ⇒ String
15 16 17 |
# File 'lib/message_bus/client.rb', line 15 def site_id @site_id end |
#use_chunked ⇒ Boolean
28 29 30 |
# File 'lib/message_bus/client.rb', line 28 def use_chunked @use_chunked end |
#user_id ⇒ String, Integer
9 10 11 |
# File 'lib/message_bus/client.rb', line 9 def user_id @user_id end |
Instance Method Details
#<<(msg) ⇒ void
This method returns an undefined value.
Delivers a message to the client, even if it’s empty
118 119 120 121 122 123 124 125 |
# File 'lib/message_bus/client.rb', line 118 def <<(msg) json = ([msg]) if use_chunked write_chunk json else write_and_close json end end |
#allowed?(msg) ⇒ Boolean
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/message_bus/client.rb', line 130 def allowed?(msg) client_allowed = !msg.client_ids || msg.client_ids.length == 0 || msg.client_ids.include?(self.client_id) user_allowed = false group_allowed = false has_users = msg.user_ids && msg.user_ids.length > 0 has_groups = msg.group_ids && msg.group_ids.length > 0 if has_users user_allowed = msg.user_ids.include?(self.user_id) end if has_groups group_allowed = ( msg.group_ids - (self.group_ids || []) ).length < msg.group_ids.length end = client_allowed && (user_allowed || group_allowed || (!has_users && !has_groups)) return if ! filters_allowed = true len = @bus..length while len > 0 len -= 1 channel_prefix, blk = @bus.[len] if msg.channel.start_with?(channel_prefix) filters_allowed = blk.call(msg) break if !filters_allowed end end filters_allowed end |
#backlog ⇒ Array<MessageBus::Message>
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/message_bus/client.rb', line 174 def backlog r = [] = nil @subscriptions.each do |k, v| id = v.to_i if id < -1 last_id = @bus.last_id(k, site_id) id = last_id + id + 1 id = 0 if id < 0 end next if id < 0 = @bus.backlog(k, id, site_id) if .length == 0 if id > @bus.last_id(k, site_id) @subscriptions[k] = -1 end else .each do |msg| if allowed?(msg) r << msg else ||= {} [k] = msg. end end end end # stats message for all newly subscribed = nil @subscriptions.each do |k, v| if v.to_i == -1 || ( && [k]) ||= {} @subscriptions[k] = [k] = @bus.last_id(k, site_id) end end r << MessageBus::Message.new(-1, -1, '/__status', ) if r || [] end |
#close ⇒ Object
Closes the client connection
58 59 60 61 62 63 64 65 |
# File 'lib/message_bus/client.rb', line 58 def close if cleanup_timer # concurrency may nil cleanup timer cleanup_timer.cancel rescue nil self.cleanup_timer = nil end ensure_closed! end |
#closed? ⇒ Boolean
92 93 94 |
# File 'lib/message_bus/client.rb', line 92 def closed? !@async_response && !@io end |
#deliver_backlog(backlog) ⇒ void
This method returns an undefined value.
Delivers a backlog of messages to the client, if there is anything in it. If chunked encoding/streaming is in use, will keep the connection open; if not, will close it.
73 74 75 76 77 78 79 80 81 |
# File 'lib/message_bus/client.rb', line 73 def deliver_backlog(backlog) if backlog.length > 0 if use_chunked write_chunk((backlog)) else write_and_close (backlog) end end end |
#ensure_first_chunk_sent ⇒ Object
If no data has yet been sent to the client, sends an empty chunk; prevents clients from entering a timeout state if nothing is delivered initially.
85 86 87 88 89 |
# File 'lib/message_bus/client.rb', line 85 def ensure_first_chunk_sent if use_chunked && @chunks_sent == 0 write_chunk("[]") end end |
#subscribe(channel, last_seen_id) ⇒ void
This method returns an undefined value.
Subscribes the client to messages on a channel, optionally from a defined starting point.
103 104 105 106 107 |
# File 'lib/message_bus/client.rb', line 103 def subscribe(channel, last_seen_id) last_seen_id = nil if last_seen_id == "" last_seen_id ||= @bus.last_id(channel) @subscriptions[channel] = last_seen_id.to_i end |
#subscriptions ⇒ Hash<String => Integer>
111 112 113 |
# File 'lib/message_bus/client.rb', line 111 def subscriptions @subscriptions end |
#synchronize { ... } ⇒ void
This method returns an undefined value.
53 54 55 |
# File 'lib/message_bus/client.rb', line 53 def synchronize @lock.synchronize { yield } end |