class Concurrent::Channel

{include:file:docs-source/channel.md} @!macro warn.edge

Constants

BUFFER_TYPES
DEFAULT_VALIDATOR
Error
GOROUTINES

NOTE: Move to global IO pool once stable

Public Class Methods

after(seconds)
Alias for: timer
alt(*args)
Alias for: select
go(*args, &block) click to toggle source
# File lib-edge/concurrent/channel.rb, line 224
def go(*args, &block)
  go_via(GOROUTINES, *args, &block)
end
go_loop(*args, &block) click to toggle source
# File lib-edge/concurrent/channel.rb, line 233
def go_loop(*args, &block)
  go_loop_via(GOROUTINES, *args, &block)
end
go_loop_via(executor, *args, &block) click to toggle source
# File lib-edge/concurrent/channel.rb, line 237
def go_loop_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(block, *args) do
    loop do
      break unless block.call(*args)
    end
  end
end
go_via(executor, *args, &block) click to toggle source
# File lib-edge/concurrent/channel.rb, line 228
def go_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(*args, &block)
end
new(opts = {}) click to toggle source
# File lib-edge/concurrent/channel.rb, line 47
def initialize(opts = {})
  # undocumented -- for internal use only
  if opts.is_a? Buffer::Base
    self.buffer = opts
    return
  end

  capacity = opts[:capacity] || opts[:size]
  buffer = opts[:buffer]

  if capacity && buffer == :unbuffered
    raise ArgumentError.new('unbuffered channels cannot have a capacity')
  elsif capacity.nil? && buffer.nil?
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity == 0 && buffer == :buffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif buffer == :unbuffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity.nil? || capacity < 1
    raise ArgumentError.new('capacity must be at least 1 for this buffer type')
  else
    buffer ||= :buffered
    self.buffer = BUFFER_TYPES[buffer].new(capacity)
  end

  self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
end
select(*args) { |selector, *args| ... } click to toggle source
# File lib-edge/concurrent/channel.rb, line 216
def select(*args)
  raise ArgumentError.new('no block given') unless block_given?
  selector = Selector.new
  yield(selector, *args)
  selector.execute
end
Also aliased as: alt
tick(interval)
Alias for: ticker
ticker(interval) click to toggle source
# File lib-edge/concurrent/channel.rb, line 211
def ticker(interval)
  Channel.new(Buffer::Ticker.new(interval))
end
Also aliased as: tick
timer(seconds) click to toggle source
# File lib-edge/concurrent/channel.rb, line 206
def timer(seconds)
  Channel.new(Buffer::Timer.new(seconds))
end
Also aliased as: after

Public Instance Methods

<<(item)
Alias for: put
each() { |item| ... } click to toggle source
# File lib-edge/concurrent/channel.rb, line 193
def each
  raise ArgumentError.new('no block given') unless block_given?
  loop do
    item, more = do_next
    if item != Concurrent::NULL
      yield(item)
    elsif !more
      break
    end
  end
end
next() click to toggle source

@example

jobs = Channel.new

Channel.go do
  loop do
    j, more = jobs.next
    if more
      print "received job #{j}\n"
    else
      print "received all jobs\n"
      break
    end
  end
end
# File lib-edge/concurrent/channel.rb, line 159
def next
  item, more = do_next
  item = nil if item == Concurrent::NULL
  return item, more
end
next?() click to toggle source
# File lib-edge/concurrent/channel.rb, line 165
def next?
  item, more = do_next
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  return item, more
end
offer(item) click to toggle source
# File lib-edge/concurrent/channel.rb, line 99
def offer(item)
  return false unless validate(item, false, false)
  do_offer(item)
end
offer!(item) click to toggle source
# File lib-edge/concurrent/channel.rb, line 104
def offer!(item)
  validate(item, false, true)
  ok = do_offer(item)
  raise Error if !ok
  ok
end
offer?(item) click to toggle source
# File lib-edge/concurrent/channel.rb, line 111
def offer?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_offer(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end
poll() click to toggle source
# File lib-edge/concurrent/channel.rb, line 175
def poll
  (item = do_poll) == Concurrent::NULL ? nil : item
end
poll!() click to toggle source
# File lib-edge/concurrent/channel.rb, line 179
def poll!
  item = do_poll
  raise Error if item == Concurrent::NULL
  item
end
poll?() click to toggle source
# File lib-edge/concurrent/channel.rb, line 185
def poll?
  if (item = do_poll) == Concurrent::NULL
    Concurrent::Maybe.nothing
  else
    Concurrent::Maybe.just(item)
  end
end
put(item) click to toggle source
# File lib-edge/concurrent/channel.rb, line 75
def put(item)
  return false unless validate(item, false, false)
  do_put(item)
end
Also aliased as: send, <<
put!(item) click to toggle source
# File lib-edge/concurrent/channel.rb, line 82
def put!(item)
  validate(item, false, true)
  ok = do_put(item)
  raise Error if !ok
  ok
end
put?(item) click to toggle source
# File lib-edge/concurrent/channel.rb, line 89
def put?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_put(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end
receive()
Alias for: take
send(item)
Alias for: put
take() click to toggle source
# File lib-edge/concurrent/channel.rb, line 121
def take
  item = do_take
  item == Concurrent::NULL ? nil : item
end
Also aliased as: receive, ~
take!() click to toggle source
# File lib-edge/concurrent/channel.rb, line 128
def take!
  item = do_take
  raise Error if item == Concurrent::NULL
  item
end
take?() click to toggle source
# File lib-edge/concurrent/channel.rb, line 134
def take?
  item = do_take
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  item
end
~()
Alias for: take

Private Instance Methods

buffer() click to toggle source
# File lib-edge/concurrent/channel.rb, line 257
def buffer
  @buffer
end
buffer=(value) click to toggle source
# File lib-edge/concurrent/channel.rb, line 261
def buffer=(value)
  @buffer = value
end
do_next() click to toggle source
# File lib-edge/concurrent/channel.rb, line 290
def do_next
  buffer.next
end
do_offer(item) click to toggle source
# File lib-edge/concurrent/channel.rb, line 282
def do_offer(item)
  buffer.offer(item)
end
do_poll() click to toggle source
# File lib-edge/concurrent/channel.rb, line 294
def do_poll
  buffer.poll
end
do_put(item) click to toggle source
# File lib-edge/concurrent/channel.rb, line 278
def do_put(item)
  buffer.put(item)
end
do_take() click to toggle source
# File lib-edge/concurrent/channel.rb, line 286
def do_take
  buffer.take
end
validate(value, allow_nil, raise_error) click to toggle source
# File lib-edge/concurrent/channel.rb, line 265
def validate(value, allow_nil, raise_error)
  if !allow_nil && value.nil?
    raise_error ? raise(ValidationError.new('nil is not a valid value')) : false
  elsif !validator.call(value)
    raise_error ? raise(ValidationError) : false
  else
    true
  end
rescue => ex
  # the validator raised an exception
  return raise_error ? raise(ex) : false
end
validator() click to toggle source
# File lib-edge/concurrent/channel.rb, line 249
def validator
  @validator
end
validator=(value) click to toggle source
# File lib-edge/concurrent/channel.rb, line 253
def validator=(value)
  @validator = value
end