Class: LogStash::Outputs::Application_insights::Channel
- Inherits:
-
Object
- Object
- LogStash::Outputs::Application_insights::Channel
- Defined in:
- lib/logstash/outputs/application_insights/channel.rb
Instance Attribute Summary collapse
-
#blob_max_delay ⇒ Object
readonly
Returns the value of attribute blob_max_delay.
-
#event_format_ext ⇒ Object
readonly
Returns the value of attribute event_format_ext.
-
#failed_on_notify_retry_Q ⇒ Object
readonly
Returns the value of attribute failed_on_notify_retry_Q.
-
#failed_on_upload_retry_Q ⇒ Object
readonly
Returns the value of attribute failed_on_upload_retry_Q.
-
#instrumentation_key ⇒ Object
readonly
Returns the value of attribute instrumentation_key.
-
#table_id ⇒ Object
readonly
Returns the value of attribute table_id.
Instance Method Summary collapse
- #<<(event) ⇒ Object
- #close ⇒ Object
- #flush ⇒ Object
-
#initialize(instrumentation_key, table_id) ⇒ Channel
constructor
A new instance of Channel.
- #stopped? ⇒ Boolean
Constructor Details
#initialize(instrumentation_key, table_id) ⇒ Channel
Returns a new instance of Channel.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 34 def initialize ( instrumentation_key, table_id ) @closing = false configuration = Config.current @logger = configuration[:logger] @logger.debug { "Create a new channel, instrumentation_key / table_id : #{instrumentation_key} / #{table_id}" } @instrumentation_key = instrumentation_key @table_id = table_id set_table_properties( configuration ) @semaphore = Mutex.new @failed_on_upload_retry_Q = Queue.new @failed_on_notify_retry_Q = Queue.new @workers_channel = { } @active_blobs = [ Blob.new( self, 1 ) ] launch_upload_recovery_thread launch_notify_recovery_thread end |
Instance Attribute Details
#blob_max_delay ⇒ Object (readonly)
Returns the value of attribute blob_max_delay.
30 31 32 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 30 def blob_max_delay @blob_max_delay end |
#event_format_ext ⇒ Object (readonly)
Returns the value of attribute event_format_ext.
29 30 31 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 29 def event_format_ext @event_format_ext end |
#failed_on_notify_retry_Q ⇒ Object (readonly)
Returns the value of attribute failed_on_notify_retry_Q.
28 29 30 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 28 def failed_on_notify_retry_Q @failed_on_notify_retry_Q end |
#failed_on_upload_retry_Q ⇒ Object (readonly)
Returns the value of attribute failed_on_upload_retry_Q.
27 28 29 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 27 def failed_on_upload_retry_Q @failed_on_upload_retry_Q end |
#instrumentation_key ⇒ Object (readonly)
Returns the value of attribute instrumentation_key.
25 26 27 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 25 def instrumentation_key @instrumentation_key end |
#table_id ⇒ Object (readonly)
Returns the value of attribute table_id.
26 27 28 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 26 def table_id @table_id end |
Instance Method Details
#<<(event) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 65 def << ( event ) if @serialized_event_field && event[@serialized_event_field] serialized_event = serialize_serialized_event_field( event[@serialized_event_field] ) else serialized_event = ( EXT_EVENT_FORMAT_CSV == @serialization ? serialize_to_csv( event ) : serialize_to_json( event ) ) end if serialized_event sub_channel = @workers_channel[Thread.current] || @semaphore.synchronize { @workers_channel[Thread.current] = Sub_channel.new( @event_separator ) } sub_channel << serialized_event else @logger.warn { "event not uploaded, no relevant data in event. table_id: #{table_id}, event: #{event}" } end end |
#close ⇒ Object
54 55 56 57 58 59 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 54 def close @closing = true @active_blobs.each do |blob| blob.close end end |
#flush ⇒ Object
80 81 82 83 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 80 def flush block_list = collect_blocks enqueue_blocks( block_list ) end |
#stopped? ⇒ Boolean
61 62 63 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 61 def stopped? @closing end |