Class: RubyAMI::Client
Defined Under Namespace
Classes: ErrorHandler
Instance Attribute Summary collapse
-
#action_queue ⇒ Object
readonly
Returns the value of attribute action_queue.
-
#actions_stream ⇒ Object
readonly
Returns the value of attribute actions_stream.
-
#events_stream ⇒ Object
readonly
Returns the value of attribute events_stream.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #_send_action(action) ⇒ Object
- #handle_event(event) ⇒ Object
- #handle_message(message) ⇒ Object
-
#initialize(options) ⇒ Client
constructor
A new instance of Client.
- #send_action(action, headers = {}, &block) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(options) ⇒ Client
Returns a new instance of Client.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/ruby_ami/client.rb', line 6 def initialize() @options = @logger = [:logger] || Logger.new(STDOUT) @logger.level = [:log_level] || Logger::DEBUG if @logger @event_handler = @options[:event_handler] @state = :stopped if RubyAMI.rbx? logger.warn 'The "timeout" parameter is not supported when using Rubinius' end stop_writing_actions @pending_actions = {} @sent_actions = {} @actions_lock = Mutex.new @action_queue = GirlFriday::WorkQueue.new(:actions, :size => 1, :error_handler => ErrorHandler) do |action| @actions_write_blocker.wait _send_action action begin action.response action.sync_timeout rescue Timeout::Error => e logger.error "Timed out waiting for a response to #{action}" rescue RubyAMI::Error nil end end @message_processor = GirlFriday::WorkQueue.new(:messages, :size => 1, :error_handler => ErrorHandler) do || end @event_processor = GirlFriday::WorkQueue.new(:events, :size => 2, :error_handler => ErrorHandler) do |event| handle_event event end end |
Instance Attribute Details
#action_queue ⇒ Object (readonly)
Returns the value of attribute action_queue.
4 5 6 |
# File 'lib/ruby_ami/client.rb', line 4 def action_queue @action_queue end |
#actions_stream ⇒ Object (readonly)
Returns the value of attribute actions_stream.
4 5 6 |
# File 'lib/ruby_ami/client.rb', line 4 def actions_stream @actions_stream end |
#events_stream ⇒ Object (readonly)
Returns the value of attribute events_stream.
4 5 6 |
# File 'lib/ruby_ami/client.rb', line 4 def events_stream @events_stream end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
4 5 6 |
# File 'lib/ruby_ami/client.rb', line 4 def @options end |
Instance Method Details
#_send_action(action) ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/ruby_ami/client.rb', line 122 def _send_action(action) logger.trace "[SEND]: #{action.inspect}" if logger transition_action_to_sent action actions_stream.send_action action action.state = :sent action end |
#handle_event(event) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/ruby_ami/client.rb', line 110 def handle_event(event) logger.trace "[RECV-EVENTS]: #{event.inspect}" if logger case event when Stream::Connected login_events when Stream::Disconnected stop else pass_event event end end |
#handle_message(message) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/ruby_ami/client.rb', line 74 def () logger.trace "[RECV-ACTIONS]: #{.inspect}" if logger case when Stream::Connected login_actions when Stream::Disconnected stop_writing_actions stop when Event action = @current_action_with_causal_events if action .action = action action << @current_action_with_causal_events = nil if action.complete? else if .name == 'FullyBooted' pass_event start_writing_actions else raise StandardError, "Got an unexpected event on actions socket! This AMI command may have a multi-message response. Try making Adhearsion treat it as causal action #{.inspect}" end end when Response, Error action = sent_action_with_id .action_id raise StandardError, "Received an AMI response with an unrecognized ActionID!! This may be an bug! #{.inspect}" unless action .action = action # By this point the write loop will already have started blocking by calling the response() method on the # action. Because we must collect more events before we wake the write loop up again, let's create these # instance variable which will needed when the subsequent causal events come in. @current_action_with_causal_events = action if action.has_causal_events? action << end end |
#send_action(action, headers = {}, &block) ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/ruby_ami/client.rb', line 66 def send_action(action, headers = {}, &block) (action.is_a?(Action) ? action : Action.new(action, headers, &block)).tap do |action| logger.trace "[QUEUE]: #{action.inspect}" if logger register_pending_action action action_queue << action end end |
#start ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/ruby_ami/client.rb', line 48 def start @events_stream = new_stream lambda { |event| @event_processor << event } @actions_stream = new_stream lambda { || @message_processor << } streams.each { |stream| stream.async.run } @state = :started streams.each { |s| Celluloid::Actor.join s } end |
#stop ⇒ Object
56 57 58 59 60 61 62 63 64 |
# File 'lib/ruby_ami/client.rb', line 56 def stop streams.each do |stream| begin stream.terminate if stream.alive? rescue => e logger.error e if logger end end end |