Class: MessageBus::Client
- Inherits:
-
Object
- Object
- MessageBus::Client
- Defined in:
- lib/message_bus/client.rb
Overview
Represents a connected subscriber and delivers published messages over its connected socket.
Instance Attribute Summary collapse
- #async_response ⇒ Thin::AsyncResponse?
-
#cleanup_timer ⇒ MessageBus::TimerThread::Cancelable
A timer job that is used to auto-disconnect the client at the configured long-polling interval.
-
#client_id ⇒ String
The unique ID provided by the client.
-
#connect_time ⇒ Time
The time at which the client connected.
-
#group_ids ⇒ Array<String,Integer>
The group IDs the authenticated client is a member of.
-
#headers ⇒ Hash<String => String>
Custom headers to include in HTTP responses.
-
#io ⇒ IO
The HTTP socket the client is connected on.
-
#seq ⇒ Integer
The connection sequence number the client provided when connecting.
-
#site_id ⇒ String
The site ID the client was authenticated for; used for hosting multiple.
-
#use_chunked ⇒ Boolean
Whether or not the client should use chunked encoding.
-
#user_id ⇒ String, Integer
The user ID the client was authenticated for.
Instance Method Summary collapse
-
#<<(msg) ⇒ void
Delivers a message to the client, even if it’s empty.
-
#allowed?(msg) ⇒ Boolean
Whether or not the client has permission to receive the passed message.
-
#backlog ⇒ Array<MessageBus::Message>
The set of messages the client is due to receive, based on its subscriptions and permissions.
-
#close ⇒ Object
Closes the client connection.
-
#closed? ⇒ Boolean
Whether the connection is closed or not.
-
#deliver_backlog(backlog) ⇒ void
Delivers a backlog of messages to the client, if there is anything in it.
-
#ensure_first_chunk_sent ⇒ Object
If no data has yet been sent to the client, sends an empty chunk; prevents clients from entering a timeout state if nothing is delivered initially.
-
#initialize(opts) ⇒ Client
constructor
A new instance of Client.
-
#subscribe(channel, last_seen_id) ⇒ void
Subscribes the client to messages on a channel, optionally from a defined starting point.
-
#subscriptions ⇒ Hash<String => Integer>
The active subscriptions, mapping channel names to last seen message IDs.
- #synchronize { ... } ⇒ void
Constructor Details
#initialize(opts) ⇒ Client
Returns a new instance of Client.
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/message_bus/client.rb', line 38 def initialize(opts) self.client_id = opts[:client_id] self.user_id = opts[:user_id] self.group_ids = opts[:group_ids] || [] self.site_id = opts[:site_id] self.seq = opts[:seq].to_i self.connect_time = Time.now @lock = Mutex.new @bus = opts[:message_bus] || MessageBus @subscriptions = {} @chunks_sent = 0 end |
Instance Attribute Details
#async_response ⇒ Thin::AsyncResponse?
20 21 22 |
# File 'lib/message_bus/client.rb', line 20 def async_response @async_response end |
#cleanup_timer ⇒ MessageBus::TimerThread::Cancelable
Returns a timer job that is used to auto-disconnect the client at the configured long-polling interval.
18 19 20 |
# File 'lib/message_bus/client.rb', line 18 def cleanup_timer @cleanup_timer end |
#client_id ⇒ String
Returns the unique ID provided by the client.
7 8 9 |
# File 'lib/message_bus/client.rb', line 7 def client_id @client_id end |
#connect_time ⇒ Time
Returns the time at which the client connected.
13 14 15 |
# File 'lib/message_bus/client.rb', line 13 def connect_time @connect_time end |
#group_ids ⇒ Array<String,Integer>
Returns the group IDs the authenticated client is a member of.
11 12 13 |
# File 'lib/message_bus/client.rb', line 11 def group_ids @group_ids end |
#headers ⇒ Hash<String => String>
Returns custom headers to include in HTTP responses.
24 25 26 |
# File 'lib/message_bus/client.rb', line 24 def headers @headers end |
#io ⇒ IO
Returns the HTTP socket the client is connected on.
22 23 24 |
# File 'lib/message_bus/client.rb', line 22 def io @io end |
#seq ⇒ Integer
Returns the connection sequence number the client provided when connecting.
26 27 28 |
# File 'lib/message_bus/client.rb', line 26 def seq @seq end |
#site_id ⇒ String
Returns the site ID the client was authenticated for; used for hosting multiple.
15 16 17 |
# File 'lib/message_bus/client.rb', line 15 def site_id @site_id end |
#use_chunked ⇒ Boolean
Returns whether or not the client should use chunked encoding.
28 29 30 |
# File 'lib/message_bus/client.rb', line 28 def use_chunked @use_chunked end |
#user_id ⇒ String, Integer
Returns the user ID the client was authenticated for.
9 10 11 |
# File 'lib/message_bus/client.rb', line 9 def user_id @user_id end |
Instance Method Details
#<<(msg) ⇒ void
This method returns an undefined value.
Delivers a message to the client, even if it’s empty
118 119 120 121 122 123 124 125 |
# File 'lib/message_bus/client.rb', line 118 def <<(msg) json = ([msg]) if use_chunked write_chunk json else write_and_close json end end |
#allowed?(msg) ⇒ Boolean
Returns whether or not the client has permission to receive the passed message.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/message_bus/client.rb', line 130 def allowed?(msg) client_allowed = !msg.client_ids || msg.client_ids.length == 0 || msg.client_ids.include?(self.client_id) user_allowed = false group_allowed = false has_users = msg.user_ids && msg.user_ids.length > 0 has_groups = msg.group_ids && msg.group_ids.length > 0 if has_users user_allowed = msg.user_ids.include?(self.user_id) end if has_groups group_allowed = ( msg.group_ids - (self.group_ids || []) ).length < msg.group_ids.length end = client_allowed && (user_allowed || group_allowed || (!has_users && !has_groups)) return if ! filters_allowed = true len = @bus..length while len > 0 len -= 1 channel_prefix, blk = @bus.[len] if msg.channel.start_with?(channel_prefix) filters_allowed = blk.call(msg) break if !filters_allowed end end filters_allowed end |
#backlog ⇒ Array<MessageBus::Message>
Returns the set of messages the client is due to receive, based on its subscriptions and permissions. Includes status message if any channels have no messages available and the client requested a message newer than the newest on the channel, or when there are messages available that the client doesn’t have permission for.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/message_bus/client.rb', line 174 def backlog r = [] = nil @subscriptions.each do |k, v| id = v.to_i if id < -1 last_id = @bus.last_id(k, site_id) id = last_id + id + 1 id = 0 if id < 0 end next if id < 0 = @bus.backlog(k, id, site_id) if .length == 0 if id > @bus.last_id(k, site_id) @subscriptions[k] = -1 end else .each do |msg| if allowed?(msg) r << msg else ||= {} [k] = msg. end end end end # stats message for all newly subscribed = nil @subscriptions.each do |k, v| if v.to_i == -1 || ( && [k]) ||= {} @subscriptions[k] = [k] = @bus.last_id(k, site_id) end end r << MessageBus::Message.new(-1, -1, '/__status', ) if r || [] end |
#close ⇒ Object
Closes the client connection
58 59 60 61 62 63 64 65 |
# File 'lib/message_bus/client.rb', line 58 def close if cleanup_timer # concurrency may nil cleanup timer cleanup_timer.cancel rescue nil self.cleanup_timer = nil end ensure_closed! end |
#closed? ⇒ Boolean
Returns whether the connection is closed or not.
92 93 94 |
# File 'lib/message_bus/client.rb', line 92 def closed? !@async_response && !@io end |
#deliver_backlog(backlog) ⇒ void
This method returns an undefined value.
Delivers a backlog of messages to the client, if there is anything in it. If chunked encoding/streaming is in use, will keep the connection open; if not, will close it.
73 74 75 76 77 78 79 80 81 |
# File 'lib/message_bus/client.rb', line 73 def deliver_backlog(backlog) if backlog.length > 0 if use_chunked write_chunk((backlog)) else write_and_close (backlog) end end end |
#ensure_first_chunk_sent ⇒ Object
If no data has yet been sent to the client, sends an empty chunk; prevents clients from entering a timeout state if nothing is delivered initially.
85 86 87 88 89 |
# File 'lib/message_bus/client.rb', line 85 def ensure_first_chunk_sent if use_chunked && @chunks_sent == 0 write_chunk("[]") end end |
#subscribe(channel, last_seen_id) ⇒ void
This method returns an undefined value.
Subscribes the client to messages on a channel, optionally from a defined starting point.
103 104 105 106 107 |
# File 'lib/message_bus/client.rb', line 103 def subscribe(channel, last_seen_id) last_seen_id = nil if last_seen_id == "" last_seen_id ||= @bus.last_id(channel) @subscriptions[channel] = last_seen_id.to_i end |
#subscriptions ⇒ Hash<String => Integer>
Returns the active subscriptions, mapping channel names to last seen message IDs.
111 112 113 |
# File 'lib/message_bus/client.rb', line 111 def subscriptions @subscriptions end |
#synchronize { ... } ⇒ void
This method returns an undefined value.
53 54 55 |
# File 'lib/message_bus/client.rb', line 53 def synchronize @lock.synchronize { yield } end |