Class: StompServer::Protocols::Stomp

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/stomp_server/protocols/stomp.rb

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Stomp

Returns a new instance of Stomp.



8
9
10
# File 'lib/stomp_server/protocols/stomp.rb', line 8

def initialize *args
  super
end

Instance Method Details

#abort(frame, trans = nil) ⇒ Object



134
135
136
137
138
# File 'lib/stomp_server/protocols/stomp.rb', line 134

def abort(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction does not exist" unless @transactions.has_key?(trans)
  @transactions.delete(trans)
end

#ack(frame) ⇒ Object



140
141
142
# File 'lib/stomp_server/protocols/stomp.rb', line 140

def ack(frame)
  @@queue_manager.ack(self, frame)
end

#begin(frame, trans = nil) ⇒ Object



117
118
119
120
121
# File 'lib/stomp_server/protocols/stomp.rb', line 117

def begin(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction exists" if @transactions.has_key?(trans)
  @transactions[trans] = []
end

#commit(frame, trans = nil) ⇒ Object



123
124
125
126
127
128
129
130
131
132
# File 'lib/stomp_server/protocols/stomp.rb', line 123

def commit(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction does not exist" unless @transactions.has_key?(trans)
  
  (@transactions[trans]).each do |frame|
    frame.headers.delete('transaction')
    process_frame(frame)
  end
  @transactions.delete(trans)
end

#connect(frame) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/stomp_server/protocols/stomp.rb', line 77

def connect(frame)
  if @@auth_required
    unless frame.headers['login'] and frame.headers['passcode'] and  @@stompauth.authorized[frame.headers['login']] == frame.headers['passcode']
      raise "Invalid Login"
    end
  end
  puts "Connecting" if $DEBUG
  response = StompServer::StompFrame.new("CONNECTED", {'session' => 'wow'})
  stomp_send_data(response)
  @connected = true
end

#connected?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/stomp_server/protocols/stomp.rb', line 156

def connected?
  @connected
end

#disconnect(frame) ⇒ Object



144
145
146
147
# File 'lib/stomp_server/protocols/stomp.rb', line 144

def disconnect(frame)
  puts "Polite disconnect" if $DEBUG
  close_connection_after_writing
end

#handle_transaction(frame, trans, cmd) ⇒ Object



68
69
70
71
72
73
74
75
# File 'lib/stomp_server/protocols/stomp.rb', line 68

def handle_transaction(frame, trans, cmd)
  if [:begin, :commit, :abort].include?(cmd)
    send(cmd, frame, trans)
  else
    raise "transaction does not exist" unless @transactions.has_key?(trans)
    @transactions[trans] << frame
  end    
end

#post_initObject



12
13
14
15
16
# File 'lib/stomp_server/protocols/stomp.rb', line 12

def post_init
  @sfr = StompServer::StompFrameRecognizer.new
  @transactions = {}
  @connected = false
end

#process_frame(frame) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/stomp_server/protocols/stomp.rb', line 50

def process_frame(frame)
  cmd = frame.command.downcase.to_sym
  raise "Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd)
  raise "Not connected" if !@connected && cmd != :connect

  # I really like this code, but my needs are a little trickier
  # 

  if trans = frame.headers['transaction']
    handle_transaction(frame, trans, cmd)
  else
    cmd = :sendmsg if cmd == :send
    send(cmd, frame) 
  end
  
  send_receipt(frame.headers['receipt']) if frame.headers['receipt']
end

#process_framesObject



45
46
47
48
# File 'lib/stomp_server/protocols/stomp.rb', line 45

def process_frames
  frame = nil
  process_frame(frame) while frame = @sfr.frames.shift
end

#receive_data(data) ⇒ Object



18
19
20
# File 'lib/stomp_server/protocols/stomp.rb', line 18

def receive_data(data)
  stomp_receive_data(data)
end

#send_error(msg) ⇒ Object



169
170
171
# File 'lib/stomp_server/protocols/stomp.rb', line 169

def send_error(msg)
  send_frame("ERROR",{'message' => 'See below'},msg)
end

#send_frame(command, headers = {}, body = '') ⇒ Object



178
179
180
181
182
# File 'lib/stomp_server/protocols/stomp.rb', line 178

def send_frame(command, headers={}, body='')
  headers['content-length'] = body.size.to_s
  response = StompServer::StompFrame.new(command, headers, body)
  stomp_send_data(response)
end

#send_message(msg) ⇒ Object



160
161
162
163
# File 'lib/stomp_server/protocols/stomp.rb', line 160

def send_message(msg)
  msg.command = "MESSAGE"
  stomp_send_data(msg)
end

#send_receipt(id) ⇒ Object



165
166
167
# File 'lib/stomp_server/protocols/stomp.rb', line 165

def send_receipt(id)
  send_frame("RECEIPT", { 'receipt-id' => id})
end

#sendmsg(frame) ⇒ Object



89
90
91
92
93
94
95
96
97
# File 'lib/stomp_server/protocols/stomp.rb', line 89

def sendmsg(frame)
  # set message id
  if frame.dest.match(%r|^/queue|)
    @@queue_manager.sendmsg(frame)
  else
    frame.headers['message-id'] = "msg-#stompcma-#{@@topic_manager.next_index}"
    @@topic_manager.sendmsg(frame)
  end
end

#stomp_receive_data(data) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/stomp_server/protocols/stomp.rb', line 22

def stomp_receive_data(data)
  begin
    puts "receive_data: #{data.inspect}" if $DEBUG
    @sfr << data
    process_frames
  rescue Exception => e
    puts "err: #{e} #{e.backtrace.join("\n")}"
    send_error(e.to_s)
    close_connection_after_writing
  end
end

#stomp_receive_frame(frame) ⇒ Object



34
35
36
37
38
39
40
41
42
43
# File 'lib/stomp_server/protocols/stomp.rb', line 34

def stomp_receive_frame(frame)
  begin
    puts "receive_frame: #{frame.inspect}" if $DEBUG
    process_frame(frame)
  rescue Exception => e
    puts "err: #{e} #{e.backtrace.join("\n")}"
    send_error(e.to_s)
    close_connection_after_writing
  end
end

#stomp_send_data(frame) ⇒ Object



173
174
175
176
# File 'lib/stomp_server/protocols/stomp.rb', line 173

def stomp_send_data(frame)
  send_data(frame.to_s)
  puts "Sending frame #{frame.to_s}" if $DEBUG
end

#subscribe(frame) ⇒ Object



99
100
101
102
103
104
105
106
107
# File 'lib/stomp_server/protocols/stomp.rb', line 99

def subscribe(frame)
  use_ack = false
  use_ack = true  if frame.headers['ack'] == 'client'
  if frame.dest =~ %r|^/queue|
    @@queue_manager.subscribe(frame.dest, self,use_ack)
  else
    @@topic_manager.subscribe(frame.dest, self)
  end
end

#unbindObject



149
150
151
152
153
154
# File 'lib/stomp_server/protocols/stomp.rb', line 149

def unbind
  p "Unbind called" if $DEBUG
  @connected = false
  @@queue_manager.disconnect(self)
  @@topic_manager.disconnect(self)
end

#unsubscribe(frame) ⇒ Object



109
110
111
112
113
114
115
# File 'lib/stomp_server/protocols/stomp.rb', line 109

def unsubscribe(frame)
  if frame.dest =~ %r|^/queue|
    @@queue_manager.unsubscribe(frame.dest,self)
  else
    @@topic_manager.unsubscribe(frame.dest,self)
  end
end