RedisのPub/Subを使用したリアルタイム通知の実現

Hiromasa Kakutani

2025.5.28

はじめに

Sreake事業部のアプリケーションエンジニアの角谷です。

リアルタイム通信を実現する手段は様々ありますが、その一つにPub/Subがあります。 Pub/Subを実装する方法は様々ありますが、今回はRedisを活用したPub/Subの仕組みについて解説します。 また、実際にGolangとTypeScript/Next.jsを用いた簡易的な実装例を紹介します。

RedisのPub/Subとは?

Redisには、Pub/Sub(Publish/Subscribe) というメッセージング機能が組み込まれており、以下のような流れでリアルタイム通信を実現できます。

  • Publisher: 特定のチャネルにメッセージを送信
  • Subscriber: 特定のチャネルをサブスクライブして、メッセージの受信を待つ

Redisが仲介役となって、送信されたメッセージをすべての購読者に即座にブロードキャストするというモデルです。

Redis Pub/Sub の技術的背景

RedisのPub/Subは、基本的には 接続ベースの プッシュ通知方式 で動作しています。 具体的には、以下のような動作原理です。

  • SUBSCRIBE コマンドでRedisにコネクションを貼り、ブロッキングで待機。
  • PUBLISH コマンドが該当チャネルに送られると、Redisがそのチャネルを購読しているすべてのクライアントに即座に通知をプッシュ。

また、特徴としては以下のとおりです。

  • メッセージは揮発的(保存されない)
  • スケーラビリティは限定的(クラスタ構成時の取り扱いに注意)

Redis Pub/Sub のメリット・デメリット

メリット

  • 非常にシンプルなAPI(PUBLISH/SUBSCRIBEのみ)
  • 高速な通知処理(インメモリ処理のため低レイテンシ)
  • 外部のMQなしで実装できる手軽さ

デメリット

  • 永続化されない:過去メッセージは取得できない
  • 配信保証がない:購読していないタイミングのメッセージは失われる
  • スケール時の課題:Redis Cluster環境ではPub/Subが全ノードにまたがらない

Redis Pub/Sub のユースケース

これまで述べた通り、RedisのPub/Subは過去のメッセージを保持せず、subscriberが接続していなければメッセージを受信できません。つまり、「リアルタイムで接続しているクライアントに即時通知を届けたい」というケースに強みを発揮します。

具体的なシナリオを踏まえてユースケースを考えてみたいと思います。

Webアプリのリアルタイム通知

シナリオ

あるECサイトでは、管理者向けのダッシュボードに「新しい注文が入った」「在庫が減少した」などの通知をリアルタイムに表示する機能があります。これにより、運営チームは即座に対応できます。

Redis Pub/Sub が活きる理由:

  • 管理画面を開いているときだけ通知を受け取れれば十分
  • 過去の通知は別途DBに保存されている(履歴は後で閲覧可能)
  • WebSocketと組み合わせて、画面に即座に反映されるUXを実現可能

技術構成(例):

  • publisher: 注文処理完了時にRedisにpublish
  • subscriber: 管理画面のバックエンドが購読し、フロントエンドにWebSocketで中継

チャットの「既読前提の一時中継」

シナリオ: カスタマーサポートのチャット機能では、オペレーターとユーザーがWebアプリ上でやりとりを行います。リアルタイムでの会話体験が重要です。ただし、チャット内容はDBに保存されており、後から閲覧可能です。

Redis Pub/Sub が活きる理由:

  • 双方がチャット画面を開いている間にだけ、即時反映されればよい
  • 未接続時のメッセージはDBに保存されており、履歴から再表示可能
  • Pub/Subの軽量性により、瞬時にやりとりを中継可能

技術構成(例):

  • publisher: ユーザーがメッセージを送信すると、Redisにpublish
  • subscriber: 接続中の相手のサーバーがsubscribeして受信、フロントに転送

ステータス監視・ジョブの一時通知

シナリオ: 画像生成サービスにおいて、バックエンドで重い処理を非同期に行う必要があります。処理中のステータス(開始、進行中、完了)をフロントエンドにリアルタイム表示することで、ユーザーは現在の状況を把握できます。

Redis Pub/Sub が活きる理由:

  • 処理ステータスの通知は“いま見ているユーザー”にだけ届けばよい
  • 履歴は不要(最終ステータスはDBで参照可能)
  • 非同期ジョブ→pub、WebSocket経由で即時にユーザーにフィードバック可能

技術構成(例):

  • publisher: ワーカーがジョブステータスの変更時にRedisへpublish
  • subscriber: クライアントごとに該当のジョブIDチャネルをsubscribe

実装例:「ステータス監視・ジョブの一時通知」

今回は、ステータス監視/ジョブの一時通知という観点で実装の例を紹介してみたいと思います。

想定ユースケース

画像生成ジョブのステータス(waitingrunningsuccess)を、クライアントにリアルタイム通知する。

システム構成

