こんにちは。
Rails8がいよいよ公式リリースされましたね。
色々試してますが、皆さんはいかがでしょうか?
今回は、Rails8で導入された新機能、Solid Queueを試してみたのでその内容をお伝えします。
Solid Queueとは
Solid Queueとは、Rails8でデフォルトとして導入された新機能の1つで、Active Jobのキューイングシステムを置き換えるものです。
Solid Queueは、RedisやSidekiqなどの外部のキューイングシステムに依存せず、Rails内部でキューイングを行うことができます。
詳しくはこちらの公式ドキュメントをご覧ください。
さあ乗り換えるぞ
さあやっていきましょう。
これまで、Sidekiqを使っていたので、Solid Queueに乗り換えてみます。
まずは、Gemfile
にsolid_queue
を追加します。
gem 'solid_queue'
次に、以下のコマンドを実行しましょう。
bin/rails solid_queue:install
そうすると、以下のようなファイルの更新だったり、新規作成が行われます。
module YourApp
class Application < Rails::Application
config.active_job.queue_adapter = :solid_queue
# ...
end
end
Rails.application.configure do
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }
# ...
end
default: &default
dispatchers:
- polling_interval: 1
batch_size: 500
workers:
- queues: "*"
threads: 3
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.1
development:
<<: *default
test:
<<: *default
production:
<<: *default
# production:
# periodic_cleanup:
# class: CleanSoftDeletedRecordsJob
# queue: background
# args: [ 1000, { batch_size: 500 } ]
# schedule: every hour
# periodic_command:
# command: "SoftDeletedRecord.due.delete_all"
# priority: 2
# schedule: at 5am every day
あとSchemaファイルが更新されるのですが、私のリポジトリではridgepoleというgemを使っているので、別々のschemaファイルを作成しています。
※ ridgepole使っていてもschemaファイルを使うこともできるのですが、多くのリポジトリではテーブルごとのファイルに分けていることが多いので、そのようにしています。
create_table :solid_queue_blocked_executions, force: :cascade do |t|
t.bigint :job_id, null: false
t.string :queue_name, null: false
t.integer :priority, default: 0, null: false
t.string :concurrency_key, null: false
t.datetime :expires_at, null: false
t.datetime :created_at, null: false
t.index [ :concurrency_key, :priority, :job_id ], name: "index_solid_queue_blocked_executions_for_release"
t.index [ :expires_at, :concurrency_key ], name: "index_solid_queue_blocked_executions_for_maintenance"
t.index [ :job_id ], name: "index_solid_queue_blocked_executions_on_job_id", unique: true
end
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
create_table :solid_queue_claimed_executions, force: :cascade do |t|
t.bigint :job_id, null: false
t.bigint :process_id
t.datetime :created_at, null: false
t.index [ :job_id ], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
t.index [ :process_id, :job_id ], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
end
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
create_table :solid_queue_failed_executions, force: :cascade do |t|
t.bigint :job_id, null: false
t.text :error
t.datetime :created_at, null: false
t.index [ :job_id ], name: "index_solid_queue_failed_executions_on_job_id", unique: true
end
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
create_table :solid_queue_jobs, force: :cascade do |t|
t.string :queue_name, null: false
t.string :class_name, null: false
t.text :arguments
t.integer :priority, default: 0, null: false
t.string :active_job_id
t.datetime :scheduled_at
t.datetime :finished_at
t.string :concurrency_key
t.datetime :created_at, null: false
t.datetime :updated_at, null: false
t.index [ :active_job_id ], name: "index_solid_queue_jobs_on_active_job_id"
t.index [ :class_name ], name: "index_solid_queue_jobs_on_class_name"
t.index [ :finished_at ], name: "index_solid_queue_jobs_on_finished_at"
t.index [ :queue_name, :finished_at ], name: "index_solid_queue_jobs_for_filtering"
t.index [ :scheduled_at, :finished_at ], name: "index_solid_queue_jobs_for_alerting"
end
create_table :solid_queue_pauses, force: :cascade do |t|
t.string :queue_name, null: false
t.datetime :created_at, null: false
t.index [ :queue_name ], name: "index_solid_queue_pauses_on_queue_name", unique: true
end
create_table :solid_queue_processes, force: :cascade do |t|
t.string :kind, null: false
t.datetime :last_heartbeat_at, null: false
t.bigint :supervisor_id
t.integer :pid, null: false
t.string :hostname
t.text :metadata
t.datetime :created_at, null: false
t.string :name, null: false
t.index [ :last_heartbeat_at ], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index [ :name, :supervisor_id ], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
t.index [ :supervisor_id ], name: "index_solid_queue_processes_on_supervisor_id"
end
create_table :solid_queue_ready_executions, force: :cascade do |t|
t.bigint :job_id, null: false
t.string :queue_name, null: false
t.integer :priority, default: 0, null: false
t.datetime :created_at, null: false
t.index [ :job_id ], name: "index_solid_queue_ready_executions_on_job_id", unique: true
t.index [ :priority, :job_id ], name: "index_solid_queue_poll_all"
t.index [ :queue_name, :priority, :job_id ], name: "index_solid_queue_poll_by_queue"
end
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
create_table :solid_queue_recurring_executions, force: :cascade do |t|
t.bigint :job_id, null: false
t.string :task_key, null: false
t.datetime :run_at, null: false
t.datetime :created_at, null: false
t.index [ :job_id ], name: "index_solid_queue_recurring_executions_on_job_id", unique: true
t.index [ :task_key, :run_at ], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
end
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
create_table :solid_queue_recurring_tasks, force: :cascade do |t|
t.string :key, null: false
t.string :schedule, null: false
t.string :command, limit: 2048
t.string :class_name
t.text :arguments
t.string :queue_name
t.integer :priority, default: 0
t.boolean :static, default: true, null: false
t.text :description
t.datetime :created_at, null: false
t.datetime :updated_at, null: false
t.index [ :key ], name: "index_solid_queue_recurring_tasks_on_key", unique: true
t.index [ :static ], name: "index_solid_queue_recurring_tasks_on_static"
end
create_table :solid_queue_scheduled_executions, force: :cascade do |t|
t.bigint :job_id, null: false
t.string :queue_name, null: false
t.integer :priority, default: 0, null: false
t.datetime :scheduled_at, null: false
t.datetime :created_at, null: false
t.index [ :job_id ], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true
t.index [ :scheduled_at, :priority, :job_id ], name: "index_solid_queue_dispatch_all"
end
add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
create_table :solid_queue_semaphores, force: :cascade do |t|
t.string :key, null: false
t.integer :value, default: 1, null: false
t.datetime :expires_at, null: false
t.datetime :created_at, null: false
t.datetime :updated_at, null: false
t.index [ :expires_at ], name: "index_solid_queue_semaphores_on_expires_at"
t.index [ :key, :value ], name: "index_solid_queue_semaphores_on_key_and_value"
t.index [ :key ], name: "index_solid_queue_semaphores_on_key", unique: true
end
長かったですが、これでSolid Queueの導入は完了です。
docker-compose.ymlの更新
もし、docker-compose.ymlを使っている場合は、以下のようにsolid_queue
を追加します。
version: '3.8'
x-app: &x-app
build:
context: .
volumes:
- .:/app
services:
# ...
solid_queue:
<<: *x-app
command: bundle exec rake solid_queue:start
tty: true
stdin_open: true
depends_on:
- db
# ...
このように、bundle exec rake solid_queue:start
を実行することで、Solid Queueを起動することができます。
キューイングしてみる
以下のようなSampleJob
を作成して、Solid Queueでキューイングされるか確誫してみましょう。
class SampleJob < ApplicationJob
queue_as :default
def perform(*args)
# Do something later
puts "I am a sample job"
end
end
bin/rails c
SampleJob.perform_later
これで、solid_queue
を実行しているコンテナのログにI am a sample job
と表示されれば成功です。
mission_control-jobs
の導入
おまけ: SidekiqのWeb UIのようなものが欲しいと思ったので、mission_control-jobs
を導入してみました。
Gemfile
にmission_control-jobs
を追加します。
gem 'mission_control-jobs'
で、bundle install
を実行した後、routesに以下のように追加します。
Rails.application.routes.draw do
# ...
mount MissionControl::Jobs::Engine => "/mission_control/jobs"
# ...
end
これで、/mission_control/jobs
にアクセスすることで、Solid Queueのジョブの状況を確認することができます。
個人的に、管理者としてログインしている場合にのみアクセスできるようにしたいので、以下のように設定することで親クラスを指定し、その親クラスでbefore_actionを設定することで実現できます。
module YourApp
class Application < Rails::Application
config.mission_control.jobs.base_controller_class = "Admins::BaseController"
# ...
end
end
class Admins::BaseController < ApplicationController
before_action :require_admin!
private
def require_admin!
redirect_to Rails.application.routes.url_helpers.new_admins_session_path unless admin_signed_in?
end
def current_admin
@current_admin ||= Admin.find_by(id: session[:admin_id])
end
helper_method :current_admin
def admin_signed_in?
current_admin.present?
end
end
1点注意点として、ちゃんと調べていないのですがrequire_admin!
メソッド内でredirect_to
を使うときには、Rails.application.routes.url_helpers
を使わないとエラーが出るので注意してください。
今回はこのあたりで。