Module: StompServer

Defined in:
lib/stomp_server.rb

Constant Summary collapse

VERSION =
'0.9.2'
VALID_COMMANDS =
[:connect, :send, :subscribe, :unsubscribe, :begin, :commit, :abort, :ack, :disconnect]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j)) ⇒ Object



11
12
13
14
15
# File 'lib/stomp_server.rb', line 11

def self.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j))
  @@journal = j
  @@topic_manager = tm
  @@queue_manager = qm
end

Instance Method Details

#abort(frame, trans = nil) ⇒ Object



117
118
119
120
121
# File 'lib/stomp_server.rb', line 117

def abort(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction does not exist" unless @transactions.has_key?(trans)
  @transactions.delete(trans)
end

#ack(frame) ⇒ Object



123
124
125
# File 'lib/stomp_server.rb', line 123

def ack(frame)
  @@queue_manager.ack(self, frame)
end

#begin(frame, trans = nil) ⇒ Object



100
101
102
103
104
# File 'lib/stomp_server.rb', line 100

def begin(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction exists" if @transactions.has_key?(trans)
  @transactions[trans] = []
end

#commit(frame, trans = nil) ⇒ Object



106
107
108
109
110
111
112
113
114
115
# File 'lib/stomp_server.rb', line 106

def commit(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction does not exist" unless @transactions.has_key?(trans)
  
  (@transactions[trans]).each do |frame|
    frame.headers.delete('transaction')
    process_frame(frame)
  end
  @transactions.delete(trans)
end

#connect(frame) ⇒ Object



67
68
69
70
71
72
# File 'lib/stomp_server.rb', line 67

def connect(frame)
  puts "Connecting" if $DEBUG
  response = StompFrame.new("CONNECTED", {'session' => 'wow'})
  send_data(response.to_s)
  @connected = true
end

#disconnect(frame) ⇒ Object



127
128
129
130
# File 'lib/stomp_server.rb', line 127

def disconnect(frame)
  puts "Polite disconnect" if $DEBUG
  close_connection_after_writing
end

#handle_transaction(frame, trans, cmd) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/stomp_server.rb', line 58

def handle_transaction(frame, trans, cmd)
  if [:begin, :commit, :abort].include?(cmd)
    send(cmd, frame, trans)
  else
    raise "transaction does not exist" unless @transactions.has_key?(trans)
    @transactions[trans] << frame
  end    
end

#post_initObject



17
18
19
20
21
# File 'lib/stomp_server.rb', line 17

def post_init
  @sfr = StompFrameRecognizer.new
  @transactions = {}
  @connected = false
end

#process_frame(frame) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/stomp_server.rb', line 40

def process_frame(frame)
  cmd = frame.command.downcase.to_sym
  raise "Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd)
  raise "Not connected" if !@connected && cmd != :connect

  # I really like this code, but my needs are a little trickier
  # 

  if trans = frame.headers['transaction']
    handle_transaction(frame, trans, cmd)
  else
    cmd = :sendmsg if cmd == :send
    send(cmd, frame) 
  end
  
  send_receipt(frame.headers['receipt']) if frame.headers['receipt']
end

#process_framesObject



35
36
37
38
# File 'lib/stomp_server.rb', line 35

def process_frames
  frame = nil
  process_frame(frame) while frame = @sfr.frames.shift
end

#receive_data(data) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/stomp_server.rb', line 23

def receive_data(data)
  begin
    puts "receive_data: #{data.inspect}" if $DEBUG
    @sfr << data
    process_frames
  rescue Exception => e
    puts "err: #{e} #{e.backtrace.join("\n")}" if $DEBUG
    send_error(e.to_s)
    close_connection_after_writing
  end
end

#send_error(msg) ⇒ Object



146
147
148
# File 'lib/stomp_server.rb', line 146

def send_error(msg)
  send_frame("ERROR",{},msg)
end

#send_frame(command, headers = {}, body = '') ⇒ Object



150
151
152
153
# File 'lib/stomp_server.rb', line 150

def send_frame(command, headers={}, body='')
  response = StompFrame.new(command, headers, body)
  send_data(response.to_s)
end

#send_message(msg) ⇒ Object



137
138
139
140
# File 'lib/stomp_server.rb', line 137

def send_message(msg)
  msg.command = "MESSAGE"
  send_data(msg.to_s)
end

#send_receipt(id) ⇒ Object



142
143
144
# File 'lib/stomp_server.rb', line 142

def send_receipt(id)
  send_frame("RECEIPT", { 'receipt-id' => id})
end

#sendmsg(frame) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/stomp_server.rb', line 74

def sendmsg(frame)
  # set message id
  frame.headers['message-id'] = "msg-#{@@journal.system_id}-#{@@journal.next_index}"
  if frame.dest.match(%r|^/queue|)
    @@queue_manager.sendmsg(frame)
  else
    @@topic_manager.sendmsg(frame)
  end
end

#subscribe(frame) ⇒ Object



84
85
86
87
88
89
90
# File 'lib/stomp_server.rb', line 84

def subscribe(frame)
  if frame.dest =~ %r|^/queue|
    @@queue_manager.subscribe(frame.dest, self)
  else
    @@topic_manager.subscribe(frame.dest, self)
  end
end

#unbindObject



132
133
134
135
# File 'lib/stomp_server.rb', line 132

def unbind
  @@queue_manager.disconnect(self)
  @@topic_manager.disconnect(self)
end

#unsubscribe(frame) ⇒ Object



92
93
94
95
96
97
98
# File 'lib/stomp_server.rb', line 92

def unsubscribe(frame)
  if frame.dest =~ %r|^/queue|
    @@queue_manager.unsubscribe(self)
  else
    @@topic_manager.unsubscribe(self)
  end
end