All Files ( 91.42% covered at 849.76 hits/line )
44 files in total.
1165 relevant lines,
1065 lines covered and
100 lines missed.
(
91.42%
)
# frozen_string_literal: true
#--
# Copyright (c) 2014-2020 David Heinemeier Hansson
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#++
- 11
require "active_support"
- 11
require "active_support/rails"
- 11
require "active_job/version"
- 11
require "global_id"
- 11
module ActiveJob
- 11
extend ActiveSupport::Autoload
- 11
autoload :Base
- 11
autoload :QueueAdapters
- 11
autoload :Serializers
- 11
autoload :ConfiguredJob
- 11
autoload :TestCase
- 11
autoload :TestHelper
end
# frozen_string_literal: true
- 11
require "active_support/core_ext/hash"
- 11
module ActiveJob
# Raised when an exception is raised during job arguments deserialization.
#
# Wraps the original exception raised as +cause+.
- 11
class DeserializationError < StandardError
- 11
def initialize #:nodoc:
- 84
super("Error while trying to deserialize arguments: #{$!.message}")
- 84
set_backtrace $!.backtrace
end
end
# Raised when an unsupported argument type is set as a job argument. We
# currently support String, Integer, Float, NilClass, TrueClass, FalseClass,
# BigDecimal, Symbol, Date, Time, DateTime, ActiveSupport::TimeWithZone,
# ActiveSupport::Duration, Hash, ActiveSupport::HashWithIndifferentAccess,
# Array or GlobalID::Identification instances, although this can be extended
# by adding custom serializers.
# Raised if you set the key for a Hash something else than a string or
# a symbol. Also raised when trying to serialize an object which can't be
# identified with a GlobalID - such as an unpersisted Active Record model.
- 11
class SerializationError < ArgumentError; end
- 11
module Arguments
- 11
extend self
# Serializes a set of arguments. Intrinsic types that can safely be
# serialized without mutation are returned as-is. Arrays/Hashes are
# serialized element by element. All other types are serialized using
# GlobalID.
- 11
def serialize(arguments)
- 12775
arguments.map { |argument| serialize_argument(argument) }
end
# Deserializes a set of arguments. Intrinsic types that can safely be
# deserialized without mutation are returned as-is. Arrays/Hashes are
# deserialized element by element. All other types are deserialized using
# GlobalID.
- 11
def deserialize(arguments)
- 9088
arguments.map { |argument| deserialize_argument(argument) }
rescue
- 84
raise DeserializationError
end
- 11
private
# :nodoc:
- 11
PERMITTED_TYPES = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ]
# :nodoc:
- 11
GLOBALID_KEY = "_aj_globalid"
# :nodoc:
- 11
SYMBOL_KEYS_KEY = "_aj_symbol_keys"
# :nodoc:
- 11
RUBY2_KEYWORDS_KEY = "_aj_ruby2_keywords"
# :nodoc:
- 11
WITH_INDIFFERENT_ACCESS_KEY = "_aj_hash_with_indifferent_access"
# :nodoc:
- 11
OBJECT_SERIALIZER_KEY = "_aj_serialized"
# :nodoc:
- 11
RESERVED_KEYS = [
GLOBALID_KEY, GLOBALID_KEY.to_sym,
SYMBOL_KEYS_KEY, SYMBOL_KEYS_KEY.to_sym,
RUBY2_KEYWORDS_KEY, RUBY2_KEYWORDS_KEY.to_sym,
OBJECT_SERIALIZER_KEY, OBJECT_SERIALIZER_KEY.to_sym,
WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym,
]
- 11
private_constant :PERMITTED_TYPES, :RESERVED_KEYS, :GLOBALID_KEY,
:SYMBOL_KEYS_KEY, :RUBY2_KEYWORDS_KEY, :WITH_INDIFFERENT_ACCESS_KEY
- 11
unless Hash.respond_to?(:ruby2_keywords_hash?) && Hash.respond_to?(:ruby2_keywords_hash)
- 11
using Module.new {
- 11
refine Hash do
- 11
class << Hash
- 11
if RUBY_VERSION >= "2.7"
def ruby2_keywords_hash?(hash)
!new(*[hash]).default.equal?(hash)
end
else
- 11
def ruby2_keywords_hash?(hash)
- 506
false
end
end
- 11
def ruby2_keywords_hash(hash)
_ruby2_keywords_hash(**hash)
end
- 11
private def _ruby2_keywords_hash(*args)
args.last
end
- 11
ruby2_keywords(:_ruby2_keywords_hash) if respond_to?(:ruby2_keywords, true)
end
end
}
end
- 11
def serialize_argument(argument)
- 7624
case argument
when *PERMITTED_TYPES
- 5909
argument
when GlobalID::Identification
- 243
convert_to_global_id_hash(argument)
when Array
- 843
argument.map { |arg| serialize_argument(arg) }
when ActiveSupport::HashWithIndifferentAccess
- 22
serialize_indifferent_hash(argument)
when Hash
- 506
symbol_keys = argument.each_key.grep(Symbol).map!(&:to_s)
- 506
aj_hash_key = if Hash.ruby2_keywords_hash?(argument)
RUBY2_KEYWORDS_KEY
else
- 506
SYMBOL_KEYS_KEY
end
- 506
result = serialize_hash(argument)
- 385
result[aj_hash_key] = symbol_keys
- 385
result
- 691
when -> (arg) { arg.respond_to?(:permitted?) }
- 11
serialize_indifferent_hash(argument.to_h)
else
- 680
Serializers.serialize(argument)
end
end
- 11
def deserialize_argument(argument)
- 7544
case argument
when String
- 3566
argument
when *PERMITTED_TYPES
- 1473
argument
when Array
- 2125
argument.map { |arg| deserialize_argument(arg) }
when Hash
- 1654
if serialized_global_id?(argument)
- 304
deserialize_global_id argument
- 1350
elsif custom_serialized?(argument)
- 734
Serializers.deserialize(argument)
else
- 616
deserialize_hash(argument)
end
else
- 22
raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
end
end
- 11
def serialized_global_id?(hash)
- 1654
hash.size == 1 && hash.include?(GLOBALID_KEY)
end
- 11
def deserialize_global_id(hash)
- 304
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
- 11
def custom_serialized?(hash)
- 1350
hash.key?(OBJECT_SERIALIZER_KEY)
end
- 11
def serialize_hash(argument)
- 539
argument.each_with_object({}) do |(key, value), hash|
- 682
hash[serialize_hash_key(key)] = serialize_argument(value)
end
end
- 11
def deserialize_hash(serialized_hash)
- 2057
result = serialized_hash.transform_values { |v| deserialize_argument(v) }
- 616
if result.delete(WITH_INDIFFERENT_ACCESS_KEY)
- 22
result = result.with_indifferent_access
- 594
elsif symbol_keys = result.delete(SYMBOL_KEYS_KEY)
- 583
result = transform_symbol_keys(result, symbol_keys)
- 11
elsif symbol_keys = result.delete(RUBY2_KEYWORDS_KEY)
result = transform_symbol_keys(result, symbol_keys)
result = Hash.ruby2_keywords_hash(result)
end
- 616
result
end
- 11
def serialize_hash_key(key)
- 682
case key
when *RESERVED_KEYS
- 88
raise SerializationError.new("Can't serialize a Hash with reserved key #{key.inspect}")
when String, Symbol
- 572
key.to_s
else
- 22
raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}")
end
end
- 11
def serialize_indifferent_hash(indifferent_hash)
- 33
result = serialize_hash(indifferent_hash)
- 33
result[WITH_INDIFFERENT_ACCESS_KEY] = serialize_argument(true)
- 33
result
end
- 11
def transform_symbol_keys(hash, symbol_keys)
# NOTE: HashWithIndifferentAccess#transform_keys always
# returns stringified keys with indifferent access
# so we call #to_h here to ensure keys are symbolized.
- 583
hash.to_h.transform_keys do |key|
- 803
if symbol_keys.include?(key)
- 748
key.to_sym
else
- 55
key
end
end
end
- 11
def convert_to_global_id_hash(argument)
- 243
{ GLOBALID_KEY => argument.to_global_id.to_s }
rescue URI::GID::MissingModelIdError
- 11
raise SerializationError, "Unable to serialize #{argument.class} " \
"without an id. (Maybe you forgot to call save?)"
end
end
end
# frozen_string_literal: true
- 11
require "active_job/core"
- 11
require "active_job/queue_adapter"
- 11
require "active_job/queue_name"
- 11
require "active_job/queue_priority"
- 11
require "active_job/enqueuing"
- 11
require "active_job/execution"
- 11
require "active_job/callbacks"
- 11
require "active_job/exceptions"
- 11
require "active_job/log_subscriber"
- 11
require "active_job/logging"
- 11
require "active_job/instrumentation"
- 11
require "active_job/timezones"
- 11
require "active_job/translation"
- 11
module ActiveJob #:nodoc:
# = Active Job
#
# Active Job objects can be configured to work with different backend
# queuing frameworks. To specify a queue adapter to use:
#
# ActiveJob::Base.queue_adapter = :inline
#
# A list of supported adapters can be found in QueueAdapters.
#
# Active Job objects can be defined by creating a class that inherits
# from the ActiveJob::Base class. The only necessary method to
# implement is the "perform" method.
#
# To define an Active Job object:
#
# class ProcessPhotoJob < ActiveJob::Base
# def perform(photo)
# photo.watermark!('Rails')
# photo.rotate!(90.degrees)
# photo.resize_to_fit!(300, 300)
# photo.upload!
# end
# end
#
# Records that are passed in are serialized/deserialized using Global
# ID. More information can be found in Arguments.
#
# To enqueue a job to be performed as soon as the queuing system is free:
#
# ProcessPhotoJob.perform_later(photo)
#
# To enqueue a job to be processed at some point in the future:
#
# ProcessPhotoJob.set(wait_until: Date.tomorrow.noon).perform_later(photo)
#
# More information can be found in ActiveJob::Core::ClassMethods#set
#
# A job can also be processed immediately without sending to the queue:
#
# ProcessPhotoJob.perform_now(photo)
#
# == Exceptions
#
# * DeserializationError - Error class for deserialization errors.
# * SerializationError - Error class for serialization errors.
- 11
class Base
- 11
include Core
- 11
include QueueAdapter
- 11
include QueueName
- 11
include QueuePriority
- 11
include Enqueuing
- 11
include Execution
- 11
include Callbacks
- 11
include Exceptions
- 11
include Logging
- 11
include Instrumentation
- 11
include Timezones
- 11
include Translation
- 11
ActiveSupport.run_load_hooks(:active_job, self)
end
end
# frozen_string_literal: true
- 11
require "active_support/callbacks"
- 11
require "active_support/core_ext/object/with_options"
- 11
require "active_support/core_ext/module/attribute_accessors"
- 11
module ActiveJob
# = Active Job Callbacks
#
# Active Job provides hooks during the life cycle of a job. Callbacks allow you
# to trigger logic during this cycle. Available callbacks are:
#
# * <tt>before_enqueue</tt>
# * <tt>around_enqueue</tt>
# * <tt>after_enqueue</tt>
# * <tt>before_perform</tt>
# * <tt>around_perform</tt>
# * <tt>after_perform</tt>
#
# NOTE: Calling the same callback multiple times will overwrite previous callback definitions.
#
- 11
module Callbacks
- 11
extend ActiveSupport::Concern
- 11
include ActiveSupport::Callbacks
- 11
class << self
- 11
include ActiveSupport::Callbacks
- 11
define_callbacks :execute
end
- 11
included do
- 11
class_attribute :return_false_on_aborted_enqueue, instance_accessor: false, instance_predicate: false, default: false
- 11
cattr_accessor :skip_after_callbacks_if_terminated, instance_accessor: false, default: false
- 11
with_options(skip_after_callbacks_if_terminated: skip_after_callbacks_if_terminated) do
- 11
define_callbacks :perform
- 11
define_callbacks :enqueue
end
end
# These methods will be included into any Active Job object, adding
# callbacks for +perform+ and +enqueue+ methods.
- 11
module ClassMethods
- 11
def inherited(klass)
- 385
klass.get_callbacks(:enqueue).config[:skip_after_callbacks_if_terminated] = skip_after_callbacks_if_terminated
- 385
klass.get_callbacks(:perform).config[:skip_after_callbacks_if_terminated] = skip_after_callbacks_if_terminated
- 385
super
end
# Defines a callback that will get called right before the
# job's perform method is executed.
#
# class VideoProcessJob < ActiveJob::Base
# queue_as :default
#
# before_perform do |job|
# UserMailer.notify_video_started_processing(job.arguments.first)
# end
#
# def perform(video_id)
# Video.find(video_id).process
# end
# end
#
- 11
def before_perform(*filters, &blk)
- 99
set_callback(:perform, :before, *filters, &blk)
end
# Defines a callback that will get called right after the
# job's perform method has finished.
#
# class VideoProcessJob < ActiveJob::Base
# queue_as :default
#
# after_perform do |job|
# UserMailer.notify_video_processed(job.arguments.first)
# end
#
# def perform(video_id)
# Video.find(video_id).process
# end
# end
#
- 11
def after_perform(*filters, &blk)
- 77
set_callback(:perform, :after, *filters, &blk)
end
# Defines a callback that will get called around the job's perform method.
#
# class VideoProcessJob < ActiveJob::Base
# queue_as :default
#
# around_perform do |job, block|
# UserMailer.notify_video_started_processing(job.arguments.first)
# block.call
# UserMailer.notify_video_processed(job.arguments.first)
# end
#
# def perform(video_id)
# Video.find(video_id).process
# end
# end
#
# You can access the return value of the job only if the execution wasn't halted.
#
# class VideoProcessJob < ActiveJob::Base
# around_perform do |job, block|
# value = block.call
# puts value # => "Hello World!"
# end
#
# def perform
# "Hello World!"
# end
# end
#
- 11
def around_perform(*filters, &blk)
- 77
set_callback(:perform, :around, *filters, &blk)
end
# Defines a callback that will get called right before the
# job is enqueued.
#
# class VideoProcessJob < ActiveJob::Base
# queue_as :default
#
# before_enqueue do |job|
# $statsd.increment "enqueue-video-job.try"
# end
#
# def perform(video_id)
# Video.find(video_id).process
# end
# end
#
- 11
def before_enqueue(*filters, &blk)
- 99
set_callback(:enqueue, :before, *filters, &blk)
end
# Defines a callback that will get called right after the
# job is enqueued.
#
# class VideoProcessJob < ActiveJob::Base
# queue_as :default
#
# after_enqueue do |job|
# $statsd.increment "enqueue-video-job.success"
# end
#
# def perform(video_id)
# Video.find(video_id).process
# end
# end
#
- 11
def after_enqueue(*filters, &blk)
- 77
set_callback(:enqueue, :after, *filters, &blk)
end
# Defines a callback that will get called around the enqueuing
# of the job.
#
# class VideoProcessJob < ActiveJob::Base
# queue_as :default
#
# around_enqueue do |job, block|
# $statsd.time "video-job.process" do
# block.call
# end
# end
#
# def perform(video_id)
# Video.find(video_id).process
# end
# end
#
- 11
def around_enqueue(*filters, &blk)
- 44
set_callback(:enqueue, :around, *filters, &blk)
end
end
- 11
private
- 11
def halted_callback_hook(_filter, name) # :nodoc:
- 132
return super unless %i(enqueue perform).include?(name.to_sym)
- 132
callbacks = public_send("_#{name}_callbacks")
- 330
if !self.class.skip_after_callbacks_if_terminated && callbacks.any? { |c| c.kind == :after }
- 22
ActiveSupport::Deprecation.warn(<<~EOM)
In Rails 6.2, `after_enqueue`/`after_perform` callbacks no longer run if `before_enqueue`/`before_perform` respectively halts with `throw :abort`.
To enable this behavior, uncomment the `config.active_job.skip_after_callbacks_if_terminated` config
in the new 6.1 framework defaults initializer.
EOM
end
- 132
super
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
class ConfiguredJob #:nodoc:
- 11
def initialize(job_class, options = {})
- 1375
@options = options
- 1375
@job_class = job_class
end
- 11
def perform_now(*args)
@job_class.new(*args).perform_now
end
- 11
ruby2_keywords(:perform_now) if respond_to?(:ruby2_keywords, true)
- 11
def perform_later(*args)
- 1375
@job_class.new(*args).enqueue @options
end
- 11
ruby2_keywords(:perform_later) if respond_to?(:ruby2_keywords, true)
end
end
# frozen_string_literal: true
- 11
module ActiveJob
# Provides general behavior that will be included into every Active Job
# object that inherits from ActiveJob::Base.
- 11
module Core
- 11
extend ActiveSupport::Concern
# Job arguments
- 11
attr_accessor :arguments
- 11
attr_writer :serialized_arguments
# Timestamp when the job should be performed
- 11
attr_accessor :scheduled_at
# Job Identifier
- 11
attr_accessor :job_id
# Queue in which the job will reside.
- 11
attr_writer :queue_name
# Priority that the job will have (lower is more priority).
- 11
attr_writer :priority
# ID optionally provided by adapter
- 11
attr_accessor :provider_job_id
# Number of times this job has been executed (which increments on every retry, like after an exception).
- 11
attr_accessor :executions
# Hash that contains the number of times this job handled errors for each specific retry_on declaration.
# Keys are the string representation of the exceptions listed in the retry_on declaration,
# while its associated value holds the number of executions where the corresponding retry_on
# declaration handled one of its listed exceptions.
- 11
attr_accessor :exception_executions
# I18n.locale to be used during the job.
- 11
attr_accessor :locale
# Timezone to be used during the job.
- 11
attr_accessor :timezone
# Track when a job was enqueued
- 11
attr_accessor :enqueued_at
# These methods will be included into any Active Job object, adding
# helpers for de/serialization and creation of job instances.
- 11
module ClassMethods
# Creates a new job instance from a hash created with +serialize+
- 11
def deserialize(job_data)
- 3170
job = job_data["job_class"].constantize.new
- 3170
job.deserialize(job_data)
- 3170
job
end
# Creates a job preconfigured with the given options. You can call
# perform_later with the job arguments to enqueue the job with the
# preconfigured options
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
# VideoJob.set(queue: :some_queue).perform_later(Video.last)
# VideoJob.set(wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
- 11
def set(options = {})
- 1375
ConfiguredJob.new(self, options)
end
end
# Creates a new job instance. Takes the arguments that will be
# passed to the perform method.
- 11
def initialize(*arguments)
- 7674
@arguments = arguments
- 7674
@job_id = SecureRandom.uuid
- 7674
@queue_name = self.class.queue_name
- 7674
@priority = self.class.priority
- 7674
@executions = 0
- 7674
@exception_executions = {}
end
- 11
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)
# Returns a hash with the job data that can safely be passed to the
# queuing adapter.
- 11
def serialize
- 5887
{
"job_class" => self.class.name,
"job_id" => job_id,
"provider_job_id" => provider_job_id,
"queue_name" => queue_name,
"priority" => priority,
"arguments" => serialize_arguments_if_needed(arguments),
"executions" => executions,
"exception_executions" => exception_executions,
"locale" => I18n.locale.to_s,
"timezone" => Time.zone&.name,
"enqueued_at" => Time.now.utc.iso8601
}
end
# Attaches the stored job data to the current instance. Receives a hash
# returned from +serialize+
#
# ==== Examples
#
# class DeliverWebhookJob < ActiveJob::Base
# attr_writer :attempt_number
#
# def attempt_number
# @attempt_number ||= 0
# end
#
# def serialize
# super.merge('attempt_number' => attempt_number + 1)
# end
#
# def deserialize(job_data)
# super
# self.attempt_number = job_data['attempt_number']
# end
#
# rescue_from(Timeout::Error) do |exception|
# raise exception if attempt_number > 5
# retry_job(wait: 10)
# end
# end
- 11
def deserialize(job_data)
- 3192
self.job_id = job_data["job_id"]
- 3192
self.provider_job_id = job_data["provider_job_id"]
- 3192
self.queue_name = job_data["queue_name"]
- 3192
self.priority = job_data["priority"]
- 3192
self.serialized_arguments = job_data["arguments"]
- 3192
self.executions = job_data["executions"]
- 3192
self.exception_executions = job_data["exception_executions"]
- 3192
self.locale = job_data["locale"] || I18n.locale.to_s
- 3192
self.timezone = job_data["timezone"] || Time.zone&.name
- 3192
self.enqueued_at = job_data["enqueued_at"]
end
- 11
private
- 11
def serialize_arguments_if_needed(arguments)
- 5887
if arguments_serialized?
- 80
@serialized_arguments
else
- 5807
serialize_arguments(arguments)
end
end
- 11
def deserialize_arguments_if_needed
- 4127
if arguments_serialized?
- 3137
@arguments = deserialize_arguments(@serialized_arguments)
- 3075
@serialized_arguments = nil
end
end
- 11
def serialize_arguments(arguments)
- 5807
Arguments.serialize(arguments)
end
- 11
def deserialize_arguments(serialized_args)
- 3137
Arguments.deserialize(serialized_args)
end
- 11
def arguments_serialized?
- 10014
defined?(@serialized_arguments) && @serialized_arguments
end
end
end
# frozen_string_literal: true
- 11
require "active_job/arguments"
- 11
module ActiveJob
# Provides behavior for enqueuing jobs.
- 11
module Enqueuing
- 11
extend ActiveSupport::Concern
# Includes the +perform_later+ method for job initialization.
- 11
module ClassMethods
# Push a job onto the queue. By default the arguments must be either String,
# Integer, Float, NilClass, TrueClass, FalseClass, BigDecimal, Symbol, Date,
# Time, DateTime, ActiveSupport::TimeWithZone, ActiveSupport::Duration,
# Hash, ActiveSupport::HashWithIndifferentAccess, Array or
# GlobalID::Identification instances, although this can be extended by adding
# custom serializers.
#
# Returns an instance of the job class queued with arguments available in
# Job#arguments.
- 11
def perform_later(*args)
- 2618
job_or_instantiate(*args).enqueue
end
- 11
ruby2_keywords(:perform_later) if respond_to?(:ruby2_keywords, true)
- 11
private
- 11
def job_or_instantiate(*args) # :doc:
- 2706
args.first.is_a?(self) ? args.first : new(*args)
end
- 11
ruby2_keywords(:job_or_instantiate) if respond_to?(:ruby2_keywords, true)
end
# Enqueues the job to be performed by the queue adapter.
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
# my_job_instance.enqueue
# my_job_instance.enqueue wait: 5.minutes
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
# my_job_instance.enqueue priority: 10
- 11
def enqueue(options = {})
- 4802
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
- 4802
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
- 4802
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
- 4802
self.priority = options[:priority].to_i if options[:priority]
- 4802
successfully_enqueued = false
- 4802
run_callbacks :enqueue do
- 4692
if scheduled_at
- 991
queue_adapter.enqueue_at self, scheduled_at
else
- 3701
queue_adapter.enqueue self
end
- 4400
successfully_enqueued = true
end
- 4488
if successfully_enqueued
- 4400
self
else
- 88
if self.class.return_false_on_aborted_enqueue
- 77
false
else
- 11
ActiveSupport::Deprecation.warn(
"Rails 6.1 will return false when the enqueuing is aborted. Make sure your code doesn't depend on it" \
" returning the instance of the job and set `config.active_job.return_false_on_aborted_enqueue = true`" \
" to remove the deprecations."
)
- 11
self
end
end
end
end
end
# frozen_string_literal: true
- 11
require "active_support/core_ext/numeric/time"
- 11
module ActiveJob
# Provides behavior for retrying and discarding jobs on exceptions.
- 11
module Exceptions
- 11
extend ActiveSupport::Concern
- 11
included do
- 11
class_attribute :retry_jitter, instance_accessor: false, instance_predicate: false, default: 0.0
end
- 11
module ClassMethods
# Catch the exception and reschedule job for re-execution after so many seconds, for a specific number of attempts.
# If the exception keeps getting raised beyond the specified number of attempts, the exception is allowed to
# bubble up to the underlying queuing system, which may have its own retry mechanism or place it in a
# holding queue for inspection.
#
# You can also pass a block that'll be invoked if the retry attempts fail for custom logic rather than letting
# the exception bubble up. This block is yielded with the job instance as the first and the error instance as the second parameter.
#
# ==== Options
# * <tt>:wait</tt> - Re-enqueues the job with a delay specified either in seconds (default: 3 seconds),
# as a computing proc that takes the number of executions so far as an argument, or as a symbol reference of
# <tt>:exponentially_longer</tt>, which applies the wait algorithm of <tt>((executions**4) + (Kernel.rand * (executions**4) * jitter)) + 2</tt>
# (first wait ~3s, then ~18s, then ~83s, etc)
# * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts)
# * <tt>:queue</tt> - Re-enqueues the job on a different queue
# * <tt>:priority</tt> - Re-enqueues the job with a different priority
# * <tt>:jitter</tt> - A random delay of wait time used when calculating backoff. The default is 15% (0.15) which represents the upper bound of possible wait time (expressed as a percentage)
#
# ==== Examples
#
# class RemoteServiceJob < ActiveJob::Base
# retry_on CustomAppException # defaults to ~3s wait, 5 attempts
# retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 }
#
# retry_on ActiveRecord::Deadlocked, wait: 5.seconds, attempts: 3
# retry_on Net::OpenTimeout, Timeout::Error, wait: :exponentially_longer, attempts: 10 # retries at most 10 times for Net::OpenTimeout and Timeout::Error combined
# # To retry at most 10 times for each individual exception:
# # retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10
# # retry_on Net::ReadTimeout, wait: 5.seconds, jitter: 0.30, attempts: 10
# # retry_on Timeout::Error, wait: :exponentially_longer, attempts: 10
#
# retry_on(YetAnotherCustomAppException) do |job, error|
# ExceptionNotifier.caught(error)
# end
#
# def perform(*args)
# # Might raise CustomAppException, AnotherCustomAppException, or YetAnotherCustomAppException for something domain specific
# # Might raise ActiveRecord::Deadlocked when a local db deadlock is detected
# # Might raise Net::OpenTimeout or Timeout::Error when the remote service is down
# end
# end
- 11
def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: nil, jitter: JITTER_DEFAULT)
- 121
rescue_from(*exceptions) do |error|
- 794
executions = executions_for(exceptions)
- 794
if executions < attempts
- 705
retry_job wait: determine_delay(seconds_or_duration_or_algorithm: wait, executions: executions, jitter: jitter), queue: queue, priority: priority, error: error
else
- 89
if block_given?
- 27
instrument :retry_stopped, error: error do
- 27
yield self, error
end
else
- 62
instrument :retry_stopped, error: error
- 62
raise error
end
end
end
end
# Discard the job with no attempts to retry, if the exception is raised. This is useful when the subject of the job,
# like an Active Record, is no longer available, and the job is thus no longer relevant.
#
# You can also pass a block that'll be invoked. This block is yielded with the job instance as the first and the error instance as the second parameter.
#
# ==== Example
#
# class SearchIndexingJob < ActiveJob::Base
# discard_on ActiveJob::DeserializationError
# discard_on(CustomAppException) do |job, error|
# ExceptionNotifier.caught(error)
# end
#
# def perform(record)
# # Will raise ActiveJob::DeserializationError if the record can't be deserialized
# # Might raise CustomAppException for something domain specific
# end
# end
- 11
def discard_on(*exceptions)
- 33
rescue_from(*exceptions) do |error|
- 35
instrument :discard, error: error do
- 35
yield self, error if block_given?
end
end
end
end
# Reschedules the job to be re-executed. This is useful in combination
# with the +rescue_from+ option. When you rescue an exception from your job
# you can ask Active Job to retry performing your job.
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay in seconds
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
# class SiteScraperJob < ActiveJob::Base
# rescue_from(ErrorLoadingSite) do
# retry_job queue: :low_priority
# end
#
# def perform(*args)
# # raise ErrorLoadingSite if cannot scrape
# end
# end
- 11
def retry_job(options = {})
- 727
instrument :enqueue_retry, options.slice(:error, :wait) do
- 727
enqueue options
end
end
- 11
private
- 11
JITTER_DEFAULT = Object.new
- 11
private_constant :JITTER_DEFAULT
- 11
def determine_delay(seconds_or_duration_or_algorithm:, executions:, jitter: JITTER_DEFAULT)
- 705
jitter = jitter == JITTER_DEFAULT ? self.class.retry_jitter : (jitter || 0.0)
- 705
case seconds_or_duration_or_algorithm
when :exponentially_longer
- 96
delay = executions**4
- 96
delay_jitter = determine_jitter_for_delay(delay, jitter)
- 96
delay + delay_jitter + 2
when ActiveSupport::Duration, Integer
- 561
delay = seconds_or_duration_or_algorithm.to_i
- 561
delay_jitter = determine_jitter_for_delay(delay, jitter)
- 561
delay + delay_jitter
when Proc
- 48
algorithm = seconds_or_duration_or_algorithm
- 48
algorithm.call(executions)
else
raise "Couldn't determine a delay based on #{seconds_or_duration_or_algorithm.inspect}"
end
end
- 11
def determine_jitter_for_delay(delay, jitter)
- 657
return 0.0 if jitter.zero?
- 56
Kernel.rand * delay * jitter
end
- 11
def executions_for(exceptions)
- 794
if exception_executions
- 746
exception_executions[exceptions.to_s] = (exception_executions[exceptions.to_s] || 0) + 1
else
# Guard against jobs that were persisted before we started having individual executions counters per retry_on
- 48
executions
end
end
end
end
# frozen_string_literal: true
- 11
require "active_support/rescuable"
- 11
require "active_job/arguments"
- 11
module ActiveJob
- 11
module Execution
- 11
extend ActiveSupport::Concern
- 11
include ActiveSupport::Rescuable
# Includes methods for executing and performing jobs instantly.
- 11
module ClassMethods
# Performs the job immediately.
#
# MyJob.perform_now("mike")
#
- 11
def perform_now(*args)
- 88
job_or_instantiate(*args).perform_now
end
- 11
ruby2_keywords(:perform_now) if respond_to?(:ruby2_keywords, true)
- 11
def execute(job_data) #:nodoc:
- 1872
ActiveJob::Callbacks.run_callbacks(:execute) do
- 1872
job = deserialize(job_data)
- 1872
job.perform_now
end
end
end
# Performs the job immediately. The job is not sent to the queuing adapter
# but directly executed by blocking the execution of others until it's finished.
# `perform_now` returns the value of your job's `perform` method.
#
# class MyJob < ActiveJob::Base
# def perform
# "Hello World!"
# end
# end
#
# puts MyJob.new(*args).perform_now # => "Hello World!"
- 11
def perform_now
# Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
- 2862
self.executions = (executions || 0) + 1
- 2862
deserialize_arguments_if_needed
- 2800
run_callbacks :perform do
- 2734
perform(*arguments)
end
rescue => exception
- 895
rescue_with_handler(exception) || raise
end
- 11
def perform(*)
fail NotImplementedError
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
# Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt>
- 11
def self.gem_version
Gem::Version.new VERSION::STRING
end
- 11
module VERSION
- 11
MAJOR = 6
- 11
MINOR = 1
- 11
TINY = 0
- 11
PRE = "alpha"
- 11
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Instrumentation #:nodoc:
- 11
extend ActiveSupport::Concern
- 11
included do
- 11
around_enqueue do |_, block|
- 4802
scheduled_at ? instrument(:enqueue_at, &block) : instrument(:enqueue, &block)
end
- 11
around_perform do |_, block|
- 2800
instrument :perform_start
- 2800
instrument :perform, &block
end
end
- 11
private
- 11
def instrument(operation, payload = {}, &block)
- 11253
enhanced_block = ->(event_payload) do
- 11253
block.call if block
- 9880
if defined?(@_halted_callback_hook_called) && @_halted_callback_hook_called
- 132
event_payload[:aborted] = true
- 132
@_halted_callback_hook_called = nil
end
end
- 11253
ActiveSupport::Notifications.instrument \
"#{operation}.active_job", payload.merge(adapter: queue_adapter, job: self), &enhanced_block
end
- 11
def halted_callback_hook(*)
- 132
super
- 132
@_halted_callback_hook_called = true
end
end
end
# frozen_string_literal: true
- 11
require "active_support/log_subscriber"
- 11
module ActiveJob
- 11
class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
- 11
def enqueue(event)
- 3789
job = event.payload[:job]
- 3789
ex = event.payload[:exception_object]
- 3789
if ex
- 84
error do
- 69
"Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})"
end
- 3705
elsif event.payload[:aborted]
- 66
info do
- 41
"Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
end
else
- 3639
info do
- 2402
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
end
end
end
- 11
def enqueue_at(event)
- 1013
job = event.payload[:job]
- 1013
ex = event.payload[:exception_object]
- 1013
if ex
- 230
error do
- 185
"Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})"
end
- 783
elsif event.payload[:aborted]
- 11
info do
- 11
"Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
end
else
- 772
info do
- 578
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
end
end
end
- 11
def perform_start(event)
- 2800
info do
- 1849
job = event.payload[:job]
- 1849
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
end
end
- 11
def perform(event)
- 2800
job = event.payload[:job]
- 2800
ex = event.payload[:exception_object]
- 2800
if ex
- 844
error do
- 660
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
end
- 1956
elsif event.payload[:aborted]
- 55
error do
- 40
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: a before_perform callback halted the job execution"
end
else
- 1901
info do
- 1149
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
end
end
end
- 11
def enqueue_retry(event)
- 727
job = event.payload[:job]
- 727
ex = event.payload[:error]
- 727
wait = event.payload[:wait]
- 727
info do
- 567
if ex
- 549
"Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}."
else
- 18
"Retrying #{job.class} in #{wait.to_i} seconds."
end
end
end
- 11
def retry_stopped(event)
- 89
job = event.payload[:job]
- 89
ex = event.payload[:error]
- 89
error do
- 70
"Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts."
end
end
- 11
def discard(event)
- 35
job = event.payload[:job]
- 35
ex = event.payload[:error]
- 35
error do
- 29
"Discarded #{job.class} due to a #{ex.class}."
end
end
- 11
private
- 11
def queue_name(event)
- 6984
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
end
- 11
def args_info(job)
- 4829
if job.class.log_arguments? && job.arguments.any?
- 3681
" with arguments: " +
- 5334
job.arguments.map { |arg| format(arg).inspect }.join(", ")
else
- 1148
""
end
end
- 11
def format(arg)
- 6230
case arg
when Hash
- 500
arg.transform_values { |value| format(value) }
when Array
- 843
arg.map { |value| format(value) }
when GlobalID::Identification
- 162
arg.to_global_id rescue arg
else
- 5621
arg
end
end
- 11
def scheduled_at(event)
- 578
Time.at(event.payload[:job].scheduled_at).utc
end
- 11
def logger
- 45012
ActiveJob::Base.logger
end
end
end
- 11
ActiveJob::LogSubscriber.attach_to :active_job
# frozen_string_literal: true
- 11
require "active_support/core_ext/string/filters"
- 11
require "active_support/tagged_logging"
- 11
require "active_support/logger"
- 11
module ActiveJob
- 11
module Logging #:nodoc:
- 11
extend ActiveSupport::Concern
- 11
included do
- 11
cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
- 11
class_attribute :log_arguments, instance_accessor: false, default: true
- 4813
around_enqueue { |_, block| tag_logger(&block) }
- 2811
around_perform { |job, block| tag_logger(job.class.name, job.job_id, &block) }
end
- 11
private
- 11
def tag_logger(*tags)
- 7602
if logger.respond_to?(:tagged)
- 682
tags.unshift "ActiveJob" unless logger_tagged_by_active_job?
- 1364
logger.tagged(*tags) { yield }
else
- 6920
yield
end
end
- 11
def logger_tagged_by_active_job?
- 682
logger.formatter.current_tags.include?("ActiveJob")
end
end
end
# frozen_string_literal: true
- 11
require "active_support/core_ext/string/inflections"
- 11
module ActiveJob
# The <tt>ActiveJob::QueueAdapter</tt> module is used to load the
# correct adapter. The default queue adapter is the +:async+ queue.
- 11
module QueueAdapter #:nodoc:
- 11
extend ActiveSupport::Concern
- 11
included do
- 11
class_attribute :_queue_adapter_name, instance_accessor: false, instance_predicate: false
- 11
class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
- 11
delegate :queue_adapter, to: :class
- 11
self.queue_adapter = :async
end
# Includes the setter method for changing the active queue adapter.
- 11
module ClassMethods
# Returns the backend queue provider. The default queue adapter
# is the +:async+ queue. See QueueAdapters for more information.
- 11
def queue_adapter
- 4959
_queue_adapter
end
# Returns string denoting the name of the configured queue adapter.
# By default returns +"async"+.
- 11
def queue_adapter_name
- 44
_queue_adapter_name
end
# Specify the backend queue provider. The default queue adapter
# is the +:async+ queue. See QueueAdapters for more
# information.
- 11
def queue_adapter=(name_or_adapter)
- 88
case name_or_adapter
when Symbol, String
- 66
queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
- 66
assign_adapter(name_or_adapter.to_s, queue_adapter)
else
- 22
if queue_adapter?(name_or_adapter)
adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
assign_adapter(adapter_name, name_or_adapter)
else
- 22
raise ArgumentError
end
end
end
- 11
private
- 11
def assign_adapter(adapter_name, queue_adapter)
- 66
self._queue_adapter_name = adapter_name
- 66
self._queue_adapter = queue_adapter
end
- 11
QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze
- 11
def queue_adapter?(object)
- 44
QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) }
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
# == Active Job adapters
#
# Active Job has adapters for the following queuing backends:
#
# * {Backburner}[https://github.com/nesquena/backburner]
# * {Delayed Job}[https://github.com/collectiveidea/delayed_job]
# * {Que}[https://github.com/chanks/que]
# * {queue_classic}[https://github.com/QueueClassic/queue_classic]
# * {Resque}[https://github.com/resque/resque]
# * {Sidekiq}[https://sidekiq.org]
# * {Sneakers}[https://github.com/jondot/sneakers]
# * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch]
# * {Active Job Async Job}[https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html]
# * {Active Job Inline}[https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html]
# * Please Note: We are not accepting pull requests for new adapters. See the {README}[link:files/activejob/README_md.html] for more details.
#
# === Backends Features
#
# | | Async | Queues | Delayed | Priorities | Timeout | Retries |
# |-------------------|-------|--------|------------|------------|---------|---------|
# | Backburner | Yes | Yes | Yes | Yes | Job | Global |
# | Delayed Job | Yes | Yes | Yes | Job | Global | Global |
# | Que | Yes | Yes | Yes | Job | No | Job |
# | queue_classic | Yes | Yes | Yes* | No | No | No |
# | Resque | Yes | Yes | Yes (Gem) | Queue | Global | Yes |
# | Sidekiq | Yes | Yes | Yes | Queue | No | Job |
# | Sneakers | Yes | Yes | No | Queue | Queue | No |
# | Sucker Punch | Yes | Yes | Yes | No | No | No |
# | Active Job Async | Yes | Yes | Yes | No | No | No |
# | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A |
#
# ==== Async
#
# Yes: The Queue Adapter has the ability to run the job in a non-blocking manner.
# It either runs on a separate or forked process, or on a different thread.
#
# No: The job is run in the same process.
#
# ==== Queues
#
# Yes: Jobs may set which queue they are run in with queue_as or by using the set
# method.
#
# ==== Delayed
#
# Yes: The adapter will run the job in the future through perform_later.
#
# (Gem): An additional gem is required to use perform_later with this adapter.
#
# No: The adapter will run jobs at the next opportunity and cannot use perform_later.
#
# N/A: The adapter does not support queuing.
#
# NOTE:
# queue_classic supports job scheduling since version 3.1.
# For older versions you can use the queue_classic-later gem.
#
# ==== Priorities
#
# The order in which jobs are processed can be configured differently depending
# on the adapter.
#
# Job: Any class inheriting from the adapter may set the priority on the job
# object relative to other jobs.
#
# Queue: The adapter can set the priority for job queues, when setting a queue
# with Active Job this will be respected.
#
# Yes: Allows the priority to be set on the job object, at the queue level or
# as default configuration option.
#
# No: The adapter does not allow the priority of jobs to be configured.
#
# N/A: The adapter does not support queuing, and therefore sorting them.
#
# ==== Timeout
#
# When a job will stop after the allotted time.
#
# Job: The timeout can be set for each instance of the job class.
#
# Queue: The timeout is set for all jobs on the queue.
#
# Global: The adapter is configured that all jobs have a maximum run time.
#
# No: The adapter does not allow the timeout of jobs to be configured.
#
# N/A: This adapter does not run in a separate process, and therefore timeout
# is unsupported.
#
# ==== Retries
#
# Job: The number of retries can be set per instance of the job class.
#
# Yes: The Number of retries can be configured globally, for each instance or
# on the queue. This adapter may also present failed instances of the job class
# that can be restarted.
#
# Global: The adapter has a global number of retries.
#
# No: The adapter does not allow the number of retries to be configured.
#
# N/A: The adapter does not run in a separate process, and therefore doesn't
# support retries.
#
# === Async and Inline Queue Adapters
#
# Active Job has two built-in queue adapters intended for development and
# testing: +:async+ and +:inline+.
- 11
module QueueAdapters
- 11
extend ActiveSupport::Autoload
- 11
autoload :AsyncAdapter
- 11
autoload :InlineAdapter
- 11
autoload :BackburnerAdapter
- 11
autoload :DelayedJobAdapter
- 11
autoload :QueAdapter
- 11
autoload :QueueClassicAdapter
- 11
autoload :ResqueAdapter
- 11
autoload :SidekiqAdapter
- 11
autoload :SneakersAdapter
- 11
autoload :SuckerPunchAdapter
- 11
autoload :TestAdapter
- 11
ADAPTER = "Adapter"
- 11
private_constant :ADAPTER
- 11
class << self
# Returns adapter for specified name.
#
# ActiveJob::QueueAdapters.lookup(:sidekiq)
# # => ActiveJob::QueueAdapters::SidekiqAdapter
- 11
def lookup(name)
- 66
const_get(name.to_s.camelize << ADAPTER)
end
end
end
end
# frozen_string_literal: true
- 11
require "securerandom"
- 11
require "concurrent/scheduled_task"
- 11
require "concurrent/executor/thread_pool_executor"
- 11
require "concurrent/utility/processor_counter"
- 11
module ActiveJob
- 11
module QueueAdapters
# == Active Job Async adapter
#
# The Async adapter runs jobs with an in-process thread pool.
#
# This is the default queue adapter. It's well-suited for dev/test since
# it doesn't need an external infrastructure, but it's a poor fit for
# production since it drops pending jobs on restart.
#
# To use this adapter, set queue adapter to +:async+:
#
# config.active_job.queue_adapter = :async
#
# To configure the adapter's thread pool, instantiate the adapter and
# pass your own config:
#
# config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
# min_threads: 1,
# max_threads: 2 * Concurrent.processor_count,
# idletime: 600.seconds
#
# The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
# jobs. Since jobs share a single thread pool, long-running jobs will block
# short-lived jobs. Fine for dev/test; bad for production.
- 11
class AsyncAdapter
# See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html] for executor options.
- 11
def initialize(**executor_options)
- 12
@scheduler = Scheduler.new(**executor_options)
end
- 11
def enqueue(job) #:nodoc:
- 11
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
- 11
def enqueue_at(job, timestamp) #:nodoc:
- 2
@scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
end
# Gracefully stop processing jobs. Finishes in-progress work and handles
# any new jobs following the executor's fallback policy (`caller_runs`).
# Waits for termination by default. Pass `wait: false` to continue.
- 11
def shutdown(wait: true) #:nodoc:
@scheduler.shutdown wait: wait
end
# Used for our test suite.
- 11
def immediate=(immediate) #:nodoc:
- 1
@scheduler.immediate = immediate
end
# Note that we don't actually need to serialize the jobs since we're
# performing them in-process, but we do so anyway for parity with other
# adapters and deployment environments. Otherwise, serialization bugs
# may creep in undetected.
- 11
class JobWrapper #:nodoc:
- 11
def initialize(job)
- 13
job.provider_job_id = SecureRandom.uuid
- 13
@job_data = job.serialize
end
- 11
def perform
- 13
Base.execute @job_data
end
end
- 11
class Scheduler #:nodoc:
- 11
DEFAULT_EXECUTOR_OPTIONS = {
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
}.freeze
- 11
attr_accessor :immediate
- 11
def initialize(**options)
- 12
self.immediate = false
- 12
@immediate_executor = Concurrent::ImmediateExecutor.new
- 12
@async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
end
- 11
def enqueue(job, queue_name:)
- 13
executor.post(job, &:perform)
end
- 11
def enqueue_at(job, timestamp, queue_name:)
- 2
delay = timestamp - Time.current.to_f
- 2
if delay > 0
Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
else
- 2
enqueue(job, queue_name: queue_name)
end
end
- 11
def shutdown(wait: true)
@async_executor.shutdown
@async_executor.wait_for_termination if wait
end
- 11
def executor
- 13
immediate ? @immediate_executor : @async_executor
end
end
end
end
end
# frozen_string_literal: true
- 1
require "backburner"
- 1
module ActiveJob
- 1
module QueueAdapters
# == Backburner adapter for Active Job
#
# Backburner is a beanstalkd-powered job queue that can handle a very
# high volume of jobs. You create background jobs and place them on
# multiple work queues to be processed later. Read more about
# Backburner {here}[https://github.com/nesquena/backburner].
#
# To use Backburner set the queue_adapter config to +:backburner+.
#
# Rails.application.config.active_job.queue_adapter = :backburner
- 1
class BackburnerAdapter
- 1
def enqueue(job) #:nodoc:
- 35
Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority)
end
- 1
def enqueue_at(job, timestamp) #:nodoc:
- 75
delay = timestamp - Time.current.to_f
- 75
Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority, delay: delay)
end
- 1
class JobWrapper #:nodoc:
- 1
class << self
- 1
def perform(job_data)
- 110
Base.execute job_data
end
end
end
end
end
end
# frozen_string_literal: true
- 1
require "delayed_job"
- 1
module ActiveJob
- 1
module QueueAdapters
# == Delayed Job adapter for Active Job
#
# Delayed::Job (or DJ) encapsulates the common pattern of asynchronously
# executing longer tasks in the background. Although DJ can have many
# storage backends, one of the most used is based on Active Record.
# Read more about Delayed Job {here}[https://github.com/collectiveidea/delayed_job].
#
# To use Delayed Job, set the queue_adapter config to +:delayed_job+.
#
# Rails.application.config.active_job.queue_adapter = :delayed_job
- 1
class DelayedJobAdapter
- 1
def enqueue(job) #:nodoc:
- 35
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
- 30
job.provider_job_id = delayed_job.id
- 30
delayed_job
end
- 1
def enqueue_at(job, timestamp) #:nodoc:
- 75
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
- 55
job.provider_job_id = delayed_job.id
- 55
delayed_job
end
- 1
class JobWrapper #:nodoc:
- 1
attr_accessor :job_data
- 1
def initialize(job_data)
- 110
@job_data = job_data
end
- 1
def display_name
"#{job_data['job_class']} [#{job_data['job_id']}] from DelayedJob(#{job_data['queue_name']}) with arguments: #{job_data['arguments']}"
end
- 1
def perform
- 110
Base.execute(job_data)
end
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module QueueAdapters
# == Active Job Inline adapter
#
# When enqueuing jobs with the Inline adapter the job will be executed
# immediately.
#
# To use the Inline set the queue_adapter config to +:inline+.
#
# Rails.application.config.active_job.queue_adapter = :inline
- 11
class InlineAdapter
- 11
def enqueue(job) #:nodoc:
- 22
Thread.new { Base.execute(job.serialize) }.join
end
- 11
def enqueue_at(*) #:nodoc:
- 2
raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at https://guides.rubyonrails.org/active_job_basics.html"
end
end
end
end
# frozen_string_literal: true
- 1
require "que"
- 1
module ActiveJob
- 1
module QueueAdapters
# == Que adapter for Active Job
#
# Que is a high-performance alternative to DelayedJob or QueueClassic that
# improves the reliability of your application by protecting your jobs with
# the same ACID guarantees as the rest of your data. Que is a queue for
# Ruby and PostgreSQL that manages jobs using advisory locks.
#
# Read more about Que {here}[https://github.com/chanks/que].
#
# To use Que set the queue_adapter config to +:que+.
#
# Rails.application.config.active_job.queue_adapter = :que
- 1
class QueAdapter
- 1
def enqueue(job) #:nodoc:
- 35
que_job = JobWrapper.enqueue job.serialize, priority: job.priority, queue: job.queue_name
- 30
job.provider_job_id = que_job.attrs["job_id"]
- 30
que_job
end
- 1
def enqueue_at(job, timestamp) #:nodoc:
- 75
que_job = JobWrapper.enqueue job.serialize, priority: job.priority, queue: job.queue_name, run_at: Time.at(timestamp)
- 55
job.provider_job_id = que_job.attrs["job_id"]
- 55
que_job
end
- 1
class JobWrapper < Que::Job #:nodoc:
- 1
def run(job_data)
- 110
Base.execute job_data
end
end
end
end
end
# frozen_string_literal: true
- 1
require "queue_classic"
- 1
module ActiveJob
- 1
module QueueAdapters
# == queue_classic adapter for Active Job
#
# queue_classic provides a simple interface to a PostgreSQL-backed message
# queue. queue_classic specializes in concurrent locking and minimizing
# database load while providing a simple, intuitive developer experience.
# queue_classic assumes that you are already using PostgreSQL in your
# production environment and that adding another dependency (e.g. redis,
# beanstalkd, 0mq) is undesirable.
#
# Read more about queue_classic {here}[https://github.com/QueueClassic/queue_classic].
#
# To use queue_classic set the queue_adapter config to +:queue_classic+.
#
# Rails.application.config.active_job.queue_adapter = :queue_classic
- 1
class QueueClassicAdapter
- 1
def enqueue(job) #:nodoc:
- 35
qc_job = build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
- 30
job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash)
- 30
qc_job
end
- 1
def enqueue_at(job, timestamp) #:nodoc:
- 75
queue = build_queue(job.queue_name)
- 75
unless queue.respond_to?(:enqueue_at)
raise NotImplementedError, "To be able to schedule jobs with queue_classic " \
"the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. " \
"You can implement this yourself or you can use the queue_classic-later gem."
end
- 75
qc_job = queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
- 55
job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash)
- 55
qc_job
end
# Builds a <tt>QC::Queue</tt> object to schedule jobs on.
#
# If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass
# <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the
# <tt>build_queue</tt> method.
- 1
def build_queue(queue_name)
- 110
QC::Queue.new(queue_name)
end
- 1
class JobWrapper #:nodoc:
- 1
class << self
- 1
def perform(job_data)
- 110
Base.execute job_data
end
end
end
end
end
end
# frozen_string_literal: true
- 1
require "resque"
- 1
require "active_support/core_ext/enumerable"
- 1
require "active_support/core_ext/array/access"
- 1
begin
- 1
require "resque-scheduler"
rescue LoadError
begin
require "resque_scheduler"
rescue LoadError
false
end
end
- 1
module ActiveJob
- 1
module QueueAdapters
# == Resque adapter for Active Job
#
# Resque (pronounced like "rescue") is a Redis-backed library for creating
# background jobs, placing those jobs on multiple queues, and processing
# them later.
#
# Read more about Resque {here}[https://github.com/resque/resque].
#
# To use Resque set the queue_adapter config to +:resque+.
#
# Rails.application.config.active_job.queue_adapter = :resque
- 1
class ResqueAdapter
- 1
def enqueue(job) #:nodoc:
- 35
JobWrapper.instance_variable_set(:@queue, job.queue_name)
- 35
Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
- 1
def enqueue_at(job, timestamp) #:nodoc:
- 75
unless Resque.respond_to?(:enqueue_at_with_queue)
raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
"resque-scheduler gem. Please add it to your Gemfile and run bundle install"
end
- 75
Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
end
- 1
class JobWrapper #:nodoc:
- 1
class << self
- 1
def perform(job_data)
- 110
Base.execute job_data
end
end
end
end
end
end
# frozen_string_literal: true
- 1
require "sidekiq"
- 1
module ActiveJob
- 1
module QueueAdapters
# == Sidekiq adapter for Active Job
#
# Simple, efficient background processing for Ruby. Sidekiq uses threads to
# handle many jobs at the same time in the same process. It does not
# require Rails but will integrate tightly with it to make background
# processing dead simple.
#
# Read more about Sidekiq {here}[http://sidekiq.org].
#
# To use Sidekiq set the queue_adapter config to +:sidekiq+.
#
# Rails.application.config.active_job.queue_adapter = :sidekiq
- 1
class SidekiqAdapter
- 1
def enqueue(job) #:nodoc:
# Sidekiq::Client does not support symbols as keys
- 35
job.provider_job_id = Sidekiq::Client.push \
"class" => JobWrapper,
"wrapped" => job.class,
"queue" => job.queue_name,
"args" => [ job.serialize ]
end
- 1
def enqueue_at(job, timestamp) #:nodoc:
- 75
job.provider_job_id = Sidekiq::Client.push \
"class" => JobWrapper,
"wrapped" => job.class,
"queue" => job.queue_name,
"args" => [ job.serialize ],
"at" => timestamp
end
- 1
class JobWrapper #:nodoc:
- 1
include Sidekiq::Worker
- 1
def perform(job_data)
- 110
Base.execute job_data.merge("provider_job_id" => jid)
end
end
end
end
end
# frozen_string_literal: true
- 11
require "sneakers"
- 11
require "monitor"
- 11
module ActiveJob
- 11
module QueueAdapters
# == Sneakers adapter for Active Job
#
# A high-performance RabbitMQ background processing framework for Ruby.
# Sneakers is being used in production for both I/O and CPU intensive
# workloads, and have achieved the goals of high-performance and
# 0-maintenance, as designed.
#
# Read more about Sneakers {here}[https://github.com/jondot/sneakers].
#
# To use Sneakers set the queue_adapter config to +:sneakers+.
#
# Rails.application.config.active_job.queue_adapter = :sneakers
- 11
class SneakersAdapter
- 11
def initialize
- 1
@monitor = Monitor.new
end
- 11
def enqueue(job) #:nodoc:
- 11
@monitor.synchronize do
- 11
JobWrapper.from_queue job.queue_name
- 11
JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
end
- 11
def enqueue_at(job, timestamp) #:nodoc:
- 2
raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html"
end
- 11
class JobWrapper #:nodoc:
- 11
include Sneakers::Worker
- 11
from_queue "default"
- 11
def work(msg)
- 11
job_data = ActiveSupport::JSON.decode(msg)
- 11
Base.execute job_data
- 11
ack!
end
end
end
end
end
# frozen_string_literal: true
- 1
require "sucker_punch"
- 1
module ActiveJob
- 1
module QueueAdapters
# == Sucker Punch adapter for Active Job
#
# Sucker Punch is a single-process Ruby asynchronous processing library.
# This reduces the cost of hosting on a service like Heroku along
# with the memory footprint of having to maintain additional jobs if
# hosting on a dedicated server. All queues can run within a
# single application (e.g. Rails, Sinatra, etc.) process.
#
# Read more about Sucker Punch {here}[https://github.com/brandonhilkert/sucker_punch].
#
# To use Sucker Punch set the queue_adapter config to +:sucker_punch+.
#
# Rails.application.config.active_job.queue_adapter = :sucker_punch
- 1
class SuckerPunchAdapter
- 1
def enqueue(job) #:nodoc:
- 35
if JobWrapper.respond_to?(:perform_async)
# sucker_punch 2.0 API
- 35
JobWrapper.perform_async job.serialize
else
# sucker_punch 1.0 API
JobWrapper.new.async.perform job.serialize
end
end
- 1
def enqueue_at(job, timestamp) #:nodoc:
- 75
if JobWrapper.respond_to?(:perform_in)
- 75
delay = timestamp - Time.current.to_f
- 75
JobWrapper.perform_in delay, job.serialize
else
raise NotImplementedError, "sucker_punch 1.0 does not support `enqueued_at`. Please upgrade to version ~> 2.0.0 to enable this behavior."
end
end
- 1
class JobWrapper #:nodoc:
- 1
include SuckerPunch::Job
- 1
def perform(job_data)
- 110
Base.execute job_data
end
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module QueueAdapters
# == Test adapter for Active Job
#
# The test adapter should be used only in testing. Along with
# <tt>ActiveJob::TestCase</tt> and <tt>ActiveJob::TestHelper</tt>
# it makes a great tool to test your Rails application.
#
# To use the test adapter set queue_adapter config to +:test+.
#
# Rails.application.config.active_job.queue_adapter = :test
- 11
class TestAdapter
- 11
attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter, :reject, :queue, :at)
- 11
attr_writer(:enqueued_jobs, :performed_jobs)
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
- 11
def enqueued_jobs
- 8250
@enqueued_jobs ||= []
end
# Provides a store of all the performed jobs with the TestAdapter so you can check them.
- 11
def performed_jobs
- 6523
@performed_jobs ||= []
end
- 11
def enqueue(job) #:nodoc:
- 3423
job_data = job_to_hash(job)
- 3423
perform_or_enqueue(perform_enqueued_jobs && !filtered?(job), job, job_data)
end
- 11
def enqueue_at(job, timestamp) #:nodoc:
- 460
job_data = job_to_hash(job, at: timestamp)
- 460
perform_or_enqueue(perform_enqueued_at_jobs && !filtered?(job), job, job_data)
end
- 11
private
- 11
def job_to_hash(job, extras = {})
job.serialize.tap do |job_data|
- 3883
job_data[:job] = job.class
- 3883
job_data[:args] = job_data.fetch("arguments")
- 3883
job_data[:queue] = job_data.fetch("queue_name")
- 3883
end.merge(extras)
end
- 11
def perform_or_enqueue(perform, job, job_data)
- 3883
if perform
- 1067
performed_jobs << job_data
- 1067
Base.execute(job.serialize)
else
- 2816
enqueued_jobs << job_data
end
end
- 11
def filtered?(job)
- 1694
filtered_queue?(job) || filtered_job_class?(job) || filtered_time?(job)
end
- 11
def filtered_time?(job)
- 1078
job.scheduled_at > at.to_f if at && job.scheduled_at
end
- 11
def filtered_queue?(job)
- 1694
if queue
- 407
job.queue_name != queue.to_s
end
end
- 11
def filtered_job_class?(job)
- 1496
if filter
- 385
!filter_as_proc(filter).call(job)
- 1111
elsif reject
- 330
filter_as_proc(reject).call(job)
end
end
- 11
def filter_as_proc(filter)
- 715
return filter if filter.is_a?(Proc)
- 1342
->(job) { Array(filter).include?(job.class) }
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module QueueName
- 11
extend ActiveSupport::Concern
# Includes the ability to override the default queue name and prefix.
- 11
module ClassMethods
- 11
mattr_accessor :default_queue_name, default: "default"
# Specifies the name of the queue to process the job on.
#
# class PublishToFeedJob < ActiveJob::Base
# queue_as :feeds
#
# def perform(post)
# post.to_feed!
# end
# end
#
# Can be given a block that will evaluate in the context of the job
# allowing +self.arguments+ to be accessed so that a dynamic queue name
# can be applied:
#
# class PublishToFeedJob < ApplicationJob
# queue_as do
# post = self.arguments.first
#
# if post.paid?
# :paid_feeds
# else
# :feeds
# end
# end
#
# def perform(post)
# post.to_feed!
# end
# end
- 11
def queue_as(part_name = nil, &block)
- 88
if block_given?
- 22
self.queue_name = block
else
- 66
self.queue_name = queue_name_from_part(part_name)
end
end
- 11
def queue_name_from_part(part_name) #:nodoc:
- 4355
queue_name = part_name || default_queue_name
- 4355
name_parts = [queue_name_prefix.presence, queue_name]
- 4355
-name_parts.compact.join(queue_name_delimiter)
end
end
- 11
included do
- 3189
class_attribute :queue_name, instance_accessor: false, default: -> { self.class.default_queue_name }
- 11
class_attribute :queue_name_delimiter, instance_accessor: false, default: "_"
- 11
class_attribute :queue_name_prefix
end
# Returns the name of the queue the job will be run on.
- 11
def queue_name
- 14151
if @queue_name.is_a?(Proc)
- 3211
@queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name))
end
- 14151
@queue_name
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module QueuePriority
- 11
extend ActiveSupport::Concern
# Includes the ability to override the default queue priority.
- 11
module ClassMethods
- 11
mattr_accessor :default_priority
# Specifies the priority of the queue to create the job with.
#
# class PublishToFeedJob < ActiveJob::Base
# queue_with_priority 50
#
# def perform(post)
# post.to_feed!
# end
# end
#
# Specify either an argument or a block.
- 11
def queue_with_priority(priority = nil, &block)
- 33
if block_given?
- 22
self.priority = block
else
- 11
self.priority = priority
end
end
end
- 11
included do
- 11
class_attribute :priority, instance_accessor: false, default: default_priority
end
# Returns the priority that the job will be created with
- 11
def priority
- 6272
if @priority.is_a?(Proc)
- 33
@priority = instance_exec(&@priority)
end
- 6272
@priority
end
end
end
# frozen_string_literal: true
require "global_id/railtie"
require "active_job"
module ActiveJob
# = Active Job Railtie
class Railtie < Rails::Railtie # :nodoc:
config.active_job = ActiveSupport::OrderedOptions.new
config.active_job.custom_serializers = []
initializer "active_job.logger" do
ActiveSupport.on_load(:active_job) { self.logger = ::Rails.logger }
end
initializer "active_job.custom_serializers" do |app|
config.after_initialize do
custom_serializers = app.config.active_job.delete(:custom_serializers)
ActiveJob::Serializers.add_serializers custom_serializers
end
end
initializer "active_job.set_configs" do |app|
options = app.config.active_job
options.queue_adapter ||= :async
ActiveSupport.on_load(:active_job) do
options.each do |k, v|
k = "#{k}="
send(k, v) if respond_to? k
end
end
ActiveSupport.on_load(:action_dispatch_integration_test) do
include ActiveJob::TestHelper
end
end
initializer "active_job.set_reloader_hook" do |app|
ActiveSupport.on_load(:active_job) do
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :around, prepend: true) do |_, inner|
app.reloader.wrap do
inner.call
end
end
end
end
end
end
# frozen_string_literal: true
- 11
require "set"
- 11
module ActiveJob
# The <tt>ActiveJob::Serializers</tt> module is used to store a list of known serializers
# and to add new ones. It also has helpers to serialize/deserialize objects.
- 11
module Serializers # :nodoc:
- 11
extend ActiveSupport::Autoload
- 11
autoload :ObjectSerializer
- 11
autoload :SymbolSerializer
- 11
autoload :DurationSerializer
- 11
autoload :DateTimeSerializer
- 11
autoload :DateSerializer
- 11
autoload :TimeWithZoneSerializer
- 11
autoload :TimeSerializer
- 11
autoload :ModuleSerializer
- 11
mattr_accessor :_additional_serializers
- 11
self._additional_serializers = Set.new
- 11
class << self
# Returns serialized representative of the passed object.
# Will look up through all known serializers.
# Raises <tt>ActiveJob::SerializationError</tt> if it can't find a proper serializer.
- 11
def serialize(argument)
- 2867
serializer = serializers.detect { |s| s.serialize?(argument) }
- 702
raise SerializationError.new("Unsupported argument type: #{argument.class.name}") unless serializer
- 647
serializer.serialize(argument)
end
# Returns deserialized object.
# Will look up through all known serializers.
# If no serializer found will raise <tt>ArgumentError</tt>.
- 11
def deserialize(argument)
- 767
serializer_name = argument[Arguments::OBJECT_SERIALIZER_KEY]
- 767
raise ArgumentError, "Serializer name is not present in the argument: #{argument.inspect}" unless serializer_name
- 756
serializer = serializer_name.safe_constantize
- 756
raise ArgumentError, "Serializer #{serializer_name} is not known" unless serializer
- 745
serializer.deserialize(argument)
end
# Returns list of known serializers.
- 11
def serializers
- 812
self._additional_serializers
end
# Adds new serializers to a list of known serializers.
- 11
def add_serializers(*new_serializers)
- 66
self._additional_serializers += new_serializers.flatten
end
end
- 11
add_serializers SymbolSerializer,
DurationSerializer,
DateTimeSerializer,
DateSerializer,
TimeWithZoneSerializer,
TimeSerializer,
ModuleSerializer
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Serializers
- 11
class DateSerializer < ObjectSerializer # :nodoc:
- 11
def serialize(date)
- 11
super("value" => date.iso8601)
end
- 11
def deserialize(hash)
- 11
Date.iso8601(hash["value"])
end
- 11
private
- 11
def klass
- 264
Date
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Serializers
- 11
class DateTimeSerializer < ObjectSerializer # :nodoc:
- 11
def serialize(time)
- 66
super("value" => time.iso8601)
end
- 11
def deserialize(hash)
- 99
DateTime.iso8601(hash["value"])
end
- 11
private
- 11
def klass
- 330
DateTime
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Serializers
- 11
class DurationSerializer < ObjectSerializer # :nodoc:
- 11
def serialize(duration)
- 11
super("value" => duration.value, "parts" => Arguments.serialize(duration.parts))
end
- 11
def deserialize(hash)
- 11
value = hash["value"]
- 11
parts = Arguments.deserialize(hash["parts"])
- 11
klass.new(value, parts)
end
- 11
private
- 11
def klass
- 352
ActiveSupport::Duration
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Serializers
- 11
class ModuleSerializer < ObjectSerializer # :nodoc:
- 11
def serialize(constant)
- 33
super("value" => constant.name)
end
- 11
def deserialize(hash)
- 33
hash["value"].constantize
end
- 11
private
- 11
def klass
- 99
Module
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Serializers
# Base class for serializing and deserializing custom objects.
#
# Example:
#
# class MoneySerializer < ActiveJob::Serializers::ObjectSerializer
# def serialize(money)
# super("amount" => money.amount, "currency" => money.currency)
# end
#
# def deserialize(hash)
# Money.new(hash["amount"], hash["currency"])
# end
#
# private
#
# def klass
# Money
# end
# end
- 11
class ObjectSerializer
- 11
include Singleton
- 11
class << self
- 11
delegate :serialize?, :serialize, :deserialize, to: :instance
end
# Determines if an argument should be serialized by a serializer.
- 11
def serialize?(argument)
- 2165
argument.is_a?(klass)
end
# Serializes an argument to a JSON primitive type.
- 11
def serialize(hash)
- 647
{ Arguments::OBJECT_SERIALIZER_KEY => self.class.name }.merge!(hash)
end
# Deserializes an argument from a JSON primitive type.
- 11
def deserialize(json)
raise NotImplementedError
end
- 11
private
# The class of the object that will be serialized.
- 11
def klass # :doc:
raise NotImplementedError
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Serializers
- 11
class SymbolSerializer < ObjectSerializer # :nodoc:
- 11
def serialize(argument)
- 361
super("value" => argument.to_s)
end
- 11
def deserialize(argument)
- 360
argument["value"].to_sym
end
- 11
private
- 11
def klass
- 702
Symbol
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Serializers
- 11
class TimeSerializer < ObjectSerializer # :nodoc:
- 11
def serialize(time)
- 66
super("value" => time.iso8601)
end
- 11
def deserialize(hash)
- 99
Time.iso8601(hash["value"])
end
- 11
private
- 11
def klass
- 165
Time
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Serializers
- 11
class TimeWithZoneSerializer < ObjectSerializer # :nodoc:
- 11
def serialize(time)
- 88
super("value" => time.iso8601)
end
- 11
def deserialize(hash)
- 121
Time.iso8601(hash["value"]).in_time_zone
end
- 11
private
- 11
def klass
- 253
ActiveSupport::TimeWithZone
end
end
end
end
# frozen_string_literal: true
- 11
require "active_support/test_case"
- 11
module ActiveJob
- 11
class TestCase < ActiveSupport::TestCase
- 11
include ActiveJob::TestHelper
- 11
ActiveSupport.run_load_hooks(:active_job_test_case, self)
end
end
# frozen_string_literal: true
- 11
require "active_support/core_ext/class/subclasses"
- 11
module ActiveJob
# Provides helper methods for testing Active Job
- 11
module TestHelper
- 11
delegate :enqueued_jobs, :enqueued_jobs=,
:performed_jobs, :performed_jobs=,
to: :queue_adapter
- 11
module TestQueueAdapter
- 11
extend ActiveSupport::Concern
- 11
included do
- 11
class_attribute :_test_adapter, instance_accessor: false, instance_predicate: false
end
- 11
module ClassMethods
- 11
def queue_adapter
- 44028
self._test_adapter.nil? ? super : self._test_adapter
end
- 11
def disable_test_adapter
- 6375
self._test_adapter = nil
end
- 11
def enable_test_adapter(test_adapter)
- 6335
self._test_adapter = test_adapter
end
end
end
- 11
ActiveJob::Base.include(TestQueueAdapter)
- 11
def before_setup # :nodoc:
- 2464
test_adapter = queue_adapter_for_test
- 2464
queue_adapter_changed_jobs.each do |klass|
- 6335
klass.enable_test_adapter(test_adapter)
end
- 2464
clear_enqueued_jobs
- 2464
clear_performed_jobs
- 2464
super
end
- 11
def after_teardown # :nodoc:
- 2464
super
- 8828
queue_adapter_changed_jobs.each { |klass| klass.disable_test_adapter }
end
# Specifies the queue adapter to use with all Active Job test helpers.
#
# Returns an instance of the queue adapter and defaults to
# <tt>ActiveJob::QueueAdapters::TestAdapter</tt>.
#
# Note: The adapter provided by this method must provide some additional
# methods from those expected of a standard <tt>ActiveJob::QueueAdapter</tt>
# in order to be used with the active job test helpers. Refer to
# <tt>ActiveJob::QueueAdapters::TestAdapter</tt>.
- 11
def queue_adapter_for_test
- 2453
ActiveJob::QueueAdapters::TestAdapter.new
end
# Asserts that the number of enqueued jobs matches the given number.
#
# def test_jobs
# assert_enqueued_jobs 0
# HelloJob.perform_later('david')
# assert_enqueued_jobs 1
# HelloJob.perform_later('abdelkader')
# assert_enqueued_jobs 2
# end
#
# If a block is passed, asserts that the block will cause the specified number of
# jobs to be enqueued.
#
# def test_jobs_again
# assert_enqueued_jobs 1 do
# HelloJob.perform_later('cristian')
# end
#
# assert_enqueued_jobs 2 do
# HelloJob.perform_later('aaron')
# HelloJob.perform_later('rafael')
# end
# end
#
# Asserts the number of times a specific job was enqueued by passing +:only+ option.
#
# def test_logging_job
# assert_enqueued_jobs 1, only: LoggingJob do
# LoggingJob.perform_later
# HelloJob.perform_later('jeremy')
# end
# end
#
# Asserts the number of times a job except specific class was enqueued by passing +:except+ option.
#
# def test_logging_job
# assert_enqueued_jobs 1, except: HelloJob do
# LoggingJob.perform_later
# HelloJob.perform_later('jeremy')
# end
# end
#
# +:only+ and +:except+ options accepts Class, Array of Class or Proc. When passed a Proc,
# a hash containing the job's class and it's argument are passed as argument.
#
# Asserts the number of times a job is enqueued to a specific queue by passing +:queue+ option.
#
# def test_logging_job
# assert_enqueued_jobs 2, queue: 'default' do
# LoggingJob.perform_later
# HelloJob.perform_later('elfassy')
# end
# end
- 11
def assert_enqueued_jobs(number, only: nil, except: nil, queue: nil, &block)
- 561
if block_given?
- 506
original_jobs = enqueued_jobs_with(only: only, except: except, queue: queue)
- 396
assert_nothing_raised(&block)
- 396
new_jobs = enqueued_jobs_with(only: only, except: except, queue: queue)
- 396
actual_count = (new_jobs - original_jobs).count
else
- 55
actual_count = enqueued_jobs_with(only: only, except: except, queue: queue).count
end
- 451
assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued"
end
# Asserts that no jobs have been enqueued.
#
# def test_jobs
# assert_no_enqueued_jobs
# HelloJob.perform_later('jeremy')
# assert_enqueued_jobs 1
# end
#
# If a block is passed, asserts that the block will not cause any job to be enqueued.
#
# def test_jobs_again
# assert_no_enqueued_jobs do
# # No job should be enqueued from this block
# end
# end
#
# Asserts that no jobs of a specific kind are enqueued by passing +:only+ option.
#
# def test_no_logging
# assert_no_enqueued_jobs only: LoggingJob do
# HelloJob.perform_later('jeremy')
# end
# end
#
# Asserts that no jobs except specific class are enqueued by passing +:except+ option.
#
# def test_no_logging
# assert_no_enqueued_jobs except: HelloJob do
# HelloJob.perform_later('jeremy')
# end
# end
#
# +:only+ and +:except+ options accepts Class, Array of Class or Proc. When passed a Proc,
# a hash containing the job's class and it's argument are passed as argument.
#
# Asserts that no jobs are enqueued to a specific queue by passing +:queue+ option
#
# def test_no_logging
# assert_no_enqueued_jobs queue: 'default' do
# LoggingJob.set(queue: :some_queue).perform_later
# end
# end
#
# Note: This assertion is simply a shortcut for:
#
# assert_enqueued_jobs 0, &block
- 11
def assert_no_enqueued_jobs(only: nil, except: nil, queue: nil, &block)
- 209
assert_enqueued_jobs 0, only: only, except: except, queue: queue, &block
end
# Asserts that the number of performed jobs matches the given number.
# If no block is passed, <tt>perform_enqueued_jobs</tt>
# must be called around or after the job call.
#
# def test_jobs
# assert_performed_jobs 0
#
# perform_enqueued_jobs do
# HelloJob.perform_later('xavier')
# end
# assert_performed_jobs 1
#
# HelloJob.perform_later('yves')
#
# perform_enqueued_jobs
#
# assert_performed_jobs 2
# end
#
# If a block is passed, asserts that the block will cause the specified number of
# jobs to be performed.
#
# def test_jobs_again
# assert_performed_jobs 1 do
# HelloJob.perform_later('robin')
# end
#
# assert_performed_jobs 2 do
# HelloJob.perform_later('carlos')
# HelloJob.perform_later('sean')
# end
# end
#
# This method also supports filtering. If the +:only+ option is specified,
# then only the listed job(s) will be performed.
#
# def test_hello_job
# assert_performed_jobs 1, only: HelloJob do
# HelloJob.perform_later('jeremy')
# LoggingJob.perform_later
# end
# end
#
# Also if the +:except+ option is specified,
# then the job(s) except specific class will be performed.
#
# def test_hello_job
# assert_performed_jobs 1, except: LoggingJob do
# HelloJob.perform_later('jeremy')
# LoggingJob.perform_later
# end
# end
#
# An array may also be specified, to support testing multiple jobs.
#
# def test_hello_and_logging_jobs
# assert_nothing_raised do
# assert_performed_jobs 2, only: [HelloJob, LoggingJob] do
# HelloJob.perform_later('jeremy')
# LoggingJob.perform_later('stewie')
# RescueJob.perform_later('david')
# end
# end
# end
#
# A proc may also be specified. When passed a Proc, the job's instance will be passed as argument.
#
# def test_hello_and_logging_jobs
# assert_nothing_raised do
# assert_performed_jobs(1, only: ->(job) { job.is_a?(HelloJob) }) do
# HelloJob.perform_later('jeremy')
# LoggingJob.perform_later('stewie')
# RescueJob.perform_later('david')
# end
# end
# end
#
# If the +:queue+ option is specified,
# then only the job(s) enqueued to a specific queue will be performed.
#
# def test_assert_performed_jobs_with_queue_option
# assert_performed_jobs 1, queue: :some_queue do
# HelloJob.set(queue: :some_queue).perform_later("jeremy")
# HelloJob.set(queue: :other_queue).perform_later("bogdan")
# end
# end
- 11
def assert_performed_jobs(number, only: nil, except: nil, queue: nil, &block)
- 1133
if block_given?
- 506
original_count = performed_jobs.size
- 506
perform_enqueued_jobs(only: only, except: except, queue: queue, &block)
- 418
new_count = performed_jobs.size
- 418
performed_jobs_size = new_count - original_count
else
- 627
performed_jobs_size = performed_jobs_with(only: only, except: except, queue: queue).count
end
- 1023
assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed"
end
# Asserts that no jobs have been performed.
#
# def test_jobs
# assert_no_performed_jobs
#
# perform_enqueued_jobs do
# HelloJob.perform_later('matthew')
# assert_performed_jobs 1
# end
# end
#
# If a block is passed, asserts that the block will not cause any job to be performed.
#
# def test_jobs_again
# assert_no_performed_jobs do
# # No job should be performed from this block
# end
# end
#
# The block form supports filtering. If the +:only+ option is specified,
# then only the listed job(s) will not be performed.
#
# def test_no_logging
# assert_no_performed_jobs only: LoggingJob do
# HelloJob.perform_later('jeremy')
# end
# end
#
# Also if the +:except+ option is specified,
# then the job(s) except specific class will not be performed.
#
# def test_no_logging
# assert_no_performed_jobs except: HelloJob do
# HelloJob.perform_later('jeremy')
# end
# end
#
# +:only+ and +:except+ options accepts Class, Array of Class or Proc. When passed a Proc,
# an instance of the job will be passed as argument.
#
# If the +:queue+ option is specified,
# then only the job(s) enqueued to a specific queue will not be performed.
#
# def test_assert_no_performed_jobs_with_queue_option
# assert_no_performed_jobs queue: :some_queue do
# HelloJob.set(queue: :other_queue).perform_later("jeremy")
# end
# end
#
# Note: This assertion is simply a shortcut for:
#
# assert_performed_jobs 0, &block
- 11
def assert_no_performed_jobs(only: nil, except: nil, queue: nil, &block)
- 319
assert_performed_jobs 0, only: only, except: except, queue: queue, &block
end
# Asserts that the job has been enqueued with the given arguments.
#
# def test_assert_enqueued_with
# MyJob.perform_later(1,2,3)
# assert_enqueued_with(job: MyJob, args: [1,2,3])
#
# MyJob.set(wait_until: Date.tomorrow.noon, queue: "my_queue").perform_later
# assert_enqueued_with(at: Date.tomorrow.noon, queue: "my_queue")
# end
#
# The given arguments may also be specified as matcher procs that return a
# boolean value indicating whether a job's attribute meets certain criteria.
#
# For example, a proc can be used to match a range of times:
#
# def test_assert_enqueued_with
# at_matcher = ->(job_at) { (Date.yesterday..Date.tomorrow).cover?(job_at) }
#
# MyJob.set(wait_until: Date.today.noon).perform_later
#
# assert_enqueued_with(job: MyJob, at: at_matcher)
# end
#
# A proc can also be used to match a subset of a job's args:
#
# def test_assert_enqueued_with
# args_matcher = ->(job_args) { job_args[0].key?(:foo) }
#
# MyJob.perform_later(foo: "bar", other_arg: "No need to check in the test")
#
# assert_enqueued_with(job: MyJob, args: args_matcher)
# end
#
# If a block is passed, asserts that the block will cause the job to be
# enqueued with the given arguments.
#
# def test_assert_enqueued_with
# assert_enqueued_with(job: MyJob, args: [1,2,3]) do
# MyJob.perform_later(1,2,3)
# end
#
# assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon) do
# MyJob.set(wait_until: Date.tomorrow.noon).perform_later
# end
# end
- 11
def assert_enqueued_with(job: nil, args: nil, at: nil, queue: nil, &block)
- 363
expected = { job: job, args: args, at: at, queue: queue }.compact
- 363
expected_args = prepare_args_for_assertion(expected)
- 363
potential_matches = []
- 363
if block_given?
- 187
original_enqueued_jobs = enqueued_jobs.dup
- 187
assert_nothing_raised(&block)
- 187
jobs = enqueued_jobs - original_enqueued_jobs
else
- 176
jobs = enqueued_jobs
end
- 363
matching_job = jobs.find do |enqueued_job|
- 352
deserialized_job = deserialize_args_for_assertion(enqueued_job)
- 352
potential_matches << deserialized_job
- 352
expected_args.all? do |key, value|
- 517
if value.respond_to?(:call)
- 121
value.call(deserialized_job[key])
else
- 396
value == deserialized_job[key]
end
end
end
- 363
message = +"No enqueued job found with #{expected}"
- 363
message << "\n\nPotential matches: #{potential_matches.join("\n")}" if potential_matches.present?
- 363
assert matching_job, message
- 242
instantiate_job(matching_job)
end
# Asserts that the job has been performed with the given arguments.
#
# def test_assert_performed_with
# MyJob.perform_later(1,2,3)
#
# perform_enqueued_jobs
#
# assert_performed_with(job: MyJob, args: [1,2,3])
#
# MyJob.set(wait_until: Date.tomorrow.noon, queue: "my_queue").perform_later
#
# perform_enqueued_jobs
#
# assert_performed_with(at: Date.tomorrow.noon, queue: "my_queue")
# end
#
# The given arguments may also be specified as matcher procs that return a
# boolean value indicating whether a job's attribute meets certain criteria.
#
# For example, a proc can be used to match a range of times:
#
# def test_assert_performed_with
# at_matcher = ->(job_at) { (Date.yesterday..Date.tomorrow).cover?(job_at) }
#
# MyJob.set(wait_until: Date.today.noon).perform_later
#
# perform_enqueued_jobs
#
# assert_performed_with(job: MyJob, at: at_matcher)
# end
#
# A proc can also be used to match a subset of a job's args:
#
# def test_assert_performed_with
# args_matcher = ->(job_args) { job_args[0].key?(:foo) }
#
# MyJob.perform_later(foo: "bar", other_arg: "No need to check in the test")
#
# perform_enqueued_jobs
#
# assert_performed_with(job: MyJob, args: args_matcher)
# end
#
# If a block is passed, that block performs all of the jobs that were
# enqueued throughout the duration of the block and asserts that
# the job has been performed with the given arguments in the block.
#
# def test_assert_performed_with
# assert_performed_with(job: MyJob, args: [1,2,3]) do
# MyJob.perform_later(1,2,3)
# end
#
# assert_performed_with(job: MyJob, at: Date.tomorrow.noon) do
# MyJob.set(wait_until: Date.tomorrow.noon).perform_later
# end
# end
- 11
def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, &block)
- 374
expected = { job: job, args: args, at: at, queue: queue }.compact
- 374
expected_args = prepare_args_for_assertion(expected)
- 374
potential_matches = []
- 374
if block_given?
- 176
original_performed_jobs_count = performed_jobs.count
- 176
perform_enqueued_jobs(&block)
- 176
jobs = performed_jobs.drop(original_performed_jobs_count)
else
- 198
jobs = performed_jobs
end
- 374
matching_job = jobs.find do |enqueued_job|
- 385
deserialized_job = deserialize_args_for_assertion(enqueued_job)
- 385
potential_matches << deserialized_job
- 385
expected_args.all? do |key, value|
- 616
if value.respond_to?(:call)
- 165
value.call(deserialized_job[key])
else
- 451
value == deserialized_job[key]
end
end
end
- 374
message = +"No performed job found with #{expected}"
- 374
message << "\n\nPotential matches: #{potential_matches.join("\n")}" if potential_matches.present?
- 374
assert matching_job, message
- 231
instantiate_job(matching_job)
end
# Performs all enqueued jobs. If a block is given, performs all of the jobs
# that were enqueued throughout the duration of the block. If a block is
# not given, performs all of the enqueued jobs up to this point in the test.
#
# def test_perform_enqueued_jobs
# perform_enqueued_jobs do
# MyJob.perform_later(1, 2, 3)
# end
# assert_performed_jobs 1
# end
#
# def test_perform_enqueued_jobs_without_block
# MyJob.perform_later(1, 2, 3)
#
# perform_enqueued_jobs
#
# assert_performed_jobs 1
# end
#
# This method also supports filtering. If the +:only+ option is specified,
# then only the listed job(s) will be performed.
#
# def test_perform_enqueued_jobs_with_only
# perform_enqueued_jobs(only: MyJob) do
# MyJob.perform_later(1, 2, 3) # will be performed
# HelloJob.perform_later(1, 2, 3) # will not be performed
# end
# assert_performed_jobs 1
# end
#
# Also if the +:except+ option is specified,
# then the job(s) except specific class will be performed.
#
# def test_perform_enqueued_jobs_with_except
# perform_enqueued_jobs(except: HelloJob) do
# MyJob.perform_later(1, 2, 3) # will be performed
# HelloJob.perform_later(1, 2, 3) # will not be performed
# end
# assert_performed_jobs 1
# end
#
# +:only+ and +:except+ options accepts Class, Array of Class or Proc. When passed a Proc,
# an instance of the job will be passed as argument.
#
# If the +:queue+ option is specified,
# then only the job(s) enqueued to a specific queue will be performed.
#
# def test_perform_enqueued_jobs_with_queue
# perform_enqueued_jobs queue: :some_queue do
# MyJob.set(queue: :some_queue).perform_later(1, 2, 3) # will be performed
# HelloJob.set(queue: :other_queue).perform_later(1, 2, 3) # will not be performed
# end
# assert_performed_jobs 1
# end
#
# If the +:at+ option is specified, then only run jobs enqueued to run
# immediately or before the given time
- 11
def perform_enqueued_jobs(only: nil, except: nil, queue: nil, at: nil, &block)
- 1584
return flush_enqueued_jobs(only: only, except: except, queue: queue, at: at) unless block_given?
- 1012
validate_option(only: only, except: except)
- 924
old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
- 924
old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs
- 924
old_filter = queue_adapter.filter
- 924
old_reject = queue_adapter.reject
- 924
old_queue = queue_adapter.queue
- 924
old_at = queue_adapter.at
- 924
begin
- 924
queue_adapter.perform_enqueued_jobs = true
- 924
queue_adapter.perform_enqueued_at_jobs = true
- 924
queue_adapter.filter = only
- 924
queue_adapter.reject = except
- 924
queue_adapter.queue = queue
- 924
queue_adapter.at = at
- 924
assert_nothing_raised(&block)
ensure
- 924
queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs
- 924
queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs
- 924
queue_adapter.filter = old_filter
- 924
queue_adapter.reject = old_reject
- 924
queue_adapter.queue = old_queue
- 924
queue_adapter.at = old_at
end
end
# Accesses the queue_adapter set by ActiveJob::Base.
#
# def test_assert_job_has_custom_queue_adapter_set
# assert_instance_of CustomQueueAdapter, HelloJob.queue_adapter
# end
- 11
def queue_adapter
- 27665
ActiveJob::Base.queue_adapter
end
- 11
private
- 11
def clear_enqueued_jobs
- 2464
enqueued_jobs.clear
end
- 11
def clear_performed_jobs
- 2464
performed_jobs.clear
end
- 11
def jobs_with(jobs, only: nil, except: nil, queue: nil, at: nil)
- 2156
validate_option(only: only, except: except)
- 2024
jobs.dup.select do |job|
- 2728
job_class = job.fetch(:job)
- 2728
if only
- 715
next false unless filter_as_proc(only).call(job)
- 2013
elsif except
- 616
next false if filter_as_proc(except).call(job)
end
- 2090
if queue
- 550
next false unless queue.to_s == job.fetch(:queue, job_class.queue_name)
end
- 1782
if at && job[:at]
- 22
next false if job[:at] > at.to_f
end
- 1771
yield job if block_given?
- 1760
true
end
end
- 11
def filter_as_proc(filter)
- 1331
return filter if filter.is_a?(Proc)
- 2442
->(job) { Array(filter).include?(job.fetch(:job)) }
end
- 11
def enqueued_jobs_with(only: nil, except: nil, queue: nil, at: nil, &block)
- 1529
jobs_with(enqueued_jobs, only: only, except: except, queue: queue, at: at, &block)
end
- 11
def performed_jobs_with(only: nil, except: nil, queue: nil, &block)
- 627
jobs_with(performed_jobs, only: only, except: except, queue: queue, &block)
end
- 11
def flush_enqueued_jobs(only: nil, except: nil, queue: nil, at: nil)
enqueued_jobs_with(only: only, except: except, queue: queue, at: at) do |payload|
- 792
queue_adapter.enqueued_jobs.delete(payload)
- 792
queue_adapter.performed_jobs << payload
- 792
instantiate_job(payload).perform_now
- 572
end.count
end
- 11
def prepare_args_for_assertion(args)
- 737
args.dup.tap do |arguments|
- 737
if arguments[:at].acts_like?(:time)
- 99
at_range = arguments[:at] - 1..arguments[:at] + 1
- 209
arguments[:at] = ->(at) { at_range.cover?(at) }
end
- 737
arguments[:args] = round_time_arguments(arguments[:args]) if arguments[:args]
end
end
- 11
def round_time_arguments(argument)
- 649
case argument
when Time, ActiveSupport::TimeWithZone, DateTime
- 99
argument.change(usec: 0)
when Hash
- 363
argument.transform_values { |value| round_time_arguments(value) }
when Array
- 418
argument.map { |element| round_time_arguments(element) }
else
- 198
argument
end
end
- 11
def deserialize_args_for_assertion(job)
- 737
job.dup.tap do |new_job|
- 737
new_job[:at] = Time.at(new_job[:at]) if new_job[:at]
- 737
new_job[:args] = ActiveJob::Arguments.deserialize(new_job[:args]) if new_job[:args]
end
end
- 11
def instantiate_job(payload)
- 1265
job = payload[:job].deserialize(payload)
- 1265
job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at)
- 1265
job.send(:deserialize_arguments_if_needed)
- 1265
job
end
- 11
def queue_adapter_changed_jobs
- 4928
(ActiveJob::Base.descendants << ActiveJob::Base).select do |klass|
# only override explicitly set adapters, a quirk of `class_attribute`
- 110811
klass.singleton_class.public_instance_methods(false).include?(:_queue_adapter)
end
end
- 11
def validate_option(only: nil, except: nil)
- 3168
raise ArgumentError, "Cannot specify both `:only` and `:except` options." if only && except
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Timezones #:nodoc:
- 11
extend ActiveSupport::Concern
- 11
included do
- 11
around_perform do |job, block|
- 2800
Time.use_zone(job.timezone, &block)
end
end
end
end
# frozen_string_literal: true
- 11
module ActiveJob
- 11
module Translation #:nodoc:
- 11
extend ActiveSupport::Concern
- 11
included do
- 11
around_perform do |job, block|
- 2800
I18n.with_locale(job.locale, &block)
end
end
end
end
# frozen_string_literal: true
- 11
require_relative "gem_version"
- 11
module ActiveJob
# Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt>
- 11
def self.version
gem_version
end
end
# frozen_string_literal: true
require "rails/generators/named_base"
module Rails # :nodoc:
module Generators # :nodoc:
class JobGenerator < Rails::Generators::NamedBase # :nodoc:
desc "This generator creates an active job file at app/jobs"
class_option :queue, type: :string, default: "default", desc: "The queue name for the generated job"
check_class_collision suffix: "Job"
hook_for :test_framework
def self.default_generator_root
__dir__
end
def create_job_file
template "job.rb", File.join("app/jobs", class_path, "#{file_name}_job.rb")
in_root do
if behavior == :invoke && !File.exist?(application_job_file_name)
template "application_job.rb", application_job_file_name
end
end
end
private
def file_name
@_file_name ||= super.sub(/_job\z/i, "")
end
def application_job_file_name
@application_job_file_name ||= if mountable_engine?
"app/jobs/#{namespaced_path}/application_job.rb"
else
"app/jobs/application_job.rb"
end
end
end
end
end