Class: QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_manager.rb

Overview

queue - persistent sent to a single subscriber queue_monitor - looks, but does not remove from queue

Instance Method Summary collapse

Constructor Details

#initialize(journal) ⇒ QueueManager

Returns a new instance of QueueManager.



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/queue_manager.rb', line 7

def initialize(journal)
  # read journal information
  @journal = journal
  @queues = Hash.new { Array.new }
  @pending = Hash.new { Array.new }
  @messages = Hash.new { Array.new }
  
  # recover from previous run
  msgids = @journal.keys.sort
  msgids.each do |msgid|
    sendmsg(@journal[msgid])
  end
end

Instance Method Details

#ack(user, frame) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/queue_manager.rb', line 43

def ack(user, frame)
  pending_size = @pending[user]
  msgid = frame.headers['message-id']
  @pending[user].delete_if { |pf| pf.headers['message-id'] == msgid }
  raise "Message (#{msgid}) not found" if pending_size == @pending[user]
  @journal.delete(msgid)
end

#disconnect(user) ⇒ Object



51
52
53
54
55
56
57
58
# File 'lib/queue_manager.rb', line 51

def disconnect(user)
  @pending[user].each do |frame|
    sendmsg(frame)
  end
  @queues.each do |dest, queue|
    queue.delete_if { |qu| qu.user == user }
  end
end

#send_backlog(queue, user) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/queue_manager.rb', line 31

def send_backlog(queue, user)
  until queue.empty?
    current = queue.first
    send_to_user(current, user)
    queue.shift
  end 
end

#send_to_user(frame, user) ⇒ Object



60
61
62
63
64
65
66
67
# File 'lib/queue_manager.rb', line 60

def send_to_user(frame, user)
  if user.ack
    @pending[user.user] += [frame]
  else
    @journal.delete(frame.headers['message-id'])
  end 
  user.user.send_data(frame.to_s)
end

#sendmsg(frame) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/queue_manager.rb', line 69

def sendmsg(frame)
  frame.command = "MESSAGE"
  dest = frame.headers['destination']
  @journal[frame.headers['message-id']] = frame

  if user = @queues[dest].shift
    send_to_user(frame, user)
    @queues[dest].push(user)
  else
    @messages[dest] += [frame]
  end
end

#subscribe(dest, user, use_ack = false) ⇒ Object



21
22
23
24
25
26
27
28
29
# File 'lib/queue_manager.rb', line 21

def subscribe(dest, user, use_ack=false)
  user = Struct::QueueUser.new(user, use_ack)
  @queues[dest] += [user]
  
  # TODO handle this is some form of call back
  # it is quite possible that this could be a lot
  # of data and block things up.
  send_backlog(@messages[dest], user)
end

#unsubscribe(topic, user) ⇒ Object



39
40
41
# File 'lib/queue_manager.rb', line 39

def unsubscribe(topic, user)
  @queues[topic].delete_if { |u| u.user == user } 
end