class Dynflow::World
Constants
- TriggerResult
Attributes
Public Class Methods
# File lib/dynflow/world.rb, line 16 def initialize(config) @config = Config::ForWorld.new(config, self) # Set the telemetry instance as soon as possible Dynflow::Telemetry.set_adapter @config.telemetry_adapter Dynflow::Telemetry.register_metrics! @id = SecureRandom.uuid @logger_adapter = @config.logger_adapter @clock = spawn_and_wait(Clock, 'clock', logger) @config.validate @transaction_adapter = @config.transaction_adapter @persistence = Persistence.new(self, @config.persistence_adapter, :backup_deleted_plans => @config.backup_deleted_plans, :backup_dir => @config.backup_dir) @coordinator = Coordinator.new(@config.coordinator_adapter) if @config.executor @executor = Executors::Parallel.new(self, executor_class: @config.executor, heartbeat_interval: @config.executor_heartbeat_interval, queues_options: @config.queues) end @action_classes = @config.action_classes @auto_rescue = @config.auto_rescue @exit_on_terminate = Concurrent::AtomicBoolean.new(@config.exit_on_terminate) @connector = @config.connector @middleware = Middleware::World.new @middleware.use Middleware::Common::Transaction if @transaction_adapter @client_dispatcher = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age) @dead_letter_handler = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers) @auto_validity_check = @config.auto_validity_check @validity_check_timeout = @config.validity_check_timeout @throttle_limiter = @config.throttle_limiter @terminated = Concurrent::Promises.resolvable_event @termination_timeout = @config.termination_timeout calculate_subscription_index if executor @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore) executor.initialized.wait end update_register perform_validity_checks if auto_validity_check @termination_barrier = Mutex.new @before_termination_hooks = Queue.new if @config.auto_terminate at_exit do @exit_on_terminate.make_false # make sure we don't terminate twice self.terminate.wait end end post_initialization end
Public Instance Methods
# File lib/dynflow/world.rb, line 113 def action_logger logger_adapter.action_logger end
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven't reported any error yet (usually when no executor was available by the time of planning or terminating)
# File lib/dynflow/world.rb, line 265 def auto_execute coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do planned_execution_plans = self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) } planned_execution_plans.map do |ep| if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty? execute(ep.id) end end.compact end rescue Coordinator::LockError => e logger.info "auto-executor lock already aquired: #{e.message}" [] end
# File lib/dynflow/world.rb, line 83 def before_termination(&block) @before_termination_hooks << block end
# File lib/dynflow/world.rb, line 191 def delay(action_class, delay_options, *args) delay_with_options(action_class: action_class, args: args, delay_options: delay_options) end
# File lib/dynflow/world.rb, line 195 def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) raise 'No action_class given' if action_class.nil? execution_plan = ExecutionPlan.new(self, id) execution_plan.delay(caller_action, action_class, delay_options, *args) Scheduled[execution_plan.id] end
# File lib/dynflow/world.rb, line 221 def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Event[execution_plan_id, step_id, event], done, false) end
@return [Concurrent::Promises::ResolvableFuture] containing execution_plan when finished raises when ExecutionPlan
is not accepted for execution
# File lib/dynflow/world.rb, line 217 def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Execution[execution_plan_id], done, true) end
# File lib/dynflow/world.rb, line 237 def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end
# File lib/dynflow/world.rb, line 109 def logger logger_adapter.dynflow_logger end
# File lib/dynflow/world.rb, line 229 def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, true], done, false, timeout) end
# File lib/dynflow/world.rb, line 233 def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, false], done, false, timeout) end
# File lib/dynflow/world.rb, line 202 def plan(action_class, *args) plan_with_options(action_class: action_class, args: args) end
# File lib/dynflow/world.rb, line 225 def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time], accepted, false) end
# File lib/dynflow/world.rb, line 206 def plan_with_options(action_class:, args:, id: nil, caller_action: nil) ExecutionPlan.new(self, id).tap do |execution_plan| coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do execution_plan.prepare(action_class, caller_action: caller_action) execution_plan.plan(*args) end end end
performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check
if false, as it should be called after `perform_validity_checks` method)
# File lib/dynflow/world.rb, line 75 def post_initialization @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock) @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock) update_register @delayed_executor.start if @delayed_executor && !@delayed_executor.started? self.auto_execute if @config.auto_execute end
# File lib/dynflow/world.rb, line 241 def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason| done.reject reason if reason end client_dispatcher.ask([:publish_request, done, request, timeout], accepted) accepted.wait if wait_for_accepted done rescue => e accepted.reject e end
# File lib/dynflow/world.rb, line 101 def registered_world if executor Coordinator::ExecutorWorld.new(self) else Coordinator::ClientWorld.new(self) end end
reload actions classes, intended only for devel
# File lib/dynflow/world.rb, line 122 def reload! # TODO what happens with newly loaded classes @action_classes = @action_classes.map do |klass| begin Utils.constantize(klass.to_s) rescue NameError nil # ignore missing classes end end.compact middleware.clear_cache! calculate_subscription_index end
# File lib/dynflow/world.rb, line 117 def subscribed_actions(action_class) @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : [] end
# File lib/dynflow/world.rb, line 253 def terminate(future = Concurrent::Promises.resolvable_future) start_termination.tangle(future) future end
# File lib/dynflow/world.rb, line 258 def terminating? defined?(@terminating) end
@return [TriggerResult] blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block
# File lib/dynflow/world.rb, line 174 def trigger(action_class = nil, *args, &block) if action_class.nil? raise 'Neither action_class nor a block given' if block.nil? execution_plan = block.call(self) else execution_plan = plan(action_class, *args) end planned = execution_plan.state == :planned if planned done = execute(execution_plan.id, Concurrent::Promises.resolvable_future) Triggered[execution_plan.id, done] else PlaningFailed[execution_plan.id, execution_plan.errors.first.exception] end end
# File lib/dynflow/world.rb, line 280 def try_spawn(what, lock_class = nil) object = nil return nil if !executor || (object = @config.public_send(what)).nil? coordinator.acquire(lock_class.new(self)) if lock_class object.spawn.wait object rescue Coordinator::LockError => e nil end
# File lib/dynflow/world.rb, line 87 def update_register @meta ||= @config.meta @meta['queues'] = @config.queues if @executor @meta['delayed_executor'] = true if @delayed_executor @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time if @already_registered coordinator.update_record(registered_world) else coordinator.register_world(registered_world) @already_registered = true end end
Private Instance Methods
# File lib/dynflow/world.rb, line 349 def calculate_subscription_index @subscription_index = action_classes.each_with_object(Hash.new { |h, k| h[k] = [] }) do |klass, index| next unless klass.subscribe Array(klass.subscribe).each do |subscribed_class| index[Utils.constantize(subscribed_class.to_s)] << klass end end.tap { |o| o.freeze } end
# File lib/dynflow/world.rb, line 359 def run_before_termination_hooks until @before_termination_hooks.empty? hook_run = Concurrent::Promises.future do begin @before_termination_hooks.pop.call rescue => e logger.error e end end logger.error "timeout running before_termination_hook" unless hook_run.wait(termination_timeout) end end
# File lib/dynflow/world.rb, line 372 def spawn_and_wait(klass, name, *args) initialized = Concurrent::Promises.resolvable_future actor = klass.spawn(name: name, args: args, initialized: initialized) initialized.wait return actor end
# File lib/dynflow/world.rb, line 293 def start_termination @termination_barrier.synchronize do return @terminating if @terminating termination_future ||= Concurrent::Promises.future do begin run_before_termination_hooks if delayed_executor logger.info "start terminating delayed_executor..." delayed_executor.terminate.wait(termination_timeout) end logger.info "start terminating throttle_limiter..." throttle_limiter.terminate.wait(termination_timeout) if executor connector.stop_receiving_new_work(self, termination_timeout) logger.info "start terminating executor..." executor.terminate.wait(termination_timeout) logger.info "start terminating executor dispatcher..." executor_dispatcher_terminated = Concurrent::Promises.resolvable_future executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated]) executor_dispatcher_terminated.wait(termination_timeout) end logger.info "start terminating client dispatcher..." client_dispatcher_terminated = Concurrent::Promises.resolvable_future client_dispatcher.ask([:start_termination, client_dispatcher_terminated]) client_dispatcher_terminated.wait(termination_timeout) logger.info "stop listening for new events..." connector.stop_listening(self, termination_timeout) if @clock logger.info "start terminating clock..." clock.ask(:terminate!).wait(termination_timeout) end coordinator.delete_world(registered_world) @terminated.resolve true rescue => e logger.fatal(e) end end @terminating = Concurrent::Promises.future do termination_future.wait(termination_timeout) end.on_resolution do @terminated.resolve Thread.new { Kernel.exit } if @exit_on_terminate.true? end end end