Class: LogStash::Inputs::Azurenlogtable

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/azurenlogtable.rb

Overview

Generate a repeating message.

This plugin is intented only as an example.

Constant Summary collapse

TICKS_SINCE_EPOCH =
Time.utc(0001, 01, 01).to_i * 10000000

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Azurenlogtable

Returns a new instance of Azurenlogtable.



32
33
34
35
36
37
38
# File 'lib/logstash/inputs/azurenlogtable.rb', line 32

def initialize(*args)
  super(*args)
  if @collection_start_time_utc.nil?
    @collection_start_time_utc = (Time.now - ( 60 * @data_latency_minutes) - 60).iso8601
    @logger.debug("collection_start_time_utc = #{@collection_start_time_utc}")
  end
end

Instance Method Details

#build_latent_queryObject

process



100
101
102
103
104
105
106
107
108
109
# File 'lib/logstash/inputs/azurenlogtable.rb', line 100

def build_latent_query
  @logger.debug("from #{@last_timestamp} to #{@until_timestamp}")
  if @last_timestamp > @until_timestamp
    @logger.debug("last_timestamp is in the future. Will not run any query!")
    return nil
  end
  query_filter = "(PartitionKey gt '#{@last_timestamp}' and PartitionKey lt '#{@until_timestamp}')"
  query_filter = query_filter.gsub('"','')
  return AzureQuery.new(@logger, @azure_table_service, @table_name, query_filter, @last_timestamp.to_s + "-" + @until_timestamp.to_s, @entity_count_to_process)
end

#on_new_data(entity, output_queue, last_good_timestamp) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/logstash/inputs/azurenlogtable.rb', line 111

def on_new_data(entity, output_queue, last_good_timestamp)
  #@logger.debug("new event")
  
  event = LogStash::Event.new(entity.properties)
  event.set("type", @table_name)
  @logger.debug("new event:" + event.to_hash.to_s)

  decorate(event)
  last_good_timestamp = event.get('PartitionKey')
  output_queue << event
  return last_good_timestamp
end

#partitionkey_from_datetime(time_string) ⇒ Object

Windows Azure Diagnostic’s algorithm for determining the partition key based on time is as follows:

  1. Take time in UTC without seconds.

  2. Convert it into .net ticks

  3. add a ‘0’ prefix.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/logstash/inputs/azurenlogtable.rb', line 128

def partitionkey_from_datetime(time_string)
  if time_string.nil?
    @logger.warn("partitionkey_from_datetime with invalid time_string. ")
    collection_time = (Time.now - ( 60 * @data_latency_minutes) - 60)
  else
    begin
      collection_time = Time.parse(time_string)
    rescue => e  
      @logger.error("partitionkey_from_datetime fail with time_string =>" + time_string, :exception => e)
      collection_time = (Time.now - ( 60 * @data_latency_minutes) - 60)
    end
  end
  if collection_time
    #@logger.debug("collection time parsed successfully #{collection_time}")
  else
    raise(ArgumentError, "Could not parse the time_string => #{time_string}")
  end # if else block

  collection_time -= collection_time.sec
  ticks = to_ticks(collection_time)
  "0#{ticks}"
end

#process(output_queue) ⇒ Object

run



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/logstash/inputs/azurenlogtable.rb', line 77

def process(output_queue)
  @until_timestamp = partitionkey_from_datetime(Time.now.iso8601)
  last_good_timestamp = nil

  log_count = 0

  query = build_latent_query
  query.reset
  query.run( ->(entity) {
    last_good_timestamp = on_new_data(entity, output_queue, last_good_timestamp)
    log_count += 1
  })
  
  @logger.debug("log total count => #{log_count}")
  if (!last_good_timestamp.nil?)
    @last_timestamp = last_good_timestamp
  end

rescue => e
  @logger.error("Oh My, An error occurred. Error:#{e}: Trace: #{e.backtrace}", :exception => e)
  raise
end

#registerObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/logstash/inputs/azurenlogtable.rb', line 41

def register
  user_agent = "logstash-input-azurenlogtable-0.1.0"
  
  if @sas_token.nil?
      @client = Azure::Storage::Client.create(
      :storage_account_name => @account_name,
      :storage_access_key => @access_key,
      :storage_table_host => "https://#{@account_name}.table.#{@endpoint}",
      :user_agent_prefix => user_agent)
  else
    @client = Azure::Storage::Client.create(
      :storage_account_name => @account_name,
      :storage_sas_token => @sas_token,
      :storage_table_host => "https://#{@account_name}.table.#{@endpoint}",
      :user_agent_prefix => user_agent)
  end

  @azure_table_service = @client.table_client
  @last_timestamp = partitionkey_from_datetime(@collection_start_time_utc)
  @idle_delay = @idle_delay_seconds
end

#run(output_queue) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/logstash/inputs/azurenlogtable.rb', line 64

def run(output_queue)
  while !stop?
    @logger.debug("Starting process method @" + Time.now.to_s);
    begin
      process(output_queue)
    rescue => e
      @logger.error("process fail", :exception => e)
    end
    @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s);
    sleep @idle_delay
  end # while
end

#stopObject

to_ticks



157
158
159
160
161
162
163
# File 'lib/logstash/inputs/azurenlogtable.rb', line 157

def stop
  # nothing to do in this case so it is not necessary to define stop
  # examples of common "stop" tasks:
  #  * close sockets (unblocking blocking reads/accepts)
  #  * cleanup temporary files
  #  * terminate spawned threads
end

#to_ticks(time_to_convert) ⇒ Object

Convert time to ticks



152
153
154
155
# File 'lib/logstash/inputs/azurenlogtable.rb', line 152

def to_ticks(time_to_convert)
  #@logger.debug("Converting time to ticks")
  time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH
end