Class: LogStash::Inputs::Azurenlogtable
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Azurenlogtable
- Defined in:
- lib/logstash/inputs/azurenlogtable.rb
Overview
Generate a repeating message.
This plugin is intented only as an example.
Constant Summary collapse
- TICKS_SINCE_EPOCH =
Time.utc(0001, 01, 01).to_i * 10000000
Instance Method Summary collapse
-
#build_latent_query ⇒ Object
process.
-
#initialize(*args) ⇒ Azurenlogtable
constructor
A new instance of Azurenlogtable.
- #on_new_data(entity, output_queue, last_good_timestamp) ⇒ Object
-
#partitionkey_from_datetime(time_string) ⇒ Object
Windows Azure Diagnostic’s algorithm for determining the partition key based on time is as follows: 1.
-
#process(output_queue) ⇒ Object
run.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#stop ⇒ Object
to_ticks.
-
#to_ticks(time_to_convert) ⇒ Object
Convert time to ticks.
Constructor Details
#initialize(*args) ⇒ Azurenlogtable
Returns a new instance of Azurenlogtable.
32 33 34 35 36 37 38 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 32 def initialize(*args) super(*args) if @collection_start_time_utc.nil? @collection_start_time_utc = (Time.now - ( 60 * @data_latency_minutes) - 60).iso8601 @logger.debug("collection_start_time_utc = #{@collection_start_time_utc}") end end |
Instance Method Details
#build_latent_query ⇒ Object
process
100 101 102 103 104 105 106 107 108 109 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 100 def build_latent_query @logger.debug("from #{@last_timestamp} to #{@until_timestamp}") if @last_timestamp > @until_timestamp @logger.debug("last_timestamp is in the future. Will not run any query!") return nil end query_filter = "(PartitionKey gt '#{@last_timestamp}' and PartitionKey lt '#{@until_timestamp}')" query_filter = query_filter.gsub('"','') return AzureQuery.new(@logger, @azure_table_service, @table_name, query_filter, @last_timestamp.to_s + "-" + @until_timestamp.to_s, @entity_count_to_process) end |
#on_new_data(entity, output_queue, last_good_timestamp) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 111 def on_new_data(entity, output_queue, ) #@logger.debug("new event") event = LogStash::Event.new(entity.properties) event.set("type", @table_name) @logger.debug("new event:" + event.to_hash.to_s) decorate(event) = event.get('PartitionKey') output_queue << event return end |
#partitionkey_from_datetime(time_string) ⇒ Object
Windows Azure Diagnostic’s algorithm for determining the partition key based on time is as follows:
-
Take time in UTC without seconds.
-
Convert it into .net ticks
-
add a ‘0’ prefix.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 128 def partitionkey_from_datetime(time_string) if time_string.nil? @logger.warn("partitionkey_from_datetime with invalid time_string. ") collection_time = (Time.now - ( 60 * @data_latency_minutes) - 60) else begin collection_time = Time.parse(time_string) rescue => e @logger.error("partitionkey_from_datetime fail with time_string =>" + time_string, :exception => e) collection_time = (Time.now - ( 60 * @data_latency_minutes) - 60) end end if collection_time #@logger.debug("collection time parsed successfully #{collection_time}") else raise(ArgumentError, "Could not parse the time_string => #{time_string}") end # if else block collection_time -= collection_time.sec ticks = to_ticks(collection_time) "0#{ticks}" end |
#process(output_queue) ⇒ Object
run
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 77 def process(output_queue) @until_timestamp = partitionkey_from_datetime(Time.now.iso8601) = nil log_count = 0 query = build_latent_query query.reset query.run( ->(entity) { = on_new_data(entity, output_queue, ) log_count += 1 }) @logger.debug("log total count => #{log_count}") if (!.nil?) @last_timestamp = end rescue => e @logger.error("Oh My, An error occurred. Error:#{e}: Trace: #{e.backtrace}", :exception => e) raise end |
#register ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 41 def register user_agent = "logstash-input-azurenlogtable-0.1.0" if @sas_token.nil? @client = Azure::Storage::Client.create( :storage_account_name => @account_name, :storage_access_key => @access_key, :storage_table_host => "https://#{@account_name}.table.#{@endpoint}", :user_agent_prefix => user_agent) else @client = Azure::Storage::Client.create( :storage_account_name => @account_name, :storage_sas_token => @sas_token, :storage_table_host => "https://#{@account_name}.table.#{@endpoint}", :user_agent_prefix => user_agent) end @azure_table_service = @client.table_client @last_timestamp = partitionkey_from_datetime(@collection_start_time_utc) @idle_delay = @idle_delay_seconds end |
#run(output_queue) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 64 def run(output_queue) while !stop? @logger.debug("Starting process method @" + Time.now.to_s); begin process(output_queue) rescue => e @logger.error("process fail", :exception => e) end @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s); sleep @idle_delay end # while end |
#stop ⇒ Object
to_ticks
157 158 159 160 161 162 163 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 157 def stop # nothing to do in this case so it is not necessary to define stop # examples of common "stop" tasks: # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads end |
#to_ticks(time_to_convert) ⇒ Object
Convert time to ticks
152 153 154 155 |
# File 'lib/logstash/inputs/azurenlogtable.rb', line 152 def to_ticks(time_to_convert) #@logger.debug("Converting time to ticks") time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH end |