class Concurrent::Promises::AbstractEventFuture

Common ancestor of {Event} and {Future} classes, many shared methods are defined here.

Public Class Methods

new(promise, default_executor) click to toggle source
Calls superclass method
# File lib/concurrent-ruby/concurrent/promises.rb, line 520
def initialize(promise, default_executor)
  super()
  @Lock               = Mutex.new
  @Condition          = ConditionVariable.new
  @Promise            = promise
  @DefaultExecutor    = default_executor
  @Callbacks          = LockFreeStack.new
  @Waiters            = AtomicFixnum.new 0
  self.internal_state = PENDING
end

Public Instance Methods

add_callback_clear_delayed_node(node) click to toggle source

@!visibility private

# File lib/concurrent-ruby/concurrent/promises.rb, line 736
def add_callback_clear_delayed_node(node)
  add_callback(:callback_clear_delayed_node, node)
end
add_callback_notify_blocked(promise, index) click to toggle source

@!visibility private

# File lib/concurrent-ruby/concurrent/promises.rb, line 731
def add_callback_notify_blocked(promise, index)
  add_callback :callback_notify_blocked, promise, index
end
blocks() click to toggle source

For inspection. @!visibility private @return [Array<AbstractPromise>]

# File lib/concurrent-ruby/concurrent/promises.rb, line 700
def blocks
  @Callbacks.each_with_object([]) do |(method, args), promises|
    promises.push(args[0]) if method == :callback_notify_blocked
  end
end
callbacks() click to toggle source

For inspection. @!visibility private

# File lib/concurrent-ruby/concurrent/promises.rb, line 708
def callbacks
  @Callbacks.each.to_a
end
chain(*args, &task) click to toggle source

@!macro promises.shortcut.on @return [Future]

# File lib/concurrent-ruby/concurrent/promises.rb, line 594
def chain(*args, &task)
  chain_on @DefaultExecutor, *args, &task
end
chain_on(executor, *args, &task) click to toggle source

Chains the task to be executed asynchronously on executor after it is resolved.

@!macro promises.param.executor @!macro promises.param.args @return [Future] @!macro promise.param.task-future

@overload an_event.chain_on(executor, *args, &task)

@yield [*args] to the task.

@overload a_future.chain_on(executor, *args, &task)

@yield [fulfilled, value, reason, *args] to the task.
@yieldparam [true, false] fulfilled
@yieldparam [Object] value
@yieldparam [Object] reason
# File lib/concurrent-ruby/concurrent/promises.rb, line 612
def chain_on(executor, *args, &task)
  ChainPromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future
end
chain_resolvable(resolvable) click to toggle source

Resolves the resolvable when receiver is resolved.

@param [Resolvable] resolvable @return [self]

# File lib/concurrent-ruby/concurrent/promises.rb, line 627
def chain_resolvable(resolvable)
  on_resolution! { resolvable.resolve_with internal_state }
end
Also aliased as: tangle
default_executor() click to toggle source

Returns default executor. @return [Executor] default executor @see with_default_executor @see FactoryMethods#future_on @see FactoryMethods#resolvable_future @see FactoryMethods#any_fulfilled_future_on @see similar

# File lib/concurrent-ruby/concurrent/promises.rb, line 588
def default_executor
  @DefaultExecutor
end
inspect()
Alias for: to_s
on_resolution(*args, &callback) click to toggle source

@!macro promises.shortcut.using @return [self]

# File lib/concurrent-ruby/concurrent/promises.rb, line 635
def on_resolution(*args, &callback)
  on_resolution_using @DefaultExecutor, *args, &callback
end
on_resolution!(*args, &callback) click to toggle source

Stores the callback to be executed synchronously on resolving thread after it is resolved.

@!macro promises.param.args @!macro promise.param.callback @return [self]

@overload an_event.on_resolution!(*args, &callback)

@yield [*args] to the callback.

@overload a_future.on_resolution!(*args, &callback)

@yield [fulfilled, value, reason, *args] to the callback.
@yieldparam [true, false] fulfilled
@yieldparam [Object] value
@yieldparam [Object] reason
# File lib/concurrent-ruby/concurrent/promises.rb, line 653
def on_resolution!(*args, &callback)
  add_callback :callback_on_resolution, args, callback
end
on_resolution_using(executor, *args, &callback) click to toggle source

Stores the callback to be executed asynchronously on executor after it is resolved.

@!macro promises.param.executor @!macro promises.param.args @!macro promise.param.callback @return [self]

@overload an_event.on_resolution_using(executor, *args, &callback)

@yield [*args] to the callback.

@overload a_future.on_resolution_using(executor, *args, &callback)

@yield [fulfilled, value, reason, *args] to the callback.
@yieldparam [true, false] fulfilled
@yieldparam [Object] value
@yieldparam [Object] reason
# File lib/concurrent-ruby/concurrent/promises.rb, line 671
def on_resolution_using(executor, *args, &callback)
  add_callback :async_callback_on_resolution, executor, args, callback
end
pending?() click to toggle source

Is it in pending state? @return [Boolean]

# File lib/concurrent-ruby/concurrent/promises.rb, line 547
def pending?
  !internal_state.resolved?
end
promise() click to toggle source

For inspection. @!visibility private

# File lib/concurrent-ruby/concurrent/promises.rb, line 714
def promise
  @Promise
