Class: Fog::AWS::Kinesis::Mock

Inherits:
Object
  • Object
show all
Defined in:
lib/fog/aws/kinesis.rb,
lib/fog/aws/requests/kinesis/put_record.rb,
lib/fog/aws/requests/kinesis/get_records.rb,
lib/fog/aws/requests/kinesis/put_records.rb,
lib/fog/aws/requests/kinesis/split_shard.rb,
lib/fog/aws/requests/kinesis/list_streams.rb,
lib/fog/aws/requests/kinesis/merge_shards.rb,
lib/fog/aws/requests/kinesis/create_stream.rb,
lib/fog/aws/requests/kinesis/delete_stream.rb,
lib/fog/aws/requests/kinesis/describe_stream.rb,
lib/fog/aws/requests/kinesis/add_tags_to_stream.rb,
lib/fog/aws/requests/kinesis/get_shard_iterator.rb,
lib/fog/aws/requests/kinesis/list_tags_for_stream.rb,
lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Mock

Returns a new instance of Mock.



150
151
152
153
154
155
156
# File 'lib/fog/aws/kinesis.rb', line 150

def initialize(options={})
  @account_id        = Fog::AWS::Mock.owner_id
  @aws_access_key_id = options[:aws_access_key_id]
  @region            = options[:region] || 'us-east-1'

  Fog::AWS.validate_region!(@region)
end

Class Method Details

.dataObject



130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/fog/aws/kinesis.rb', line 130

def self.data
  @mutex.synchronize do
    @data ||= Hash.new do |hash, region|
      hash[region] = Hash.new do |region_hash, key|
        region_hash[key] = {
          :kinesis_streams => {}
        }
      end
    end
    
    yield @data if block_given?
  end
end

.next_sequence_numberObject



170
171
172
173
174
175
176
# File 'lib/fog/aws/kinesis.rb', line 170

def self.next_sequence_number
  @mutex.synchronize do
    @sequence_number ||= -1
    @sequence_number += 1
    @sequence_number.to_s
  end
end

.next_shard_idObject



180
181
182
183
184
185
186
# File 'lib/fog/aws/kinesis.rb', line 180

def self.next_shard_id
  @mutex.synchronize do
    @shard_id ||= -1
    @shard_id += 1
    "shardId-#{@shard_id.to_s.rjust(12, "0")}"
  end
end

.resetObject



144
145
146
147
148
# File 'lib/fog/aws/kinesis.rb', line 144

def self.reset
  @mutex.synchronize do
    @data = nil
  end
end

Instance Method Details

#add_tags_to_stream(options = {}) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fog/aws/requests/kinesis/add_tags_to_stream.rb', line 30

def add_tags_to_stream(options={})
  stream_name = options.delete("StreamName")
  tags = options.delete("Tags")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  stream["Tags"] = stream["Tags"].merge(tags)

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#create_stream(options = {}) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fog/aws/requests/kinesis/create_stream.rb', line 30

def create_stream(options={})
  stream_name = options.delete("StreamName")
  shard_count = options.delete("ShardCount") || 1
  stream_arn = "arn:aws:kinesis:#{@region}:#{@account_id}:stream/#{stream_name}"

  if data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceInUse.new("Stream #{stream_name} under account #{@account_id} already exists.")
  end

  shards = (0...shard_count).map do |shard|
    {
      "HashKeyRange"=>{
        "EndingHashKey"=>"340282366920938463463374607431768211455",
        "StartingHashKey"=>"0"
      },
      "SequenceNumberRange"=>{
        "StartingSequenceNumber"=> next_sequence_number
      },
      "ShardId"=>next_shard_id,
      "Records" => []
    }
  end

  data[:kinesis_streams] = [{
      "HasMoreShards" => false,
      "StreamARN" => stream_arn,
      "StreamName" => stream_name,
      "StreamStatus" => "ACTIVE",
      "Shards" => shards,
      "Tags" => {}
    }]

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#dataObject



158
159
160
161
162
# File 'lib/fog/aws/kinesis.rb', line 158

def data
  self.class.data do |data|
    data[@region][@aws_access_key_id]
  end
end

#delete_stream(options = {}) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fog/aws/requests/kinesis/delete_stream.rb', line 28

def delete_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  data[:kinesis_streams].delete(stream)

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#describe_stream(options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fog/aws/requests/kinesis/describe_stream.rb', line 36

def describe_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  # Strip Records key out of shards for response
  shards = stream["Shards"].reject{ |k,_| k == "Records" }

  response = Excon::Response.new
  response.status = 200
  response.body = { "StreamDescription" => stream.dup.merge("Shards" => shards) }
  response
end

#get_records(options = {}) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fog/aws/requests/kinesis/get_records.rb', line 32

def get_records(options={})
  shard_iterator = Fog::JSON.decode(options.delete("ShardIterator"))
  limit = options.delete("Limit") || -1
  stream_name = shard_iterator["StreamName"]
  shard_id = shard_iterator["ShardId"]
  starting_sequence_number = (shard_iterator["StartingSequenceNumber"] || 1).to_i

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.")
  end

  records = []
  shard["Records"].each do |record|
    next if record["SequenceNumber"].to_i < starting_sequence_number
    records << record
    break if records.size == limit
  end

  shard_iterator["StartingSequenceNumber"] = if records.empty?
    starting_sequence_number.to_s
  else
    (records.last["SequenceNumber"].to_i + 1).to_s
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "MillisBehindLatest"=> 0,
    "NextShardIterator"=> Fog::JSON.encode(shard_iterator),
    "Records"=> records
  }
  response
