class Dynflow::Testing::InThreadExecutor
Public Class Methods
new(world)
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 6 def initialize(world) @world = world @director = Director.new(@world) @work_items = Queue.new end
Public Instance Methods
clock_tick()
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 53 def clock_tick @world.clock.progress_all([:periodic_check_inbox]) end
delayed_event(director_event)
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 46 def delayed_event(director_event) @director.handle_event(director_event).each do |work_item| @work_items << work_item end director_event.result end
event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future, optional: false)
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 38 def event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future, optional: false) event = (Director::Event[SecureRandom.uuid, execution_plan_id, step_id, event, future, optional]) @director.handle_event(event).each do |work_item| @work_items << work_item end future end
execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true)
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 12 def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true) feed_queue(@director.start_execution(execution_plan_id, finished)) process_work_items finished end
feed_queue(work_items)
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 57 def feed_queue(work_items) work_items.each do |work_item| work_item.world = @world @work_items.push(work_item) end end
handle_work(work_item)
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 31 def handle_work(work_item) work_item.execute step = work_item.step if work_item.is_a?(Director::StepWorkItem) plan_events(step && step.delayed_events) if step && step.delayed_events @director.work_finished(work_item) end
plan_events(delayed_events)
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 25 def plan_events(delayed_events) delayed_events.each do |event| @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time) end end
process_work_items()
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 18 def process_work_items until @work_items.empty? feed_queue(handle_work(@work_items.pop)) clock_tick end end
terminate(future = Concurrent::Promises.resolvable_future)
click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 64 def terminate(future = Concurrent::Promises.resolvable_future) @director.terminate future.fulfill true rescue => e future.reject e end