Class: StompServer::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_server/queue_manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(qstore) ⇒ QueueManager

Returns a new instance of QueueManager.



63
64
65
66
67
68
69
70
71
72
# File 'lib/stomp_server/queue_manager.rb', line 63

def initialize(qstore)
  @qstore = qstore
  @queues = Hash.new { Array.new }
  @pending = Hash.new
  if $STOMP_SERVER
    monitor = StompServer::QueueMonitor.new(@qstore,@queues)
    monitor.start
    puts "Queue monitor started" if $DEBUG
  end
end

Instance Method Details

#ack(connection, frame) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/stomp_server/queue_manager.rb', line 128

def ack(connection, frame)
  puts "Acking #{frame.headers['message-id']}" if $DEBUG
  unless @pending[connection]
    puts "No message pending for connection!"
    return
  end
  msgid = frame.headers['message-id']
  p_msgid = @pending[connection].headers['message-id']
  if p_msgid != msgid
    # We don't know what happened, we requeue
    # (probably a client connecting to a restarted server)
    frame = @pending[connection]
    @qstore.requeue(frame.headers['destination'],frame)
    puts "Invalid message-id (received #{msgid} != #{p_msgid})"
  end
  @pending.delete connection
  # We are free to work now, look if there's something for us
  send_a_backlog(connection)
end

#dequeue(dest) ⇒ Object

For protocol handlers that want direct access to the queue



198
199
200
# File 'lib/stomp_server/queue_manager.rb', line 198

def dequeue(dest)
  @qstore.dequeue(dest)
end

#disconnect(connection) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/stomp_server/queue_manager.rb', line 148

def disconnect(connection)
  puts "Disconnecting"
  frame = @pending[connection]
  if frame
    @qstore.requeue(frame.headers['destination'],frame)
    @pending.delete connection
  end

  @queues.each do |dest, queue|
    queue.delete_if { |qu| qu.connection == connection }
    @queues.delete(dest) if queue.empty?
  end
end

#enqueue(frame) ⇒ Object



202
203
204
205
206
# File 'lib/stomp_server/queue_manager.rb', line 202

def enqueue(frame)
  frame.command = "MESSAGE"
  dest = frame.headers['destination']
  @qstore.enqueue(dest,frame)
end

#send_a_backlog(connection) ⇒ Object

Send at most one frame to a connection used when use_ack == true



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/stomp_server/queue_manager.rb', line 87

def send_a_backlog(connection)
  puts "Sending a backlog" if $DEBUG
  # lookup queues with data for this connection
  possible_queues = @queues.select{ |destination,users|
    @qstore.message_for?(destination) &&
      users.detect{|u| u.connection == connection}
  }
  if possible_queues.empty?
    puts "Nothing left" if $DEBUG
    return
  end
  # Get a random one (avoid artificial priority between queues
  # without coding a whole scheduler, which might be desirable later)
  dest,users = possible_queues[rand(possible_queues.length)]
  user = users.find{|u| u.connection == connection}
  frame = @qstore.dequeue(dest)
  puts "Chose #{dest}" if $DEBUG
  send_to_user(frame, user)
end

#send_destination_backlog(dest, user) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/stomp_server/queue_manager.rb', line 107

def send_destination_backlog(dest,user)
  puts "Sending destination backlog for #{dest}" if $DEBUG
  if user.ack
    # only send one message (waiting for ack)
    frame = @qstore.dequeue(dest)
    send_to_user(frame, user) if frame
  else
    while frame = @qstore.dequeue(dest)
      send_to_user(frame, user)
    end
  end
end

#send_to_user(frame, user) ⇒ Object



162
163
164
165
166
167
168
169
# File 'lib/stomp_server/queue_manager.rb', line 162

def send_to_user(frame, user)
  connection = user.connection
  if user.ack
    raise "other connection's end already busy" if @pending[connection]
    @pending[connection] = frame
  end
  connection.stomp_send_data(frame)
end

#sendmsg(frame) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/stomp_server/queue_manager.rb', line 171

def sendmsg(frame)
  frame.command = "MESSAGE"
  dest = frame.headers['destination']
  puts "Sending a message to #{dest}: #{frame}"
  # Lookup a user willing to handle this destination
  available_users = @queues[dest].reject{|user| @pending[user.connection]}
  if available_users.empty?
    @qstore.enqueue(dest,frame)
    return
  end

  # Look for a user with ack (we favor reliability)
  reliable_user = available_users.find{|u| u.ack}

  if reliable_user
    # give it a message-id
    @qstore.assign_id(frame, dest)
    send_to_user(frame, reliable_user)
  else
    random_user = available_users[rand(available_users.length)]
    # Note message-id header isn't set but we won't need it anyway
    # <TODO> could break some clients: fix this
    send_to_user(frame, random_user)
  end
end

#stopObject



74
75
76
# File 'lib/stomp_server/queue_manager.rb', line 74

def stop
  @qstore.stop if @qstore.methods.include?('stop')
end

#subscribe(dest, connection, use_ack = false) ⇒ Object



78
79
80
81
82
83
# File 'lib/stomp_server/queue_manager.rb', line 78

def subscribe(dest, connection, use_ack=false)
  puts "Subscribing to #{dest}"
  user = Struct::QueueUser.new(connection, use_ack)
  @queues[dest] += [user]
  send_destination_backlog(dest,user) unless dest == '/queue/monitor'
end

#unsubscribe(dest, connection) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/stomp_server/queue_manager.rb', line 120

def unsubscribe(dest, connection)
  puts "Unsubscribing from #{dest}"
  @queues.each do |d, queue|
    queue.delete_if { |qu| qu.connection == connection and d == dest}
  end
  @queues.delete(dest) if @queues[dest].empty?
end