end

#get_shard_iterator(options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fog/aws/requests/kinesis/get_shard_iterator.rb', line 36

def get_shard_iterator(options={})
  stream_name = options["StreamName"]

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "ShardIterator" => Fog::JSON.encode(options) # just encode the options that were given, we decode them in get_records
  }
  response
end

#list_streams(options = {}) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/fog/aws/requests/kinesis/list_streams.rb', line 28

def list_streams(options={})
  response = Excon::Response.new
  response.status = 200
  response.body =           {
    "HasMoreStreams" => false,
    "StreamNames" => data[:kinesis_streams].map{ |stream| stream["StreamName"] }
  }
  response
end

#list_tags_for_stream(options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fog/aws/requests/kinesis/list_tags_for_stream.rb', line 36

def list_tags_for_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "HasMoreTags" => false,
    "Tags" => stream["Tags"].map{ |k,v|
      {"Key" => k, "Value" => v}
    }

  }
  response
end

#merge_shards(options = {}) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fog/aws/requests/kinesis/merge_shards.rb', line 32

def merge_shards(options={})
  stream_name = options.delete("StreamName")
  shard_to_merge_id = options.delete("ShardToMerge")
  adjacent_shard_to_merge_id = options.delete("AdjacentShardToMerge")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_to_merge_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.")
  end

  unless adjacent_shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == adjacent_shard_to_merge_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{adjacent_shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.")
  end

  # Close shards (set an EndingSequenceNumber on them)
  shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number
  adjacent_shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number

  new_starting_hash_key = [
    shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i,
    adjacent_shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i
  ].min.to_s

  new_ending_hash_key = [
    shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i,
    adjacent_shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i
  ].max.to_s

  # create a new shard with ParentShardId and AdjacentParentShardID
  stream["Shards"] << {
    "HashKeyRange"=> {
      "EndingHashKey" => new_ending_hash_key,
      "StartingHashKey" => new_starting_hash_key
    },
    "SequenceNumberRange" => {
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_to_merge_id,
    "AdjacentParentShardId" => adjacent_shard_to_merge_id
  }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#next_sequence_numberObject



178
# File 'lib/fog/aws/kinesis.rb', line 178

def next_sequence_number; self.class.next_sequence_number; end

#next_shard_idObject



188
# File 'lib/fog/aws/kinesis.rb', line 188

def next_shard_id; self.class.next_shard_id; end

#put_record(options = {}) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fog/aws/requests/kinesis/put_record.rb', line 38

def put_record(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  sequence_number = next_sequence_number
  data = options.delete("Data")
  partition_key = options.delete("PartitionKey")

  shard_id = stream["Shards"].sample["ShardId"]
  shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
  # store the records on the shard(s)
  shard["Records"] << {
    "SequenceNumber" => sequence_number,
    "Data" => data,
    "PartitionKey" => partition_key
  }

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "SequenceNumber" => sequence_number,
    "ShardId" => shard_id
  }
  response
end

#put_records(options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fog/aws/requests/kinesis/put_records.rb', line 36

def put_records(options={})
  stream_name = options.delete("StreamName")
  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  records = options.delete("Records")
  record_results = records.map { |r|
    sequence_number = next_sequence_number

    shard_id = stream["Shards"].sample["ShardId"]
    shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    # store the records on the shard(s)
    shard["Records"] << r.merge("SequenceNumber" => sequence_number)
    {
      "SequenceNumber" => sequence_number,
      "ShardId" => shard_id
    }
  }

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "FailedRecordCount" => 0,
    "Records" => record_results
  }
  response
end

#remove_tags_from_stream(options = {}) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb', line 30

def remove_tags_from_stream(options={})
  stream_name = options.delete("StreamName")
  tags = options.delete("TagKeys")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  stream["Tags"] = stream["Tags"].delete_if { |k,_| tags.include?(k) }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#reset_dataObject



164
165
166
167
168
# File 'lib/fog/aws/kinesis.rb', line 164

def reset_data
  self.class.data do |data|
    data[@region].delete(@aws_access_key_id)
  end
end

#split_shard(options = {}) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fog/aws/requests/kinesis/split_shard.rb', line 32

def split_shard(options={})
  stream_name = options.delete("StreamName")
  shard_id = options.delete("ShardToSplit")
  stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.")
  end

  # Close original shard (set an EndingSequenceNumber on it)
  shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number

  # Calculate new shard ranges
  parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"]
  parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"]
  new_starting_hash_key = options.delete("NewStartingHashKey")

  # Create two new shards using contiguous hash space based on the original shard
  stream["Shards"] << {
    "HashKeyRange"=> {
      "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s,
      "StartingHashKey" => parent_starting_hash_key
    },
    "SequenceNumberRange" => {
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_id
  }
  stream["Shards"] << {
    "HashKeyRange" => {
      "EndingHashKey" => parent_ending_hash_key,
      "StartingHashKey" => new_starting_hash_key
    },
    "SequenceNumberRange" =>{
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_id
  }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end