class Fog::AWS::Kinesis::Mock

Public Class Methods

data() { |data| ... } click to toggle source
# File lib/fog/aws/kinesis.rb, line 130
def self.data
  @mutex.synchronize do
    @data ||= Hash.new do |hash, region|
      hash[region] = Hash.new do |region_hash, key|
        region_hash[key] = {
          :kinesis_streams => {}
        }
      end
    end
    
    yield @data if block_given?
  end
end
new(options={}) click to toggle source
# File lib/fog/aws/kinesis.rb, line 150
def initialize(options={})
  @account_id        = Fog::AWS::Mock.owner_id
  @aws_access_key_id = options[:aws_access_key_id]
  @region            = options[:region] || 'us-east-1'

  Fog::AWS.validate_region!(@region)
end
next_sequence_number() click to toggle source
# File lib/fog/aws/kinesis.rb, line 170
def self.next_sequence_number
  @mutex.synchronize do
    @sequence_number ||= -1
    @sequence_number += 1
    @sequence_number.to_s
  end
end
next_shard_id() click to toggle source
# File lib/fog/aws/kinesis.rb, line 180
def self.next_shard_id
  @mutex.synchronize do
    @shard_id ||= -1
    @shard_id += 1
    "shardId-#{@shard_id.to_s.rjust(12, "0")}"
  end
end
reset() click to toggle source
# File lib/fog/aws/kinesis.rb, line 144
def self.reset
  @mutex.synchronize do
    @data = nil
  end
end

Public Instance Methods

