class Fog::AWS::Kinesis::Real

Public Class Methods

new(options={}) click to toggle source
# File lib/fog/aws/kinesis.rb, line 36
def initialize(options={})
  @use_iam_profile = options[:use_iam_profile]

  @connection_options = options[:connection_options] || {}

  @instrumentor           = options[:instrumentor]
  @instrumentor_name      = options[:instrumentor_name] || 'fog.aws.kinesis'

  options[:region] ||= 'us-east-1'
  @region     = options[:region]
  @host       = options[:host] || "kinesis.#{options[:region]}.amazonaws.com"
  @path       = options[:path]        || '/'
  @persistent = options[:persistent]  || true
  @port       = options[:port]        || 443
  @scheme     = options[:scheme]      || 'https'
  @connection = Fog::XML::Connection.new("#{@scheme}://#{@host}:#{@port}#{@path}", @persistent, @connection_options)
  @version    = "20131202"

  setup_credentials(options)
end

Public Instance Methods

add_tags_to_stream(options={}) click to toggle source

Adds or updates tags for the specified Amazon Kinesis stream.

Options

  • StreamName<~String>: The name of the stream.

  • Tags<~Hash>: The set of key-value pairs to use to create the tags.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_AddTagsToStream.html

# File lib/fog/aws/requests/kinesis/add_tags_to_stream.rb, line 16
def add_tags_to_stream(options={})
  body = {
    "StreamName" => options.delete("StreamName"),
    "Tags" => options.delete("Tags")
  }.reject{ |_,v| v.nil? }

  request({
            'X-Amz-Target' => "Kinesis_#{@version}.AddTagsToStream",
            :body          => body,
          }.merge(options))
end
create_stream(options={}) click to toggle source

Creates a Amazon Kinesis stream.

Options

  • ShardCount<~Number>: The number of shards that the stream will use.

  • StreamName<~String>: A name to identify the stream.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html

