Class: MessageBus::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/message_bus/client.rb', line 6

def initialize(opts)
  self.client_id = opts[:client_id]
  self.user_id = opts[:user_id]
  self.group_ids = opts[:group_ids] || []
  self.site_id = opts[:site_id]
  self.seq = opts[:seq].to_i
  self.connect_time = Time.now
  @lock = Mutex.new
  @bus = opts[:message_bus] || MessageBus
  @subscriptions = {}
  @chunks_sent = 0
end

Instance Attribute Details

#async_responseObject

Returns the value of attribute async_response.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def async_response
  @async_response
end

#cleanup_timerObject

Returns the value of attribute cleanup_timer.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def cleanup_timer
  @cleanup_timer
end

#client_idObject

Returns the value of attribute client_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def client_id
  @client_id
end

#connect_timeObject

Returns the value of attribute connect_time.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def connect_time
  @connect_time
end

#group_idsObject

Returns the value of attribute group_ids.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def group_ids
  @group_ids
end

#headersObject

Returns the value of attribute headers.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def headers
  @headers
end

#ioObject

Returns the value of attribute io.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def io
  @io
end

#seqObject

Returns the value of attribute seq.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def seq
  @seq
end

#site_idObject

Returns the value of attribute site_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def site_id
  @site_id
end

#subscribed_setsObject

Returns the value of attribute subscribed_sets.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def subscribed_sets
  @subscribed_sets
end

#use_chunkedObject

Returns the value of attribute use_chunked.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def use_chunked
  @use_chunked
end

#user_idObject

Returns the value of attribute user_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def user_id
  @user_id
end

Instance Method Details

#<<(msg) ⇒ Object



93
94
95
96
97
98
99
100
# File 'lib/message_bus/client.rb', line 93

def <<(msg)
  json = messages_to_json([msg])
  if use_chunked
    write_chunk json
  else
    write_and_close json
  end
end

#allowed?(msg) ⇒ Boolean

Returns:

  • (Boolean)


106
107
108
109
110
111
112
113
114
115
116
# File 'lib/message_bus/client.rb', line 106

def allowed?(msg)
  allowed = !msg.user_ids || msg.user_ids.include?(self.user_id)
  allowed &&= !msg.client_ids || msg.client_ids.include?(self.client_id)
  allowed && (
    msg.group_ids.nil? ||
    msg.group_ids.length == 0 ||
    (
      msg.group_ids - self.group_ids
    ).length < msg.group_ids.length
  )
end

#backlogObject



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/message_bus/client.rb', line 118

def backlog
  r = []
  @subscriptions.each do |k,v|
    next if v.to_i < 0
    messages = @bus.backlog(k, v, site_id)
    messages.each do |msg|
      r << msg if allowed?(msg)
    end
  end
  # stats message for all newly subscribed
  status_message = nil
  @subscriptions.each do |k,v|
    if v.to_i == -1
      status_message ||= {}
      @subscriptions[k] = status_message[k] = @bus.last_id(k, site_id)
    end
  end
  r << MessageBus::Message.new(-1, -1, '/__status', status_message) if status_message

  r || []
end

#cancelObject



23
24
25
26
27
28
29
30
# File 'lib/message_bus/client.rb', line 23

def cancel
  if cleanup_timer
    # concurrency may nil cleanup timer
    cleanup_timer.cancel rescue nil
    self.cleanup_timer = nil
  end
  ensure_closed!
end

#closeObject



75
76
77
# File 'lib/message_bus/client.rb', line 75

def close
  ensure_closed!
end

#closed?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/message_bus/client.rb', line 79

def closed?
  !@async_response && !@io
end

#deliver_backlog(backlog) ⇒ Object



36
37
38
39
40
41
42
43
44
# File 'lib/message_bus/client.rb', line 36

def deliver_backlog(backlog)
  if backlog.length > 0
    if use_chunked
      write_chunk(messages_to_json(backlog))
    else
      write_and_close messages_to_json(backlog)
    end
  end
end

#ensure_closed!Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/message_bus/client.rb', line 52

def ensure_closed!
  return unless in_async?
  if use_chunked
    write_chunk("[]".freeze)
    if @io
      @io.write("0\r\n\r\n".freeze)
      @io.close
      @io = nil
    end
    if @async_response
      @async_response << ("0\r\n\r\n".freeze)
      @async_response.done
      @async_response = nil
    end
  else
    write_and_close "[]"
  end
rescue
  # we may have a dead socket, just nil the @io
  @io = nil
  @async_response = nil
end

#ensure_first_chunk_sentObject



46
47
48
49
50
# File 'lib/message_bus/client.rb', line 46

def ensure_first_chunk_sent
  if use_chunked && @chunks_sent == 0
    write_chunk("[]".freeze)
  end
end

#in_async?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/message_bus/client.rb', line 32

def in_async?
  @async_response || @io
end

#subscribe(channel, last_seen_id) ⇒ Object



83
84
85
86
87
# File 'lib/message_bus/client.rb', line 83

def subscribe(channel, last_seen_id)
  last_seen_id = nil if last_seen_id == ""
  last_seen_id ||= @bus.last_id(channel)
  @subscriptions[channel] = last_seen_id.to_i
end

#subscriptionsObject



89
90
91
# File 'lib/message_bus/client.rb', line 89

def subscriptions
  @subscriptions
end

#synchronizeObject



19
20
21
# File 'lib/message_bus/client.rb', line 19

def synchronize
  @lock.synchronize { yield }
end