All Files ( 80.85% covered at 51.99 hits/line )
47 files in total.
1629 relevant lines,
1317 lines covered and
312 lines missed.
(
80.85%
)
# frozen_string_literal: true
#--
# Copyright (c) 2015-2020 Basecamp, LLC
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#++
- 1
require "active_support"
- 1
require "active_support/rails"
- 1
require "action_cable/version"
- 1
module ActionCable
- 1
extend ActiveSupport::Autoload
- 1
INTERNAL = {
message_types: {
welcome: "welcome",
disconnect: "disconnect",
ping: "ping",
confirmation: "confirm_subscription",
rejection: "reject_subscription"
},
disconnect_reasons: {
unauthorized: "unauthorized",
invalid_request: "invalid_request",
server_restart: "server_restart"
},
default_mount_path: "/cable",
protocols: ["actioncable-v1-json", "actioncable-unsupported"].freeze
}
# Singleton instance of the server
- 1
module_function def server
- 491
@server ||= ActionCable::Server::Base.new
end
- 1
autoload :Server
- 1
autoload :Connection
- 1
autoload :Channel
- 1
autoload :RemoteConnections
- 1
autoload :SubscriptionAdapter
- 1
autoload :TestHelper
- 1
autoload :TestCase
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Channel
- 1
extend ActiveSupport::Autoload
- 1
eager_autoload do
- 1
autoload :Base
- 1
autoload :Broadcasting
- 1
autoload :Callbacks
- 1
autoload :Naming
- 1
autoload :PeriodicTimers
- 1
autoload :Streams
- 1
autoload :TestCase
end
end
end
# frozen_string_literal: true
- 1
require "set"
- 1
require "active_support/rescuable"
- 1
module ActionCable
- 1
module Channel
# The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection.
# You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply
# responding to the subscriber's direct requests.
#
# Channel instances are long-lived. A channel object will be instantiated when the cable consumer becomes a subscriber, and then
# lives until the consumer disconnects. This may be seconds, minutes, hours, or even days. That means you have to take special care
# not to do anything silly in a channel that would balloon its memory footprint or whatever. The references are forever, so they won't be released
# as is normally the case with a controller instance that gets thrown away after every request.
#
# Long-lived channels (and connections) also mean you're responsible for ensuring that the data is fresh. If you hold a reference to a user
# record, but the name is changed while that reference is held, you may be sending stale data if you don't take precautions to avoid it.
#
# The upside of long-lived channel instances is that you can use instance variables to keep reference to objects that future subscriber requests
# can interact with. Here's a quick example:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
# end
#
# def speak(data)
# @room.speak data, user: current_user
# end
# end
#
# The #speak action simply uses the Chat::Room object that was created when the channel was first subscribed to by the consumer when that
# subscriber wants to say something in the room.
#
# == Action processing
#
# Unlike subclasses of ActionController::Base, channels do not follow a RESTful
# constraint form for their actions. Instead, Action Cable operates through a
# remote-procedure call model. You can declare any public method on the
# channel (optionally taking a <tt>data</tt> argument), and this method is
# automatically exposed as callable to the client.
#
# Example:
#
# class AppearanceChannel < ApplicationCable::Channel
# def subscribed
# @connection_token = generate_connection_token
# end
#
# def unsubscribed
# current_user.disappear @connection_token
# end
#
# def appear(data)
# current_user.appear @connection_token, on: data['appearing_on']
# end
#
# def away
# current_user.away @connection_token
# end
#
# private
# def generate_connection_token
# SecureRandom.hex(36)
# end
# end
#
# In this example, the subscribed and unsubscribed methods are not callable methods, as they
# were already declared in ActionCable::Channel::Base, but <tt>#appear</tt>
# and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not
# callable, since it's a private method. You'll see that appear accepts a data
# parameter, which it then uses as part of its model call. <tt>#away</tt>
# does not, since it's simply a trigger action.
#
# Also note that in this example, <tt>current_user</tt> is available because
# it was marked as an identifying attribute on the connection. All such
# identifiers will automatically create a delegation method of the same name
# on the channel instance.
#
# == Rejecting subscription requests
#
# A channel can reject a subscription request in the #subscribed callback by
# invoking the #reject method:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
# reject unless current_user.can_access?(@room)
# end
# end
#
# In this example, the subscription will be rejected if the
# <tt>current_user</tt> does not have access to the chat room. On the
# client-side, the <tt>Channel#rejected</tt> callback will get invoked when
# the server rejects the subscription request.
- 1
class Base
- 1
include Callbacks
- 1
include PeriodicTimers
- 1
include Streams
- 1
include Naming
- 1
include Broadcasting
- 1
include ActiveSupport::Rescuable
- 1
attr_reader :params, :connection, :identifier
- 1
delegate :logger, to: :connection
- 1
class << self
# A list of method names that should be considered actions. This
# includes all public instance methods on a channel, less
# any internal methods (defined on Base), adding back in
# any methods that are internal, but still exist on the class
# itself.
#
# ==== Returns
# * <tt>Set</tt> - A set of all methods that should be considered actions.
- 1
def action_methods
- 143
@action_methods ||= begin
# All public instance methods of this class, including ancestors
- 5
methods = (public_instance_methods(true) -
# Except for public instance methods of Base and its ancestors
- 5
ActionCable::Channel::Base.public_instance_methods(true) +
# Be sure to include shadowed public instance methods of this class
public_instance_methods(false)).uniq.map(&:to_s)
- 5
methods.to_set
end
end
- 1
private
# action_methods are cached and there is sometimes need to refresh
# them. ::clear_action_methods! allows you to do that, so next time
# you run action_methods, they will be recalculated.
- 1
def clear_action_methods! # :doc:
- 66
@action_methods = nil
end
# Refresh the cached action_methods when a new action_method is added.
- 1
def method_added(name) # :doc:
- 66
super
- 66
clear_action_methods!
end
end
- 1
def initialize(connection, identifier, params = {})
- 174
@connection = connection
- 174
@identifier = identifier
- 174
@params = params
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
#
# The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
- 174
@defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
- 174
@reject_subscription = nil
- 174
@subscription_confirmation_sent = nil
- 174
delegate_connection_identifiers
end
# Extract the action name from the passed data and process it via the channel. The process will ensure
# that the action requested is a public method on the channel declared by the user (so not one of the callbacks
# like #subscribed).
- 1
def perform_action(data)
- 143
action = extract_action(data)
- 143
if processable_action?(action)
- 139
payload = { channel_class: self.class.name, action: action, data: data }
- 139
ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do
- 139
dispatch_action(action, data)
end
else
- 4
logger.error "Unable to process #{action_signature(action, data)}"
end
end
# This method is called after subscription has been added to the connection
# and confirms or rejects the subscription.
- 1
def subscribe_to_channel
- 156
run_callbacks :subscribe do
- 156
subscribed
end
- 156
reject_subscription if subscription_rejected?
- 156
ensure_confirmation_sent
end
# Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
- 1
def unsubscribe_from_channel # :nodoc:
- 119
run_callbacks :unsubscribe do
- 119
unsubscribed
end
end
- 1
private
# Called once a consumer has become a subscriber of the channel. Usually the place to set up any streams
# you want this channel to be sending to the subscriber.
- 1
def subscribed # :doc:
# Override in subclasses
end
# Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking
# users as offline or the like.
- 1
def unsubscribed # :doc:
# Override in subclasses
end
# Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with
# the proper channel identifier marked as the recipient.
- 1
def transmit(data, via: nil) # :doc:
- 218
status = "#{self.class.name} transmitting #{data.inspect.truncate(300)}"
- 218
status += " (via #{via})" if via
- 218
logger.debug(status)
- 218
payload = { channel_class: self.class.name, data: data, via: via }
- 218
ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
- 218
connection.transmit identifier: @identifier, message: data
end
end
- 1
def ensure_confirmation_sent # :doc:
- 275
return if subscription_rejected?
- 272
@defer_subscription_confirmation_counter.decrement
- 272
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
- 1
def defer_subscription_confirmation! # :doc:
- 134
@defer_subscription_confirmation_counter.increment
end
- 1
def defer_subscription_confirmation? # :doc:
- 272
@defer_subscription_confirmation_counter.value > 0
end
- 1
def subscription_confirmation_sent? # :doc:
- 153
@subscription_confirmation_sent
end
- 1
def reject # :doc:
- 4
@reject_subscription = true
end
- 1
def subscription_rejected? # :doc:
- 581
@reject_subscription
end
- 1
def delegate_connection_identifiers
- 174
connection.identifiers.each do |identifier|
- 45
define_singleton_method(identifier) do
- 6
connection.send(identifier)
end
end
end
- 1
def extract_action(data)
- 143
(data["action"].presence || :receive).to_sym
end
- 1
def processable_action?(action)
- 143
self.class.action_methods.include?(action.to_s) unless subscription_rejected?
end
- 1
def dispatch_action(action, data)
- 139
logger.info action_signature(action, data)
- 139
if method(action).arity == 1
- 132
public_send action, data
else
- 7
public_send action
end
rescue Exception => exception
- 2
rescue_with_handler(exception) || raise
end
- 1
def action_signature(action, data)
- 143
(+"#{self.class.name}##{action}").tap do |signature|
- 143
if (arguments = data.except("action")).any?
- 133
signature << "(#{arguments.inspect})"
end
end
end
- 1
def transmit_subscription_confirmation
- 152
unless subscription_confirmation_sent?
- 151
logger.debug "#{self.class.name} is transmitting the subscription confirmation"
- 151
ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
- 151
connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
- 151
@subscription_confirmation_sent = true
end
end
end
- 1
def reject_subscription
- 3
connection.subscriptions.remove_subscription self
- 3
transmit_subscription_rejection
end
- 1
def transmit_subscription_rejection
- 4
logger.debug "#{self.class.name} is transmitting the subscription rejection"
- 4
ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
- 4
connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
end
end
end
end
end
- 1
ActiveSupport.run_load_hooks(:action_cable_channel, ActionCable::Channel::Base)
# frozen_string_literal: true
- 1
require "active_support/core_ext/object/to_param"
- 1
module ActionCable
- 1
module Channel
- 1
module Broadcasting
- 1
extend ActiveSupport::Concern
- 1
delegate :broadcasting_for, :broadcast_to, to: :class
- 1
module ClassMethods
# Broadcast a hash to a unique broadcasting for this <tt>model</tt> in this channel.
- 1
def broadcast_to(model, message)
- 3
ActionCable.server.broadcast(broadcasting_for(model), message)
end
# Returns a unique broadcasting identifier for this <tt>model</tt> in this channel:
#
# CommentsChannel.broadcasting_for("all") # => "comments:all"
#
# You can pass any object as a target (e.g. Active Record model), and it
# would be serialized into a string under the hood.
- 1
def broadcasting_for(model)
- 20
serialize_broadcasting([ channel_name, model ])
end
- 1
def serialize_broadcasting(object) #:nodoc:
case
when object.is_a?(Array)
- 63
object.map { |m| serialize_broadcasting(m) }.join(":")
when object.respond_to?(:to_gid_param)
- 20
object.to_gid_param
else
- 21
object.to_param
- 62
end
end
end
end
end
end
# frozen_string_literal: true
- 1
require "active_support/callbacks"
- 1
module ActionCable
- 1
module Channel
- 1
module Callbacks
- 1
extend ActiveSupport::Concern
- 1
include ActiveSupport::Callbacks
- 1
included do
- 1
define_callbacks :subscribe
- 1
define_callbacks :unsubscribe
end
- 1
module ClassMethods
- 1
def before_subscribe(*methods, &block)
set_callback(:subscribe, :before, *methods, &block)
end
- 1
def after_subscribe(*methods, &block)
- 2
set_callback(:subscribe, :after, *methods, &block)
end
- 1
alias_method :on_subscribe, :after_subscribe
- 1
def before_unsubscribe(*methods, &block)
set_callback(:unsubscribe, :before, *methods, &block)
end
- 1
def after_unsubscribe(*methods, &block)
- 3
set_callback(:unsubscribe, :after, *methods, &block)
end
- 1
alias_method :on_unsubscribe, :after_unsubscribe
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Channel
- 1
module Naming
- 1
extend ActiveSupport::Concern
- 1
module ClassMethods
# Returns the name of the channel, underscored, without the <tt>Channel</tt> ending.
# If the channel is in a namespace, then the namespaces are represented by single
# colon separators in the channel name.
#
# ChatChannel.channel_name # => 'chat'
# Chats::AppearancesChannel.channel_name # => 'chats:appearances'
# FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances'
- 1
def channel_name
- 21
@channel_name ||= name.delete_suffix("Channel").gsub("::", ":").underscore
end
end
# Delegates to the class' <tt>channel_name</tt>
- 1
delegate :channel_name, to: :class
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Channel
- 1
module PeriodicTimers
- 1
extend ActiveSupport::Concern
- 1
included do
- 1
class_attribute :periodic_timers, instance_reader: false, default: []
- 1
after_subscribe :start_periodic_timers
- 1
after_unsubscribe :stop_periodic_timers
end
- 1
module ClassMethods
# Periodically performs a task on the channel, like updating an online
# user counter, polling a backend for new status messages, sending
# regular "heartbeat" messages, or doing some internal work and giving
# progress updates.
#
# Pass a method name or lambda argument or provide a block to call.
# Specify the calling period in seconds using the <tt>every:</tt>
# keyword argument.
#
# periodically :transmit_progress, every: 5.seconds
#
# periodically every: 3.minutes do
# transmit action: :update_count, count: current_count
# end
#
- 1
def periodically(callback_or_method_name = nil, every:, &block)
- 15
callback =
- 15
if block_given?
- 2
raise ArgumentError, "Pass a block or provide a callback arg, not both" if callback_or_method_name
- 1
block
else
- 13
case callback_or_method_name
when Proc
- 1
callback_or_method_name
when Symbol
- 9
-> { __send__ callback_or_method_name }
else
- 3
raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}"
end
end
- 11
unless every.kind_of?(Numeric) && every > 0
- 8
raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}"
end
- 3
self.periodic_timers += [[ callback, every: every ]]
end
end
- 1
private
- 1
def active_periodic_timers
- 240
@active_periodic_timers ||= []
end
- 1
def start_periodic_timers
- 144
self.class.periodic_timers.each do |callback, options|
- 3
active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every))
end
end
- 1
def start_periodic_timer(callback, every:)
- 3
connection.server.event_loop.timer every do
connection.worker_pool.async_exec self, connection: connection, &callback
end
end
- 1
def stop_periodic_timers
- 121
active_periodic_timers.each { |timer| timer.shutdown }
- 118
active_periodic_timers.clear
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Channel
# Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data
# placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
# streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.
#
# Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
# the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
# comments on a given page:
#
# class CommentsChannel < ApplicationCable::Channel
# def follow(data)
# stream_from "comments_for_#{data['recording_id']}"
# end
#
# def unfollow
# stop_all_streams
# end
# end
#
# Based on the above example, the subscribers of this channel will get whatever data is put into the,
# let's say, <tt>comments_for_45</tt> broadcasting as soon as it's put there.
#
# An example broadcasting for this channel looks like so:
#
# ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'
#
# If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel.
# The following example would subscribe to a broadcasting like <tt>comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE</tt>.
#
# class CommentsChannel < ApplicationCable::Channel
# def subscribed
# post = Post.find(params[:id])
# stream_for post
# end
# end
#
# You can then broadcast to this channel using:
#
# CommentsChannel.broadcast_to(@post, @comment)
#
# If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out.
# The below example shows how you can use this to provide performance introspection in the process:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
#
# stream_for @room, coder: ActiveSupport::JSON do |message|
# if message['originated_at'].present?
# elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
# ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
# logger.info "Message took #{elapsed_time}s to arrive"
# end
#
# transmit message
# end
# end
# end
#
# You can stop streaming from all broadcasts by calling #stop_all_streams.
- 1
module Streams
- 1
extend ActiveSupport::Concern
- 1
included do
- 1
on_unsubscribe :stop_all_streams
end
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
# Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
# Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
- 1
def stream_from(broadcasting, callback = nil, coder: nil, &block)
- 134
broadcasting = String(broadcasting)
# Don't send the confirmation until pubsub#subscribe is successful
- 134
defer_subscription_confirmation!
# Build a stream handler by wrapping the user-provided callback with
# a decoder or defaulting to a JSON-decoding retransmitter.
- 134
handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
- 134
streams[broadcasting] = handler
- 134
connection.server.event_loop.post do
- 134
pubsub.subscribe(broadcasting, handler, lambda do
- 119
ensure_confirmation_sent
- 119
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
end
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
# to the subscriber.
#
# Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
# Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
- 1
def stream_for(model, callback = nil, coder: nil, &block)
- 6
stream_from(broadcasting_for(model), callback || block, coder: coder)
end
# Unsubscribes streams from the named <tt>broadcasting</tt>.
- 1
def stop_stream_from(broadcasting)
- 2
callback = streams.delete(broadcasting)
- 2
if callback
- 2
pubsub.unsubscribe(broadcasting, callback)
- 2
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end
end
# Unsubscribes streams for the <tt>model</tt>.
- 1
def stop_stream_for(model)
- 1
stop_stream_from(broadcasting_for(model))
end
# Unsubscribes all streams associated with this channel from the pubsub queue.
- 1
def stop_all_streams
streams.each do |broadcasting, callback|
- 118
pubsub.unsubscribe broadcasting, callback
- 118
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
- 119
end.clear
end
# Calls stream_for if record is present, otherwise calls reject.
# This method is intended to be called when you're looking
# for a record based on a parameter, if its found it will start
# streaming. If the record is nil then it will reject the connection.
- 1
def stream_or_reject_for(record)
- 2
if record
- 1
stream_for record
else
- 1
reject
end
end
- 1
private
- 1
delegate :pubsub, to: :connection
- 1
def streams
- 255
@_streams ||= {}
end
# Always wrap the outermost handler to invoke the user handler on the
# worker pool rather than blocking the event loop.
- 1
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
- 134
handler = stream_handler(broadcasting, user_handler, coder: coder)
- 134
-> message do
- 102
connection.worker_pool.async_invoke handler, :call, message, connection: connection
end
end
# May be overridden to add instrumentation, logging, specialized error
# handling, or other forms of handler decoration.
#
# TODO: Tests demonstrating this.
- 1
def stream_handler(broadcasting, user_handler, coder: nil)
- 134
if user_handler
- 1
stream_decoder user_handler, coder: coder
else
- 133
default_stream_handler broadcasting, coder: coder
end
end
# May be overridden to change the default stream handling behavior
# which decodes JSON and transmits to the client.
#
# TODO: Tests demonstrating this.
#
# TODO: Room for optimization. Update transmit API to be coder-aware
# so we can no-op when pubsub and connection are both JSON-encoded.
# Then we can skip decode+encode if we're just proxying messages.
- 1
def default_stream_handler(broadcasting, coder:)
- 133
coder ||= ActiveSupport::JSON
- 133
stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
end
- 1
def stream_decoder(handler = identity_handler, coder:)
- 134
if coder
- 234
-> message { handler.(coder.decode(message)) }
else
- 1
handler
end
end
- 1
def stream_transmitter(handler = identity_handler, broadcasting:)
- 133
via = "streamed from #{broadcasting}"
- 133
-> (message) do
- 101
transmit handler.(message), via: via
end
end
- 1
def identity_handler
- 234
-> message { message }
end
end
end
end
# frozen_string_literal: true
- 1
require "active_support"
- 1
require "active_support/test_case"
- 1
require "active_support/core_ext/hash/indifferent_access"
- 1
require "json"
- 1
module ActionCable
- 1
module Channel
- 1
class NonInferrableChannelError < ::StandardError
- 1
def initialize(name)
super "Unable to determine the channel to test from #{name}. " +
"You'll need to specify it using `tests YourChannel` in your " +
"test case definition."
end
end
# Stub `stream_from` to track streams for the channel.
# Add public aliases for `subscription_confirmation_sent?` and
# `subscription_rejected?`.
- 1
module ChannelStub
- 1
def confirmed?
- 2
subscription_confirmation_sent?
end
- 1
def rejected?
- 7
subscription_rejected?
end
- 1
def stream_from(broadcasting, *)
- 3
streams << broadcasting
end
- 1
def stop_all_streams
- 1
@_streams = []
end
- 1
def streams
- 8
@_streams ||= []
end
# Make periodic timers no-op
- 1
def start_periodic_timers; end
- 1
alias stop_periodic_timers start_periodic_timers
end
- 1
class ConnectionStub
- 1
attr_reader :transmissions, :identifiers, :subscriptions, :logger
- 1
def initialize(identifiers = {})
- 13
@transmissions = []
- 13
identifiers.each do |identifier, val|
- 12
define_singleton_method(identifier) { val }
end
- 13
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
- 13
@identifiers = identifiers.keys
- 13
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
end
- 1
def transmit(cable_message)
- 14
transmissions << cable_message.with_indifferent_access
end
end
# Superclass for Action Cable channel functional tests.
#
# == Basic example
#
# Functional tests are written as follows:
# 1. First, one uses the +subscribe+ method to simulate subscription creation.
# 2. Then, one asserts whether the current state is as expected. "State" can be anything:
# transmitted messages, subscribed streams, etc.
#
# For example:
#
# class ChatChannelTest < ActionCable::Channel::TestCase
# def test_subscribed_with_room_number
# # Simulate a subscription creation
# subscribe room_number: 1
#
# # Asserts that the subscription was successfully created
# assert subscription.confirmed?
#
# # Asserts that the channel subscribes connection to a stream
# assert_has_stream "chat_1"
#
# # Asserts that the channel subscribes connection to a specific
# # stream created for a model
# assert_has_stream_for Room.find(1)
# end
#
# def test_does_not_stream_with_incorrect_room_number
# subscribe room_number: -1
#
# # Asserts that not streams was started
# assert_no_streams
# end
#
# def test_does_not_subscribe_without_room_number
# subscribe
#
# # Asserts that the subscription was rejected
# assert subscription.rejected?
# end
# end
#
# You can also perform actions:
# def test_perform_speak
# subscribe room_number: 1
#
# perform :speak, message: "Hello, Rails!"
#
# assert_equal "Hello, Rails!", transmissions.last["text"]
# end
#
# == Special methods
#
# ActionCable::Channel::TestCase will also automatically provide the following instance
# methods for use in the tests:
#
# <b>connection</b>::
# An ActionCable::Channel::ConnectionStub, representing the current HTTP connection.
# <b>subscription</b>::
# An instance of the current channel, created when you call `subscribe`.
# <b>transmissions</b>::
# A list of all messages that have been transmitted into the channel.
#
#
# == Channel is automatically inferred
#
# ActionCable::Channel::TestCase will automatically infer the channel under test
# from the test class name. If the channel cannot be inferred from the test
# class name, you can explicitly set it with +tests+.
#
# class SpecialEdgeCaseChannelTest < ActionCable::Channel::TestCase
# tests SpecialChannel
# end
#
# == Specifying connection identifiers
#
# You need to set up your connection manually to provide values for the identifiers.
# To do this just use:
#
# stub_connection(user: users(:john))
#
# == Testing broadcasting
#
# ActionCable::Channel::TestCase enhances ActionCable::TestHelper assertions (e.g.
# +assert_broadcasts+) to handle broadcasting to models:
#
#
# # in your channel
# def speak(data)
# broadcast_to room, text: data["message"]
# end
#
# def test_speak
# subscribe room_id: rooms(:chat).id
#
# assert_broadcast_on(rooms(:chat), text: "Hello, Rails!") do
# perform :speak, message: "Hello, Rails!"
# end
# end
- 1
class TestCase < ActiveSupport::TestCase
- 1
module Behavior
- 1
extend ActiveSupport::Concern
- 1
include ActiveSupport::Testing::ConstantLookup
- 1
include ActionCable::TestHelper
- 1
CHANNEL_IDENTIFIER = "test_stub"
- 1
included do
- 1
class_attribute :_channel_class
- 1
attr_reader :connection, :subscription
- 1
ActiveSupport.run_load_hooks(:action_cable_channel_test_case, self)
end
- 1
module ClassMethods
- 1
def tests(channel)
- 12
case channel
when String, Symbol
- 2
self._channel_class = channel.to_s.camelize.constantize
when Module
- 10
self._channel_class = channel
else
raise NonInferrableChannelError.new(channel)
end
end
- 1
def channel_class
- 18
if channel = self._channel_class
- 11
channel
else
- 7
tests determine_default_channel(name)
end
end
- 1
def determine_default_channel(name)
- 7
channel = determine_constant_from_test_name(name) do |constant|
- 7
Class === constant && constant < ActionCable::Channel::Base
end
- 7
raise NonInferrableChannelError.new(name) if channel.nil?
- 7
channel
end
end
# Set up test connection with the specified identifiers:
#
# class ApplicationCable < ActionCable::Connection::Base
# identified_by :user, :token
# end
#
# stub_connection(user: users[:john], token: 'my-secret-token')
- 1
def stub_connection(identifiers = {})
- 13
@connection = ConnectionStub.new(identifiers)
end
# Subscribe to the channel under test. Optionally pass subscription parameters as a Hash.
- 1
def subscribe(params = {})
- 12
@connection ||= stub_connection
- 12
@subscription = self.class.channel_class.new(connection, CHANNEL_IDENTIFIER, params.with_indifferent_access)
- 12
@subscription.singleton_class.include(ChannelStub)
- 12
@subscription.subscribe_to_channel
- 12
@subscription
end
# Unsubscribe the subscription under test.
- 1
def unsubscribe
check_subscribed!
subscription.unsubscribe_from_channel
end
# Perform action on a channel.
#
# NOTE: Must be subscribed.
- 1
def perform(action, data = {})
- 6
check_subscribed!
- 5
subscription.perform_action(data.stringify_keys.merge("action" => action.to_s))
end
# Returns messages transmitted into channel
- 1
def transmissions
# Return only directly sent message (via #transmit)
- 6
connection.transmissions.map { |data| data["message"] }.compact
end
# Enhance TestHelper assertions to handle non-String
# broadcastings
- 1
def assert_broadcasts(stream_or_object, *args)
- 1
super(broadcasting_for(stream_or_object), *args)
end
- 1
def assert_broadcast_on(stream_or_object, *args)
- 2
super(broadcasting_for(stream_or_object), *args)
end
# Asserts that no streams have been started.
#
# def test_assert_no_started_stream
# subscribe
# assert_no_streams
# end
#
- 1
def assert_no_streams
- 1
assert subscription.streams.empty?, "No streams started was expected, but #{subscription.streams.count} found"
end
# Asserts that the specified stream has been started.
#
# def test_assert_started_stream
# subscribe
# assert_has_stream 'messages'
# end
#
- 1
def assert_has_stream(stream)
- 3
assert subscription.streams.include?(stream), "Stream #{stream} has not been started"
end
# Asserts that the specified stream for a model has started.
#
# def test_assert_started_stream_for
# subscribe id: 42
# assert_has_stream_for User.find(42)
# end
#
- 1
def assert_has_stream_for(object)
- 1
assert_has_stream(broadcasting_for(object))
end
- 1
private
- 1
def check_subscribed!
- 6
raise "Must be subscribed!" if subscription.nil? || subscription.rejected?
end
- 1
def broadcasting_for(stream_or_object)
- 4
return stream_or_object if stream_or_object.is_a?(String)
- 3
self.class.channel_class.broadcasting_for(stream_or_object)
end
end
- 1
include Behavior
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Connection
- 1
extend ActiveSupport::Autoload
- 1
eager_autoload do
- 1
autoload :Authorization
- 1
autoload :Base
- 1
autoload :ClientSocket
- 1
autoload :Identification
- 1
autoload :InternalChannel
- 1
autoload :MessageBuffer
- 1
autoload :Stream
- 1
autoload :StreamEventLoop
- 1
autoload :Subscriptions
- 1
autoload :TaggedLoggerProxy
- 1
autoload :TestCase
- 1
autoload :WebSocket
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Connection
- 1
module Authorization
- 1
class UnauthorizedError < StandardError; end
# Closes the WebSocket connection if it is open and returns a 404 "File not Found" response.
- 1
def reject_unauthorized_connection
- 6
logger.error "An unauthorized connection attempt was rejected"
- 6
raise UnauthorizedError
end
end
end
end
# frozen_string_literal: true
- 1
require "action_dispatch"
- 1
require "active_support/rescuable"
- 1
module ActionCable
- 1
module Connection
# For every WebSocket connection the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent
# of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
# based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
#
# Here's a basic example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
# identified_by :current_user
#
# def connect
# self.current_user = find_verified_user
# logger.add_tags current_user.name
# end
#
# def disconnect
# # Any cleanup work needed when the cable connection is cut.
# end
#
# private
# def find_verified_user
# User.find_by_identity(cookies.encrypted[:identity_id]) ||
# reject_unauthorized_connection
# end
# end
# end
#
# First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections
# established for that current_user (and potentially disconnect them). You can declare as many
# identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
# Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
- 1
class Base
- 1
include Identification
- 1
include InternalChannel
- 1
include Authorization
- 1
include ActiveSupport::Rescuable
- 1
attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
- 1
delegate :event_loop, :pubsub, to: :server
- 1
def initialize(server, env, coder: ActiveSupport::JSON)
- 160
@server, @env, @coder = server, env, coder
- 160
@worker_pool = server.worker_pool
- 160
@logger = new_tagged_logger
- 160
@websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop)
- 160
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
- 160
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
- 160
@_internal_subscriptions = nil
- 160
@started_at = Time.now
end
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
- 1
def process #:nodoc:
- 153
logger.info started_request_message
- 153
if websocket.possible? && allow_request_origin?
- 138
respond_to_successful_request
else
- 15
respond_to_invalid_request
end
end
# Decodes WebSocket messages and dispatches them to subscribed channels.
# WebSocket message transfer encoding is always JSON.
- 1
def receive(websocket_message) #:nodoc:
- 238
send_async :dispatch_websocket_message, websocket_message
end
- 1
def dispatch_websocket_message(websocket_message) #:nodoc:
- 241
if websocket.alive?
- 241
subscriptions.execute_command decode(websocket_message)
else
logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
end
end
- 1
def transmit(cable_message) # :nodoc:
- 493
websocket.transmit encode(cable_message)
end
# Close the WebSocket connection.
- 1
def close(reason: nil, reconnect: true)
- 4
transmit(
type: ActionCable::INTERNAL[:message_types][:disconnect],
reason: reason,
reconnect: reconnect
)
- 4
websocket.close
end
# Invoke a method on the connection asynchronously through the pool of thread workers.
- 1
def send_async(method, *arguments)
- 467
worker_pool.async_invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>.
# This can be returned by a health check against the connection.
- 1
def statistics
- 1
{
identifier: connection_identifier,
started_at: @started_at,
subscriptions: subscriptions.identifiers,
request_id: @env["action_dispatch.request_id"]
}
end
- 1
def beat
- 3
transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end
- 1
def on_open # :nodoc:
- 139
send_async :handle_open
end
- 1
def on_message(message) # :nodoc:
- 238
message_buffer.append message
end
- 1
def on_error(message) # :nodoc:
# log errors to make diagnosing socket errors easier
- 40
logger.error "WebSocket error occurred: #{message}"
end
- 1
def on_close(reason, code) # :nodoc:
- 115
send_async :handle_close
end
- 1
private
- 1
attr_reader :websocket
- 1
attr_reader :message_buffer
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
- 1
def request # :doc:
- 723
@request ||= begin
- 153
environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
- 153
ActionDispatch::Request.new(environment || env)
end
end
# The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
- 1
def cookies # :doc:
- 8
request.cookie_jar
end
- 1
def encode(cable_message)
- 493
@coder.encode cable_message
end
- 1
def decode(websocket_message)
- 241
@coder.decode websocket_message
end
- 1
def handle_open
- 149
@protocol = websocket.protocol
- 149
connect if respond_to?(:connect)
- 148
subscribe_to_internal_channel
- 148
send_welcome_message
- 148
message_buffer.process!
- 148
server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
- 1
close(reason: ActionCable::INTERNAL[:disconnect_reasons][:unauthorized], reconnect: false) if websocket.alive?
end
- 1
def handle_close
- 117
logger.info finished_request_message
- 117
server.remove_connection(self)
- 117
subscriptions.unsubscribe_from_all
- 117
unsubscribe_from_internal_channel
- 117
disconnect if respond_to?(:disconnect)
end
- 1
def send_welcome_message
# Send welcome message to the internal connection monitor channel.
# This ensures the connection monitor state is reset after a successful
# websocket connection.
- 148
transmit type: ActionCable::INTERNAL[:message_types][:welcome]
end
- 1
def allow_request_origin?
- 152
return true if server.config.disable_request_forgery_protection
- 35
proto = Rack::Request.new(env).ssl? ? "https" : "http"
- 35
if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}"
- 1
true
- 69
elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] }
- 20
true
else
- 14
logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
- 14
false
end
end
- 1
def respond_to_successful_request
- 138
logger.info successful_request_message
- 138
websocket.rack_response
end
- 1
def respond_to_invalid_request
- 15
close(reason: ActionCable::INTERNAL[:disconnect_reasons][:invalid_request]) if websocket.alive?
- 15
logger.error invalid_request_message
- 15
logger.info finished_request_message
- 15
[ 404, { "Content-Type" => "text/plain" }, [ "Page not found" ] ]
end
# Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
- 1
def new_tagged_logger
- 160
TaggedLoggerProxy.new server.logger,
tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end
- 1
def started_request_message
- 153
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- 153
websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
request.ip,
Time.now.to_s ]
end
- 1
def finished_request_message
- 132
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- 132
websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
request.ip,
Time.now.to_s ]
end
- 1
def invalid_request_message
- 15
"Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
- 1
def successful_request_message
- 138
"Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
end
end
end
- 1
ActiveSupport.run_load_hooks(:action_cable_connection, ActionCable::Connection::Base)
# frozen_string_literal: true
- 1
require "websocket/driver"
- 1
module ActionCable
- 1
module Connection
#--
# This class is heavily based on faye-websocket-ruby
#
# Copyright (c) 2010-2015 James Coglan
- 1
class ClientSocket # :nodoc:
- 1
def self.determine_url(env)
- 159
scheme = secure_request?(env) ? "wss:" : "ws:"
- 159
"#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
end
- 1
def self.secure_request?(env)
- 159
return true if env["HTTPS"] == "on"
- 159
return true if env["HTTP_X_FORWARDED_SSL"] == "on"
- 159
return true if env["HTTP_X_FORWARDED_SCHEME"] == "https"
- 159
return true if env["HTTP_X_FORWARDED_PROTO"] == "https"
- 159
return true if env["rack.url_scheme"] == "https"
- 159
false
end
- 1
CONNECTING = 0
- 1
OPEN = 1
- 1
CLOSING = 2
- 1
CLOSED = 3
- 1
attr_reader :env, :url
- 1
def initialize(env, event_target, event_loop, protocols)
- 159
@env = env
- 159
@event_target = event_target
- 159
@event_loop = event_loop
- 159
@url = ClientSocket.determine_url(@env)
- 159
@driver = @driver_started = nil
- 159
@close_params = ["", 1006]
- 159
@ready_state = CONNECTING
# The driver calls +env+, +url+, and +write+
- 159
@driver = ::WebSocket::Driver.rack(self, protocols: protocols)
- 297
@driver.on(:open) { |e| open }
- 397
@driver.on(:message) { |e| receive_message(e.data) }
- 274
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
- 159
@driver.on(:error) { |e| emit_error(e.message) }
- 159
@stream = ActionCable::Connection::Stream.new(@event_loop, self)
end
- 1
def start_driver
- 138
return if @driver.nil? || @driver_started
- 138
@stream.hijack_rack_socket
- 138
if callback = @env["async.callback"]
callback.call([101, {}, @stream])
end
- 138
@driver_started = true
- 138
@driver.start
end
- 1
def rack_response
- 138
start_driver
- 138
[ -1, {}, [] ]
end
- 1
def write(data)
- 729
@stream.write(data)
rescue => e
- 41
emit_error e.message
end
- 1
def transmit(message)
- 489
return false if @ready_state > OPEN
- 485
case message
when Numeric then @driver.text(message.to_s)
- 485
when String then @driver.text(message)
when Array then @driver.binary(message)
else false
end
end
- 1
def close(code = nil, reason = nil)
- 2
code ||= 1000
- 2
reason ||= ""
- 2
unless code == 1000 || (code >= 3000 && code <= 4999)
raise ArgumentError, "Failed to execute 'close' on WebSocket: " \
"The code must be either 1000, or between 3000 and 4999. " \
"#{code} is neither."
end
- 2
@ready_state = CLOSING unless @ready_state == CLOSED
- 2
@driver.close(reason, code)
end
- 1
def parse(data)
- 352
@driver.parse(data)
end
- 1
def client_gone
finalize_close
end
- 1
def alive?
- 260
@ready_state == OPEN
end
- 1
def protocol
- 149
@driver.protocol
end
- 1
private
- 1
def open
- 138
return unless @ready_state == CONNECTING
- 138
@ready_state = OPEN
- 138
@event_target.on_open
end
- 1
def receive_message(data)
- 238
return unless @ready_state == OPEN
- 238
@event_target.on_message(data)
end
- 1
def emit_error(message)
- 41
return if @ready_state >= CLOSING
- 41
@event_target.on_error(message)
end
- 1
def begin_close(reason, code)
- 115
return if @ready_state == CLOSED
- 115
@ready_state = CLOSING
- 115
@close_params = [reason, code]
- 115
@stream.shutdown if @stream
- 115
finalize_close
end
- 1
def finalize_close
- 115
return if @ready_state == CLOSED
- 115
@ready_state = CLOSED
- 115
@event_target.on_close(*@close_params)
end
end
end
end
# frozen_string_literal: true
- 1
require "set"
- 1
module ActionCable
- 1
module Connection
- 1
module Identification
- 1
extend ActiveSupport::Concern
- 1
included do
- 2
class_attribute :identifiers, default: Set.new
end
- 1
module ClassMethods
# Mark a key as being a connection identifier index that can then be used to find the specific connection again later.
# Common identifiers are current_user and current_account, but could be anything, really.
#
# Note that anything marked as an identifier will automatically create a delegate by the same name on any
# channel instances created off the connection.
- 1
def identified_by(*identifiers)
- 19
Array(identifiers).each { |identifier| attr_accessor identifier }
- 9
self.identifiers += identifiers
end
end
# Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
- 1
def connection_identifier
- 171
unless defined? @connection_identifier
- 150
@connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
end
- 171
@connection_identifier
end
- 1
private
- 1
def connection_gid(ids)
ids.map do |o|
- 7
if o.respond_to? :to_gid_param
- 6
o.to_gid_param
else
- 1
o.to_s
end
- 143
end.sort.join(":")
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Connection
# Makes it possible for the RemoteConnection to disconnect a specific connection.
- 1
module InternalChannel
- 1
extend ActiveSupport::Concern
- 1
private
- 1
def internal_channel
- 12
"action_cable/#{connection_identifier}"
end
- 1
def subscribe_to_internal_channel
- 148
if connection_identifier.present?
- 6
callback = -> (message) { process_internal_message decode(message) }
- 6
@_internal_subscriptions ||= []
- 6
@_internal_subscriptions << [ internal_channel, callback ]
- 12
server.event_loop.post { pubsub.subscribe(internal_channel, callback) }
- 6
logger.info "Registered connection (#{connection_identifier})"
end
end
- 1
def unsubscribe_from_internal_channel
- 117
if @_internal_subscriptions.present?
- 3
@_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } }
end
end
- 1
def process_internal_message(message)
- 2
case message["type"]
when "disconnect"
- 1
logger.info "Removing connection (#{connection_identifier})"
- 1
websocket.close
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
close
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Connection
# Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them.
- 1
class MessageBuffer # :nodoc:
- 1
def initialize(connection)
- 160
@connection = connection
- 160
@buffered_messages = []
end
- 1
def append(message)
- 238
if valid? message
- 238
if processing?
- 238
receive message
else
buffer message
end
else
connection.logger.error "Couldn't handle non-string message: #{message.class}"
end
end
- 1
def processing?
- 238
@processing
end
- 1
def process!
- 147
@processing = true
- 147
receive_buffered_messages
end
- 1
private
- 1
attr_reader :connection
- 1
attr_reader :buffered_messages
- 1
def valid?(message)
- 238
message.is_a?(String)
end
- 1
def receive(message)
- 238
connection.receive message
end
- 1
def buffer(message)
buffered_messages << message
end
- 1
def receive_buffered_messages
- 147
receive buffered_messages.shift until buffered_messages.empty?
end
end
end
end
# frozen_string_literal: true
- 1
require "thread"
- 1
module ActionCable
- 1
module Connection
#--
# This class is heavily based on faye-websocket-ruby
#
# Copyright (c) 2010-2015 James Coglan
- 1
class Stream # :nodoc:
- 1
def initialize(event_loop, socket)
- 159
@event_loop = event_loop
- 159
@socket_object = socket
- 159
@stream_send = socket.env["stream.send"]
- 159
@rack_hijack_io = nil
- 159
@write_lock = Mutex.new
- 159
@write_head = nil
- 159
@write_buffer = Queue.new
end
- 1
def each(&callback)
@stream_send ||= callback
end
- 1
def close
shutdown
@socket_object.client_gone
end
- 1
def shutdown
- 115
clean_rack_hijack
end
- 1
def write(data)
- 728
if @stream_send
return @stream_send.call(data)
end
- 728
if @write_lock.try_lock
- 728
begin
- 728
if @write_head.nil? && @write_buffer.empty?
- 728
written = @rack_hijack_io.write_nonblock(data, exception: false)
- 686
case written
when :wait_writable
# proceed below
when data.bytesize
- 686
return data.bytesize
else
@write_head = data.byteslice(written, data.bytesize)
@event_loop.writes_pending @rack_hijack_io
return data.bytesize
end
end
ensure
- 728
@write_lock.unlock
end
end
@write_buffer << data
@event_loop.writes_pending @rack_hijack_io
data.bytesize
rescue EOFError, Errno::ECONNRESET
- 2
@socket_object.client_gone
end
- 1
def flush_write_buffer
@write_lock.synchronize do
loop do
if @write_head.nil?
return true if @write_buffer.empty?
@write_head = @write_buffer.pop
end
written = @rack_hijack_io.write_nonblock(@write_head, exception: false)
case written
when :wait_writable
return false
when @write_head.bytesize
@write_head = nil
else
@write_head = @write_head.byteslice(written, @write_head.bytesize)
return false
end
end
end
end
- 1
def receive(data)
- 352
@socket_object.parse(data)
end
- 1
def hijack_rack_socket
- 138
return unless @socket_object.env["rack.hijack"]
# This should return the underlying io according to the SPEC:
- 119
@rack_hijack_io = @socket_object.env["rack.hijack"].call
# Retain existing behaviour if required:
- 119
@rack_hijack_io ||= @socket_object.env["rack.hijack_io"]
- 119
@event_loop.attach(@rack_hijack_io, self)
end
- 1
private
- 1
def clean_rack_hijack
- 115
return unless @rack_hijack_io
- 115
@event_loop.detach(@rack_hijack_io, self)
- 115
@rack_hijack_io = nil
end
end
end
end
# frozen_string_literal: true
- 1
require "nio"
- 1
require "thread"
- 1
module ActionCable
- 1
module Connection
- 1
class StreamEventLoop
- 1
def initialize
- 100
@nio = @executor = @thread = nil
- 100
@map = {}
- 100
@stopping = false
- 100
@todo = Queue.new
- 100
@spawn_mutex = Mutex.new
end
- 1
def timer(interval, &block)
- 6
Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
end
- 1
def post(task = nil, &block)
- 501
task ||= block
- 501
spawn
- 501
@executor << task
end
- 1
def attach(io, stream)
- 119
@todo << lambda do
- 119
@map[io] = @nio.register(io, :r)
- 117
@map[io].value = stream
end
- 119
wakeup
end
- 1
def detach(io, stream)
- 115
@todo << lambda do
- 115
@nio.deregister io
- 115
@map.delete io
- 115
io.close
end
- 115
wakeup
end
- 1
def writes_pending(io)
@todo << lambda do
if monitor = @map[io]
monitor.interests = :rw
end
end
wakeup
end
- 1
def stop
@stopping = true
wakeup if @nio
end
- 1
private
- 1
def spawn
- 735
return if @thread && @thread.status
- 74
@spawn_mutex.synchronize do
- 74
return if @thread && @thread.status
- 74
@nio ||= NIO::Selector.new
- 74
@executor ||= Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: 10,
max_queue: 0,
)
- 148
@thread = Thread.new { run }
- 74
return true
end
end
- 1
def wakeup
- 234
spawn || @nio.wakeup
end
- 1
def run
- 74
loop do
- 192
if @stopping
@nio.close
break
end
- 192
until @todo.empty?
- 234
@todo.pop(true).call
end
- 190
next unless monitors = @nio.select
- 102
monitors.each do |monitor|
- 352
io = monitor.io
- 352
stream = monitor.value
- 352
begin
- 352
if monitor.writable?
if stream.flush_write_buffer
monitor.interests = :r
end
next unless monitor.readable?
end
- 352
incoming = io.read_nonblock(4096, exception: false)
- 352
case incoming
when :wait_readable
next
when nil
stream.close
else
- 352
stream.receive incoming
end
rescue
# We expect one of EOFError or Errno::ECONNRESET in
# normal operation (when the client goes away). But if
# anything else goes wrong, this is still the best way
# to handle it.
begin
stream.close
rescue
@nio.deregister io
@map.delete io
end
end
end
end
end
end
end
end
# frozen_string_literal: true
- 1
require "active_support/core_ext/hash/indifferent_access"
- 1
module ActionCable
- 1
module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
# the connection to the proper channel.
- 1
class Subscriptions # :nodoc:
- 1
def initialize(connection)
- 180
@connection = connection
- 180
@subscriptions = {}
end
- 1
def execute_command(data)
- 253
case data["command"]
- 126
when "subscribe" then add data
- 2
when "unsubscribe" then remove data
- 125
when "message" then perform_action data
else
logger.error "Received unrecognized command in #{data.inspect}"
end
rescue Exception => e
- 3
@connection.rescue_with_handler(e)
- 3
logger.error "Could not execute command from (#{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end
- 1
def add(data)
- 126
id_key = data["identifier"]
- 126
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
- 125
return if subscriptions.key?(id_key)
- 124
subscription_klass = id_options[:channel].safe_constantize
- 124
if subscription_klass && ActionCable::Channel::Base >= subscription_klass
- 124
subscription = subscription_klass.new(connection, id_key, id_options)
- 124
subscriptions[id_key] = subscription
- 124
subscription.subscribe_to_channel
else
logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end
end
- 1
def remove(data)
- 2
logger.info "Unsubscribing from channel: #{data['identifier']}"
- 2
remove_subscription find(data)
end
- 1
def remove_subscription(subscription)
- 118
subscription.unsubscribe_from_channel
- 118
subscriptions.delete(subscription.identifier)
end
- 1
def perform_action(data)
- 125
find(data).perform_action ActiveSupport::JSON.decode(data["data"])
end
- 1
def identifiers
- 11
subscriptions.keys
end
- 1
def unsubscribe_from_all
- 233
subscriptions.each { |id, channel| remove_subscription(channel) }
end
- 1
private
- 1
attr_reader :connection, :subscriptions
- 1
delegate :logger, to: :connection
- 1
def find(data)
- 134
if subscription = subscriptions[data["identifier"]]
- 133
subscription
else
- 1
raise "Unable to find subscription with identifier: #{data['identifier']}"
end
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Connection
# Allows the use of per-connection tags against the server logger. This wouldn't work using the traditional
# <tt>ActiveSupport::TaggedLogging</tt> enhanced Rails.logger, as that logger will reset the tags between requests.
# The connection is long-lived, so it needs its own set of tags for its independent duration.
- 1
class TaggedLoggerProxy
- 1
attr_reader :tags
- 1
def initialize(logger, tags:)
- 176
@logger = logger
- 176
@tags = tags.flatten
end
- 1
def add_tags(*tags)
- 1
@tags += tags.flatten
- 1
@tags = @tags.uniq
end
- 1
def tag(logger)
- 1777
if logger.respond_to?(:tagged)
- 182
current_tags = tags - logger.formatter.current_tags
- 364
logger.tagged(*current_tags) { yield }
else
- 1595
yield
end
end
- 1
%i( debug info warn error fatal unknown ).each do |severity|
- 6
define_method(severity) do |message|
- 1206
log severity, message
end
end
- 1
private
- 1
def log(type, message) # :doc:
- 2412
tag(@logger) { @logger.send type, message }
end
end
end
end
# frozen_string_literal: true
- 1
require "active_support"
- 1
require "active_support/test_case"
- 1
require "active_support/core_ext/hash/indifferent_access"
- 1
require "action_dispatch"
- 1
require "action_dispatch/http/headers"
- 1
require "action_dispatch/testing/test_request"
- 1
module ActionCable
- 1
module Connection
- 1
class NonInferrableConnectionError < ::StandardError
- 1
def initialize(name)
super "Unable to determine the connection to test from #{name}. " +
"You'll need to specify it using `tests YourConnection` in your " +
"test case definition."
end
end
- 1
module Assertions
# Asserts that the connection is rejected (via +reject_unauthorized_connection+).
#
# # Asserts that connection without user_id fails
# assert_reject_connection { connect params: { user_id: '' } }
- 1
def assert_reject_connection(&block)
- 6
assert_raises(Authorization::UnauthorizedError, "Expected to reject connection but no rejection was made", &block)
end
end
# We don't want to use the whole "encryption stack" for connection
# unit-tests, but we want to make sure that users test against the correct types
# of cookies (i.e. signed or encrypted or plain)
- 1
class TestCookieJar < ActiveSupport::HashWithIndifferentAccess
- 1
def signed
- 4
self[:signed] ||= {}.with_indifferent_access
end
- 1
def encrypted
- 3
self[:encrypted] ||= {}.with_indifferent_access
end
end
- 1
class TestRequest < ActionDispatch::TestRequest
- 1
attr_accessor :session, :cookie_jar
end
- 1
module TestConnection
- 1
attr_reader :logger, :request
- 1
def initialize(request)
- 14
inner_logger = ActiveSupport::Logger.new(StringIO.new)
- 14
tagged_logging = ActiveSupport::TaggedLogging.new(inner_logger)
- 14
@logger = ActionCable::Connection::TaggedLoggerProxy.new(tagged_logging, tags: [])
- 14
@request = request
- 14
@env = request.env
end
end
# Unit test Action Cable connections.
#
# Useful to check whether a connection's +identified_by+ gets assigned properly
# and that any improper connection requests are rejected.
#
# == Basic example
#
# Unit tests are written as follows:
#
# 1. Simulate a connection attempt by calling +connect+.
# 2. Assert state, e.g. identifiers, has been assigned.
#
#
# class ApplicationCable::ConnectionTest < ActionCable::Connection::TestCase
# def test_connects_with_proper_cookie
# # Simulate the connection request with a cookie.
# cookies["user_id"] = users(:john).id
#
# connect
#
# # Assert the connection identifier matches the fixture.
# assert_equal users(:john).id, connection.user.id
# end
#
# def test_rejects_connection_without_proper_cookie
# assert_reject_connection { connect }
# end
# end
#
# +connect+ accepts additional information about the HTTP request with the
# +params+, +headers+, +session+ and Rack +env+ options.
#
# def test_connect_with_headers_and_query_string
# connect params: { user_id: 1 }, headers: { "X-API-TOKEN" => "secret-my" }
#
# assert_equal "1", connection.user.id
# assert_equal "secret-my", connection.token
# end
#
# def test_connect_with_params
# connect params: { user_id: 1 }
#
# assert_equal "1", connection.user.id
# end
#
# You can also set up the correct cookies before the connection request:
#
# def test_connect_with_cookies
# # Plain cookies:
# cookies["user_id"] = 1
#
# # Or signed/encrypted:
# # cookies.signed["user_id"] = 1
# # cookies.encrypted["user_id"] = 1
#
# connect
#
# assert_equal "1", connection.user_id
# end
#
# == Connection is automatically inferred
#
# ActionCable::Connection::TestCase will automatically infer the connection under test
# from the test class name. If the channel cannot be inferred from the test
# class name, you can explicitly set it with +tests+.
#
# class ConnectionTest < ActionCable::Connection::TestCase
# tests ApplicationCable::Connection
# end
#
- 1
class TestCase < ActiveSupport::TestCase
- 1
module Behavior
- 1
extend ActiveSupport::Concern
- 1
DEFAULT_PATH = "/cable"
- 1
include ActiveSupport::Testing::ConstantLookup
- 1
include Assertions
- 1
included do
- 1
class_attribute :_connection_class
- 1
attr_reader :connection
- 1
ActiveSupport.run_load_hooks(:action_cable_connection_test_case, self)
end
- 1
module ClassMethods
- 1
def tests(connection)
- 5
case connection
when String, Symbol
self._connection_class = connection.to_s.camelize.constantize
when Module
- 5
self._connection_class = connection
else
raise NonInferrableConnectionError.new(connection)
end
end
- 1
def connection_class
- 14
if connection = self._connection_class
- 13
connection
else
- 1
tests determine_default_connection(name)
end
end
- 1
def determine_default_connection(name)
- 1
connection = determine_constant_from_test_name(name) do |constant|
- 1
Class === constant && constant < ActionCable::Connection::Base
end
- 1
raise NonInferrableConnectionError.new(name) if connection.nil?
- 1
connection
end
end
# Performs connection attempt to exert #connect on the connection under test.
#
# Accepts request path as the first argument and the following request options:
#
# - params â URL parameters (Hash)
# - headers â request headers (Hash)
# - session â session data (Hash)
# - env â additional Rack env configuration (Hash)
- 1
def connect(path = ActionCable.server.config.mount_path, **request_params)
- 14
path ||= DEFAULT_PATH
- 14
connection = self.class.connection_class.allocate
- 14
connection.singleton_class.include(TestConnection)
- 14
connection.send(:initialize, build_test_request(path, **request_params))
- 14
connection.connect if connection.respond_to?(:connect)
# Only set instance variable if connected successfully
- 9
@connection = connection
end
# Exert #disconnect on the connection under test.
- 1
def disconnect
- 1
raise "Must be connected!" if connection.nil?
- 1
connection.disconnect if connection.respond_to?(:disconnect)
- 1
@connection = nil
end
- 1
def cookies
- 19
@cookie_jar ||= TestCookieJar.new
end
- 1
private
- 1
def build_test_request(path, params: nil, headers: {}, session: {}, env: {})
- 14
wrapped_headers = ActionDispatch::Http::Headers.from_hash(headers)
- 14
uri = URI.parse(path)
- 14
query_string = params.nil? ? uri.query : params.to_query
- 14
request_env = {
"QUERY_STRING" => query_string,
"PATH_INFO" => uri.path
}.merge(env)
- 14
if wrapped_headers.present?
- 14
ActionDispatch::Http::Headers.from_hash(request_env).merge!(wrapped_headers)
end
- 14
TestRequest.create(request_env).tap do |request|
- 14
request.session = session.with_indifferent_access
- 14
request.cookie_jar = cookies
end
end
end
- 1
include Behavior
end
end
end
# frozen_string_literal: true
- 1
require "websocket/driver"
- 1
module ActionCable
- 1
module Connection
# Wrap the real socket to minimize the externally-presented API
- 1
class WebSocket # :nodoc:
- 1
def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols])
- 160
@websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil
end
- 1
def possible?
- 442
websocket
end
- 1
def alive?
- 261
websocket && websocket.alive?
end
- 1
def transmit(data)
- 489
websocket.transmit data
end
- 1
def close
- 2
websocket.close
end
- 1
def protocol
- 149
websocket.protocol
end
- 1
def rack_response
- 138
websocket.rack_response
end
- 1
private
- 1
attr_reader :websocket
end
end
end
# frozen_string_literal: true
require "rails"
require "action_cable"
require "action_cable/helpers/action_cable_helper"
require "active_support/core_ext/hash/indifferent_access"
module ActionCable
class Engine < Rails::Engine # :nodoc:
config.action_cable = ActiveSupport::OrderedOptions.new
config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path]
config.eager_load_namespaces << ActionCable
initializer "action_cable.helpers" do
ActiveSupport.on_load(:action_view) do
include ActionCable::Helpers::ActionCableHelper
end
end
initializer "action_cable.logger" do
ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger }
end
initializer "action_cable.set_configs" do |app|
options = app.config.action_cable
options.allowed_request_origins ||= /https?:\/\/localhost:\d+/ if ::Rails.env.development?
app.paths.add "config/cable", with: "config/cable.yml"
ActiveSupport.on_load(:action_cable) do
if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist?
self.cable = Rails.application.config_for(config_path).with_indifferent_access
end
previous_connection_class = connection_class
self.connection_class = -> { "ApplicationCable::Connection".safe_constantize || previous_connection_class.call }
options.each { |k, v| send("#{k}=", v) }
end
end
initializer "action_cable.routes" do
config.after_initialize do |app|
config = app.config
unless config.action_cable.mount_path.nil?
app.routes.prepend do
mount ActionCable.server => config.action_cable.mount_path, internal: true
end
end
end
end
initializer "action_cable.set_work_hooks" do |app|
ActiveSupport.on_load(:action_cable) do
ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner|
app.executor.wrap do
# If we took a while to get the lock, we may have been halted
# in the meantime. As we haven't started doing any real work
# yet, we should pretend that we never made it off the queue.
unless stopping?
inner.call
end
end
end
wrap = lambda do |_, inner|
app.executor.wrap(&inner)
end
ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap
ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap
app.reloader.before_class_unload do
ActionCable.server.restart
end
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
# Returns the version of the currently loaded Action Cable as a <tt>Gem::Version</tt>.
- 1
def self.gem_version
Gem::Version.new VERSION::STRING
end
- 1
module VERSION
- 1
MAJOR = 6
- 1
MINOR = 1
- 1
TINY = 0
- 1
PRE = "alpha"
- 1
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
end
# frozen_string_literal: true
module ActionCable
module Helpers
module ActionCableHelper
# Returns an "action-cable-url" meta tag with the value of the URL specified in your
# configuration. Ensure this is above your JavaScript tag:
#
# <head>
# <%= action_cable_meta_tag %>
# <%= javascript_include_tag 'application', 'data-turbolinks-track' => 'reload' %>
# </head>
#
# This is then used by Action Cable to determine the URL of your WebSocket server.
# Your JavaScript can then connect to the server without needing to specify the
# URL directly:
#
# window.Cable = require("@rails/actioncable")
# window.App = {}
# App.cable = Cable.createConsumer()
#
# Make sure to specify the correct server location in each of your environment
# config files:
#
# config.action_cable.mount_path = "/cable123"
# <%= action_cable_meta_tag %> would render:
# => <meta name="action-cable-url" content="/cable123" />
#
# config.action_cable.url = "ws://actioncable.com"
# <%= action_cable_meta_tag %> would render:
# => <meta name="action-cable-url" content="ws://actioncable.com" />
#
def action_cable_meta_tag
tag "meta", name: "action-cable-url", content: (
ActionCable.server.config.url ||
ActionCable.server.config.mount_path ||
raise("No Action Cable URL configured -- please configure this at config.action_cable.url")
)
end
end
end
end
# frozen_string_literal: true
- 1
require "active_support/core_ext/module/redefine_method"
- 1
module ActionCable
# If you need to disconnect a given connection, you can go through the
# RemoteConnections. You can find the connections you're looking for by
# searching for the identifier declared on the connection. For example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
# identified_by :current_user
# ....
# end
# end
#
# ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect
#
# This will disconnect all the connections established for
# <tt>User.find(1)</tt>, across all servers running on all machines, because
# it uses the internal channel that all of these servers are subscribed to.
- 1
class RemoteConnections
- 1
attr_reader :server
- 1
def initialize(server)
- 1
@server = server
end
- 1
def where(identifier)
- 1
RemoteConnection.new(server, identifier)
end
- 1
private
# Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>.
# Exists solely for the purpose of calling #disconnect on that connection.
- 1
class RemoteConnection
- 1
class InvalidIdentifiersError < StandardError; end
- 1
include Connection::Identification, Connection::InternalChannel
- 1
def initialize(server, ids)
- 1
@server = server
- 1
set_identifier_instance_vars(ids)
end
# Uses the internal channel to disconnect the connection.
- 1
def disconnect
server.broadcast internal_channel, type: "disconnect"
end
# Returns all the identifiers that were applied to this connection.
- 1
redefine_method :identifiers do
- 1
server.connection_identifiers
end
- 1
protected
- 1
attr_reader :server
- 1
private
- 1
def set_identifier_instance_vars(ids)
- 1
raise InvalidIdentifiersError unless valid_identifiers?(ids)
- 2
ids.each { |k, v| instance_variable_set("@#{k}", v) }
end
- 1
def valid_identifiers?(ids)
- 1
keys = ids.keys
- 1
identifiers.all? { |id| keys.include?(id) }
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Server
- 1
extend ActiveSupport::Autoload
- 1
eager_autoload do
- 1
autoload :Base
- 1
autoload :Broadcasting
- 1
autoload :Connections
- 1
autoload :Configuration
- 1
autoload :Worker
end
end
end
# frozen_string_literal: true
- 1
require "monitor"
- 1
module ActionCable
- 1
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but
# is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers.
#
# Also, this is the server instance used for broadcasting. See Broadcasting for more information.
- 1
class Base
- 1
include ActionCable::Server::Broadcasting
- 1
include ActionCable::Server::Connections
- 1
cattr_accessor :config, instance_accessor: false, default: ActionCable::Server::Configuration.new
- 1
attr_reader :config
- 1
def self.logger; config.logger; end
- 1
delegate :logger, to: :config
- 1
attr_reader :mutex
- 1
def initialize(config: self.class.config)
- 70
@config = config
- 70
@mutex = Monitor.new
- 70
@remote_connections = @event_loop = @worker_pool = @pubsub = nil
end
# Called by Rack to set up the server.
- 1
def call(env)
- 115
setup_heartbeat_timer
- 115
config.connection_class.call.new(self, env).process
end
# Disconnect all the connections identified by +identifiers+ on this server or any others via RemoteConnections.
- 1
def disconnect(identifiers)
remote_connections.where(identifiers).disconnect
end
- 1
def restart
- 4
connections.each do |connection|
- 2
connection.close(reason: ActionCable::INTERNAL[:disconnect_reasons][:server_restart])
end
- 4
@mutex.synchronize do
# Shutdown the worker pool
- 4
@worker_pool.halt if @worker_pool
- 4
@worker_pool = nil
# Shutdown the pub/sub adapter
- 4
@pubsub.shutdown if @pubsub
- 4
@pubsub = nil
end
end
# Gateway to RemoteConnections. See that class for details.
- 1
def remote_connections
- 2
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
- 1
def event_loop
- 380
@event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
# The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread.
# The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out
# at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>.
#
# Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool.
# Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database
# connections.
#
# Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe
# the database connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger
# database connection pool instead.
- 1
def worker_pool
- 123
@worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
# Adapter used for all streams/broadcasting.
- 1
def pubsub
- 501
@pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
end
# All of the identifiers applied to the connection class associated with this server.
- 1
def connection_identifiers
- 1
config.connection_class.call.identifiers
end
end
- 1
ActiveSupport.run_load_hooks(:action_cable, Base.config)
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Server
# Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these
# broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example:
#
# class WebNotificationsChannel < ApplicationCable::Channel
# def subscribed
# stream_from "web_notifications_#{current_user.id}"
# end
# end
#
# # Somewhere in your app this is called, perhaps from a NewCommentJob:
# ActionCable.server.broadcast \
# "web_notifications_1", { title: "New things!", body: "All that's fit for print" }
#
# # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications:
# App.cable.subscriptions.create "WebNotificationsChannel",
# received: (data) ->
# new Notification data['title'], body: data['body']
- 1
module Broadcasting
# Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded.
- 1
def broadcast(broadcasting, message, coder: ActiveSupport::JSON)
- 31
broadcaster_for(broadcasting, coder: coder).broadcast(message)
end
# Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that
# may need multiple spots to transmit to a specific broadcasting over and over.
- 1
def broadcaster_for(broadcasting, coder: ActiveSupport::JSON)
- 33
Broadcaster.new(self, String(broadcasting), coder: coder)
end
- 1
private
- 1
class Broadcaster
- 1
attr_reader :server, :broadcasting, :coder
- 1
def initialize(server, broadcasting, coder:)
- 33
@server, @broadcasting, @coder = server, broadcasting, coder
end
- 1
def broadcast(message)
- 36
server.logger.debug { "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" }
- 32
payload = { broadcasting: broadcasting, message: message, coder: coder }
- 32
ActiveSupport::Notifications.instrument("broadcast.action_cable", payload) do
- 32
encoded = coder ? coder.encode(message) : message
- 32
server.pubsub.broadcast broadcasting, encoded
end
end
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Server
# An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration
# in a Rails config initializer.
- 1
class Configuration
- 1
attr_accessor :logger, :log_tags
- 1
attr_accessor :connection_class, :worker_pool_size
- 1
attr_accessor :disable_request_forgery_protection, :allowed_request_origins, :allow_same_origin_as_host
- 1
attr_accessor :cable, :url, :mount_path
- 1
def initialize
- 4
@log_tags = []
- 120
@connection_class = -> { ActionCable::Connection::Base }
- 4
@worker_pool_size = 4
- 4
@disable_request_forgery_protection = false
- 4
@allow_same_origin_as_host = true
end
# Returns constant of subscription adapter specified in config/cable.yml.
# If the adapter cannot be found, this will default to the Redis adapter.
# Also makes sure proper dependencies are required.
- 1
def pubsub_adapter
- 68
adapter = (cable.fetch("adapter") { "redis" })
# Require the adapter itself and give useful feedback about
# 1. Missing adapter gems and
# 2. Adapter gems' missing dependencies.
- 68
path_to_adapter = "action_cable/subscription_adapter/#{adapter}"
- 68
begin
- 68
require path_to_adapter
rescue LoadError => e
# We couldn't require the adapter itself. Raise an exception that
# points out config typos and missing gems.
if e.path == path_to_adapter
# We can assume that a non-builtin adapter was specified, so it's
# either misspelled or missing from Gemfile.
raise e.class, "Could not load the '#{adapter}' Action Cable pubsub adapter. Ensure that the adapter is spelled correctly in config/cable.yml and that you've added the necessary adapter gem to your Gemfile.", e.backtrace
# Bubbled up from the adapter require. Prefix the exception message
# with some guidance about how to address it and reraise.
else
raise e.class, "Error loading the '#{adapter}' Action Cable pubsub adapter. Missing a gem it depends on? #{e.message}", e.backtrace
end
end
- 68
adapter = adapter.camelize
- 68
adapter = "PostgreSQL" if adapter == "Postgresql"
- 68
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Server
# Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so
# you can't use this collection as a full list of all of the connections established against your application. Instead, use RemoteConnections for that.
- 1
module Connections # :nodoc:
- 1
BEAT_INTERVAL = 3
- 1
def connections
- 295
@connections ||= []
end
- 1
def add_connection(connection)
- 149
connections << connection
end
- 1
def remove_connection(connection)
- 117
connections.delete connection
end
# WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
# then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically
# disconnect.
- 1
def setup_heartbeat_timer
- 115
@heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do
- 40
event_loop.post { connections.map(&:beat) }
end
end
- 1
def open_connections_statistics
connections.map(&:statistics)
end
end
end
end
# frozen_string_literal: true
- 1
require "active_support/callbacks"
- 1
require "active_support/core_ext/module/attribute_accessors_per_thread"
- 1
require "action_cable/server/worker/active_record_connection_management"
- 1
require "concurrent"
- 1
module ActionCable
- 1
module Server
# Worker used by Server.send_async to do connection work in threads.
- 1
class Worker # :nodoc:
- 1
include ActiveSupport::Callbacks
- 1
thread_mattr_accessor :connection
- 1
define_callbacks :work
- 1
include ActiveRecordConnectionManagement
- 1
attr_reader :executor
- 1
def initialize(max_size: 5)
- 44
@executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
# Stop processing work: any work that has not already started
# running will be discarded from the queue
- 1
def halt
- 1
@executor.shutdown
end
- 1
def stopping?
@executor.shuttingdown?
end
- 1
def work(connection)
- 571
self.connection = connection
- 571
run_callbacks :work do
- 571
yield
end
ensure
- 571
self.connection = nil
end
- 1
def async_exec(receiver, *args, connection:, &block)
async_invoke receiver, :instance_exec, *args, connection: connection, &block
end
- 1
def async_invoke(receiver, method, *args, connection: receiver, &block)
- 569
@executor.post do
- 569
invoke(receiver, method, *args, connection: connection, &block)
end
end
- 1
def invoke(receiver, method, *args, connection:, &block)
- 571
work(connection) do
- 571
receiver.send method, *args, &block
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
end
- 1
private
- 1
def logger
ActionCable.server.logger
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module Server
- 1
class Worker
- 1
module ActiveRecordConnectionManagement
- 1
extend ActiveSupport::Concern
- 1
included do
- 1
if defined?(ActiveRecord::Base)
- 1
set_callback :work, :around, :with_database_connections
end
end
- 1
def with_database_connections
- 1142
connection.logger.tag(ActiveRecord::Base.logger) { yield }
end
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module SubscriptionAdapter
- 1
extend ActiveSupport::Autoload
- 1
autoload :Base
- 1
autoload :Test
- 1
autoload :SubscriberMap
- 1
autoload :ChannelPrefix
end
end
# frozen_string_literal: true
- 1
require "action_cable/subscription_adapter/inline"
- 1
module ActionCable
- 1
module SubscriptionAdapter
- 1
class Async < Inline # :nodoc:
- 1
private
- 1
def new_subscriber_map
- 42
AsyncSubscriberMap.new(server.event_loop)
end
- 1
class AsyncSubscriberMap < SubscriberMap
- 1
def initialize(event_loop)
- 42
@event_loop = event_loop
- 42
super()
end
- 1
def add_subscriber(*)
- 282
@event_loop.post { super }
end
- 1
def invoke_callback(*)
- 248
@event_loop.post { super }
end
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module SubscriptionAdapter
- 1
class Base
- 1
attr_reader :logger, :server
- 1
def initialize(server)
- 348
@server = server
- 348
@logger = @server.logger
end
- 1
def broadcast(channel, payload)
- 1
raise NotImplementedError
end
- 1
def subscribe(channel, message_callback, success_callback = nil)
- 1
raise NotImplementedError
end
- 1
def unsubscribe(channel, message_callback)
- 1
raise NotImplementedError
end
- 1
def shutdown
raise NotImplementedError
end
- 1
def identifier
- 60
@server.config.cable[:id] ||= "ActionCable-PID-#{$$}"
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module SubscriptionAdapter
- 1
module ChannelPrefix # :nodoc:
- 1
def broadcast(channel, payload)
- 42
channel = channel_with_prefix(channel)
- 42
super
end
- 1
def subscribe(channel, callback, success_callback = nil)
- 39
channel = channel_with_prefix(channel)
- 39
super
end
- 1
def unsubscribe(channel, callback)
- 39
channel = channel_with_prefix(channel)
- 39
super
end
- 1
private
# Returns the channel name, including channel_prefix specified in cable.yml
- 1
def channel_with_prefix(channel)
- 120
[@server.config.cable[:channel_prefix], channel].compact.join(":")
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module SubscriptionAdapter
- 1
class Inline < Base # :nodoc:
- 1
def initialize(*)
- 262
super
- 262
@subscriber_map = nil
end
- 1
def broadcast(channel, payload)
- 76
subscriber_map.broadcast(channel, payload)
end
- 1
def subscribe(channel, callback, success_callback = nil)
- 152
subscriber_map.add_subscriber(channel, callback, success_callback)
end
- 1
def unsubscribe(channel, callback)
- 147
subscriber_map.remove_subscriber(channel, callback)
end
- 1
def shutdown
# nothing to do
end
- 1
private
- 1
def subscriber_map
- 425
@subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
end
- 1
def new_subscriber_map
- 8
SubscriberMap.new
end
end
end
end
# frozen_string_literal: true
gem "pg", "~> 1.1"
require "pg"
require "thread"
require "digest/sha1"
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
prepend ChannelPrefix
def initialize(*)
super
@listener = nil
end
def broadcast(channel, payload)
with_broadcast_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel_identifier(channel))}, '#{pg_conn.escape_string(payload)}'")
end
end
def subscribe(channel, callback, success_callback = nil)
listener.add_subscriber(channel_identifier(channel), callback, success_callback)
end
def unsubscribe(channel, callback)
listener.remove_subscriber(channel_identifier(channel), callback)
end
def shutdown
listener.shutdown
end
def with_subscriptions_connection(&block) # :nodoc:
ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn|
# Action Cable is taking ownership over this database connection, and
# will perform the necessary cleanup tasks
ActiveRecord::Base.connection_pool.remove(conn)
end
pg_conn = ar_conn.raw_connection
verify!(pg_conn)
pg_conn.exec("SET application_name = #{pg_conn.escape_identifier(identifier)}")
yield pg_conn
ensure
ar_conn.disconnect!
end
def with_broadcast_connection(&block) # :nodoc:
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
pg_conn = ar_conn.raw_connection
verify!(pg_conn)
yield pg_conn
end
end
private
def channel_identifier(channel)
channel.size > 63 ? Digest::SHA1.hexdigest(channel) : channel
end
def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def verify!(pg_conn)
unless pg_conn.is_a?(PG::Connection)
raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter"
end
end
class Listener < SubscriberMap
def initialize(adapter, event_loop)
super()
@adapter = adapter
@event_loop = event_loop
@queue = Queue.new
@thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
end
def listen
@adapter.with_subscriptions_connection do |pg_conn|
catch :shutdown do
loop do
until @queue.empty?
action, channel, callback = @queue.pop(true)
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
@event_loop.post(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
throw :shutdown
end
end
pg_conn.wait_for_notify(1) do |chan, pid, message|
broadcast(chan, message)
end
end
end
end
end
def shutdown
@queue.push([:shutdown])
Thread.pass while @thread.alive?
end
def add_channel(channel, on_success)
@queue.push([:listen, channel, on_success])
end
def remove_channel(channel)
@queue.push([:unlisten, channel])
end
def invoke_callback(*)
@event_loop.post { super }
end
end
end
end
end
# frozen_string_literal: true
- 1
require "thread"
- 1
gem "redis", ">= 3", "< 5"
- 1
require "redis"
- 1
require "active_support/core_ext/hash/except"
- 1
module ActionCable
- 1
module SubscriptionAdapter
- 1
class Redis < Base # :nodoc:
- 1
prepend ChannelPrefix
# Overwrite this factory method for Redis connections if you want to use a different Redis library than the redis gem.
# This is needed, for example, when using Makara proxies for distributed Redis.
- 1
cattr_accessor :redis_connector, default: ->(config) do
- 60
::Redis.new(config.except(:adapter, :channel_prefix))
end
- 1
def initialize(*)
- 63
super
- 63
@listener = nil
- 63
@redis_connection_for_broadcasts = nil
end
- 1
def broadcast(channel, payload)
- 42
redis_connection_for_broadcasts.publish(channel, payload)
end
- 1
def subscribe(channel, callback, success_callback = nil)
- 39
listener.add_subscriber(channel, callback, success_callback)
end
- 1
def unsubscribe(channel, callback)
- 39
listener.remove_subscriber(channel, callback)
end
- 1
def shutdown
- 54
@listener.shutdown if @listener
end
- 1
def redis_connection_for_subscriptions
- 30
redis_connection
end
- 1
private
- 1
def listener
- 108
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
- 1
def redis_connection_for_broadcasts
- 42
@redis_connection_for_broadcasts || @server.mutex.synchronize do
- 27
@redis_connection_for_broadcasts ||= redis_connection
end
end
- 1
def redis_connection
- 60
self.class.redis_connector.call(@server.config.cable.merge(id: identifier))
end
- 1
class Listener < SubscriberMap
- 1
def initialize(adapter, event_loop)
- 30
super()
- 30
@adapter = adapter
- 30
@event_loop = event_loop
- 96
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
- 30
@subscription_lock = Mutex.new
- 30
@raw_client = nil
- 30
@when_connected = []
- 30
@thread = nil
end
- 1
def listen(conn)
- 30
conn.without_reconnect do
- 30
original_client = conn.respond_to?(:_client) ? conn._client : conn.client
- 30
conn.subscribe("_action_cable_internal") do |on|
- 30
on.subscribe do |chan, count|
- 66
@subscription_lock.synchronize do
- 66
if count == 1
- 30
@raw_client = original_client
- 30
until @when_connected.empty?
- 30
@when_connected.shift.call
end
end
- 66
if callbacks = @subscribe_callbacks[chan]
- 66
next_callback = callbacks.shift
- 66
@event_loop.post(&next_callback) if next_callback
- 66
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
end
- 30
on.message do |chan, message|
- 36
broadcast(chan, message)
end
- 30
on.unsubscribe do |chan, count|
- 63
if count == 0
- 27
@subscription_lock.synchronize do
- 27
@raw_client = nil
end
end
end
end
end
end
- 1
def shutdown
- 27
@subscription_lock.synchronize do
- 27
return if @thread.nil?
- 27
when_connected do
- 27
send_command("unsubscribe")
- 27
@raw_client = nil
end
end
- 27
Thread.pass while @thread.alive?
end
- 1
def add_channel(channel, on_success)
- 36
@subscription_lock.synchronize do
- 36
ensure_listener_running
- 36
@subscribe_callbacks[channel] << on_success
- 72
when_connected { send_command("subscribe", channel) }
end
end
- 1
def remove_channel(channel)
- 36
@subscription_lock.synchronize do
- 72
when_connected { send_command("unsubscribe", channel) }
end
end
- 1
def invoke_callback(*)
- 78
@event_loop.post { super }
end
- 1
private
- 1
def ensure_listener_running
- 36
@thread ||= Thread.new do
- 30
Thread.current.abort_on_exception = true
- 30
conn = @adapter.redis_connection_for_subscriptions
- 30
listen conn
end
end
- 1
def when_connected(&block)
- 99
if @raw_client
- 69
block.call
else
- 30
@when_connected << block
end
end
- 1
def send_command(*command)
- 99
@raw_client.write(command)
- 99
very_raw_connection =
@raw_client.connection.instance_variable_defined?(:@connection) &&
@raw_client.connection.instance_variable_get(:@connection)
- 99
if very_raw_connection && very_raw_connection.respond_to?(:flush)
- 33
very_raw_connection.flush
end
end
end
end
end
end
# frozen_string_literal: true
- 1
module ActionCable
- 1
module SubscriptionAdapter
- 1
class SubscriberMap
- 1
def initialize
- 206
@subscribers = Hash.new { |h, k| h[k] = [] }
- 81
@sync = Mutex.new
end
- 1
def add_subscriber(channel, subscriber, on_success)
- 191
@sync.synchronize do
- 191
new_channel = !@subscribers.key?(channel)
- 191
@subscribers[channel] << subscriber
- 191
if new_channel
- 77
add_channel channel, on_success
- 114
elsif on_success
- 114
on_success.call
end
end
end
- 1
def remove_subscriber(channel, subscriber)
- 186
@sync.synchronize do
- 186
@subscribers[channel].delete(subscriber)
- 186
if @subscribers[channel].empty?
- 119
@subscribers.delete channel
- 119
remove_channel channel
end
end
end
- 1
def broadcast(channel, message)
- 113
list = @sync.synchronize do
- 113
return if !@subscribers.key?(channel)
- 78
@subscribers[channel].dup
end
- 78
list.each do |subscriber|
- 174
invoke_callback(subscriber, message)
end
end
- 1
def add_channel(channel, on_success)
- 41
on_success.call if on_success
end
- 1
def remove_channel(channel)
end
- 1
def invoke_callback(callback, message)
- 174
callback.call message
end
end
end
end
# frozen_string_literal: true
- 1
require_relative "async"
- 1
module ActionCable
- 1
module SubscriptionAdapter
# == Test adapter for Action Cable
#
# The test adapter should be used only in testing. Along with
# <tt>ActionCable::TestHelper</tt> it makes a great tool to test your Rails application.
#
# To use the test adapter set +adapter+ value to +test+ in your +config/cable.yml+ file.
#
# NOTE: Test adapter extends the <tt>ActionCable::SubscriptionsAdapter::Async</tt> adapter,
# so it could be used in system tests too.
- 1
class Test < Async
- 1
def broadcast(channel, payload)
- 40
broadcasts(channel) << payload
- 40
super
end
- 1
def broadcasts(channel)
- 72
channels_data[channel] ||= []
end
- 1
def clear_messages(channel)
- 9
channels_data[channel] = []
end
- 1
def clear
- 1
@channels_data = nil
end
- 1
private
- 1
def channels_data
- 81
@channels_data ||= {}
end
end
end
end
# frozen_string_literal: true
- 1
require "active_support/test_case"
- 1
module ActionCable
- 1
class TestCase < ActiveSupport::TestCase
- 1
include ActionCable::TestHelper
- 1
ActiveSupport.run_load_hooks(:action_cable_test_case, self)
end
end
# frozen_string_literal: true
- 1
module ActionCable
# Provides helper methods for testing Action Cable broadcasting
- 1
module TestHelper
- 1
def before_setup # :nodoc:
- 197
server = ActionCable.server
- 197
test_adapter = ActionCable::SubscriptionAdapter::Test.new(server)
- 197
@old_pubsub_adapter = server.pubsub
- 197
server.instance_variable_set(:@pubsub, test_adapter)
- 197
super
end
- 1
def after_teardown # :nodoc:
- 197
super
- 197
ActionCable.server.instance_variable_set(:@pubsub, @old_pubsub_adapter)
end
# Asserts that the number of broadcasted messages to the stream matches the given number.
#
# def test_broadcasts
# assert_broadcasts 'messages', 0
# ActionCable.server.broadcast 'messages', { text: 'hello' }
# assert_broadcasts 'messages', 1
# ActionCable.server.broadcast 'messages', { text: 'world' }
# assert_broadcasts 'messages', 2
# end
#
# If a block is passed, that block should cause the specified number of
# messages to be broadcasted.
#
# def test_broadcasts_again
# assert_broadcasts('messages', 1) do
# ActionCable.server.broadcast 'messages', { text: 'hello' }
# end
#
# assert_broadcasts('messages', 2) do
# ActionCable.server.broadcast 'messages', { text: 'hi' }
# ActionCable.server.broadcast 'messages', { text: 'how are you?' }
# end
# end
#
- 1
def assert_broadcasts(stream, number, &block)
- 9
if block_given?
- 6
original_count = broadcasts_size(stream)
- 6
assert_nothing_raised(&block)
- 6
new_count = broadcasts_size(stream)
- 6
actual_count = new_count - original_count
else
- 3
actual_count = broadcasts_size(stream)
end
- 9
assert_equal number, actual_count, "#{number} broadcasts to #{stream} expected, but #{actual_count} were sent"
end
# Asserts that no messages have been sent to the stream.
#
# def test_no_broadcasts
# assert_no_broadcasts 'messages'
# ActionCable.server.broadcast 'messages', { text: 'hi' }
# assert_broadcasts 'messages', 1
# end
#
# If a block is passed, that block should not cause any message to be sent.
#
# def test_broadcasts_again
# assert_no_broadcasts 'messages' do
# # No job messages should be sent from this block
# end
# end
#
# Note: This assertion is simply a shortcut for:
#
# assert_broadcasts 'messages', 0, &block
#
- 1
def assert_no_broadcasts(stream, &block)
- 3
assert_broadcasts stream, 0, &block
end
# Asserts that the specified message has been sent to the stream.
#
# def test_assert_transmitted_message
# ActionCable.server.broadcast 'messages', text: 'hello'
# assert_broadcast_on('messages', text: 'hello')
# end
#
# If a block is passed, that block should cause a message with the specified data to be sent.
#
# def test_assert_broadcast_on_again
# assert_broadcast_on('messages', text: 'hello') do
# ActionCable.server.broadcast 'messages', text: 'hello'
# end
# end
#
- 1
def assert_broadcast_on(stream, data, &block)
# Encode to JSON and backâwe want to use this value to compare
# with decoded JSON.
# Comparing JSON strings doesn't work due to the order if the keys.
- 7
serialized_msg =
ActiveSupport::JSON.decode(ActiveSupport::JSON.encode(data))
- 7
new_messages = broadcasts(stream)
- 7
if block_given?
- 4
old_messages = new_messages
- 4
clear_messages(stream)
- 4
assert_nothing_raised(&block)
- 4
new_messages = broadcasts(stream)
- 4
clear_messages(stream)
# Restore all sent messages
- 8
(old_messages + new_messages).each { |m| pubsub_adapter.broadcast(stream, m) }
end
- 15
message = new_messages.find { |msg| ActiveSupport::JSON.decode(msg) == serialized_msg }
- 7
assert message, "No messages sent with #{data} to #{stream}"
end
- 1
def pubsub_adapter # :nodoc:
- 38
ActionCable.server.pubsub
end
- 1
delegate :broadcasts, :clear_messages, to: :pubsub_adapter
- 1
private
- 1
def broadcasts_size(channel)
- 15
broadcasts(channel).size
end
end
end
# frozen_string_literal: true
- 1
require_relative "gem_version"
- 1
module ActionCable
# Returns the version of the currently loaded Action Cable as a <tt>Gem::Version</tt>
- 1
def self.version
gem_version
end
end
# frozen_string_literal: true
module Rails
module Generators
class ChannelGenerator < NamedBase
source_root File.expand_path("templates", __dir__)
argument :actions, type: :array, default: [], banner: "method method"
class_option :assets, type: :boolean
check_class_collision suffix: "Channel"
hook_for :test_framework
def create_channel_file
template "channel.rb", File.join("app/channels", class_path, "#{file_name}_channel.rb")
if options[:assets]
if behavior == :invoke
template "javascript/index.js", "app/javascript/channels/index.js"
template "javascript/consumer.js", "app/javascript/channels/consumer.js"
end
js_template "javascript/channel", File.join("app/javascript/channels", class_path, "#{file_name}_channel")
end
generate_application_cable_files
end
private
def file_name
@_file_name ||= super.sub(/_channel\z/i, "")
end
# FIXME: Change these files to symlinks once RubyGems 2.5.0 is required.
def generate_application_cable_files
return if behavior != :invoke
files = [
"application_cable/channel.rb",
"application_cable/connection.rb"
]
files.each do |name|
path = File.join("app/channels/", name)
template(name, path) if !File.exist?(path)
end
end
end
end
end
# frozen_string_literal: true
module TestUnit
module Generators
class ChannelGenerator < ::Rails::Generators::NamedBase
source_root File.expand_path("templates", __dir__)
check_class_collision suffix: "ChannelTest"
def create_test_files
template "channel_test.rb", File.join("test/channels", class_path, "#{file_name}_channel_test.rb")
end
private
def file_name # :doc:
@_file_name ||= super.sub(/_channel\z/i, "")
end
end
end
end