Class: MessageBus::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/message_bus/client.rb', line 7

def initialize(opts)
  self.client_id = opts[:client_id]
  self.user_id = opts[:user_id]
  self.group_ids = opts[:group_ids] || []
  self.site_id = opts[:site_id]
  self.seq = opts[:seq].to_i
  self.connect_time = Time.now
  @lock = Mutex.new
  @bus = opts[:message_bus] || MessageBus
  @subscriptions = {}
  @chunks_sent = 0
end

Instance Attribute Details

#async_responseObject

Returns the value of attribute async_response.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def async_response
  @async_response
end

#cleanup_timerObject

Returns the value of attribute cleanup_timer.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def cleanup_timer
  @cleanup_timer
end

#client_idObject

Returns the value of attribute client_id.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def client_id
  @client_id
end

#connect_timeObject

Returns the value of attribute connect_time.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def connect_time
  @connect_time
end

#group_idsObject

Returns the value of attribute group_ids.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def group_ids
  @group_ids
end

#headersObject

Returns the value of attribute headers.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def headers
  @headers
end

#ioObject

Returns the value of attribute io.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def io
  @io
end

#seqObject

Returns the value of attribute seq.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def seq
  @seq
end

#site_idObject

Returns the value of attribute site_id.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def site_id
  @site_id
end

#subscribed_setsObject

Returns the value of attribute subscribed_sets.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def subscribed_sets
  @subscribed_sets
end

#use_chunkedObject

Returns the value of attribute use_chunked.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def use_chunked
  @use_chunked
end

#user_idObject

Returns the value of attribute user_id.



3
4
5
# File 'lib/message_bus/client.rb', line 3

def user_id
  @user_id
end

Instance Method Details

#<<(msg) ⇒ Object



94
95
96
97
98
99
100
101
# File 'lib/message_bus/client.rb', line 94

def <<(msg)
  json = messages_to_json([msg])
  if use_chunked
    write_chunk json
  else
    write_and_close json
  end
end

#allowed?(msg) ⇒ Boolean

Returns:

  • (Boolean)


107
108
109
110
111
112
113
114
115
116
117
# File 'lib/message_bus/client.rb', line 107

def allowed?(msg)
  allowed = !msg.user_ids || msg.user_ids.include?(self.user_id)
  allowed &&= !msg.client_ids || msg.client_ids.include?(self.client_id)
  allowed && (
    msg.group_ids.nil? ||
    msg.group_ids.length == 0 ||
    (
      msg.group_ids - self.group_ids
    ).length < msg.group_ids.length
  )
end

#backlogObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/message_bus/client.rb', line 119

def backlog
  r = []
  new_message_ids = nil

  @subscriptions.each do |k, v|
    next if v.to_i < 0
    messages = @bus.backlog(k, v, site_id)

    messages.each do |msg|
      if allowed?(msg)
        r << msg
      else
        new_message_ids ||= {}
        new_message_ids[k] = msg.message_id
      end
    end
  end

  # stats message for all newly subscribed
  status_message = nil
  @subscriptions.each do |k, v|
    if v.to_i == -1 || (new_message_ids && new_message_ids[k])
      status_message ||= {}
      @subscriptions[k] = status_message[k] = @bus.last_id(k, site_id)
    end
  end

  r << MessageBus::Message.new(-1, -1, '/__status', status_message) if status_message

  r || []
end

#cancelObject



24
25
26
27
28
29
30
31
# File 'lib/message_bus/client.rb', line 24

def cancel
  if cleanup_timer
    # concurrency may nil cleanup timer
    cleanup_timer.cancel rescue nil
    self.cleanup_timer = nil
  end
  ensure_closed!
end

#closeObject



76
77
78
# File 'lib/message_bus/client.rb', line 76

def close
  ensure_closed!
end

#closed?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/message_bus/client.rb', line 80

def closed?
  !@async_response && !@io
end

#deliver_backlog(backlog) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/message_bus/client.rb', line 37

def deliver_backlog(backlog)
  if backlog.length > 0
    if use_chunked
      write_chunk(messages_to_json(backlog))
    else
      write_and_close messages_to_json(backlog)
    end
  end
end

#ensure_closed!Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/message_bus/client.rb', line 53

def ensure_closed!
  return unless in_async?
  if use_chunked
    write_chunk("[]")
    if @io
      @io.write("0\r\n\r\n")
      @io.close
      @io = nil
    end
    if @async_response
      @async_response << ("0\r\n\r\n")
      @async_response.done
      @async_response = nil
    end
  else
    write_and_close "[]"
  end
rescue
  # we may have a dead socket, just nil the @io
  @io = nil
  @async_response = nil
end

#ensure_first_chunk_sentObject



47
48
49
50
51
# File 'lib/message_bus/client.rb', line 47

def ensure_first_chunk_sent
  if use_chunked && @chunks_sent == 0
    write_chunk("[]")
  end
end

#in_async?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/message_bus/client.rb', line 33

def in_async?
  @async_response || @io
end

#subscribe(channel, last_seen_id) ⇒ Object



84
85
86
87
88
# File 'lib/message_bus/client.rb', line 84

def subscribe(channel, last_seen_id)
  last_seen_id = nil if last_seen_id == ""
  last_seen_id ||= @bus.last_id(channel)
  @subscriptions[channel] = last_seen_id.to_i
end

#subscriptionsObject



90
91
92
# File 'lib/message_bus/client.rb', line 90

def subscriptions
  @subscriptions
end

#synchronizeObject



20
21
22
# File 'lib/message_bus/client.rb', line 20

def synchronize
  @lock.synchronize { yield }
end