Represents an event which will happen in future (will be completed). It has to always happen.
@!visibility private
@!visibility private
# File lib/concurrent/edge/future.rb, line 177 def initialize(promise, default_executor) super() @Promise = promise @DefaultExecutor = default_executor @Touched = AtomicBoolean.new(false) @Callbacks = LockFreeStack.new # TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem # TODO (pitr 12-Sep-2015): look at java.util.concurrent solution @Waiters = LockFreeStack.new self.internal_state = PENDING end
@!visibility private
# File lib/concurrent/edge/future.rb, line 347 def add_callback(method, *args) if completed? call_callback method, *args else @Callbacks.push [method, *args] call_callbacks if completed? end self end
@!visibility private just for inspection @return [Array<AbstractPromise>]
# File lib/concurrent/edge/future.rb, line 334 def blocks @Callbacks.each_with_object([]) do |callback, promises| promises.push(*(callback.select { |v| v.is_a? AbstractPromise })) end end
@!visibility private just for inspection
# File lib/concurrent/edge/future.rb, line 342 def callbacks @Callbacks.each.to_a end
@yield [success, value, reason] of the parent
# File lib/concurrent/edge/future.rb, line 240 def chain(executor = nil, &callback) ChainPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end
# File lib/concurrent/edge/future.rb, line 246 def chain_completable(completable_event) on_completion! { completable_event.complete_with COMPLETED } end
@!visibility private
# File lib/concurrent/edge/future.rb, line 318 def complete_with(state, raise_on_reassign = true) if compare_and_set_internal_state(PENDING, state) #(state) # go to synchronized block only if there were waiting threads synchronize { ns_broadcast } if @Waiters.clear call_callbacks else Concurrent::MultipleAssignmentError.new('Event can be completed only once') if raise_on_reassign return false end self end
Has the Event been completed? @return [Boolean]
# File lib/concurrent/edge/future.rb, line 208 def completed?(state = internal_state) state.completed? end
@return [Executor] current default executor @see with_default_executor
# File lib/concurrent/edge/future.rb, line 235 def default_executor @DefaultExecutor end
Inserts delay into the chain of Futures making rest of it lazy evaluated. @return [Event]
# File lib/concurrent/edge/future.rb, line 266 def delay ZipEventEventPromise.new(self, Delay.new(@DefaultExecutor).event, @DefaultExecutor).event end
# File lib/concurrent/edge/future.rb, line 308 def inspect "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" end
@yield [success, value, reason] executed async on `executor` when completed @return self
# File lib/concurrent/edge/future.rb, line 288 def on_completion(executor = nil, &callback) add_callback :pr_async_callback_on_completion, executor || @DefaultExecutor, callback end
@yield [success, value, reason] executed sync when completed @return self
# File lib/concurrent/edge/future.rb, line 294 def on_completion!(&callback) add_callback :pr_callback_on_completion, callback end
Is Event/Future pending? @return [Boolean]
# File lib/concurrent/edge/future.rb, line 196 def pending?(state = internal_state) !state.completed? end
@!visibility private only for inspection
# File lib/concurrent/edge/future.rb, line 359 def promise @Promise end
# File lib/concurrent/edge/future.rb, line 312 def set(*args, &block) raise 'Use CompletableEvent#complete or CompletableFuture#complete instead, ' + 'constructed by Concurrent.event or Concurrent.future respectively.' end
@return [:pending, :completed]
# File lib/concurrent/edge/future.rb, line 190 def state internal_state.to_sym end
Zips with selected value form the suplied channels @return [Future]
# File lib/concurrent/edge/future.rb, line 282 def then_select(*channels) ZipFutureEventPromise(Concurrent.select(*channels), self, @DefaultExecutor).future end
# File lib/concurrent/edge/future.rb, line 304 def to_s "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state.to_sym}>" end
@!visibility private
# File lib/concurrent/edge/future.rb, line 227 def touch # distribute touch to promise only once @Promise.touch if @Touched.make_true self end
@!visibility private only for inspection
# File lib/concurrent/edge/future.rb, line 365 def touched @Touched.value end
# File lib/concurrent/edge/future.rb, line 200 def unscheduled? raise 'unsupported' end
Wait until Event is complete? @param [Numeric] timeout the maximum time in second to wait. @return [Event, true, false] self or true/false if timeout is used @!macro [attach] edge.periodical_wait
@note a thread should wait only once! For repeated checking use faster `completed?` check. If thread waits periodically it will dangerously grow the waiters stack.
# File lib/concurrent/edge/future.rb, line 220 def wait(timeout = nil) touch result = wait_until_complete(timeout) timeout ? result : self end
@!visibility private only for debugging inspection
# File lib/concurrent/edge/future.rb, line 371 def waiting_threads @Waiters.each.to_a end
Changes default executor for rest of the chain @return [Event]
# File lib/concurrent/edge/future.rb, line 300 def with_default_executor(executor) EventWrapperPromise.new(self, executor).future end
Zip with future producing new Future @return [Event]
# File lib/concurrent/edge/future.rb, line 254 def zip(other) if other.is?(Future) ZipFutureEventPromise.new(other, self, @DefaultExecutor).future else ZipEventEventPromise.new(self, other, @DefaultExecutor).future end end
# File lib/concurrent/edge/future.rb, line 408 def call_callback(method, *args) self.send method, *args end
# File lib/concurrent/edge/future.rb, line 412 def call_callbacks method, *args = @Callbacks.pop while method call_callback method, *args method, *args = @Callbacks.pop end end
# File lib/concurrent/edge/future.rb, line 396 def pr_async_callback_on_completion(executor, callback) pr_with_async(executor) { pr_callback_on_completion callback } end
# File lib/concurrent/edge/future.rb, line 404 def pr_callback_notify_blocked(promise) promise.on_done self end
# File lib/concurrent/edge/future.rb, line 400 def pr_callback_on_completion(callback) callback.call end
# File lib/concurrent/edge/future.rb, line 392 def pr_with_async(executor, *args, &block) Concurrent.post_on(executor, *args, &block) end
@return [true, false]
# File lib/concurrent/edge/future.rb, line 378 def wait_until_complete(timeout) while true last_waiter = @Waiters.peek # waiters' state before completion return true if completed? # synchronize so it cannot be signaled before it waits synchronize do # ok only if completing thread did not start signaling next unless @Waiters.compare_and_push last_waiter, Thread.current return ns_wait_until(timeout) { completed? } end end end