add_tags_to_stream(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/add_tags_to_stream.rb, line 30
def add_tags_to_stream(options={})
  stream_name = options.delete("StreamName")
  tags = options.delete("Tags")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  stream["Tags"] = stream["Tags"].merge(tags)

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end
create_stream(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/create_stream.rb, line 30
def create_stream(options={})
  stream_name = options.delete("StreamName")
  shard_count = options.delete("ShardCount") || 1
  stream_arn = "arn:aws:kinesis:#{@region}:#{@account_id}:stream/#{stream_name}"

  if data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceInUse.new("Stream #{stream_name} under account #{@account_id} already exists.")
  end

  shards = (0...shard_count).map do |shard|
    {
      "HashKeyRange"=>{
        "EndingHashKey"=>"340282366920938463463374607431768211455",
        "StartingHashKey"=>"0"
      },
      "SequenceNumberRange"=>{
        "StartingSequenceNumber"=> next_sequence_number
      },
      "ShardId"=>next_shard_id,
      "Records" => []
    }
  end

  data[:kinesis_streams] = [{
      "HasMoreShards" => false,
      "StreamARN" => stream_arn,
      "StreamName" => stream_name,
      "StreamStatus" => "ACTIVE",
      "Shards" => shards,
      "Tags" => {}
    }]

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end
data() click to toggle source
# File lib/fog/aws/kinesis.rb, line 158
def data
  self.class.data do |data|
    data[@region][@aws_access_key_id]
  end
end
delete_stream(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/delete_stream.rb, line 28
def delete_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  data[:kinesis_streams].delete(stream)

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end
describe_stream(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/describe_stream.rb, line 36
def describe_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  # Strip Records key out of shards for response
  shards = stream["Shards"].reject{ |k,_| k == "Records" }

  response = Excon::Response.new
  response.status = 200
  response.body = { "StreamDescription" => stream.dup.merge("Shards" => shards) }
  response
end
get_records(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/get_records.rb, line 32
def get_records(options={})
  shard_iterator = Fog::JSON.decode(options.delete("ShardIterator"))
  limit = options.delete("Limit") || -1
  stream_name = shard_iterator["StreamName"]
  shard_id = shard_iterator["ShardId"]
  starting_sequence_number = (shard_iterator["StartingSequenceNumber"] || 1).to_i

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.")
  end

  records = []
  shard["Records"].each do |record|
    next if record["SequenceNumber"].to_i < starting_sequence_number
    records << record
    break if records.size == limit
  end

  shard_iterator["StartingSequenceNumber"] = if records.empty?
    starting_sequence_number.to_s
  else
    (records.last["SequenceNumber"].to_i + 1).to_s
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "MillisBehindLatest"=> 0,
    "NextShardIterator"=> Fog::JSON.encode(shard_iterator),
    "Records"=> records
  }
  response
end
get_shard_iterator(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/get_shard_iterator.rb, line 36
def get_shard_iterator(options={})
  stream_name = options["StreamName"]

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "ShardIterator" => Fog::JSON.encode(options) # just encode the options that were given, we decode them in get_records
  }
  response
end
list_streams(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/list_streams.rb, line 28
def list_streams(options={})
  response = Excon::Response.new
  response.status = 200
  response.body =           {
    "HasMoreStreams" => false,
    "StreamNames" => data[:kinesis_streams].map{ |stream| stream["StreamName"] }
  }
  response
end
list_tags_for_stream(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/list_tags_for_stream.rb, line 36
def list_tags_for_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "HasMoreTags" => false,
    "Tags" => stream["Tags"].map{ |k,v|
      {"Key" => k, "Value" => v}
    }

  }
  response
end
merge_shards(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/merge_shards.rb, line 32
def merge_shards(options={})
  stream_name = options.delete("StreamName")
  shard_to_merge_id = options.delete("ShardToMerge")
  adjacent_shard_to_merge_id = options.delete("AdjacentShardToMerge")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_to_merge_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.")
  end

  unless adjacent_shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == adjacent_shard_to_merge_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{adjacent_shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.")
  end

  # Close shards (set an EndingSequenceNumber on them)
  shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number
  adjacent_shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number

  new_starting_hash_key = [
    shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i,
    adjacent_shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i
  ].min.to_s

  new_ending_hash_key = [
    shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i,
    adjacent_shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i
  ].max.to_s

  # create a new shard with ParentShardId and AdjacentParentShardID
  stream["Shards"] << {
    "HashKeyRange"=> {
      "EndingHashKey" => new_ending_hash_key,
      "StartingHashKey" => new_starting_hash_key
    },
    "SequenceNumberRange" => {
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_to_merge_id,
    "AdjacentParentShardId" => adjacent_shard_to_merge_id
  }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end
next_sequence_number() click to toggle source
# File lib/fog/aws/kinesis.rb, line 178
def next_sequence_number; self.class.next_sequence_number; end
next_shard_id() click to toggle source
# File lib/fog/aws/kinesis.rb, line 188
def next_shard_id; self.class.next_shard_id; end
put_record(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/put_record.rb, line 38
def put_record(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  sequence_number = next_sequence_number
  data = options.delete("Data")
  partition_key = options.delete("PartitionKey")

  shard_id = stream["Shards"].sample["ShardId"]
  shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
  # store the records on the shard(s)
  shard["Records"] << {
    "SequenceNumber" => sequence_number,
    "Data" => data,
    "PartitionKey" => partition_key
  }

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "SequenceNumber" => sequence_number,
    "ShardId" => shard_id
  }
  response
end
put_records(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/put_records.rb, line 36
def put_records(options={})
  stream_name = options.delete("StreamName")
  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  records = options.delete("Records")
  record_results = records.map { |r|
    sequence_number = next_sequence_number

    shard_id = stream["Shards"].sample["ShardId"]
    shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    # store the records on the shard(s)
    shard["Records"] << r.merge("SequenceNumber" => sequence_number)
    {
      "SequenceNumber" => sequence_number,
      "ShardId" => shard_id
    }
  }

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "FailedRecordCount" => 0,
    "Records" => record_results
  }
  response
end
remove_tags_from_stream(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb, line 30
def remove_tags_from_stream(options={})
  stream_name = options.delete("StreamName")
  tags = options.delete("TagKeys")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  stream["Tags"] = stream["Tags"].delete_if { |k,_| tags.include?(k) }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end
reset_data() click to toggle source
# File lib/fog/aws/kinesis.rb, line 164
def reset_data
  self.class.data do |data|
    data[@region].delete(@aws_access_key_id)
  end
end
split_shard(options={}) click to toggle source
# File lib/fog/aws/requests/kinesis/split_shard.rb, line 32
def split_shard(options={})
  stream_name = options.delete("StreamName")
  shard_id = options.delete("ShardToSplit")
  stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.")
  end

  # Close original shard (set an EndingSequenceNumber on it)
  shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number

  # Calculate new shard ranges
  parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"]
  parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"]
  new_starting_hash_key = options.delete("NewStartingHashKey")

  # Create two new shards using contiguous hash space based on the original shard
  stream["Shards"] << {
    "HashKeyRange"=> {
      "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s,
      "StartingHashKey" => parent_starting_hash_key
    },
    "SequenceNumberRange" => {
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_id
  }
  stream["Shards"] << {
    "HashKeyRange" => {
      "EndingHashKey" => parent_ending_hash_key,
      "StartingHashKey" => new_starting_hash_key
    },
    "SequenceNumberRange" =>{
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_id
  }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end