Real-Time Command Line Applications with Action Cable and Thor
If you build a Rails application that has any kind of real-time feature, chances are you use Action Cable.
Action Cable allows you to build nice things such as feeds that automatically refresh as new content is published, or editors that display a list of users currently working on a document. Under the hood, it uses Websockets to stream changes to clients as they happen.
The most commonly used client is, of course, the web browser. But that doesn’t mean you can’t leverage Action Cable when using other kinds of clients - such as command line applications.
Imagine a command line client that triggers some long-running job on the server. Wouldn’t it be nice to give users live updates on how that job is advancing?
In this guide, I’ll show you how to build exactly that. We’ll create a command line app that connects to an Action Cable server, triggers a lengthy background job, and then displays live updates about its progress.
A (moving?) picture tells more than a thousand words.
This is a long post. If you have no patience for words, you can find the source code of the result on GitHub.
The Server
To start things off let’s create a new Rails application. We don’t need most of Rails’ functionality in this guide, so we can skip a lot of things.
rails new actioncable-cli \
--skip-action-mailer \
--skip-action-mailbox \
--skip-action-text \
--skip-active-job \
--skip-active-record \
--skip-active-storage \
--skip-javascript \
--skip-jbuilder \
--skip-spring \
--skip-test \
--skip-system-test \
--skip-webpack-install \
--skip-turbolinks
We will create our command line app later. First, we have to make some changes to the Action cable connection. Usually, clients provide information about the currently logged-in user, for example through session cookies, which then serves as a connection identifier. See the official connection docs.
Our command line app offers no such thing. We could add some sort of authentication mechanism, but to keep things simple we won’t. We will use a simple UUID to identify connections.
Open and modify app/channels/application_cable/connection.rb
:
module ApplicationCable
class Connection < ActionCable::Connection::Base
identified_by :client_id
def connect
self.client_id = request.params[:client_id]
end
end
end
Next, create a worker channel, through which we’ll later publish updates. Create a new file app/channels/worker_channel.rb
:
class WorkerChannel < ApplicationCable::Channel
def subscribed
stream_for "client_#{client_id}"
end
def unsubscribed
stop_all_streams
end
end
Because we’ll be connecting from the command line, we’ll have to disable some security measures Rails enables by default. Uncomment this line in development.rb
:
config.action_cable.disable_request_forgery_protection = true
Now that we have the connection and channel set up, let’s create a background job. We’ll be using Sidekiq, so add this to your Gemfile:
gem 'sidekiq', '~> 6.1'
We must also make sure that Redis is up and running because Sidekiq relies on that for managing background workers. If you use docker-compose
, add the following to docker-compose.yml
:
version: "3"
services:
actioncable-cli-redis:
image: redis
container_name: redis
ports:
- 6379:6379
Next, create a new worker in app/workers
. It won’t be doing any actual work, mostly it will be taking a nap.
class Worker
include Sidekiq::Worker
def perform
steps = 5
(1..steps).each do |progress|
sleep(rand(1..3))
Sidekiq.logger.info("Step #{progress} for client")
end
end
end
To offer a way to start the background job we just created, add a new controller with the following contents:
class WorkersController < ApplicationController
def start
Worker.perform_async
head(:ok)
end
end
Don’t forget to also add a new route to your routes.rb
!
get '/workers/start', to: 'workers#start'
This is a good point to stop and check how badly broken everything is :crossed_fingers:
Start your Rails app, Sidekiq, and start the worker. If all is in order, you should see your worker writing to the Sidekiq logs.
rails start
bundle exec sidekiq
# Send a request to trigger the worker
curl "http://localhost:3000/workers/start"
Success? Then on to the next part.
The Command Line App
Our command line application will offer just a single command - one that starts the worker. Thor is a simple way to create command line apps, and it’s bundled with Rails, so we’ll be using that to implement that command.
Create worker.thor
in your lib/tasks
directory:
require 'thor'
class Worker < Thor
include Thor::Actions
desc 'start', 'Start a worker process'
def start
puts 'Hello there!'
end
end
You can test your command using bundle exec thor worker:start
.
To receive live updates using Websockets we’ll need a Websocket client. I used async-websocket. Add it to your Gemfile:
gem 'async-websocket', '~> 0.17'
Then update your command to connect to the server. Note that we generate a UUID to identify the connection. Remember that we adapted the Action Cable connection to make use of this client_id
.
require 'thor'
require 'securerandom'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'
class Worker < Thor
include Thor::Actions
desc 'start', 'Start a worker process'
def start
@client_id = SecureRandom.uuid
url = "ws://localhost:3000/cable?client_id=#{@client_id}"
Async do |_|
endpoint = Async::HTTP::Endpoint.parse(url)
Async::WebSocket::Client.connect(endpoint) do |connection|
while (message = connection.read)
puts message
end
end
end
end
end
Run the command and check the server logs. You should see that a connection has been established, and should start receiving ping messages on the command line.
$ bundle exec thor worker:start
{:type=>"welcome"}
{:type=>"ping", :message=>1617639988}
...
Now we need to subscribe to the worker channel. As soon as the subscription was confirmed, we are ready to receive messages. We can then start the worker.
Adapt the Thor command as follows:
require 'thor'
require 'securerandom'
require 'net/http'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'
class Worker < Thor
include Thor::Actions
desc 'start', 'Start a worker process'
def start
@client_id = SecureRandom.uuid
url = "ws://localhost:3000/cable?client_id=#{@client_id}"
Async do |_|
endpoint = Async::HTTP::Endpoint.parse(url)
Async::WebSocket::Client.connect(endpoint) do |connection|
while (message = connection.read)
on_receive(connection, message)
end
end
end
end
private
def on_receive(connection, message)
handle_connection_message(connection, message)
end
def handle_connection_message(connection, message)
type = message[:type]
case type
when 'welcome'
on_connected(connection)
when 'confirm_subscription'
on_subscribed
else
puts message
end
end
def on_connected(connection)
content = { command: 'subscribe', identifier: { channel: 'WorkerChannel' }.to_json }
connection.write(content)
connection.flush
end
def on_subscribed
Net::HTTP.start('localhost', 3000) do |http|
http.get("/workers/start?client_id=#{@client_id}")
end
end
end
All that is missing is to stream updates from the worker to the connected clients. We’ll have to make some small changes to our worker and worker controller:
class Worker
include Sidekiq::Worker
def perform(client_id)
steps = 5
WorkerChannel.broadcast_to("client_#{client_id}", type: :worker_started, total: steps)
(1..steps).each do |progress|
sleep(rand(1..3))
Sidekiq.logger.info("Step #{progress} for client #{client_id}")
WorkerChannel.broadcast_to("client_#{client_id}", type: :worker_progress, progress: progress)
end
WorkerChannel.broadcast_to("client_#{client_id}", type: :worker_done)
end
end
class WorkersController < ApplicationController
def start
Worker.perform_async(params[:client_id])
head(:ok)
end
end
Note that the worker uses the client_id
to publish messages to the correct clients. We publish messages when the worker has started, when there is progress, and when the worker has finished.
We’ll update the command line app to handle these messages. Let’s also add ruby_progressbar
so we can display the progress to the user.
Add this to your Gemfile.
gem 'ruby-progressbar', '~> 1.11'
Then update the Thor command once again. In the end, it should look like this:
require 'thor'
require 'securerandom'
require 'net/http'
require 'async'
require 'async/io/stream'
require 'async/http/endpoint'
require 'async/websocket/client'
require 'ruby-progressbar'
class Worker < Thor
include Thor::Actions
desc 'start', 'Start a worker process'
def start
@client_id = SecureRandom.uuid
url = "ws://localhost:3000/cable?client_id=#{@client_id}"
Async do |_|
endpoint = Async::HTTP::Endpoint.parse(url)
Async::WebSocket::Client.connect(endpoint) do |connection|
while (message = connection.read)
on_receive(connection, message)
end
end
end
end
private
def on_receive(connection, message)
if message[:type]
handle_connection_message(connection, message)
else
handle_channel_message(connection, message)
end
end
def handle_connection_message(connection, message)
type = message[:type]
case type
when 'welcome'
on_connected(connection)
when 'confirm_subscription'
on_subscribed
end
end
def handle_channel_message(connection, message)
message = message[:message]
type = message[:type]
case type
when 'worker_started'
total = message[:total]
@bar = ProgressBar.create(title: 'Worker Progress', total: total, format: '%t %B %c/%C %P%%')
when 'worker_progress'
@bar.increment
when 'worker_done'
connection.close
end
end
def on_connected(connection)
content = { command: 'subscribe', identifier: { channel: 'WorkerChannel' }.to_json }
connection.write(content)
connection.flush
end
def on_subscribed
Net::HTTP.start('localhost', 3000) do |http|
http.get("/workers/start?client_id=#{@client_id}")
end
end
end
The most important change here is the addition of handle_channel_message
where we handle the messages we receive from the worker to create and update the progress bar.
Before wrapping up, we need to make one final change. Update cable.yml
to use Redis in development. We need to do this so that our Sidekiq process knows about subscriptions made using the main process.
development:
adapter: redis
url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/0" } %>
The default mechanism for managing Action Cable connections in development is async
, which uses in-memory structures. These are accessible only by the current process. That is no good when multiple processes need to utilize the same connections.
Restart your Rails server and, for good measure, Sidekiq process if you haven’t already and run the worker command:
bundle exec thor worker:start
Just the beginning…
This guide is done, but the story of ActionCable and command line applications isn’t. Updating a progress bar is nice and all, but it is only scratching the surface.
There is much more to explore. How about streaming process logs live to clients? Or what about streaming user inputs directly to the server?
Anything is possible - you just have to try it! 👩🔬