end
resolve_with(state, raise_on_reassign = true, reserved = false) click to toggle source

@!visibility private

# File lib/concurrent-ruby/concurrent/promises.rb, line 686
def resolve_with(state, raise_on_reassign = true, reserved = false)
  if compare_and_set_internal_state(reserved ? RESERVED : PENDING, state)
    # go to synchronized block only if there were waiting threads
    @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0
    call_callbacks state
  else
    return rejected_resolution(raise_on_reassign, state)
  end
  self
end
resolved?() click to toggle source

Is it in resolved state? @return [Boolean]

# File lib/concurrent-ruby/concurrent/promises.rb, line 553
def resolved?
  internal_state.resolved?
end
state() click to toggle source

Returns its state. @return [Symbol]

@overload an_event.state

@return [:pending, :resolved]

@overload a_future.state

Both :fulfilled, :rejected implies :resolved.
@return [:pending, :fulfilled, :rejected]
# File lib/concurrent-ruby/concurrent/promises.rb, line 541
def state
  internal_state.to_sym
end
tangle(resolvable)
Alias for: chain_resolvable
to_s() click to toggle source

@return [String] Short string representation.

Calls superclass method
# File lib/concurrent-ruby/concurrent/promises.rb, line 617
def to_s
  format '%s %s>', super[0..-2], state
end
Also aliased as: inspect
touch() click to toggle source

Propagates touch. Requests all the delayed futures, which it depends on, to be executed. This method is called by any other method requiring resolved state, like {#wait}. @return [self]

# File lib/concurrent-ruby/concurrent/promises.rb, line 560
def touch
  @Promise.touch
  self
end
touched?() click to toggle source

For inspection. @!visibility private

# File lib/concurrent-ruby/concurrent/promises.rb, line 720
def touched?
  promise.touched?
end
wait(timeout = nil) click to toggle source

@!macro promises.method.wait

Wait (block the Thread) until receiver is {#resolved?}.
@!macro promises.touches

@!macro promises.warn.blocks
@!macro promises.param.timeout
@return [self, true, false] self implies timeout was not used, true implies timeout was used
  and it was resolved, false implies it was not resolved within timeout.
# File lib/concurrent-ruby/concurrent/promises.rb, line 576
def wait(timeout = nil)
  result = wait_until_resolved(timeout)
  timeout ? result : self
end
waiting_threads() click to toggle source

For inspection. @!visibility private

# File lib/concurrent-ruby/concurrent/promises.rb, line 726
def waiting_threads
  @Waiters.each.to_a
end
with_default_executor(executor) click to toggle source

@!macro promises.method.with_default_executor

Crates new object with same class with the executor set as its new default executor.
Any futures depending on it will use the new default executor.

@!macro promises.shortcut.event-future @abstract @return [AbstractEventFuture]

# File lib/concurrent-ruby/concurrent/promises.rb, line 681
def with_default_executor(executor)
  raise NotImplementedError
end
with_hidden_resolvable() click to toggle source

@!visibility private

# File lib/concurrent-ruby/concurrent/promises.rb, line 741
def with_hidden_resolvable
  # TODO (pitr-ch 10-Dec-2018): documentation, better name if in edge
  self
end

Private Instance Methods

add_callback(method, *args) click to toggle source
# File lib/concurrent-ruby/concurrent/promises.rb, line 748
def add_callback(method, *args)
  state = internal_state
  if state.resolved?
    call_callback method, state, args
  else
    @Callbacks.push [method, args]
    state = internal_state
    # take back if it was resolved in the meanwhile
    call_callbacks state if state.resolved?
  end
  self
end
async_callback_on_resolution(state, executor, args, callback) click to toggle source
# File lib/concurrent-ruby/concurrent/promises.rb, line 801
def async_callback_on_resolution(state, executor, args, callback)
  with_async(executor, state, args, callback) do |st, ar, cb|
    callback_on_resolution st, ar, cb
  end
end
call_callback(method, state, args) click to toggle source
# File lib/concurrent-ruby/concurrent/promises.rb, line 785
def call_callback(method, state, args)
  self.send method, state, *args
end
call_callbacks(state) click to toggle source
# File lib/concurrent-ruby/concurrent/promises.rb, line 789
def call_callbacks(state)
  method, args = @Callbacks.pop
  while method
    call_callback method, state, args
    method, args = @Callbacks.pop
  end
end
callback_clear_delayed_node(state, node) click to toggle source
# File lib/concurrent-ruby/concurrent/promises.rb, line 761
def callback_clear_delayed_node(state, node)
  node.value = nil
end
callback_notify_blocked(state, promise, index) click to toggle source
# File lib/concurrent-ruby/concurrent/promises.rb, line 807
def callback_notify_blocked(state, promise, index)
  promise.on_blocker_resolution self, index
end
wait_until_resolved(timeout) click to toggle source

@return [Boolean]

# File lib/concurrent-ruby/concurrent/promises.rb, line 766
def wait_until_resolved(timeout)
  return true if resolved?

  touch

  @Lock.synchronize do
    @Waiters.increment
    begin
      unless resolved?
        @Condition.wait @Lock, timeout
      end
    ensure
      # JRuby may raise ConcurrencyError
      @Waiters.decrement
    end
  end
  resolved?
end
with_async(executor, *args, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/promises.rb, line 797
def with_async(executor, *args, &block)
  Concurrent.executor(executor).post(*args, &block)
end