Class: Fluent::OsmocomSpectrumSenseInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_osmocom_spectrum_sense.rb

Constant Summary collapse

REG =
/^(?<time>.+) center_freq (?<center_freq>\d+\.\d+) freq (?<freq>\d+\.\d+) power_db (?<power_db>-?\d+\.\d+) noise_floor_db (?<noise_floor_db>-?\d+\.\d+)$/

Instance Method Summary collapse

Constructor Details

#initializeOsmocomSpectrumSenseInput

Returns a new instance of OsmocomSpectrumSenseInput.



10
11
12
# File 'lib/fluent/plugin/in_osmocom_spectrum_sense.rb', line 10

def initialize
  super
end

Instance Method Details

#build_optionsObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/in_osmocom_spectrum_sense.rb', line 61

def build_options
  options = ""
  if @sample_rate
    options += " -s #{@sample_rate}"
  end
  if @dwell_delay
    options += " --dwell-delay=#{@dwell_delay}"
  end
  if @tune_delay
    options += " --tune-delay=#{@tune_delay}"
  end
  if @channel_bandwidth
    options += " -b #{@channel_bandwidth}"
  end

  options += " #{@minfreq} #{@maxfreq}"
  return options
end

#collect_osmocom_output(stdout) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_osmocom_spectrum_sense.rb', line 81

def collect_osmocom_output(stdout)
  collected = []
  begin
    readlines_nonblock(stdout).each do |line|
      # XXX: parse here
      log.debug "line => '#{line}'"
      match = line.match(REG)
      next unless match
      obj = {
        :time => Time.parse(match[:time]),
        :center_freq => match[:center_freq].to_f,
        :freq => match[:freq].to_f,
        :power_db => match[:power_db].to_f,
        :noise_floor_db => match[:noise_floor_db].to_f,
      }
      log.debug "new osmocom_spectrum_sense input => #{obj}"
      collected << obj
    end
  rescue => e
    log.error "failed to read or parse line", :error => e.to_s,
      :error_class => e.class.to_s
  end

  collected.each do |obj|
    time = obj[:time].nil? ? Engine.now : Fluent::EventTime.from_time(obj[:time])
    obj.delete(:time) # including time could problem in some output plugin
    router.emit(@tag, time, obj)
  end
rescue => e
  log.error "failed to collect output from tshark",
    :error => e.to_s,
    :error_class => e.class.to_s
end

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/fluent/plugin/in_osmocom_spectrum_sense.rb', line 22

def configure(conf)
  super

  if !@minfreq.is_a?(Integer) or !@maxfreq.is_a?(Integer)
    raise ConfigError, "minfreq/maxfreq is required to be Integer"
  end
end

#readlines_nonblock(io) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/fluent/plugin/in_osmocom_spectrum_sense.rb', line 115

def readlines_nonblock(io)
  @nbbuffer = "" if @nbbuffer == nil
  @nbbuffer += io.read_nonblock(65535)
  lines = []
  while idx = @nbbuffer.index("\n")
    lines << @nbbuffer[0..idx-1]
    @nbbuffer = @nbbuffer[idx+1..-1]
  end
  return lines
rescue
  return []
end

#runObject



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/in_osmocom_spectrum_sense.rb', line 47

def run
  options = build_options
  cmdline = "osmocom_spectrum_sense #{options}"
  print cmdline + "\n"
  stdin, stdout, stderr, @th_osmocom = *Open3.popen3(cmdline)

  while @th_osmocom.alive?
    collect_osmocom_output(stdout)
  end
rescue => e
  log.error "unexpected error", :error => e.to_s
  log.error_backtrace e.backtrace
end

#shutdownObject



36
37
38
39
40
41
42
43
44
45
# File 'lib/fluent/plugin/in_osmocom_spectrum_sense.rb', line 36

def shutdown
  if @th_osmocom and @th_osmocom.alive?
    Process.kill("INT", @th_osmocom.pid)
  end
  @thread.join
rescue => e
  log.error "osmocom_spectrum_sense failed to shutdown", :error => e.to_s,
    :error_class => e.class.to_s
  log.error_backtrace e.backtrace
end

#startObject



30
31
32
33
34
# File 'lib/fluent/plugin/in_osmocom_spectrum_sense.rb', line 30

def start
  super

  @thread = Thread.new(&method(:run))
end