以下の3つの主要コンポーネントで構成します:

  • バックエンド(Golang)
    • フロントエンドからジョブの開始を受け取る
    • ジョブのステータスをRedisにpublishするAPIサーバー
    • RedisのsubscriberとしてWebSocket接続を介してクライアントに通知
  • フロントエンド(TypeScript + Vite + WebSocket)
    • ジョブの開始をバックエンドにPOST
    • ジョブIDを指定してWebSocket接続
    • ステータス変化をリアルタイムで受信して画面表示
  • Redis(Pub/Subを提供)

図で表すと以下のような感じになります。

フロントエンドのコード

まずはフロントエンドのコードを実装してみましょう。以下はコードの全体となります。

'use client'

import { useEffect, useState } from 'react'

export default function Home() {
  const [status, setStatus] = useState('待機中...')
  const [logs, setLogs] = useState<string[]>([])
  const [loading, setLoading] = useState(false)

  const jobId = 'abc123'
  const startJob = async () => {
    console.log('startJob')
    setLoading(true)
    await fetch(`http://localhost:8080/start-job?job_id=${jobId}`, {
      method: 'POST'
    })
    setLoading(false)
  }

  const startWebSocket = async () => {
    console.log('startWebSocket')
    const ws = new WebSocket(`ws://localhost:8080/ws?job_id=${jobId}`)
    ws.onmessage = (event) => {
      setStatus(event.data)
      setLogs(prev => [...prev, event.data])
    }
  }

  useEffect(() => {
    startWebSocket()
  }, [jobId])

  return (
    <main className="p-6">
      <h1 className="text-2xl font-bold mb-4">ジョブステータス</h1>
      <p className="mb-2">ジョブID: <strong>{jobId}</strong></p>
      <p className="mb-2">現在のステータス: <strong>{status}</strong></p>
      <button
        className="bg-blue-600 text-white px-4 py-2 rounded disabled:opacity-50"
        onClick={startJob}
        disabled={loading}
      >
        {loading ? '実行中...' : 'ジョブ開始'}
      </button>

      <h2 className="text-xl font-semibold mt-6 mb-2">ログ:</h2>
      <ul className="list-disc list-inside">
        {logs.map((msg, i) => <li key={i}>{msg}</li>)}
      </ul>
    </main>
  )
}

startJob はジョブの開始をバックエンドにリクエストする関数で、Goサーバーの /start-job エンドポイントに対してPOSTリクエストを送ります。 このリクエストによって、Go側では Redis に対してステータスのパブリッシュを開始する非同期処理がキックされます。

startWebSocket 関数では、WebSocketオブジェクトを作成し、Goサーバーの /ws エンドポイントに接続します。 クエリパラメータとして job_id を指定しており、バックエンドではこれをもとにRedisのサブスクライブ処理を行います。

const ws = new WebSocket(`ws://localhost:8080/ws?job_id=${jobId}`)
ws.onmessage = (event) => {
  setStatus(event.data)
  setLogs(prev => [...prev, event.data])
}

WebSocket経由で届くジョブのステータス(waiting, running, success )は status ステートに反映され、リアルタイムで UI に表示されるようになります。同時に logs 配列に蓄積され、履歴としても表示されます。

startWebSocketuseEffect により、コンポーネントの初回マウント時に実行されるようになっています。

バックエンドのコード

続いて、バックエンドのコードを見てみましょう。以下はコードの全体となります。

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/gin-contrib/cors"
	"github.com/gin-gonic/gin"
	"github.com/go-redis/redis/v8"
	"github.com/gorilla/websocket"
)

var (
	ctx         = context.Background()
	redisClient *redis.Client
	upgrader    = websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}
)

func main() {
	redisClient = redis.NewClient(&redis.Options{
		Addr: "redis:6379",
	})

	r := gin.Default()

	// CORS設定(Next.jsからのリクエストを許可)
	r.Use(cors.New(cors.Config{
		AllowOrigins:     []string{"<http://localhost:3000>"},
		AllowMethods:     []string{"GET", "POST"},
		AllowHeaders:     []string{"Origin", "Content-Type"},
		AllowCredentials: true,
	}))

	r.GET("/ws", handleWebSocket)
	r.POST("/start-job", handleStartJob)

	if err := r.Run(":8080"); err != nil {
		log.Fatal("サーバ起動失敗:", err)
	}
}

// WebSocket接続でRedisのPubSubを中継
func handleWebSocket(c *gin.Context) {
	jobID := c.Query("job_id")
	if jobID == "" {
		c.JSON(http.StatusBadRequest, gin.H{"error": "job_id is required"})
		return
	}

	conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
	if err != nil {
		log.Println("WebSocketアップグレード失敗:", err)
		return
	}
	defer conn.Close()

	channel := fmt.Sprintf("job-status:%s", jobID)
	sub := redisClient.Subscribe(ctx, channel)
	defer sub.Close()

	ch := sub.Channel()
	for msg := range ch {
		if err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload)); err != nil {
			log.Println("WebSocket送信エラー:", err)
			break
		}
	}
}

