class Dynflow::Director::RunningStepsManager

Handles the events generated while running actions, makes sure the events are sent to the action only when in suspended state

Public Class Methods

new(world) click to toggle source
# File lib/dynflow/director/running_steps_manager.rb, line 8
def initialize(world)
  @world         = Type! world, World
  @running_steps = {}
  @events        = WorkQueue.new(Integer, WorkItem)
end

Public Instance Methods

add(step, work) click to toggle source
# File lib/dynflow/director/running_steps_manager.rb, line 23
def add(step, work)
  Type! step, ExecutionPlan::Steps::RunStep
  @running_steps[step.id] = step
  # we make sure not to run any event when the step is still being executed
  @events.push(step.id, work)
  self
end
done(step) click to toggle source

@returns [TrueClass|FalseClass, Array<WorkItem>]

# File lib/dynflow/director/running_steps_manager.rb, line 32
def done(step)
  Type! step, ExecutionPlan::Steps::RunStep
  @events.shift(step.id).tap do |work|
    work.event.result.fulfill true if EventWorkItem === work
  end

  if step.state == :suspended
    return true, [@events.first(step.id)].compact
  else
    while (event = @events.shift(step.id))
      message = "step #{step.execution_plan_id}:#{step.id} dropping event #{event.event}"
      @world.logger.warn message
      event.event.result.reject UnprocessableEvent.new(message).
          tap { |e| e.set_backtrace(caller) }
    end
    raise 'assert' unless @events.empty?(step.id)
    @running_steps.delete(step.id)
    return false, []
  end
end
event(event) click to toggle source

@returns [Array<WorkItem>]

# File lib/dynflow/director/running_steps_manager.rb, line 61
def event(event)
  Type! event, Event
  next_work_items = []

  step = @running_steps[event.step_id]
  unless step
    event.result.reject UnprocessableEvent.new('step is not suspended, it cannot process events')
    return next_work_items
  end

  can_run_event = @events.empty?(step.id)
  work          = EventWorkItem.new(event.execution_plan_id, step, event, step.queue)
  @events.push(step.id, work)
  next_work_items << work if can_run_event
  next_work_items
end
terminate() click to toggle source
# File lib/dynflow/director/running_steps_manager.rb, line 14
def terminate
  pending_work = @events.clear.values.flatten(1)
  pending_work.each do |w|
    if EventWorkItem === w
      w.event.result.reject UnprocessableEvent.new("dropping due to termination")
    end
  end
end
try_to_terminate() click to toggle source
# File lib/dynflow/director/running_steps_manager.rb, line 53
def try_to_terminate
  @running_steps.delete_if do |_, step|
    step.state != :running
  end
  return @running_steps.empty?
end