Class: FlowTag::FlowDB

Inherits:
Object
  • Object
show all
Defined in:
lib/flowtag/flowdb.rb

Constant Summary collapse

ST =
0
SIP =
1
DIP =
2
SP =
3
DP =
4
PKTS =
5
BYTES =
6
FIRST_PKT =
7
TAGS =
8

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pcapfile, basedir = nil) ⇒ FlowDB

Returns a new instance of FlowDB.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/flowtag/flowdb.rb', line 34

def initialize(pcapfile,basedir=nil)
	@pcapfile = pcapfile
	basedir = File.dirname(pcapfile) unless basedir
	basename = basedir+"/"+File.basename(pcapfile)
	raise "Cannot find pcapfile, #{pcapfile}" unless File.exists?(pcapfile)
	@pcapfh = File.new(pcapfile, 'rb')
	unless File.exists?(basename+".flows") and File.exists?(basename+'.pkts') and File.exists?(basename+'.tags')
		# the flow and packet database are missing, let's generate them
		flowdb = File.new(basename+".flows", 'wb')
		pktdb = File.new(basename+".pkts", 'wb')
		tagdb = File.new(basename+".tags", 'w')
		create_flowdb(basename, flowdb, pktdb)
		flowdb.close
		pktdb.close
		tagdb.close
		@flowdb = File.new(basename+".flows", 'rb')
		@pktdb = File.new(basename+".pkts", 'rb')
		@tagdb = File.new(basename+".tags", 'r')
		@flows = readflows
		@tags_flows = {}
		@flows_tags = {}
	else
		@flowdb = File.new(basename+".flows", 'rb')
		@pktdb = File.new(basename+".pkts", 'rb')
		@tagdb = File.new(basename+".tags", 'r')
		@flows = readflows
		# readtags must be called AFTER readflows
		@tags_flows, @flows_tags = readtags
	end
	@tags_updated = false
end

Instance Attribute Details

#flowsObject (readonly)

Returns the value of attribute flows.



33
34
35
# File 'lib/flowtag/flowdb.rb', line 33

def flows
  @flows
end

#tags_flowsObject (readonly)

Returns the value of attribute tags_flows.



33
34
35
# File 'lib/flowtag/flowdb.rb', line 33

def tags_flows
  @tags_flows
end

Instance Method Details

#closeObject



66
67
68
69
70
71
# File 'lib/flowtag/flowdb.rb', line 66

def close
	@pcapfh.close if @pcapfh
	@pktdb.close if @pktdb
	@flowdb.close if @flowdb
	@tagdb.close if @tagdb
end

#create_flowdb(pcapfile, flowdb, pktdb) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/flowtag/flowdb.rb', line 73

def create_flowdb(pcapfile, flowdb, pktdb)
	offset = 24 # offset into the pcap file, starts at 24 to skip header
	pktid = 0   # database id of the pkt, increments for each matching packet
	flows = {}  # flow hash to store and check for keys
	pkts = []   # stores the packet database
	pcap = PcapParser.new(File.new(pcapfile,'rb'))
	pcap.each do |pkt|
		unless pkt.tcp?
			offset += 16 + pkt.length
			next
		end
		tuple = [pkt.ip_src, pkt.tcp_sport, pkt.tcp_dport, pkt.ip_dst]
		key = tuple.join "|"
		rkey = tuple.reverse.join "|"
		if flows[rkey]
			key = rkey
		end
		if flows[key]
			last_pkt_id = flows[key][:last_pkt]
			pkts[last_pkt_id][:next_pkt] = pktid
			flows[key][:last_pkt] = pktid
		else
			flows[key] = { :st => pkt.time, :sip => pkt.ip_src, :dip => pkt.ip_dst, :sp => pkt.tcp_sport, :dp => pkt.tcp_dport, :pkts => 0, :bytes => 0 }
			flows[key][:first_pkt] = flows[key][:last_pkt] = pktid
		end
		flows[key][:pkts] += 1
		flows[key][:bytes] += pkt.length
		pkts[pktid] = { :offset => offset, :next_pkt => 0 }
		offset += 16 + pkt.length
		pktid+=1
	end
	pcap.close

	# write out the flow database and the packet database
	flows.sort_by{|key,flow| flow[:st]}.each do |key,flow|
		flowdb.write( [ flow[:st], flow[:sip], flow[:dip], flow[:sp], flow[:dp], flow[:pkts], flow[:bytes], flow[:first_pkt] ].pack("NNNnnNNn") )
		flows[key] = [flow[:st], [flow[:sip]].pack("N").unpack("C4").join("."), [flow[:dip]].pack("N").unpack("C4").join("."), flow[:sp], flow[:dp], flow[:pkts], flow[:bytes], flow[:first_pkt], []]
	end
	pkts.each do |pkt|
		pktdb.write([pkt[:offset],pkt[:next_pkt]].pack("Nn"))
	end
	return flows
end

#dumpflowsObject



117
118
119
120
121
# File 'lib/flowtag/flowdb.rb', line 117

def dumpflows
	@flows.sort_by { |k,f| f[ST].to_i }.each do |key,flow|
		puts flow.join(" ")
	end
end

#flows_taggedwith(tag) ⇒ Object



194
195
196
197
198
199
200
201
202
203
# File 'lib/flowtag/flowdb.rb', line 194

def flows_taggedwith(tag)
	keys = @tags_flows[tag]
	flows = []
	if keys
		keys.each do |key|
			flows.push(@flows[key])
		end
	end
	flows
end

