Class: RubyAMI::Stream
- Inherits:
-
Object
- Object
- RubyAMI::Stream
- Includes:
- Celluloid::IO
- Defined in:
- lib/ruby_ami/stream.rb
Defined Under Namespace
Classes: ConnectionStatus
Constant Summary collapse
- Connected =
Class.new ConnectionStatus
- Disconnected =
Class.new ConnectionStatus
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize(host, port, username, password, event_callback, logger = Logger, timeout = 0) ⇒ Stream
constructor
A new instance of Stream.
- #message_received(message) ⇒ Object (also: #error_received)
- #post_init ⇒ Object
- #receive_data(data) ⇒ Object
- #run ⇒ Object
- #send_action(name, headers = {}, error_handler = self.method(:abort)) ⇒ Object
- #send_data(data) ⇒ Object
- #syntax_error_encountered(ignored_chunk) ⇒ Object
- #version ⇒ Object
Constructor Details
#initialize(host, port, username, password, event_callback, logger = Logger, timeout = 0) ⇒ Stream
Returns a new instance of Stream.
25 26 27 28 29 30 31 32 33 |
# File 'lib/ruby_ami/stream.rb', line 25 def initialize(host, port, username, password, event_callback, logger = Logger, timeout = 0) super() @host, @port, @username, @password, @event_callback, @logger, @timeout = host, port, username, password, event_callback, logger, timeout logger.debug "Starting up..." @lexer = Lexer.new self @sent_actions = {} @causal_actions = {} async.run end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
21 22 23 |
# File 'lib/ruby_ami/stream.rb', line 21 def logger @logger end |
Instance Method Details
#message_received(message) ⇒ Object Also known as: error_received
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/ruby_ami/stream.rb', line 87 def () logger.trace "[RECV] #{.inspect}" case when Event action = causal_action_for_event if action action << complete_causal_action_for_event if action.complete? else fire_event end when Response, Error action = sent_action_for_response raise StandardError, "Received an AMI response with an unrecognized ActionID! #{.inspect}" unless action action << end end |
#post_init ⇒ Object
55 56 57 58 59 |
# File 'lib/ruby_ami/stream.rb', line 55 def post_init @state = :started fire_event Connected.new login @username, @password if @username && @password end |
#receive_data(data) ⇒ Object
82 83 84 85 |
# File 'lib/ruby_ami/stream.rb', line 82 def receive_data(data) logger.trace "[RECV] #{data}" @lexer << data end |
#run ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/ruby_ami/stream.rb', line 39 def run Timeout::timeout(@timeout) do @socket = TCPSocket.from_ruby_socket ::TCPSocket.new(@host, @port) end post_init loop { receive_data @socket.readpartial(4096) } rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, SocketError => e logger.error "Connection failed due to #{e.class}. Check your config and the server." rescue EOFError logger.info "Client socket closed!" rescue Timeout::Error logger.error "Timeout exceeded while trying to connect." ensure async.terminate end |
#send_action(name, headers = {}, error_handler = self.method(:abort)) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/ruby_ami/stream.rb', line 69 def send_action(name, headers = {}, error_handler = self.method(:abort)) condition = Celluloid::Condition.new action = dispatch_action name, headers do |response| condition.signal response end condition.wait action.response.tap do |resp| if resp.is_a? Exception error_handler.call(resp) end end end |
#send_data(data) ⇒ Object
65 66 67 |
# File 'lib/ruby_ami/stream.rb', line 65 def send_data(data) @socket.write data end |
#syntax_error_encountered(ignored_chunk) ⇒ Object
105 106 107 |
# File 'lib/ruby_ami/stream.rb', line 105 def syntax_error_encountered(ignored_chunk) logger.error "Encountered a syntax error. Ignoring chunk: #{ignored_chunk.inspect}" end |
#version ⇒ Object
61 62 63 |
# File 'lib/ruby_ami/stream.rb', line 61 def version @lexer.ami_version end |