Class: LogStash::Outputs::Application_insights::Channels

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/channels.rb

Constant Summary collapse

@@instance =
Channels.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeChannels

Returns a new instance of Channels.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/logstash/outputs/application_insights/channels.rb', line 27

def initialize
  configuration = Config.current

  @logger = configuration[:logger]

  @instrumentation_key_table_id_db = {}
  @channels = [  ]
  @create_semaphore = Mutex.new

  @default_instrumentation_key = configuration[:instrumentation_key]
  @default_table_id = configuration[:table_id]
  @tables = configuration[:tables]

  @flow_control = Flow_control.instance

  # launch tread that forward events from channels to azure storage
  periodic_forward_events
end

Class Method Details

.instanceObject



136
137
138
# File 'lib/logstash/outputs/application_insights/channels.rb', line 136

def self.instance
  @@instance
end

Instance Method Details

#channel(instrumentation_key, table_id) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/logstash/outputs/application_insights/channels.rb', line 63

def channel ( instrumentation_key, table_id )
  begin
    dispatch_channel( instrumentation_key, table_id )

  rescue NoChannelError
    begin
      create_channel( instrumentation_key, table_id )
    rescue ChannelExistError # can happen due to race conditions
      dispatch_channel( instrumentation_key, table_id )
    end
  end
end

#closeObject



119
120
121
122
123
# File 'lib/logstash/outputs/application_insights/channels.rb', line 119

def close
  @channels.each do |channel|
    channel.close
  end
end

#mark_invalid_instrumentation_key(instrumentation_key) ⇒ Object



125
126
127
# File 'lib/logstash/outputs/application_insights/channels.rb', line 125

def mark_invalid_instrumentation_key ( instrumentation_key )
  # TODO should go to lost and found container
end

#mark_invalid_table_id(table_id) ⇒ Object



129
130
131
# File 'lib/logstash/outputs/application_insights/channels.rb', line 129

def mark_invalid_table_id ( table_id )
  # TODO should go to lost and found container
end

#periodic_forward_eventsObject



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/logstash/outputs/application_insights/channels.rb', line 77

def periodic_forward_events
  Thread.new do
    loop do
      sleep( 0.5 )
      channels = @create_semaphore.synchronize { @channels.dup }
      channels.each do |channel|
        channel.flush
      end
    end
  end
end

#receive(event, encoded_event) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/logstash/outputs/application_insights/channels.rb', line 47

def receive ( event, encoded_event )
  if LogStash::SHUTDOWN == event
    @logger.info { "received a LogStash::SHUTDOWN event" }

  elsif LogStash::FLUSH == event
    @logger.info { "received a LogStash::FLUSH event" }
  else
    table_id = event[METADATA_FIELD_TABLE_ID] || event[FIELD_TABLE_ID] || @default_table_id
    instrumentation_key = event[METADATA_FIELD_INSTRUMENTATION_KEY] || event[FIELD_INSTRUMENTATION_KEY] || ( @tables[table_id][:instrumentation_key] if @tables[table_id] ) || @default_instrumentation_key

    @flow_control.pass_or_wait
    channel( instrumentation_key, table_id ) << event
  end
end