Class: RubyAMI::Stream

Inherits:
Object
  • Object
show all
Includes:
Celluloid::IO
Defined in:
lib/ruby_ami/stream.rb

Defined Under Namespace

Classes: ConnectionStatus

Constant Summary collapse

Connected =
Class.new ConnectionStatus
Disconnected =
Class.new ConnectionStatus

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, username, password, event_callback, logger = Logger, timeout = 0) ⇒ Stream

Returns a new instance of Stream.


25
26
27
28
29
30
31
32
33
# File 'lib/ruby_ami/stream.rb', line 25

def initialize(host, port, username, password, event_callback, logger = Logger, timeout = 0)
  super()
  @host, @port, @username, @password, @event_callback, @logger, @timeout = host, port, username, password, event_callback, logger, timeout
  logger.debug "Starting up..."
  @lexer = Lexer.new self
  @sent_actions   = {}
  @causal_actions = {}
  async.run
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger


21
22
23
# File 'lib/ruby_ami/stream.rb', line 21

def logger
  @logger
end

Instance Method Details

#message_received(message) ⇒ Object Also known as: error_received


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/ruby_ami/stream.rb', line 87

def message_received(message)
  logger.trace "[RECV] #{message.inspect}"
  case message
  when Event
    action = causal_action_for_event message
    if action
      action << message
      complete_causal_action_for_event message if action.complete?
    else
      fire_event message
    end
  when Response, Error
    action = sent_action_for_response message
    raise StandardError, "Received an AMI response with an unrecognized ActionID! #{message.inspect}" unless action
    action << message
  end
end

#post_initObject


55
56
57
58
59
# File 'lib/ruby_ami/stream.rb', line 55

def post_init
  @state = :started
  fire_event Connected.new
   @username, @password if @username && @password
end

#receive_data(data) ⇒ Object


82
83
84
85
# File 'lib/ruby_ami/stream.rb', line 82

def receive_data(data)
  logger.trace "[RECV] #{data}"
  @lexer << data
end

#runObject


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/ruby_ami/stream.rb', line 39

def run
  Timeout::timeout(@timeout) do
    @socket = TCPSocket.from_ruby_socket ::TCPSocket.new(@host, @port)
  end
  post_init
  loop { receive_data @socket.readpartial(4096) }
rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, SocketError => e
  logger.error "Connection failed due to #{e.class}. Check your config and the server."
rescue EOFError
  logger.info "Client socket closed!"
rescue Timeout::Error
  logger.error "Timeout exceeded while trying to connect."
ensure
  async.terminate
end

#send_action(name, headers = {}, error_handler = self.method(:abort)) ⇒ Object


69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/ruby_ami/stream.rb', line 69

def send_action(name, headers = {}, error_handler = self.method(:abort))
  condition = Celluloid::Condition.new
  action = dispatch_action name, headers do |response|
    condition.signal response
  end
  condition.wait
  action.response.tap do |resp|
    if resp.is_a? Exception
      error_handler.call(resp)
    end
  end
end

#send_data(data) ⇒ Object


65
66
67
# File 'lib/ruby_ami/stream.rb', line 65

def send_data(data)
  @socket.write data
end

#syntax_error_encountered(ignored_chunk) ⇒ Object


105
106
107
# File 'lib/ruby_ami/stream.rb', line 105

def syntax_error_encountered(ignored_chunk)
  logger.error "Encountered a syntax error. Ignoring chunk: #{ignored_chunk.inspect}"
end

#versionObject


61
62
63
# File 'lib/ruby_ami/stream.rb', line 61

def version
  @lexer.ami_version
end