Class: MessageBus::Client
- Inherits:
-
Object
- Object
- MessageBus::Client
- Defined in:
- lib/message_bus/client.rb
Instance Attribute Summary collapse
-
#async_response ⇒ Object
Returns the value of attribute async_response.
-
#cleanup_timer ⇒ Object
Returns the value of attribute cleanup_timer.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connect_time ⇒ Object
Returns the value of attribute connect_time.
-
#group_ids ⇒ Object
Returns the value of attribute group_ids.
-
#headers ⇒ Object
Returns the value of attribute headers.
-
#io ⇒ Object
Returns the value of attribute io.
-
#seq ⇒ Object
Returns the value of attribute seq.
-
#site_id ⇒ Object
Returns the value of attribute site_id.
-
#subscribed_sets ⇒ Object
Returns the value of attribute subscribed_sets.
-
#use_chunked ⇒ Object
Returns the value of attribute use_chunked.
-
#user_id ⇒ Object
Returns the value of attribute user_id.
Instance Method Summary collapse
- #<<(msg) ⇒ Object
- #allowed?(msg) ⇒ Boolean
- #backlog ⇒ Object
- #cancel ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #deliver_backlog(backlog) ⇒ Object
- #ensure_closed! ⇒ Object
- #ensure_first_chunk_sent ⇒ Object
- #in_async? ⇒ Boolean
-
#initialize(opts) ⇒ Client
constructor
A new instance of Client.
- #subscribe(channel, last_seen_id) ⇒ Object
- #subscriptions ⇒ Object
- #synchronize ⇒ Object
Constructor Details
#initialize(opts) ⇒ Client
Returns a new instance of Client.
6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/message_bus/client.rb', line 6 def initialize(opts) self.client_id = opts[:client_id] self.user_id = opts[:user_id] self.group_ids = opts[:group_ids] || [] self.site_id = opts[:site_id] self.seq = opts[:seq].to_i self.connect_time = Time.now @lock = Mutex.new @bus = opts[:message_bus] || MessageBus @subscriptions = {} @chunks_sent = 0 end |
Instance Attribute Details
#async_response ⇒ Object
Returns the value of attribute async_response.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def async_response @async_response end |
#cleanup_timer ⇒ Object
Returns the value of attribute cleanup_timer.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def cleanup_timer @cleanup_timer end |
#client_id ⇒ Object
Returns the value of attribute client_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def client_id @client_id end |
#connect_time ⇒ Object
Returns the value of attribute connect_time.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def connect_time @connect_time end |
#group_ids ⇒ Object
Returns the value of attribute group_ids.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def group_ids @group_ids end |
#headers ⇒ Object
Returns the value of attribute headers.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def headers @headers end |
#io ⇒ Object
Returns the value of attribute io.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def io @io end |
#seq ⇒ Object
Returns the value of attribute seq.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def seq @seq end |
#site_id ⇒ Object
Returns the value of attribute site_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def site_id @site_id end |
#subscribed_sets ⇒ Object
Returns the value of attribute subscribed_sets.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def subscribed_sets @subscribed_sets end |
#use_chunked ⇒ Object
Returns the value of attribute use_chunked.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def use_chunked @use_chunked end |
#user_id ⇒ Object
Returns the value of attribute user_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def user_id @user_id end |
Instance Method Details
#<<(msg) ⇒ Object
93 94 95 96 97 98 99 100 |
# File 'lib/message_bus/client.rb', line 93 def <<(msg) json = ([msg]) if use_chunked write_chunk json else write_and_close json end end |
#allowed?(msg) ⇒ Boolean
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/message_bus/client.rb', line 106 def allowed?(msg) allowed = !msg.user_ids || msg.user_ids.include?(self.user_id) allowed &&= !msg.client_ids || msg.client_ids.include?(self.client_id) allowed && ( msg.group_ids.nil? || msg.group_ids.length == 0 || ( msg.group_ids - self.group_ids ).length < msg.group_ids.length ) end |
#backlog ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/message_bus/client.rb', line 118 def backlog r = [] @subscriptions.each do |k,v| next if v.to_i < 0 = @bus.backlog(k, v, site_id) .each do |msg| r << msg if allowed?(msg) end end # stats message for all newly subscribed = nil @subscriptions.each do |k,v| if v.to_i == -1 ||= {} @subscriptions[k] = [k] = @bus.last_id(k, site_id) end end r << MessageBus::Message.new(-1, -1, '/__status', ) if r || [] end |
#cancel ⇒ Object
23 24 25 26 27 28 29 30 |
# File 'lib/message_bus/client.rb', line 23 def cancel if cleanup_timer # concurrency may nil cleanup timer cleanup_timer.cancel rescue nil self.cleanup_timer = nil end ensure_closed! end |
#close ⇒ Object
75 76 77 |
# File 'lib/message_bus/client.rb', line 75 def close ensure_closed! end |
#closed? ⇒ Boolean
79 80 81 |
# File 'lib/message_bus/client.rb', line 79 def closed? !@async_response && !@io end |
#deliver_backlog(backlog) ⇒ Object
36 37 38 39 40 41 42 43 44 |
# File 'lib/message_bus/client.rb', line 36 def deliver_backlog(backlog) if backlog.length > 0 if use_chunked write_chunk((backlog)) else write_and_close (backlog) end end end |
#ensure_closed! ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/message_bus/client.rb', line 52 def ensure_closed! return unless in_async? if use_chunked write_chunk("[]".freeze) if @io @io.write("0\r\n\r\n".freeze) @io.close @io = nil end if @async_response @async_response << ("0\r\n\r\n".freeze) @async_response.done @async_response = nil end else write_and_close "[]" end rescue # we may have a dead socket, just nil the @io @io = nil @async_response = nil end |
#ensure_first_chunk_sent ⇒ Object
46 47 48 49 50 |
# File 'lib/message_bus/client.rb', line 46 def ensure_first_chunk_sent if use_chunked && @chunks_sent == 0 write_chunk("[]".freeze) end end |
#in_async? ⇒ Boolean
32 33 34 |
# File 'lib/message_bus/client.rb', line 32 def in_async? @async_response || @io end |
#subscribe(channel, last_seen_id) ⇒ Object
83 84 85 86 87 |
# File 'lib/message_bus/client.rb', line 83 def subscribe(channel, last_seen_id) last_seen_id = nil if last_seen_id == "" last_seen_id ||= @bus.last_id(channel) @subscriptions[channel] = last_seen_id.to_i end |
#subscriptions ⇒ Object
89 90 91 |
# File 'lib/message_bus/client.rb', line 89 def subscriptions @subscriptions end |
#synchronize ⇒ Object
19 20 21 |
# File 'lib/message_bus/client.rb', line 19 def synchronize @lock.synchronize { yield } end |