class Dynflow::Director::RunningStepsManager

Handles the events generated while running actions, makes sure the events are sent to the action only when in suspended state

Public Class Methods

new(world) click to toggle source
# File lib/dynflow/director/running_steps_manager.rb, line 9
def initialize(world)
  @world         = Type! world, World
  @running_steps = {}
  # enqueued work items by step id
  @work_items    = QueueHash.new(Integer, WorkItem)
  # enqueued events by step id - we delay creating work items from events until execution time
  # to handle potential updates of the step object (that is part of the event)
  @events        = QueueHash.new(Integer, Director::Event)
  @events_by_request_id = {}
end

Public Instance Methods

add(step, work) click to toggle source
# File lib/dynflow/director/running_steps_manager.rb, line 29
def add(step, work)
  Type! step, ExecutionPlan::Steps::RunStep
  @running_steps[step.id] = step
  # we make sure not to run any event when the step is still being executed
  @work_items.push(step.id, work)
  self
end
create_next_event_work_item(step) click to toggle source

turns the first event from the queue to the next work item to work on

# File lib/dynflow/director/running_steps_manager.rb, line 98
def create_next_event_work_item(step)
  event = @events.shift(step.id)
  return unless event
  work = EventWorkItem.new(event.request_id, event.execution_plan_id, step, event.event, step.queue, @world.id)
  @work_items.push(step.id, work)
  work
end
done(step) click to toggle source

@returns [TrueClass|FalseClass, Array<WorkItem>]

# File lib/dynflow/director/running_steps_manager.rb, line 38
def done(step)
  Type! step, ExecutionPlan::Steps::RunStep
  # update the step based on the latest finished work
  @running_steps[step.id] = step

  @work_items.shift(step.id).tap do |work|
    finish_event_result(work) { |f| f.fulfill true }
  end

  if step.state == :suspended
    return true, [create_next_event_work_item(step)].compact
  else
    while (work = @work_items.shift(step.id))
      @world.logger.debug "step #{step.execution_plan_id}:#{step.id} dropping event #{work.request_id}/#{work.event}"
      finish_event_result(work) do |f|
        f.reject UnprocessableEvent.new("Message dropped").tap { |e| e.set_backtrace(caller) }
      end
    end
    while (event = @events.shift(step.id))
      @world.logger.debug "step #{step.execution_plan_id}:#{step.id} dropping event #{event.request_id}/#{event}"
      if event.result
        event.result.reject UnprocessableEvent.new("Message dropped").tap { |e| e.set_backtrace(caller) }
      end
    end
    unless @work_items.empty?(step.id) && @events.empty?(step.id)
      raise "Unexpected item in @work_items (#{@work_items.inspect}) or @events (#{@events.inspect})"
    end
    @running_steps.delete(step.id)
    return false, []
  end
end
event(event) click to toggle source

@returns [Array<WorkItem>]

# File lib/dynflow/director/running_steps_manager.rb, line 78
def event(event)
  Type! event, Event

  step = @running_steps[event.step_id]
  unless step
    event.result.reject UnprocessableEvent.new('step is not suspended, it cannot process events')
    return []
  end

  can_run_event = @work_items.empty?(step.id)
  @events_by_request_id[event.request_id] = event
  @events.push(step.id, event)
  if can_run_event
    [create_next_event_work_item(step)]
  else
    []
  end
end
finish_event_result(work_item) { |result| ... } click to toggle source

@yield [Concurrent.resolvable_future] in case the work item has an result future assigned and deletes the tracked event

# File lib/dynflow/director/running_steps_manager.rb, line 108
def finish_event_result(work_item)
  return unless EventWorkItem === work_item
  if event = @events_by_request_id.delete(work_item.request_id)
    yield event.result if event.result
  end
end
terminate() click to toggle source
# File lib/dynflow/director/running_steps_manager.rb, line 20
def terminate
  pending_work = @work_items.clear.values.flatten(1)
  pending_work.each do |w|
    finish_event_result(w) do |result|
      result.reject UnprocessableEvent.new("dropping due to termination")
    end
  end
end
try_to_terminate() click to toggle source
# File lib/dynflow/director/running_steps_manager.rb, line 70
def try_to_terminate
  @running_steps.delete_if do |_, step|
    step.state != :running
  end
  return @running_steps.empty?
end