Class: RubyAMI::Client

Inherits:
Object show all
Defined in:
lib/ruby_ami/client.rb

Defined Under Namespace

Classes: ErrorHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/ruby_ami/client.rb', line 6

def initialize(options)
  @options          = options
  @logger           = options[:logger] || Logger.new(STDOUT)
  @logger.level     = options[:log_level] || Logger::DEBUG if @logger
  @event_handler    = @options[:event_handler]
  @state            = :stopped

  if RubyAMI.rbx?
    logger.warn 'The "timeout" parameter is not supported when using Rubinius'
  end

  stop_writing_actions

  @pending_actions  = {}
  @sent_actions     = {}
  @actions_lock     = Mutex.new

  @action_queue = GirlFriday::WorkQueue.new(:actions, :size => 1, :error_handler => ErrorHandler) do |action|
    @actions_write_blocker.wait
    _send_action action
    begin
      action.response action.sync_timeout
    rescue Timeout::Error => e
      logger.error "Timed out waiting for a response to #{action}"
    rescue RubyAMI::Error
      nil
    end
  end

  @message_processor = GirlFriday::WorkQueue.new(:messages, :size => 1, :error_handler => ErrorHandler) do |message|
    handle_message message
  end

  @event_processor = GirlFriday::WorkQueue.new(:events, :size => 2, :error_handler => ErrorHandler) do |event|
    handle_event event
  end
end

Instance Attribute Details

#action_queueObject (readonly)

Returns the value of attribute action_queue.



4
5
6
# File 'lib/ruby_ami/client.rb', line 4

def action_queue
  @action_queue
end

#actions_streamObject (readonly)

Returns the value of attribute actions_stream.



4
5
6
# File 'lib/ruby_ami/client.rb', line 4

def actions_stream
  @actions_stream
end

#events_streamObject (readonly)

Returns the value of attribute events_stream.



4
5
6
# File 'lib/ruby_ami/client.rb', line 4

def events_stream
  @events_stream
end

#optionsObject (readonly)

Returns the value of attribute options.



4
5
6
# File 'lib/ruby_ami/client.rb', line 4

def options
  @options
end

Instance Method Details

#_send_action(action) ⇒ Object



122
123
124
125
126
127
128
# File 'lib/ruby_ami/client.rb', line 122

def _send_action(action)
  logger.trace "[SEND]: #{action.inspect}" if logger
  transition_action_to_sent action
  actions_stream.send_action action
  action.state = :sent
  action
end

#handle_event(event) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
# File 'lib/ruby_ami/client.rb', line 110

def handle_event(event)
  logger.trace "[RECV-EVENTS]: #{event.inspect}" if logger
  case event
  when Stream::Connected
    
  when Stream::Disconnected
    stop
  else
    pass_event event
  end
end

#handle_message(message) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/ruby_ami/client.rb', line 74

def handle_message(message)
  logger.trace "[RECV-ACTIONS]: #{message.inspect}" if logger
  case message
  when Stream::Connected
    
  when Stream::Disconnected
    stop_writing_actions
    stop
  when Event
    action = @current_action_with_causal_events
    if action
      message.action = action
      action << message
      @current_action_with_causal_events = nil if action.complete?
    else
      if message.name == 'FullyBooted'
        pass_event message
        start_writing_actions
      else
        raise StandardError, "Got an unexpected event on actions socket! This AMI command may have a multi-message response. Try making Adhearsion treat it as causal action #{message.inspect}"
      end
    end
  when Response, Error
    action = sent_action_with_id message.action_id
    raise StandardError, "Received an AMI response with an unrecognized ActionID!! This may be an bug! #{message.inspect}" unless action
    message.action = action

    # By this point the write loop will already have started blocking by calling the response() method on the
    # action. Because we must collect more events before we wake the write loop up again, let's create these
    # instance variable which will needed when the subsequent causal events come in.
    @current_action_with_causal_events = action if action.has_causal_events?

    action << message
  end
end

#send_action(action, headers = {}, &block) ⇒ Object



66
67
68
69
70
71
72
# File 'lib/ruby_ami/client.rb', line 66

def send_action(action, headers = {}, &block)
  (action.is_a?(Action) ? action : Action.new(action, headers, &block)).tap do |action|
    logger.trace "[QUEUE]: #{action.inspect}" if logger
    register_pending_action action
    action_queue << action
  end
end

#startObject



48
49
50
51
52
53
54
# File 'lib/ruby_ami/client.rb', line 48

def start
  @events_stream  = new_stream lambda { |event| @event_processor << event }
  @actions_stream = new_stream lambda { |message| @message_processor << message }
  streams.each { |stream| stream.async.run }
  @state = :started
  streams.each { |s| Celluloid::Actor.join s }
end

#stopObject



56
57
58
59
60
61
62
63
64
# File 'lib/ruby_ami/client.rb', line 56

def stop
  streams.each do |stream|
    begin
      stream.terminate if stream.alive?
    rescue => e
      logger.error e if logger
    end
  end
end