class Dynflow::World
Constants
- TriggerResult
Attributes
action_classes[R]
auto_rescue[R]
auto_validity_check[R]
client_dispatcher[R]
clock[R]
connector[R]
coordinator[R]
dead_letter_handler[R]
delayed_executor[R]
execution_plan_cleaner[R]
executor[R]
executor_dispatcher[R]
id[R]
logger_adapter[R]
meta[R]
middleware[R]
persistence[R]
subscription_index[R]
terminated[R]
termination_timeout[R]
throttle_limiter[R]
transaction_adapter[R]
validity_check_timeout[R]
Public Class Methods
new(config)
click to toggle source
# File lib/dynflow/world.rb, line 12 def initialize(config) @id = SecureRandom.uuid @clock = spawn_and_wait(Clock, 'clock') config_for_world = Config::ForWorld.new(config, self) @logger_adapter = config_for_world.logger_adapter config_for_world.validate @transaction_adapter = config_for_world.transaction_adapter @persistence = Persistence.new(self, config_for_world.persistence_adapter, :backup_deleted_plans => config_for_world.backup_deleted_plans, :backup_dir => config_for_world.backup_dir) @coordinator = Coordinator.new(config_for_world.coordinator_adapter) @executor = config_for_world.executor @action_classes = config_for_world.action_classes @auto_rescue = config_for_world.auto_rescue @exit_on_terminate = Concurrent::AtomicBoolean.new(config_for_world.exit_on_terminate) @connector = config_for_world.connector @middleware = Middleware::World.new @middleware.use Middleware::Common::Transaction if @transaction_adapter @client_dispatcher = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self) @dead_letter_handler = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', config_for_world.silent_dead_letter_matchers) @meta = config_for_world.meta @auto_validity_check = config_for_world.auto_validity_check @validity_check_timeout = config_for_world.validity_check_timeout @throttle_limiter = config_for_world.throttle_limiter @terminated = Concurrent.event @termination_timeout = config_for_world.termination_timeout calculate_subscription_index if executor @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, config_for_world.executor_semaphore) executor.initialized.wait end if auto_validity_check self.worlds_validity_check self.locks_validity_check end @delayed_executor = try_spawn(config_for_world, :delayed_executor, Coordinator::DelayedExecutorLock) @execution_plan_cleaner = try_spawn(config_for_world, :execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock) @meta = config_for_world.meta @meta['delayed_executor'] = true if @delayed_executor @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner coordinator.register_world(registered_world) @termination_barrier = Mutex.new @before_termination_hooks = Queue.new if config_for_world.auto_terminate at_exit do @exit_on_terminate.make_false # make sure we don't terminate twice self.terminate.wait end end self.auto_execute if config_for_world.auto_execute @delayed_executor.start if @delayed_executor end
Public Instance Methods
action_logger()
click to toggle source
# File lib/dynflow/world.rb, line 83 def action_logger logger_adapter.action_logger end
auto_execute()
click to toggle source
executes plans that are planned/paused and haven't reported any error yet (usually when no executor was available by the time of planning or terminating)
# File lib/dynflow/world.rb, line 380 def auto_execute coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do planned_execution_plans = self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) } planned_execution_plans.map do |ep| if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty? execute(ep.id) end end.compact end rescue Coordinator::LockError => e logger.info "auto-executor lock already aquired: #{e.message}" [] end
before_termination(&block)
click to toggle source
# File lib/dynflow/world.rb, line 67 def before_termination(&block) @before_termination_hooks << block end
delay(*args)
click to toggle source
# File lib/dynflow/world.rb, line 161 def delay(*args) delay_with_caller(nil, *args) end
delay_with_caller(caller_action, action_class, delay_options, *args)
click to toggle source
# File lib/dynflow/world.rb, line 165 def delay_with_caller(caller_action, action_class, delay_options, *args) raise 'No action_class given' if action_class.nil? execution_plan = ExecutionPlan.new(self) execution_plan.delay(caller_action, action_class, delay_options, *args) Scheduled[execution_plan.id] end
event(execution_plan_id, step_id, event, done = Concurrent.future)
click to toggle source
# File lib/dynflow/world.rb, line 192 def event(execution_plan_id, step_id, event, done = Concurrent.future) publish_request(Dispatcher::Event[execution_plan_id, step_id, event], done, false) end
execute(execution_plan_id, done = Concurrent.future)
click to toggle source
@return [Concurrent::Edge::Future] containing execution_plan when finished raises when ExecutionPlan is not accepted for execution
# File lib/dynflow/world.rb, line 188 def execute(execution_plan_id, done = Concurrent.future) publish_request(Dispatcher::Execution[execution_plan_id], done, true) end
invalidate(world)
click to toggle source
Invalidate another world, that left some data in the runtime, but it's not really running
# File lib/dynflow/world.rb, line 272 def invalidate(world) Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld coordinator.acquire(Coordinator::WorldInvalidationLock.new(self, world)) do if world.is_a? Coordinator::ExecutorWorld old_execution_locks = coordinator.find_locks(class: Coordinator::ExecutionLock.name, owner_id: "world:#{world.id}") coordinator.deactivate_world(world) old_execution_locks.each do |execution_lock| invalidate_execution_lock(execution_lock) end end coordinator.delete_world(world) end end
invalidate_execution_lock(execution_lock)
click to toggle source
# File lib/dynflow/world.rb, line 290 def invalidate_execution_lock(execution_lock) begin plan = persistence.load_execution_plan(execution_lock.execution_plan_id) rescue => e if e.is_a?(KeyError) logger.error "invalidated execution plan #{execution_lock.execution_plan_id} missing, skipping" else logger.error e logger.error "unexpected error when invalidating execution plan #{execution_lock.execution_plan_id}, skipping" end coordinator.release(execution_lock) return end unless plan.valid? logger.error "invalid plan #{plan.id}, skipping" coordinator.release(execution_lock) return end plan.execution_history.add('terminate execution', execution_lock.world_id) plan.steps.values.each do |step| if step.state == :running step.error = ExecutionPlan::Steps::Error.new("Abnormal termination (previous state: #{step.state})") step.state = :error step.save end end plan.update_state(:paused) if plan.state == :running plan.save coordinator.release(execution_lock) available_executors = coordinator.find_worlds(true) if available_executors.any? && !plan.error? client_dispatcher.tell([:dispatch_request, Dispatcher::Execution[execution_lock.execution_plan_id], execution_lock.client_world_id, execution_lock.request_id]) end rescue Errors::PersistenceError logger.error "failed to write data while invalidating execution lock #{execution_lock}" end
locks_validity_check()
click to toggle source
# File lib/dynflow/world.rb, line 368 def locks_validity_check orphaned_locks = coordinator.clean_orphaned_locks unless orphaned_locks.empty? logger.error "invalid coordinator locks found and invalidated: #{orphaned_locks.inspect}" end return orphaned_locks end
logger()
click to toggle source
# File lib/dynflow/world.rb, line 79 def logger logger_adapter.dynflow_logger end
ping(world_id, timeout, done = Concurrent.future)
click to toggle source
# File lib/dynflow/world.rb, line 196 def ping(world_id, timeout, done = Concurrent.future) publish_request(Dispatcher::Ping[world_id], done, false, timeout) end
plan(action_class, *args)
click to toggle source
# File lib/dynflow/world.rb, line 172 def plan(action_class, *args) ExecutionPlan.new(self).tap do |execution_plan| execution_plan.prepare(action_class) execution_plan.plan(*args) end end
plan_with_caller(caller_action, action_class, *args)
click to toggle source
# File lib/dynflow/world.rb, line 179 def plan_with_caller(caller_action, action_class, *args) ExecutionPlan.new(self).tap do |execution_plan| execution_plan.prepare(action_class, caller_action: caller_action) execution_plan.plan(*args) end end
publish_request(request, done, wait_for_accepted, timeout = nil)
click to toggle source
# File lib/dynflow/world.rb, line 200 def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent.future accepted.rescue do |reason| done.fail reason if reason end client_dispatcher.ask([:publish_request, done, request, timeout], accepted) accepted.wait if wait_for_accepted done rescue => e accepted.fail e end
registered_world()
click to toggle source
# File lib/dynflow/world.rb, line 71 def registered_world if executor Coordinator::ExecutorWorld.new(self) else Coordinator::ClientWorld.new(self) end end
reload!()
click to toggle source
reload actions classes, intended only for devel
# File lib/dynflow/world.rb, line 92 def reload! # TODO what happens with newly loaded classes @action_classes = @action_classes.map do |klass| begin Utils.constantize(klass.to_s) rescue NameError nil # ignore missing classes end end.compact middleware.clear_cache! calculate_subscription_index end
subscribed_actions(action_class)
click to toggle source
# File lib/dynflow/world.rb, line 87 def subscribed_actions(action_class) @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : [] end
terminate(future = Concurrent.future)
click to toggle source
# File lib/dynflow/world.rb, line 212 def terminate(future = Concurrent.future) @termination_barrier.synchronize do @terminating ||= Concurrent.future do begin run_before_termination_hooks if delayed_executor logger.info "start terminating delayed_executor..." delayed_executor.terminate.wait(termination_timeout) end logger.info "start terminating throttle_limiter..." throttle_limiter.terminate.wait(termination_timeout) if executor connector.stop_receiving_new_work(self) logger.info "start terminating executor..." executor.terminate.wait(termination_timeout) logger.info "start terminating executor dispatcher..." executor_dispatcher_terminated = Concurrent.future executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated]) executor_dispatcher_terminated.wait(termination_timeout) end logger.info "start terminating client dispatcher..." client_dispatcher_terminated = Concurrent.future client_dispatcher.ask([:start_termination, client_dispatcher_terminated]) client_dispatcher_terminated.wait(termination_timeout) logger.info "stop listening for new events..." connector.stop_listening(self) if @clock logger.info "start terminating clock..." clock.ask(:terminate!).wait(termination_timeout) end coordinator.delete_world(registered_world) @terminated.complete true rescue => e logger.fatal(e) end end.on_completion do Thread.new { Kernel.exit } if @exit_on_terminate.true? end end @terminating.tangle(future) future end
terminating?()
click to toggle source
# File lib/dynflow/world.rb, line 266 def terminating? defined?(@terminating) end
trigger(action_class = nil, *args, &block)
click to toggle source
@return [TriggerResult] blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block
# File lib/dynflow/world.rb, line 144 def trigger(action_class = nil, *args, &block) if action_class.nil? raise 'Neither action_class nor a block given' if block.nil? execution_plan = block.call(self) else execution_plan = plan(action_class, *args) end planned = execution_plan.state == :planned if planned done = execute(execution_plan.id, Concurrent.future) Triggered[execution_plan.id, done] else PlaningFailed[execution_plan.id, execution_plan.errors.first.exception] end end
try_spawn(config_for_world, what, lock_class = nil)
click to toggle source
# File lib/dynflow/world.rb, line 395 def try_spawn(config_for_world, what, lock_class = nil) object = nil return nil if !executor || (object = config_for_world.public_send(what)).nil? coordinator.acquire(lock_class.new(self)) if lock_class object.spawn.wait object rescue Coordinator::LockError => e nil end
worlds_validity_check(auto_invalidate = true, worlds_filter = {})
click to toggle source
# File lib/dynflow/world.rb, line 333 def worlds_validity_check(auto_invalidate = true, worlds_filter = {}) worlds = coordinator.find_worlds(false, worlds_filter) world_checks = worlds.reduce({}) do |hash, world| hash.update(world => ping(world.id, self.validity_check_timeout)) end world_checks.values.each(&:wait) results = {} world_checks.each do |world, check| if check.success? result = :valid else if auto_invalidate begin invalidate(world) result = :invalidated rescue => e logger.error e result = e.message end else result = :invalid end end results[world.id] = result end unless results.values.all? { |result| result == :valid } logger.error "invalid worlds found #{results.inspect}" end return results end
Private Instance Methods
calculate_subscription_index()
click to toggle source
# File lib/dynflow/world.rb, line 407 def calculate_subscription_index @subscription_index = action_classes.each_with_object(Hash.new { |h, k| h[k] = [] }) do |klass, index| next unless klass.subscribe Array(klass.subscribe).each do |subscribed_class| index[Utils.constantize(subscribed_class.to_s)] << klass end end.tap { |o| o.freeze } end
run_before_termination_hooks()
click to toggle source
# File lib/dynflow/world.rb, line 417 def run_before_termination_hooks until @before_termination_hooks.empty? begin @before_termination_hooks.pop.call rescue => e logger.error e end end end
spawn_and_wait(klass, name, *args)
click to toggle source
# File lib/dynflow/world.rb, line 427 def spawn_and_wait(klass, name, *args) initialized = Concurrent.future actor = klass.spawn(name: name, args: args, initialized: initialized) initialized.wait return actor end