# File lib/fog/aws/requests/kinesis/create_stream.rb, line 16
def create_stream(options={})
  body = {
    "ShardCount" => options.delete("ShardCount") || 1,
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  request({
            'X-Amz-Target' => "Kinesis_#{@version}.CreateStream",
            :body          => body,
          }.merge(options))
end
delete_stream(options={}) click to toggle source

Deletes a stream and all its shards and data.

Options

  • StreamName<~String>: A name to identify the stream.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html

# File lib/fog/aws/requests/kinesis/delete_stream.rb, line 15
def delete_stream(options={})
  body = {
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  request({
            'X-Amz-Target' => "Kinesis_#{@version}.DeleteStream",
            :body          => body,
          }.merge(options))
end
describe_stream(options={}) click to toggle source

Describes the specified stream.

Options

  • ExclusiveStartShardId<~String>: The shard ID of the shard to start with.

  • Limit<~Number>: The maximum number of shards to return.

  • StreamName<~String>: The name of the stream to describe.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html

# File lib/fog/aws/requests/kinesis/describe_stream.rb, line 17
def describe_stream(options={})
  body = {
    "ExclusiveStartShardId" => options.delete("ExclusiveStartShardId"),
    "Limit" => options.delete("Limit"),
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  response = request({
                       :idempotent    => true,
                       'X-Amz-Target' => "Kinesis_#{@version}.DescribeStream",
                       :body          => body,
                     }.merge(options))
  response.body = Fog::JSON.decode(response.body) unless response.body.nil?
  response.body
  response
end
get_records(options={}) click to toggle source

Gets data records from a shard.

Options

  • Limit<~Number>: The maximum number of records to return.

  • ShardIterator<~String>: The position in the shard from which you want to start sequentially reading data records.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html

# File lib/fog/aws/requests/kinesis/get_records.rb, line 16
def get_records(options={})
  body = {
    "Limit" => options.delete("Limit"),
    "ShardIterator" => options.delete("ShardIterator")
  }.reject{ |_,v| v.nil? }

  response = request({
                       'X-Amz-Target' => "Kinesis_#{@version}.GetRecords",
                       :body          => body,
                     }.merge(options))
  response.body = Fog::JSON.decode(response.body) unless response.body.nil?
  response
end
get_shard_iterator(options={}) click to toggle source

Gets a shard iterator.

Options

  • ShardId<~String>: The shard ID of the shard to get the iterator for.

  • ShardIteratorType<~String>: Determines how the shard iterator is used to start reading data records from the shard.

  • StartingSequenceNumber<~String>: The sequence number of the data record in the shard from which to start reading from.

  • StreamName<~String>: A name to identify the stream.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

# File lib/fog/aws/requests/kinesis/get_shard_iterator.rb, line 18
def get_shard_iterator(options={})
  body = {
    "ShardId" => options.delete("ShardId"),
    "ShardIteratorType" => options.delete("ShardIteratorType"),
    "StartingSequenceNumber" => options.delete("StartingSequenceNumber"),
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  response = request({
                       'X-Amz-Target' => "Kinesis_#{@version}.GetShardIterator",
                       :body          => body,
                     }.merge(options))
  response.body = Fog::JSON.decode(response.body) unless response.body.nil?
  response
end
list_streams(options={}) click to toggle source

List availabe streams

Options

  • ExclusiveStartStreamName<~String>: The name of the stream to start the list with.

  • Limit<~Number>: The maximum number of streams to list.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html

# File lib/fog/aws/requests/kinesis/list_streams.rb, line 16
def list_streams(options={})
  response = request({
                       :idempotent    => true,
                       'X-Amz-Target' => "Kinesis_#{@version}.ListStreams",
                       :body          => {},
                     }.merge(options))
  response.body = Fog::JSON.decode(response.body) unless response.body.nil?
  response
end
list_tags_for_stream(options={}) click to toggle source

Lists the tags for the specified Amazon Kinesis stream.

Options

  • ExclusiveStartTagKey<~String>: The key to use as the starting point for the list of tags.

  • Limit<~Number>: The number of tags to return.

  • StreamName<~String>: The name of the stream.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_ListTagsForStream.html

# File lib/fog/aws/requests/kinesis/list_tags_for_stream.rb, line 17
def list_tags_for_stream(options={})
  body = {
    "ExclusiveStartTagKey" => options.delete("ExclusiveStartTagKey"),
    "Limit" => options.delete("Limit"),
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  response = request({
                       :idempotent    => true,
                       'X-Amz-Target' => "Kinesis_#{@version}.ListTagsForStream",
                       :body          => body,
                     }.merge(options))
  response.body = Fog::JSON.decode(response.body) unless response.body.nil?
  response.body
  response
end
merge_shards(options={}) click to toggle source

Merges two adjacent shards in a stream and combines them into a single shard to reduce the stream's capacity to ingest and transport data.

Options

  • AdjacentShardToMerge<~String>: The shard ID of the adjacent shard for the merge.

  • ShardToMerge<~String>: The shard ID of the shard to combine with the adjacent shard for the merge.

  • StreamName<~String>: The name of the stream for the merge.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html

# File lib/fog/aws/requests/kinesis/merge_shards.rb, line 17
def merge_shards(options={})
  body = {
    "AdjacentShardToMerge" => options.delete("AdjacentShardToMerge"),
    "ShardToMerge" => options.delete("ShardToMerge"),
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  request({
            'X-Amz-Target' => "Kinesis_#{@version}.MergeShards",
            :body          => body,
          }.merge(options))
end
put_record(options={}) click to toggle source

Writes a single data record from a producer into an Amazon Kinesis stream.

Options

  • Data<~Blob>: The data blob to put into the record, which is base64-encoded when the blob is serialized.

  • ExplicitHashKey<~String>: The hash value used to determine explicitly the shard that the data record is assigned to by overriding the partition key hash.

  • PartitionKey<~String>: Determines which shard in the stream the data record is assigned to.

  • SequenceNumberForOrdering<~String>: Guarantees strictly increasing sequence numbers, for puts from the same client and to the same partition key.

  • StreamName<~String>: The stream name associated with the request.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

# File lib/fog/aws/requests/kinesis/put_record.rb, line 19
def put_record(options={})
  body = {
    "Data" => options.delete("Data"),
    "ExplicitHashKey" => options.delete("ExplicitHashKey"),
    "PartitionKey" => options.delete("PartitionKey"),
    "SequenceNumberForOrdering" => options.delete("SequenceNumberForOrdering"),
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  response = request({
                       'X-Amz-Target' => "Kinesis_#{@version}.PutRecord",
                       :body          => body,
                     }.merge(options))
  response.body = Fog::JSON.decode(response.body) unless response.body.nil?
  response
end
put_records(options={}) click to toggle source

Writes multiple data records from a producer into an Amazon Kinesis stream in a single call (also referred to as a PutRecords request).

Options

  • Records<~Array>: The records associated with the request.

    • Record<~Hash>: A record.

      • Data<~Blob>: The data blob to put into the record, which is base64-encoded when the blob is serialized.

      • ExplicitHashKey<~String>: The hash value used to determine explicitly the shard that the data record is assigned to by overriding the partition key hash.

      • PartitionKey<~String>: Determines which shard in the stream the data record is assigned to.

  • StreamName<~String>: The stream name associated with the request.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html

# File lib/fog/aws/requests/kinesis/put_records.rb, line 20
def put_records(options={})
  body = {
    "Records" => options.delete("Records"),
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  response = request({
                       'X-Amz-Target' => "Kinesis_#{@version}.PutRecords",
                       :body          => body,
                     }.merge(options))
  response.body = Fog::JSON.decode(response.body) unless response.body.nil?
  response
end
remove_tags_from_stream(options={}) click to toggle source

Deletes tags from the specified Amazon Kinesis stream.

Options

  • StreamName<~String>: The name of the stream.

  • TagKeys<~Array>: A list of tag keys.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_RemoveTagsFromStream.html

# File lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb, line 16
def remove_tags_from_stream(options={})
  body = {
    "StreamName" => options.delete("StreamName"),
    "TagKeys" => options.delete("TagKeys")
  }.reject{ |_,v| v.nil? }

  request({
            'X-Amz-Target' => "Kinesis_#{@version}.RemoveTagsFromStream",
            :body          => body,
          }.merge(options))
end
split_shard(options={}) click to toggle source

Splits a shard into two new shards in the stream, to increase the stream's capacity to ingest and transport data.

Options

  • NewStartingHashKey<~String>: A hash key value for the starting hash key of one of the child shards created by the split.

  • ShardToSplit<~String>: The shard ID of the shard to split.

  • StreamName<~String>: The name of the stream for the shard split.

Returns

  • response<~Excon::Response>:

See Also

docs.aws.amazon.com/kinesis/latest/APIReference/API_SplitShard.html

# File lib/fog/aws/requests/kinesis/split_shard.rb, line 17
def split_shard(options={})
  body = {
    "NewStartingHashKey" => options.delete("NewStartingHashKey"),
    "ShardToSplit" => options.delete("ShardToSplit"),
    "StreamName" => options.delete("StreamName")
  }.reject{ |_,v| v.nil? }

  request({
            'X-Amz-Target' => "Kinesis_#{@version}.SplitShard",
            :body          => body,
          }.merge(options))
end

Private Instance Methods

_request(body, headers, idempotent, parser) click to toggle source
# File lib/fog/aws/kinesis.rb, line 93
def _request(body, headers, idempotent, parser)
  @connection.request({
                        :body       => body,
                        :expects    => 200,
                        :headers    => headers,
                        :idempotent => idempotent,
                        :method     => 'POST',
                        :parser     => parser
                      })
rescue Excon::Errors::HTTPStatusError => error
  match = Fog::AWS::Errors.match_error(error)
  raise if match.empty?
  raise case match[:code]
        when 'ExpiredIteratorException'
          Fog::AWS::Kinesis::ExpiredIterator.slurp(error, match[:message])
        when 'LimitExceededException'
          Fog::AWS::Kinesis::LimitExceeded.slurp(error, match[:message])
        when 'ResourceInUseException'
          Fog::AWS::Kinesis::ResourceInUse.slurp(error, match[:message])
        when 'ResourceNotFoundException'
          Fog::AWS::Kinesis::ResourceNotFound.slurp(error, match[:message])
        when 'ExpiredIteratorException'
          Fog::AWS::Kinesis::ExpiredIterator.slurp(error, match[:message])
        when 'InvalidArgumentException'
          Fog::AWS::Kinesis::InvalidArgument.slurp(error, match[:message])
        when 'ProvisionedThroughputExceededException'
          Fog::AWS::Kinesis::ProvisionedThroughputExceeded.slurp(error, match[:message])
        else
          Fog::AWS::Kinesis::Error.slurp(error, "#{match[:code]} => #{match[:message]}")
        end
end
request(params) click to toggle source
# File lib/fog/aws/kinesis.rb, line 68
def request(params)
  refresh_credentials_if_expired
  idempotent  = params.delete(:idempotent)
  parser      = params.delete(:parser)

  date = Fog::Time.now
  headers = {
    'X-Amz-Target' => params['X-Amz-Target'],
    'Content-Type' => 'application/x-amz-json-1.1',
    'Host'         => @host,
    'x-amz-date'   => date.to_iso8601_basic
  }
  headers['x-amz-security-token'] = @aws_session_token if @aws_session_token
  body = MultiJson.dump(params[:body])
  headers['Authorization'] = @signer.sign({:method => "POST", :headers => headers, :body => body, :query => {}, :path => @path}, date)

  if @instrumentor
    @instrumentor.instrument("#{@instrumentor_name}.request", params) do
      _request(body, headers, idempotent, parser)
    end
  else
    _request(body, headers, idempotent, parser)
  end
end
setup_credentials(options) click to toggle source
# File lib/fog/aws/kinesis.rb, line 59
def setup_credentials(options)
  @aws_access_key_id      = options[:aws_access_key_id]
  @aws_secret_access_key  = options[:aws_secret_access_key]
  @aws_session_token      = options[:aws_session_token]
  @aws_credentials_expire_at = options[:aws_credentials_expire_at]

  @signer = Fog::AWS::SignatureV4.new( @aws_access_key_id, @aws_secret_access_key, @region, 'kinesis')
end