28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/fluent/plugin/in_sforce.rb', line 28
def start
super
login_info = login()
client = Restforce.new :oauth_token => login_info["sessionId"],
:instance_url => login_info["instanceUrl"]
th_low = DateTime.now().strftime("%Y-%m-%dT%H:%M:%S.000%Z")
if @topic == nil then
sleep(@polling_interval)
th_high = DateTime.now().strftime("%Y-%m-%dT%H:%M:%S.000%Z")
loop do
where = "CreatedDate <= #{th_high} AND CreatedDate > #{th_low}"
soql = ""
if @query =~ /^(.+)\s(where|WHERE)\s(.+)$/ then
soql = "#{$1} WHERE #{where} AND #{$3}"
elsif @query =~ /^(.+)$/ then
soql = "#{$1} WHERE #{where}"
end
begin
log.info "query: #{soql}"
records = client.query(soql)
records.each do |record|
Fluent::Engine.emit(@tag, Fluent::Engine.now, record)
end
sleep(@polling_interval)
th_low = th_high
th_high = DateTime.now().strftime("%Y-%m-%dT%H:%M:%S.000%Z")
rescue Restforce::UnauthorizedError => e
log.error e
login_info = login()
client = Restforce.new :oauth_token => login_info["sessionId"],
:instance_url => login_info["instanceUrl"]
end
end
else
EM.run do
log.info "suscribe: #{@topic}"
client.subscribe @topic do |message|
Fluent::Engine.emit(@tag, Fluent::Engine.now, message)
end
end
end
end
|