#getdata(offset) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/flowtag/flowdb.rb', line 225

def getdata(offset)
	@pcapfh.seek(offset)
	# this needs to be endian sensitive...
	if @endian
		(tv_sec, tv_usec, caplen, origlen) = @pcapfh.read(16).unpack(@endian)
	else
		(tv_sec, tv_usec, caplen, origlen) = @pcapfh.read(16).unpack("VVVV")
		@endian = "VVVV"
		if caplen > 5000
			@pcapfh.seek(offset)
			(tv_sec, tv_usec, caplen, origlen) = @pcapfh.read(16).unpack("NNNN")
			@endian = "NNNN"
		end
	end
	pkt = @pcapfh.read(caplen)
	type = (pkt[12,2].unpack("n"))[0]
	return nil unless type == 0x0800
	return nil unless pkt[14] == 0x45
	return nil unless pkt[14+10-1] == 0x06
	tcp_header_len = (pkt[14+20+12]>>4)<<2
	pkt[14+20+tcp_header_len,1000] || ''
end

#getfirstpktid(sip, dip, sp, dp) ⇒ Object



209
210
211
212
213
214
215
216
# File 'lib/flowtag/flowdb.rb', line 209

def getfirstpktid(sip, dip, sp, dp)
	@flows.each do |key,flow|
		if flow[SIP] == sip and flow[DIP] == dip and flow[SP] == sp and flow[DP] == dp
			return flow[FIRST_PKT]
		end
	end
	-1
end

#getflowdata(sip, dip, sp, dp, limit = nil) ⇒ Object



260
261
262
263
264
# File 'lib/flowtag/flowdb.rb', line 260

def getflowdata(sip, dip, sp, dp, limit=nil)
	first_pkt = getfirstpktid(sip, dip, sp, dp)
	return '' if first_pkt == -1
	return getflowdata_frompkt(first_pkt, limit)
end

#getflowdata_frompkt(first_pkt, limit = nil) ⇒ Object



248
249
250
251
252
253
254
255
256
257
258
# File 'lib/flowtag/flowdb.rb', line 248

def getflowdata_frompkt(first_pkt, limit=nil)
	(poff,npkt) = fp_rec = getpktrec(first_pkt)
	payload = getdata(poff)
	while npkt != 0
		(poff,npkt) = getpktrec(npkt)
		data = getdata(poff)
		payload += data if data
		break if limit and payload.length > limit
	end
	payload
end

#getflowsObject



136
137
138
# File 'lib/flowtag/flowdb.rb', line 136

def getflows
	return @flows
end

#getflowtags(flow) ⇒ Object



189
190
191
192
# File 'lib/flowtag/flowdb.rb', line 189

def getflowtags(flow)
	key = flow.join("|")
	@flows_tags[key] || []
end

#getpktrec(pktid) ⇒ Object



218
219
220
221
222
223
# File 'lib/flowtag/flowdb.rb', line 218

def getpktrec(pktid)
	offset = 6*pktid
	@pktdb.seek(offset)
	(poff, pktid) = @pktdb.read(6).unpack("Nn")
	[poff, pktid] 
end

#readflowsObject



123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/flowtag/flowdb.rb', line 123

def readflows
	flows = {}
	@flowdb.seek(0)
	while ! @flowdb.eof?
		(st,sip,dip,sp,dp,pkts,bytes,first_pkt) = @flowdb.read(26).unpack("NNNnnNNn")
		sip = IPAddr.ntop([sip].pack("N"))
		dip = IPAddr.ntop([dip].pack("N"))
		key = [sip,dip,sp,dp].join("|")
		flows[key] = [st,sip,dip,sp,dp,pkts,bytes,first_pkt,[]]
	end
	flows
end

#readtagsObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/flowtag/flowdb.rb', line 140

def readtags
	@tagdb.seek 0
	flows_tags = {}
	tags_flows = {}
	@tagdb.each_line do |l|
		(sip,dip,sp,dp,*tags)=l.strip.split(/\|/)
		key = [sip,dip,sp,dp].join("|")
		flows_tags[key] = tags
		@flows[key][TAGS] = tags
		tags.each do |tag|
			tags_flows[tag] = [] unless tags_flows[tag]
			tags_flows[tag].push(key)
		end 
	end
	[tags_flows, flows_tags]
end

#tag_flow(flow, tags) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/flowtag/flowdb.rb', line 170

def tag_flow(flow, tags)
	@tags_updated = true
	tags.uniq!
	key = flow.join("|")
	if @flows_tags[key]
		currtags = @flows_tags[key]
		currtags.each do |tag|
			@tags_flows[tag].delete(key)
			@tags_flows.delete(tag) if @tags_flows[tag].length == 0
		end
	end
	@flows_tags[key] = tags
	@flows[key][TAGS] = tags
	tags.each do |tag|
		@tags_flows[tag] = [] unless @tags_flows[tag]
		@tags_flows[tag].push(key)
	end
end

#tagsObject



205
206
207
# File 'lib/flowtag/flowdb.rb', line 205

def tags
	@tags_flows.keys
end

#writetagdbObject



157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/flowtag/flowdb.rb', line 157

def writetagdb
	return true unless @tags_updated
	@tagdb.close if @tagdb
	tagdb = File.new(@pcapfile+'.tags', 'w')
	@flows_tags.each do |key,tags|
		tagdb.puts key+"|"+tags.join("|")
	end
	tagdb.close
	@tagdb = File.new(@pcapfile+'.tags', 'r')
	@tags_updated = false
	true
end