Class: LogStash::Outputs::Application_insights::Channels
- Inherits:
-
Object
- Object
- LogStash::Outputs::Application_insights::Channels
- Defined in:
- lib/logstash/outputs/application_insights/channels.rb
Constant Summary collapse
- @@instance =
Channels.new
Class Method Summary collapse
Instance Method Summary collapse
- #channel(instrumentation_key, table_id) ⇒ Object
- #close ⇒ Object
-
#initialize ⇒ Channels
constructor
A new instance of Channels.
- #mark_invalid_instrumentation_key(instrumentation_key) ⇒ Object
- #mark_invalid_table_id(table_id) ⇒ Object
- #periodic_forward_events ⇒ Object
- #receive(event, encoded_event) ⇒ Object
Constructor Details
#initialize ⇒ Channels
Returns a new instance of Channels.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 27 def initialize configuration = Config.current @logger = configuration[:logger] @instrumentation_key_table_id_db = {} @channels = [ ] @create_semaphore = Mutex.new @default_instrumentation_key = configuration[:instrumentation_key] @default_table_id = configuration[:table_id] @tables = configuration[:tables] @flow_control = Flow_control.instance # launch tread that forward events from channels to azure storage periodic_forward_events end |
Class Method Details
.instance ⇒ Object
136 137 138 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 136 def self.instance @@instance end |
Instance Method Details
#channel(instrumentation_key, table_id) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 63 def channel ( instrumentation_key, table_id ) begin dispatch_channel( instrumentation_key, table_id ) rescue NoChannelError begin create_channel( instrumentation_key, table_id ) rescue ChannelExistError # can happen due to race conditions dispatch_channel( instrumentation_key, table_id ) end end end |
#close ⇒ Object
119 120 121 122 123 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 119 def close @channels.each do |channel| channel.close end end |
#mark_invalid_instrumentation_key(instrumentation_key) ⇒ Object
125 126 127 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 125 def mark_invalid_instrumentation_key ( instrumentation_key ) # TODO should go to lost and found container end |
#mark_invalid_table_id(table_id) ⇒ Object
129 130 131 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 129 def mark_invalid_table_id ( table_id ) # TODO should go to lost and found container end |
#periodic_forward_events ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 77 def periodic_forward_events Thread.new do loop do sleep( 0.5 ) channels = @create_semaphore.synchronize { @channels.dup } channels.each do |channel| channel.flush end end end end |
#receive(event, encoded_event) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/logstash/outputs/application_insights/channels.rb', line 47 def receive ( event, encoded_event ) if LogStash::SHUTDOWN == event @logger.info { "received a LogStash::SHUTDOWN event" } elsif LogStash::FLUSH == event @logger.info { "received a LogStash::FLUSH event" } else table_id = event[METADATA_FIELD_TABLE_ID] || event[FIELD_TABLE_ID] || @default_table_id instrumentation_key = event[METADATA_FIELD_INSTRUMENTATION_KEY] || event[FIELD_INSTRUMENTATION_KEY] || ( @tables[table_id][:instrumentation_key] if @tables[table_id] ) || @default_instrumentation_key @flow_control.pass_or_wait channel( instrumentation_key, table_id ) << event end end |