Class: Fluent::Qi

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_qi.rb

Constant Summary collapse

DEFAULT_FORMAT_TYPE =
'json'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeQi

Returns a new instance of Qi.



55
56
57
58
# File 'lib/fluent/plugin/out_qi.rb', line 55

def initialize
  super
  @delayed = false
end

Instance Attribute Details

#delayedObject

Returns the value of attribute delayed.



53
54
55
# File 'lib/fluent/plugin/out_qi.rb', line 53

def delayed
  @delayed
end

Instance Method Details

#configure(conf) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_qi.rb', line 60

def configure(conf)
  if conf['output_type'] && !conf['format']
    conf['format'] = conf['output_type']
  end

  super
  
  (conf.key?('tenantId')) ? @tenantId = conf['tenantId'] : @tenantId = nil
  (conf.key?('namespaceId')) ? @namespaceId = conf['namespaceId'] : @attpath = nil
  
  (conf.key?('appId')) ? @appId = conf['appId'] : @appId = nil
  (conf.key?('appKey')) ? @appKey = conf['appKey'] : @appKey = nil

  (conf.key?('typeID')) ? @typeID = conf['typeID'] : @typeID = nil 
  (conf.key?('streamID')) ? @streamID = conf['streamID'] : @streamID = 'nil'
  (conf.key?('valuetag')) ? @valuetag = conf['valuetag'] : @valuetag = nil   
   
  if(@tenantId == nil  || @namespaceId == nil)
    #could change requirement and create a namespace as it is needed, but currently it must be already created

    $log.write("Please specify a tenantID and namespaceID.  Unable to send to QI.\n")
    return
  end
  
  if(@appId == nil  || @appKey == nil)
    $log.write("Please specify a appId and appKey.  Unable to send to QI.\n")
    return
  end        
end

#createTypeAndStreamObject



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/fluent/plugin/out_qi.rb', line 167

def createTypeAndStream
    typeURL = ("https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Types")

    # could open this up to let configuration dictate how the object is created.

    # need to figure out if this should be guided or open (either ask for information and it creates JSON, or just import the JSON and use directly).     

    jsonObj =  "{\"Id\":\"#{@typeID}\",\"Name\":\"FluentD\",\"Description\":\"This is a type for fluentd events\",\"QiTypeCode\":0,\"Properties\":[{\"Id\":\"Timestamp\",\"Name\":null,\"Description\":null,\"QiType\":{\"Id\":\"string\",\"Name\":null,\"Description\":null,\"QiTypeCode\":18,\"Properties\":null},\"IsKey\":true},{\"Id\":\"Value\",\"Name\":null,\"Description\":null,\"QiType\":{\"Id\":\"string\",\"Name\":null,\"Description\":null,\"QiTypeCode\":18,\"Properties\":null},\"IsKey\":false}]}"
    
    client = RestClient::Resource.new(typeURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE)

    client.post(jsonObj, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block|
        case response.code
        when 200,201,302                  
            #this returns a 201 when the create works and a 302 when the thing already existed


            getStreamURL = "https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Streams"
            jsonObjGetStream = "{\"Id\":\"#{@streamID}\",\"Name\":\"WaveData_SampleStream\",\"Description\":null,\"TypeId\":\"#{@typeID}\",\"BehaviorId\":null}"
            
            clientGetStream = RestClient::Resource.new(getStreamURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE)
            clientGetStream.post(jsonObjGetStream, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block|
                case response.code
                when 201,302
                #this returns a 201 when the create works and a 302 when the thing already existed


                else          
                    $log.write("Failed to get the stream created or found ")           
                    raise "Stream creation failed"  
                end                    
                }
        else
        $log.write("Failed to get the type created or found ")      
        raise "Type creation failed"  
        end
    }
end

#format(tag, time, record) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/fluent/plugin/out_qi.rb', line 202

def format(tag, time, record)
  #assumes time is a bignum in FluentD pattern not a PI Time

  value = nil
  if(@valuetag != nil)
    value = record[@valuetag].to_s
  else
    z = record["Value"]
    z2 = z["log"]
    value = z2.to_s
  end
    return "{\"Timestamp\":\"#{Time.at(time).localtime}\",\"Value\":\"#{value}\"},"
end

#gettoken(getstream) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fluent/plugin/out_qi.rb', line 141

def gettoken(getstream)
    auth_ctx = ADAL::AuthenticationContext.new("login.windows.net","365cc376-d621-41b0-b32f-7a982b22a63c")
    client_cred = ADAL::ClientCredential.new(@appId, @appKey)

    token_response = auth_ctx.acquire_token_for_client("https://qihomeprod.onmicrosoft.com/historian",client_cred)
    case token_response
    when ADAL::SuccessResponse
        # ADAL successfully exchanged the authorization code for an access_token.

        # The token_response includes other information but we only care about the

        # access token and the refresh token.

        @full_token = token_response
        @access_token = token_response.access_token
        @refresh_token = token_response.refresh_token    
        @authHeader = ("bearer " + @access_token)
        
        if(getstream)
          createTypeAndStream()
        end

    when ADAL::ErrorResponse
        # ADAL failed to exchange the authorization code for an access_token.

        $log.write("Failed to get an access_token")
        raise "ADAL token failed"  
    end
end

#prefer_buffered_processingObject



45
46
47
# File 'lib/fluent/plugin/out_qi.rb', line 45

def prefer_buffered_processing
  false
end

#prefer_delayed_commitObject



49
50
51
# File 'lib/fluent/plugin/out_qi.rb', line 49

def prefer_delayed_commit
  @delayed
end

#startObject



89
90
91
92
93
94
# File 'lib/fluent/plugin/out_qi.rb', line 89

def start
  super
  gettoken(true) 
  @secondWriteAttempt = false
  @three  = false
end

#write(chunk) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/out_qi.rb', line 96

def write(chunk)
  if(@three)
    return
  end
  data = chunk.read

  jsonObj = nil     
        
  if(@valuetag != nil)
    jsonObj = "[" + data.chop + "]" # removes trailing ',' and makes a valid json object       

  else 
    time = Time.now.utc
     jsonObj = "[{\"Timestamp\":\"#{time}\",\"Value\":\"#{data}\"}]"
  end
  $log.write(jsonObj)    

  if(Time.at(@full_token.expires_on).utc  < (Time.now.utc + 30))
    gettoken(false) #the refresh token isn't working so when we are withing 30 seconds get a new token

  end

  sendDataURL = "https://qi-data.osisoft.com/Qi/#{@tenantId}/#{@namespaceId}/Streams/#{@streamID}/Data/UpdateValues"

  clientGetStream = RestClient::Resource.new(sendDataURL, :verify_ssl => OpenSSL::SSL::VERIFY_NONE)
  clientGetStream.put(jsonObj, :authorization => @authHeader, :content_type => "application/json", :accept => "text/plain") { |response, request, result, &block|
      case response.code
      when 200,201,302   
        #201 means it is created, 302 means it was created and it was just retrieved.

        secondWriteAttempt=false
      else
        $log.write(response.to_s)    
        $log.write("\nFailed to write the value.  Regetting stream and type to see if this helps.\n")    
        if(!@secondWriteAttempt) 
          @secondWriteAttempt=true
          createTypeAndStream()   
          write(chunk)
        else
          $log.write("Failed to write the value.  Regetting stream and type did not help.\n")    
          @three = true
          raise "Writing Values failed"  
        end
      end
      }           
    $log.write("QI successful writes\n")
end