Module: StompServer

Defined in:
lib/stomp_server.rb

Constant Summary collapse

VERSION =
'0.9.5'
VALID_COMMANDS =
[:connect, :send, :subscribe, :unsubscribe, :begin, :commit, :abort, :ack, :disconnect]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j)) ⇒ Object



12
13
14
15
16
# File 'lib/stomp_server.rb', line 12

def self.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j))
  @@journal = j
  @@topic_manager = tm
  @@queue_manager = qm
end

Instance Method Details

#abort(frame, trans = nil) ⇒ Object



118
119
120
121
122
# File 'lib/stomp_server.rb', line 118

def abort(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction does not exist" unless @transactions.has_key?(trans)
  @transactions.delete(trans)
end

#ack(frame) ⇒ Object



124
125
126
# File 'lib/stomp_server.rb', line 124

def ack(frame)
  @@queue_manager.ack(self, frame)
end

#begin(frame, trans = nil) ⇒ Object



101
102
103
104
105
# File 'lib/stomp_server.rb', line 101

def begin(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction exists" if @transactions.has_key?(trans)
  @transactions[trans] = []
end

#commit(frame, trans = nil) ⇒ Object



107
108
109
110
111
112
113
114
115
116
# File 'lib/stomp_server.rb', line 107

def commit(frame, trans=nil)
  raise "Missing transaction" unless trans
  raise "transaction does not exist" unless @transactions.has_key?(trans)
  
  (@transactions[trans]).each do |frame|
    frame.headers.delete('transaction')
    process_frame(frame)
  end
  @transactions.delete(trans)
end

#connect(frame) ⇒ Object



68
69
70
71
72
73
# File 'lib/stomp_server.rb', line 68

def connect(frame)
  puts "Connecting" if $DEBUG
  response = StompFrame.new("CONNECTED", {'session' => 'wow'})
  send_data(response.to_s)
  @connected = true
end

#disconnect(frame) ⇒ Object



128
129
130
131
# File 'lib/stomp_server.rb', line 128

def disconnect(frame)
  puts "Polite disconnect" if $DEBUG
  close_connection_after_writing
end

#handle_transaction(frame, trans, cmd) ⇒ Object



59
60
61
62
63
64
65
66
# File 'lib/stomp_server.rb', line 59

def handle_transaction(frame, trans, cmd)
  if [:begin, :commit, :abort].include?(cmd)
    send(cmd, frame, trans)
  else
    raise "transaction does not exist" unless @transactions.has_key?(trans)
    @transactions[trans] << frame
  end    
end

#post_initObject



18
19
20
21
22
# File 'lib/stomp_server.rb', line 18

def post_init
  @sfr = StompFrameRecognizer.new
  @transactions = {}
  @connected = false
end

#process_frame(frame) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/stomp_server.rb', line 41

def process_frame(frame)
  cmd = frame.command.downcase.to_sym
  raise "Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd)
  raise "Not connected" if !@connected && cmd != :connect

  # I really like this code, but my needs are a little trickier
  # 

  if trans = frame.headers['transaction']
    handle_transaction(frame, trans, cmd)
  else
    cmd = :sendmsg if cmd == :send
    send(cmd, frame) 
  end
  
  send_receipt(frame.headers['receipt']) if frame.headers['receipt']
end

#process_framesObject



36
37
38
39
# File 'lib/stomp_server.rb', line 36

def process_frames
  frame = nil
  process_frame(frame) while frame = @sfr.frames.shift
end

#receive_data(data) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/stomp_server.rb', line 24

def receive_data(data)
  begin
    puts "receive_data: #{data.inspect}" if $DEBUG
    @sfr << data
    process_frames
  rescue Exception => e
    puts "err: #{e} #{e.backtrace.join("\n")}" if $DEBUG
    send_error(e.to_s)
    close_connection_after_writing
  end
end

#send_error(msg) ⇒ Object



147
148
149
# File 'lib/stomp_server.rb', line 147

def send_error(msg)
  send_frame("ERROR",{},msg)
end

#send_frame(command, headers = {}, body = '') ⇒ Object



151
152
153
154
# File 'lib/stomp_server.rb', line 151

def send_frame(command, headers={}, body='')
  response = StompFrame.new(command, headers, body)
  send_data(response.to_s)
end

#send_message(msg) ⇒ Object



138
139
140
141
# File 'lib/stomp_server.rb', line 138

def send_message(msg)
  msg.command = "MESSAGE"
  send_data(msg.to_s)
end

#send_receipt(id) ⇒ Object



143
144
145
# File 'lib/stomp_server.rb', line 143

def send_receipt(id)
  send_frame("RECEIPT", { 'receipt-id' => id})
end

#sendmsg(frame) ⇒ Object



75
76
77
78
79
80
81
82
83
# File 'lib/stomp_server.rb', line 75

def sendmsg(frame)
  # set message id
  frame.headers['message-id'] = "msg-#{@@journal.system_id}-#{@@journal.next_index}"
  if frame.dest.match(%r|^/queue|)
    @@queue_manager.sendmsg(frame)
  else
    @@topic_manager.sendmsg(frame)
  end
end

#subscribe(frame) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/stomp_server.rb', line 85

def subscribe(frame)
  if frame.dest =~ %r|^/queue|
    @@queue_manager.subscribe(frame.dest, self)
  else
    @@topic_manager.subscribe(frame.dest, self)
  end
end

#unbindObject



133
134
135
136
# File 'lib/stomp_server.rb', line 133

def unbind
  @@queue_manager.disconnect(self)
  @@topic_manager.disconnect(self)
end

#unsubscribe(frame) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/stomp_server.rb', line 93

def unsubscribe(frame)
  if frame.dest =~ %r|^/queue|
    @@queue_manager.unsubscribe(self)
  else
    @@topic_manager.unsubscribe(self)
  end
end