はじめに
現在、生成AIを利用したアプリケーションが増加しています。その多くはテキストを中心としたものですが、アプリケーションによっては音声や動画でのやり取りが必要となることもあります。これまで生成AIとの音声・動画のやりとりはストリーミングではなく一括での入出力が中心でしたが、この方式はリアルタイム性があまりないという課題がありました。本記事ではよりリアルタイムに音声をやり取りする方法として、双方向ストリームの実装を2つ紹介します。
- Multimodal Live APIを用いた実装
- Cloud Speech-to-TextとCloud Text-to-Speechを用いた実装
本記事のソースコードは以下のリポジトリで公開しており、本文中では一部を抜粋して紹介します。
Gemini Bidirectional Audio Streaming Sample
実装には以下のような技術スタックを用いています。
- フロントエンド:TypeScript、React(Remix)
- バックエンド:Python, uv
Multimodal Live APIを用いた実装
Multimodal Live APIは、Geminiに対して音声と動画を双方向に低レイテンシーで入出力できるAPIです。音声については入出力どちらもストリーミングすることができ、生成AIとのリアルタイムの音声会話を比較的簡単に実現することができます。Multimodal Live APIには以下のようなメリット・デメリットがあります。
- メリット
- リアルタイム、マルチモーダルなやり取りを比較的簡単に実現できる。
- 関数呼び出し、コード実行、検索、出力中の中断など豊富な機能を持っている。
- デメリット
gemini-2.0-flash-exp
モデルでのみ利用可能。- 処理を完全に制御することはできない。
このパートでは以下のステップでMultimodal Live APIを用いたリアルタイム音声会話の実装を紹介します。
- Step1:Text-to-Textストリーミング
- Step2:Text-to-Speechストリーミング
- Step3:Speech-to-Speechストリーミング
Step1:Text-to-Textストリーミング
Text-to-Textのストリーミングでは入力のテキストは一括で渡し、出力のテキストをストリーミングします。
バックエンド
Step1でバックエンドで必要となるのは以下の2つです。
- 生成AIとのやりとり
- WebSocketをリッスンするサーバ
バックエンドでは生成AIの呼び出しを行いますが、こちらはGoogle Gen AI SDKを用います。GeminiにはGemini APIとVertex AIのAPIの2つのAPIがありますが、Google Gen AI SDKを利用することでこの2つを統一的に利用することができます。今回はVertex AIを利用する設定で初期化します。
client = genai.Client(
http_options=HttpOptions(api_version="v1beta1"),
vertexai=True,
project=PROJECT_ID,
location="us-central1",
)
model_id = "gemini-2.0-flash-exp"
次にWebSocketのハンドラーを定義します。このハンドラーでは以下のことを行います。
- Multimodal Live APIとのセッションを作成
- WebSocketから一度だけテキストを取り出し、セッションに送信
- 生成結果を受け取る度にWebSocketで送信
async def handler(websocket):
async with client.aio.live.connect(
model=model_id,
config=LiveConnectConfig(response_modalities=[Modality.TEXT]),
) as session:
query = await websocket.recv()
print(f"Received: {query}")
await session.send(input=query, end_of_turn=True)
async for answer in session.receive():
if answer.text:
print(f"Received: {answer.text}")
await websocket.send(answer.text)
await websocket.close()
最後に上記のハンドラを用いてサーバを起動します。
async def main():
async with serve(handler, "localhost", 8765) as server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
フロントエンド
Step1でフロントエンドでは次が必要です。
- ユーザの入力
- WebSocketを用いた入出力のやり取り
- 生成AIの出力の表示
ロジックとしては以下の処理で実装できます。このうち、WebSocketに関しては
- 先ほど定義したサーバのエンドポイントに接続
- ユーザの入力を送信
- 回答を受け取るたびに
answer
を更新
を行います。
const [query, setQuery] = useState("")
const [answer, setAnswer] = useState("")
const onSend = () => {
setAnswer("")
const websocket = new WebSocket("ws://localhost:8765")
websocket.onopen = () => {
websocket.send(query)
}
websocket.onmessage = (event) => {
setAnswer((prev) => prev + event.data)
}
}
UIは最低限、入力欄、送信ボタン、出力を用意します。
return (
<div className="flex h-screen items-center justify-center">
<div className="flex flex-col items-center gap-16">
<div>
<div>
<label htmlFor="message">メッセージ</label>
</div>
<div>
<textarea
id="message"
rows={4}
cols={50}
onChange={(e) => setQuery(e.target.value)}
style={{
padding: "0.5rem",
border: "1px solid #ccc",
outline: "none",
boxShadow: "none",
}}
>
{query}
</textarea>
</div>
<div>
<button
type="button"
onClick={onSend}
style={{
border: "1px solid #ccc",
padding: "0.5rem 1rem",
}}
>
送信
</button>
</div>
</div>
<div>
<div>
<label htmlFor="answer">回答</label>
</div>
<div>
<textarea
id="answer"
rows={4}
cols={50}
value={answer}
readOnly
style={{
padding: "0.5rem",
border: "1px solid #ccc",
outline: "none",
boxShadow: "none",
}}
/>
</div>
</div>
</div>
</div>
);
デモ
Step2:Text-to-Speechストリーミング
Text-to-Speechのストリーミングでは入力のテキストを一括で渡す部分はそのままで、出力を音声のストリーミングにします。
バックエンド
Step2でのバックエンドでは以下が必要です。
- 音声を出力する設定
- Multimodal Live APIセッションから受け取った音声をWebSocketで送信
音声を出力する設定は以下になります。ここではresponse_modalities
をModality.AUDIO
に変更し、声を"Aoede"
に設定しています。
live_connect_config = LiveConnectConfig(
response_modalities=[Modality.AUDIO],
speech_config=SpeechConfig(
voice_config=VoiceConfig(
prebuilt_voice_config=PrebuiltVoiceConfig(
voice_name="Aoede",
)
)
),
)
この設定をMultimodal Live APIとの接続時に利用します。またレスポンスからは音声データを取り出し、WebSocketに書き込みます。
async def handler(websocket):
async with client.aio.live.connect(
model=model_id,
config=live_connect_config,
) as session:
query = await websocket.recv()
await session.send(input=query, end_of_turn=True)
async for response in session.receive():
if (
response.server_content.model_turn
and response.server_content.model_turn.parts
):
for part in response.server_content.model_turn.parts:
if part.inline_data:
await websocket.send(part.inline_data.data)
await websocket.close()
フロントエンド
Step2でフロントエンドではWebSocketで送られてくる音声データの連続再生が必要となります。ウェブブラウザ上で音声を再生する方法はいくつかありますが、リアルタイムでの再生が必要な場合にはWeb Audio APIを利用します。Web Audio APIでの処理はある程度複雑になりますが、今回は公式のMultimodal Live API Demoを参考にします。詳細は割愛しますが、このデモではLiveAudioOutputManagerというクラスを定義しており、playAudioChunkメソッドにArrayBufferで音声データを渡すことでスピーカーから再生されます。こちらのクラスを使って、Step1で定義したonSend関数を音声データを受け取る度にplayAudioChunkメソッドに渡すよう修正します。
const onSend = () => {
const manager = new LiveAudioOutputManager()
const websocket = new WebSocket("ws://localhost:8765")
websocket.onopen = () => {
websocket.send(query)
}
websocket.onmessage = async (event) => {
if (event.data instanceof Blob) {
manager.playAudioChunk(await event.data.arrayBuffer())
}
}
}
デモ
※音声が出ます。
Step3:Speech-to-Speechストリーミング
Speech-to-Speechのストリーミングでは入出力は両方音声であり、どちらもストリーミングでの送受信を行います
バックエンド
Step3のバックエンドでは以下が必要となります。
- WebSocketから音声データを受け取る度にMultimodal Live APIセッションに送信する
- 1.と並行してStep2の音声出力をWebSocketに送信する
今回は1.を非同期タスクとして実行するよう実装しました。
async def send():
async for data in websocket:
if data == "exit":
break
await session.send(input={
"media_chunks": [{
"data": data,
"mime_type": "audio/pcm",
}],
})
send_task = asyncio.create_task(send())
フロントエンド
Step3のフロントエンドでは以下が必要となります。
- マイクデバイスの選択
- マイクからの音声の取得とWebSocketへの送信
マイクデバイスの扱いについてはMedia Stream APIのMediaDevices
インターフェースを用います。入力音声の処理は、今回もリアルタイムでの処理が必要となるためWeb Audio APIを利用します。今回は公式デモのgetAvailableAudioInputs
関数とLiveAudioInputManager
クラスを利用します。getAvailableAudioInputs
関数はマイクデバイスの一覧を取得する関数です。またLiveAudioInputManager
クラスには以下のメソッドとフィールドがあります。
メソッド | 処理の内容 |
---|---|
updateMicrophoneDevice | 利用するマイクを更新する |
connectMicrophone | マイクからの音声取得を開始する |
disconnectMicrophone | マイクからの音声取得を終了する |
フィールド | 用途 |
---|---|
onNewAudioRecordingChunk | 処理が完了したチャンクが渡されるコールバック |
まずは各マネージャを初期化します。今回はコンポーネントがServer ComponentだったのでuseEffect内で初期化していますが、Client Componentの場合はuseRefの初期値で渡せるはずです。
const audioInputManagerRef = useRef<LiveAudioInputManager>()
const audioOutputManagerRef = useRef<LiveAudioOutputManager>()
useEffect(() => {
// クライアントサイドでの初期化が必要
audioInputManagerRef.current = new LiveAudioInputManager()
audioOutputManagerRef.current = new LiveAudioOutputManager()
}, [])
次にデバイス一覧を取得します。こちらもServer ComponentのためuseEffect内で初期化しています。
const [devices, setDevices] = useState<Device[]>([]);
useEffect(() => {
// クライアントサイドでの初期化が必要
getAvailableAudioInputs().then(setDevices);
}, []);
次にマイクを選択する処理を追加します。
const [selectedMic, setSelectedMic] = useState<string>("")
const newMicSelected = (deviceId: string) => {
setSelectedMic(deviceId)
audioInputManagerRef.current?.updateMicrophoneDevice(deviceId)
}
最後に録音の開始・終了処理を定義します。録音開始処理では以下を行います。
- WebSocket接続を作成
- 処理が完了したチャンクを送信するよう設定
- 音声データを受け取ったら再生する設定
- マイクから音声取得を開始
録音終了処理ではマイクからの音声取得を終了します。
const [isChatting, setIsChatting] = useState(false)
const websocketRef = useRef<WebSocket>()
const startAudioInput = () => {
setIsChatting(true)
websocketRef.current = new WebSocket("ws://localhost:8765")
websocketRef.current.onmessage = async (event: MessageEvent<Blob>) => {
audioOutputManagerRef.current?.playAudioChunk(await event.data.arrayBuffer())
}
if (audioInputManagerRef.current) {
audioInputManagerRef.current.onNewAudioRecordingChunk = (audioData: string) => {
websocketRef.current?.send(audioData)
}
audioInputManagerRef.current?.connectMicrophone();
}
}
const stopAudioInput = () => {
setIsChatting(false)
audioInputManagerRef.current?.disconnectMicrophone();
websocketRef.current?.send("exit")
}
UIとしては最低限、マイク選択と会話開始・終了を切り替えるボタンを用意します。
return (
<div className="flex h-screen items-center justify-center">
<div className="flex flex-col items-center gap-16">
<div>
<div>
<label htmlFor="mic">マイクを選択</label>
</div>
<div>
<select
id="mic"
value={selectedMic}
onChange={(e) => newMicSelected(e.target.value)}
style={{
padding: "0.5rem",
border: "1px solid #ccc",
outline: "none",
boxShadow: "none",
}}
>
<option value="">選択してください</option>
{devices.map((device) => (
<option key={device.id} value={device.id}>{device.name}</option>
))}
</select>
</div>
</div>
{selectedMic && (
<div>
<button
type="button"
onClick={isChatting ? stopAudioInput : startAudioInput}
style={{
padding: "0.5rem 1rem",
border: "1px solid #ccc",
}}
>
{isChatting ? "会話終了" : "会話開始"}
</button>
</div>
)}
</div>
</div>
);
デモ
※音声が出ます。
Cloud Speech-to-TextとCloud Text-to-Speechを用いた実装
Multimodal Live APIのパートでも触れましたが、Multimodal Live APIには処理を完全に制御することはできないというデメリットがありました。そのため、よりエージェンティックなアプリケーションを作りたい場合にはMultimodal Live APIは選択肢から外れることになります。そのような場合でもSpeech-to-Speechストリーミングを実現する方法として、ここではCloud Speech-to-TextとCloud Text-to-Speechを用いた実装を紹介します。Cloud Speech-to-Text、Cloud Text-to-Speechはそれぞれ音声からテキスト、テキストから音声への変換を行うGoogle Cloudのサービスです。どちらも入出力のストリーミングに対応しています。そのため、以下のような流れでSpeech-to-Speechのストリーミングを実現することができます。
- Cloud Speech-to-Textにユーザの音声をストリーミング入力しながらテキストに変換
- テキストをGeminiに渡し、テキスト出力をストリーミング生成
- 2.の結果をそのままCloud Text-to-Speechの入力にストリーミングしつつ、音声をストリーミング出力
このパートでは以下のステップでCloud Speech-to-TextとCloud Text-to-Speechを用いたリアルタイム音声会話の実装を紹介します。
- Step1:Text-to-Textストリーミング
- Step2:Text-to-Speechストリーミング
- Step3:Speech-to-Speechストリーミング
なおフロントエンド側の実装については、Multimodal Live APIを用いた実装と同じであるため省略します。
Step1:Text-to-Textストリーミング
バックエンド
Step1でバックエンドで必要となるのは以下の2つです。
- 生成AIとのやりとり
- WebSocketをリッスンするサーバ
後者についてはMultimodal Live APIの場合と同じのため割愛します。生成AIとのやりとりについては同様にGoogle Gen AI SDKを利用し、以下のようにします。
async def handler(websocket):
query = await websocket.recv()
for chunk in client.models.generate_content_stream(
model=model_id, contents=query
):
await websocket.send(chunk.text)
await websocket.close()
デモ
Step2:Text-to-Speechストリーミング
バックエンド
Step2ではCloud Text-to-Speechを利用して、テキストを音声にリアルタイムで変換する処理が必要になります。メインの実装については以下の通りです。ポイントとしてはこの関数はGeneratorを受け取りGeneratorを返す関数となっています。
def text_to_speech(
request_generator: Generator[texttospeech.StreamingSynthesizeRequest],
) -> Generator[bytes]:
streaming_config = texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
name="ja-JP-Chirp3-HD-Charon", language_code="ja-JP"
)
)
config_request = texttospeech.StreamingSynthesizeRequest(
streaming_config=streaming_config
)
streaming_responses = tts_client.streaming_synthesize(
itertools.chain([config_request], request_generator)
)
for response in streaming_responses:
yield response.audio_content
回答生成部分についても、text_to_speech関数に合わせる形でジェネレータ関数の形に修正します。ここではコードを疎結合にするためGenerator[str]を返す形にしています。
def answer(query: str) -> Generator[str]:
for chunk in genai_client.models.generate_content_stream(
model=model_id, contents=query
):
yield chunk.text
上記2つの関数を繋げるため、Generator[str]を受け取りGenerator[texttospeech.StreamingSynthesizeRequest]に変換する関数を作成しておきます。
def request_generator_from(
text_generator: Generator[str],
) -> Generator[texttospeech.StreamingSynthesizeRequest]:
for text in text_generator:
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text=text)
)
最後にここまでに定義した関数を順に呼び出し、音声のバイナリをフロントエンドに送信します。
text_generator = answer(query)
request_generator = request_generator_from(text_generator)
speech_generator = text_to_speech(request_generator)
for chunk in speech_generator:
await websocket.send(chunk)
デモ
※音声が出ます。
Step3:Speech-to-Speechストリーミング
バックエンド
Step3ではCloud Speech-to-Textを利用して、音声をテキストにリアルタイムで変換する処理が必要になります。メインの実装については以下の通りです。ポイントとして、この関数はGeneratorを受け取りますが、出力はstrになります。これは通常のGeminiのテキスト生成がストリーミング入力に対応していないためです。
def speech_to_text(request_generator: Generator[speech.StreamingRecognizeRequest]) -> str:
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=16000,
language_code="ja-JP",
)
streaming_config = speech.StreamingRecognitionConfig(config=config)
responses = speech_client.streaming_recognize(
config=streaming_config,
requests=request_generator,
)
transcript = ""
for response in responses:
for result in response.results:
for alternative in result.alternatives:
print(f"Transcript: {alternative.transcript}")
transcript += alternative.transcript
return transcript
WebSocketから音声を受け取る箇所についてもspeech_to_text関数に合わせる形でジェネレータ関数の形に修正します。ここではコードを疎結合にするためGenerator[str]を返す形にしています。
def text_generator_from(websocket) -> Generator[str]:
for chunk in websocket:
if chunk == "exit":
return
yield chunk
上記2つの関数を繋げるため、Generator[str]を受け取りGenerator[speech.StreamingRecognizeRequest]に変換する関数を作成しておきます。
def stt_request_generator_from(
text_generator: Generator[str],
) -> Generator[speech.StreamingRecognizeRequest]:
for text in text_generator:
yield speech.StreamingRecognizeRequest(audio_content=text)
最後にここまでに定義した関数を順に呼び出し、書き起こされたユーザ入力を取得します。
text_generator = text_generator_from(websocket)
stt_request_generator = stt_request_generator_from(text_generator)
query = speech_to_text(stt_request_generator)
デモ
※音声が出ます。
まとめ
本記事ではGeminiとの双方向音声ストリームを実現する2つの実装を紹介しました。それぞれのアプローチには特徴があり、用途に応じた選択が重要となります。本記事が、より自然なリアルタイム音声会話アプリの開発の助けになれば幸いです。
参考
Multimodal Live APIを用いた実装
- Multimodal Live API | Generative AI | Google Cloud
- Google Gen AI SDK | Generative AI | Google Cloud
- API reference – websockets 15.0.1 documentation
- The WebSocket API (WebSockets) – Web APIs | MDN
- generative-ai/gemini/multimodal-live-api at main · GoogleCloudPlatform/generative-ai
- Web Audio API – Web APIs | MDN
- MediaDevices – Web APIs | MDN
Cloud Speech-to-TextとCloud Text-to-Speechを用いた実装