Class: Rookout::ComWs::AgentComWs

Inherits:
Object
  • Object
show all
Includes:
EventEmitter
Defined in:
lib/rookout/com_ws/agent_com_ws.rb

Defined Under Namespace

Classes: ExitMessage, FlushMessage

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(output, agent_host, agent_port, proxy, token, labels, print_on_connect) ⇒ AgentComWs

Returns a new instance of AgentComWs.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rookout/com_ws/agent_com_ws.rb', line 28

def initialize output, agent_host, agent_port, proxy, token, labels, print_on_connect
  if agent_host.nil? || agent_host.empty?
    @uri = ""
  else
    agent_host_with_protocl = agent_host.include?("://") ? agent_host : "ws://#{agent_host}"
    @uri = "#{agent_host_with_protocl}:#{agent_port}/v1"
  end
  if proxy.nil? || proxy.empty?
    @proxy = nil
  else
    @proxy = proxy.include?("://") ? proxy : "http://#{proxy}"
  end

  @token = token
  @token_valid = false

  @output = output
  @info = Information.new labels
  reset_id

  @main_thread = nil
  @outgoing_thread = nil
  @pending_messages = Queue.new
  @pending_messages_length = 0

  @running = false
  @ready_event = Concurrent::Event.new
  once("Com::Rookout::InitialAugsCommand") { @ready_event.set }

  @print_on_initial_connection = print_on_connect
end

Instance Attribute Details

#pending_messagesObject (readonly)

Returns the value of attribute pending_messages.



26
27
28
# File 'lib/rookout/com_ws/agent_com_ws.rb', line 26

def pending_messages
  @pending_messages
end

Instance Method Details

#add(envelope_wrapper) ⇒ Object



60
61
62
63
64
65
66
67
68
69
# File 'lib/rookout/com_ws/agent_com_ws.rb', line 60

def add envelope_wrapper
  msg_size = envelope_wrapper.calculate_size
  if @pending_messages_length + msg_size > Config.agent_com_max_queue_messages_length ||
     queue_full?
    raise Exceptions::RookOutputQueueFull
  end

  @pending_messages.push envelope_wrapper
  @pending_messages_length += msg_size
end

#connectObject



75
76
77
78
79
80
# File 'lib/rookout/com_ws/agent_com_ws.rb', line 75

def connect
  @running = true

  @main_thread = Thread.new { connection_thread }
  @main_thread.name = "rookout-connection-thread"
end

#flush_all_messagesObject



100
101
102
103
104
# File 'lib/rookout/com_ws/agent_com_ws.rb', line 100

def flush_all_messages
  flush = FlushMessage
  @pending_messages.push flush
  flush.event.wait Config.agent_com_flush_timeout
end

#queue_full?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/rookout/com_ws/agent_com_ws.rb', line 71

def queue_full?
  @pending_messages.length >= Config.agent_com_max_queued_messages
end

#stopObject



82
83
84
85
86
87
88
89
# File 'lib/rookout/com_ws/agent_com_ws.rb', line 82

def stop
  @running = false

  # Ask outgoing thread to exit (if running)
  @pending_messages << ExitMessage.new(@outgoing_thread)

  @main_thread.join
end

#wait_for_readyObject



91
92
93
94
95
96
97
98
# File 'lib/rookout/com_ws/agent_com_ws.rb', line 91

def wait_for_ready
  is_finished = @ready_event.wait Config.agent_com_timeout
  # We didn't finish - will keep trying in the background
  raise Exceptions::RookCommunicationException unless is_finished

  # We finished - raise if we failed
  raise @connection_error if @connection_error
end