Class: Carrot::AMQP::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/server.rb

Defined Under Namespace

Classes: ProtocolError, ServerDown

Constant Summary collapse

CONNECT_TIMEOUT =
1.0
RETRY_DELAY =
10.0
DEFAULT_PORT =
5672

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Server

Returns a new instance of Server.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/amqp/server.rb', line 17

def initialize(opts = {})
  @host   = opts[:host]  || 'localhost'
  @port   = opts[:port]  || DEFAULT_PORT
  @user   = opts[:user]  || 'guest'
  @pass   = opts[:pass]  || 'guest'
  @vhost  = opts[:vhost] || '/'
  @insist = opts[:insist]
  @status = 'NOT CONNECTED'

  @multithread = opts[:multithread]      
  start_session
end

Instance Attribute Details

#channelObject

Returns the value of attribute channel.



12
13
14
# File 'lib/amqp/server.rb', line 12

def channel
  @channel
end

#hostObject (readonly)

Returns the value of attribute host.



11
12
13
# File 'lib/amqp/server.rb', line 11

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



11
12
13
# File 'lib/amqp/server.rb', line 11

def port
  @port
end

#statusObject (readonly)

Returns the value of attribute status.



11
12
13
# File 'lib/amqp/server.rb', line 11

def status
  @status
end

#ticketObject

Returns the value of attribute ticket.



12
13
14
# File 'lib/amqp/server.rb', line 12

def ticket
  @ticket
end

Instance Method Details

#closeObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/amqp/server.rb', line 57

def close
  send_frame(
    Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0)
  )
  puts "Error closing channel #{channel}" unless next_method.is_a?(Protocol::Channel::CloseOk)

  self.channel = 0
  send_frame(
    Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0)
  )
  puts "Error closing connection" unless next_method.is_a?(Protocol::Connection::CloseOk)

rescue ServerDown => e
ensure
  close_socket
end

#next_frameObject



42
43
44
45
46
# File 'lib/amqp/server.rb', line 42

def next_frame
  frame = Frame.parse(buffer)
  log :received, frame
  frame
end

#next_methodObject



48
49
50
# File 'lib/amqp/server.rb', line 48

def next_method
  next_payload
end

#next_payloadObject



52
53
54
55
# File 'lib/amqp/server.rb', line 52

def next_payload
  frame = next_frame
  frame and frame.payload
end

#read(*args) ⇒ Object



74
75
76
# File 'lib/amqp/server.rb', line 74

def read(*args)
  send_command(:read, *args)
end

#send_frame(*args) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/amqp/server.rb', line 30

def send_frame(*args)
  args.each do |data|
    data.ticket  = ticket if ticket and data.respond_to?(:ticket=)
    data         = data.to_frame(channel) unless data.is_a?(Frame)
    data.channel = channel

    log :send, data
    write(data.to_s)
  end
  nil
end

#write(*args) ⇒ Object



78
79
80
# File 'lib/amqp/server.rb', line 78

def write(*args)
  send_command(:write, *args)
end