Synchronization::Object
Represents an event which will happen in future (will be completed). It has to always happen.
# File lib/concurrent/edge/future.rb, line 168 def initialize(promise, default_executor) @Promise = promise @DefaultExecutor = default_executor @Touched = AtomicBoolean.new(false) @Callbacks = LockFreeStack.new @Waiters = LockFreeStack.new # TODO replace with AtomicFixnum, avoid aba problem @State = AtomicReference.new PENDING super() ensure_ivar_visibility! end
@!visibility private
# File lib/concurrent/edge/future.rb, line 337 def add_callback(method, *args) if completed? call_callback method, *args else @Callbacks.push [method, *args] call_callbacks if completed? end self end
@!visibility private just for inspection @return [Array<AbstractPromise>]
# File lib/concurrent/edge/future.rb, line 324 def blocks @Callbacks.each_with_object([]) do |callback, promises| promises.push *callback.select { |v| v.is_a? AbstractPromise } end end
@!visibility private just for inspection
# File lib/concurrent/edge/future.rb, line 332 def callbacks @Callbacks.each.to_a end
@yield [success, value, reason] of the parent
# File lib/concurrent/edge/future.rb, line 230 def chain(executor = nil, &callback) ChainPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end
# File lib/concurrent/edge/future.rb, line 236 def chain_completable(completable_event) on_completion! { completable_event.complete_with COMPLETED } end
@!visibility private
# File lib/concurrent/edge/future.rb, line 308 def complete_with(state, raise_on_reassign = true) if @State.compare_and_set(PENDING, state) (state) # go to synchronized block only if there were waiting threads synchronize { ns_broadcast } if @Waiters.clear call_callbacks else Concurrent::MultipleAssignmentError.new('Event can be completed only once') if raise_on_reassign return false end self end
Has the Event been completed? @return [Boolean]
# File lib/concurrent/edge/future.rb, line 198 def completed?(state = @State.get) state.completed? end
@return [Executor] current default executor @see with_default_executor
# File lib/concurrent/edge/future.rb, line 225 def default_executor @DefaultExecutor end
Inserts delay into the chain of Futures making rest of it lazy evaluated. @return [Event]
# File lib/concurrent/edge/future.rb, line 256 def delay ZipEventEventPromise.new(self, Delay.new(@DefaultExecutor).event, @DefaultExecutor).event end
# File lib/concurrent/edge/future.rb, line 298 def inspect "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" end
@!visibility private
# File lib/concurrent/edge/future.rb, line 366 def internal_state @State.get end
@yield [success, value, reason] executed async on `executor` when completed @return self
# File lib/concurrent/edge/future.rb, line 278 def on_completion(executor = nil, &callback) add_callback :pr_async_callback_on_completion, executor || @DefaultExecutor, callback end
@yield [success, value, reason] executed sync when completed @return self
# File lib/concurrent/edge/future.rb, line 284 def on_completion!(&callback) add_callback :pr_callback_on_completion, callback end
Is Event/Future pending? @return [Boolean]
# File lib/concurrent/edge/future.rb, line 186 def pending?(state = @State.get) !state.completed? end
@!visibility private only for inspection
# File lib/concurrent/edge/future.rb, line 349 def promise @Promise end
# File lib/concurrent/edge/future.rb, line 302 def set(*args, &block) raise 'Use CompletableEvent#complete or CompletableFuture#complete instead, ' + 'constructed by Concurrent.event or Concurrent.future respectively.' end
@return [:pending, :completed]
# File lib/concurrent/edge/future.rb, line 180 def state @State.get.to_sym end
Zips with selected value form the suplied channels @return [Future]
# File lib/concurrent/edge/future.rb, line 272 def then_select(*channels) ZipFutureEventPromise(Concurrent.select(*channels), self, @DefaultExecutor).future end
# File lib/concurrent/edge/future.rb, line 294 def to_s "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state.to_sym}>" end
@!visibility private
# File lib/concurrent/edge/future.rb, line 217 def touch # distribute touch to promise only once @Promise.touch if @Touched.make_true self end
@!visibility private only for inspection
# File lib/concurrent/edge/future.rb, line 355 def touched @Touched.value end
# File lib/concurrent/edge/future.rb, line 190 def unscheduled? raise 'unsupported' end
Wait until Event is complete? @param [Numeric] timeout the maximum time in second to wait. @return [Event, true, false] self or true/false if timeout is used @!macro [attach] edge.periodical_wait
@note a thread should wait only once! For repeated checking use faster `completed?` check. If thread waits periodically it will dangerously grow the waiters stack.
# File lib/concurrent/edge/future.rb, line 210 def wait(timeout = nil) touch result = wait_until_complete(timeout) timeout ? result : self end
@!visibility private only for debugging inspection
# File lib/concurrent/edge/future.rb, line 361 def waiting_threads @Waiters.each.to_a end
Changes default executor for rest of the chain @return [Event]
# File lib/concurrent/edge/future.rb, line 290 def with_default_executor(executor) EventWrapperPromise.new(self, executor).future end
Zip with future producing new Future @return [Event]
# File lib/concurrent/edge/future.rb, line 244 def zip(other) if other.is?(Future) ZipFutureEventPromise.new(other, self, @DefaultExecutor).future else ZipEventEventPromise.new(self, other, @DefaultExecutor).future end end
Generated with the Darkfish Rdoc Generator 2.