class Dynflow::Testing::InThreadExecutor

Public Class Methods

new(world) click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 4
def initialize(world)
  @world = world
  @director = Director.new(@world)
  @work_items = Queue.new
end

Public Instance Methods

clock_tick() click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 51
def clock_tick
  @world.clock.progress_all([:periodic_check_inbox])
end
delayed_event(director_event) click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 44
def delayed_event(director_event)
  @director.handle_event(director_event).each do |work_item|
    @work_items << work_item
  end
  director_event.result
end
event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future) click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 36
def event(execution_plan_id, step_id, event, future = Concurrent::Promises.resolvable_future)
  event = (Director::Event[SecureRandom.uuid, execution_plan_id, step_id, event, future])
  @director.handle_event(event).each do |work_item|
    @work_items << work_item
  end
  future
end
execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true) click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 10
def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, _wait_for_acceptance = true)
  feed_queue(@director.start_execution(execution_plan_id, finished))
  process_work_items
  finished
end
feed_queue(work_items) click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 55
def feed_queue(work_items)
  work_items.each do |work_item|
    work_item.world = @world
    @work_items.push(work_item)
  end
end
handle_work(work_item) click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 29
def handle_work(work_item)
  work_item.execute
  step = work_item.step if work_item.is_a?(Director::StepWorkItem)
  plan_events(step && step.delayed_events) if step && step.delayed_events
  @director.work_finished(work_item)
end
plan_events(delayed_events) click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 23
def plan_events(delayed_events)
  delayed_events.each do |event|
    @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time)
  end
end
process_work_items() click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 16
def process_work_items
  until @work_items.empty?
    feed_queue(handle_work(@work_items.pop))
    clock_tick
  end
end
terminate(future = Concurrent::Promises.resolvable_future) click to toggle source
# File lib/dynflow/testing/in_thread_executor.rb, line 62
def terminate(future = Concurrent::Promises.resolvable_future)
  @director.terminate
  future.fulfill true
rescue => e
  future.reject e
end