// RESTでジョブ処理を開始(擬似的なステータス変更付き)
func handleStartJob(c *gin.Context) {
	jobID := c.Query("job_id")
	if jobID == "" {
		c.JSON(http.StatusBadRequest, gin.H{"error": "job_id is required"})
		return
	}

	go func() {
		publishJobStatus(jobID, "waiting")
		time.Sleep(10 * time.Second)

		publishJobStatus(jobID, "running")
		time.Sleep(10 * time.Second)

		publishJobStatus(jobID, "success")
	}()

	c.JSON(http.StatusOK, gin.H{"status": "job started"})
}

func publishJobStatus(jobID, status string) {
	channel := fmt.Sprintf("job-status:%s", jobID)
	redisClient.Publish(ctx, channel, status)
}

ここではRedisクライアントを初期化しています。 今回はローカルでDocker / docker-compose を使用しており、そこで定義されたRedisのサービス名が “redis” であるため、ホスト名としてredis:6379を使用しています。

r := gin.Default()

...
r.GET("/ws", handleWebSocket)
r.POST("/start-job", handleStartJob)

HTTP Web フレームワークはginを使用しています。 ルーティングはWebSocket通信用の /ws と、ジョブ開始用の /start-job の2つのルートを定義しています。

続いて handleWebSocket を見ていきましょう。 この関数はPub/SubにおけるSubscriberの役割を担います。

jobID := c.Query("job_id")
if jobID == "" {
    c.JSON(http.StatusBadRequest, gin.H{"error": "job_id is required"})
    return
}

まず、クエリパラメータからジョブIDを受け取り、特定のジョブに対応した Redis チャネルを購読するための準備をします。

conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
	log.Println("WebSocketアップグレード失敗:", err)
	return
}
defer conn.Close()

WebSocketは、最初は通常のHTTPリクエストとして接続され、その後 “WebSocketプロトコル” にアップグレードされる という仕組みになっています。 このアップグレード処理は、クライアント側(この場合はブラウザ)からのリクエストヘッダーに Upgrade: websocket が含まれており、サーバー側がそれを確認・許可し、双方向通信が可能なWebSocket接続を確立するという流れになります。 今回は gorilla/websocket パッケージに含まれる Upgrader を使って、HTTP接続をWebSocketにアップグレードしています。

channel := fmt.Sprintf("job-status:%s", jobID)
sub := redisClient.Subscribe(ctx, channel)
defer sub.Close()

Redis の Subscribe を使って、指定されたジョブIDに紐づくチャネルを購読しています。

ch := sub.Channel()
for msg := range ch {
    if err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload)); err != nil {
        log.Println("WebSocket送信エラー:", err)
        break
    }
}

Redisからのメッセージを受け取るたびに、それをWebSocketでフロントエンドへ送信しています。 RedisのメッセージはGoのチャネル( <-chan *redis.Message )として取得されており、ここから、チャネルに発行された各メッセージがリアルタイムで流れてきます。 msg.Payload にはRedis側から送られてきた文字列(例:waiting, running, success)が入ります。

続いて handleStartJob を見ていきましょう。

go func() {
    publishJobStatus(jobID, "waiting")
    time.Sleep(10 * time.Second)

    publishJobStatus(jobID, "running")
    time.Sleep(10 * time.Second)

    publishJobStatus(jobID, "success")
}()

ジョブステータスを waiting → running → success と10秒ごとに変更し、その都度 Redis に Publish しています。 今回はあくまで仮の実装であるためSleepしているだけですが、実際の実装の際は画像生成処理等の時間のかかる処理を呼び出すことになります。

最後に publishJobStatus を見ていきましょう。

func publishJobStatus(jobID, status string) {
	channel := fmt.Sprintf("job-status:%s", jobID)
	redisClient.Publish(ctx, channel, status)
}

この関数はPub/SubにおけるPublisherの役割を担います。 指定された jobID に対応する Redis チャネルに対して、Publish メソッドを使って現在のジョブステータスを文字列として発行(Publish)しています。

実際の動き

では最後に実際の動作を見てみましょう。

このように、ジョブ開始後に waiting -> running -> success とステータスの更新通知を受け取ることが出来ました!

まとめ

RedisのPub/Sub機能を活用することによって、「いまアクティブなクライアント」に対して必要なタイミングだけリアルタイム通知を届ける仕組みが、最小構成かつ低コストで実現できます。 過去メッセージの再取得や配信保証が必要な場合は、Redis StreamsやKafkaなどの他のソリューションも検討する必要があります。しかし、シンプルなPub/Sub機能を低コストで実現したい場合には、Redis Pub/Subは有効な選択肢の一つとなるでしょう。

ブログ一覧へ戻る

お気軽にお問い合わせください

SREの設計・技術支援から、
SRE運用内で使用する
ツールの導入など、
SRE全般についてご支援しています。

資料請求・お問い合わせ