Amazon TranscribeをPython SDKで使ってみた

AWSの音声をテキストに変換(文字起こし)してくれるサービス Amazon Transcribeを使ってみたので備忘録に残しておく。TranscribeはS3にある音声ファイルを指定すると文字起こししてくれて、文字起こしの結果をS3にアップロードしてくれる。

処理の流れは以下のようにした。

  • STEP1: ローカルにある音声ファイルをS3にアップロード
  • STEP2: Transcribeに文字起こし処理を依頼
  • STEP3: 定期的に処理結果を確認
  • STEP4: 処理が完了したら処理結果のURLを取得
  • STEP5: 処理結果をS3からダウンロードし結果を表示 (Pyhon SDK)

基本的には前回のAzure Speech Servicesと同じ流れ。AWSはPython SDKが充実していてS3、Transcribeの処理どちらもSDKで実装できたのが良かった。

準備

S3バケットを作成

TranscribeではS3にアップロードされている音声ファイルを利用するためにS3バケットを作成する。まずはS3のページにアクセスし、右上の「バケットを作成」をクリック。

バケット名を入力する。今回は「speech.tetsis.com」にした。

そのまま「バケットを作成」をクリック。

バケット「speech.tetsis.com」が作成された。

IAMユーザーを作成

Python SDKでAWSのリソースを操作するためのIAMユーザーを作成する。まずはIAMユーザーページにアクセスし、「ユーザーを追加」をクリック。

ユーザー名は「speech-recognition」にした。「プログラムによるアクセス」をチェックし、「次のステップ:アクセス権限」をクリック。

「AmazonS3FullAccess」と「AmazonTranscribeFullAccess」をチェックして、「次のステップ:タグ」をクリック。

あとはそのまま進める。IAMユーザーが作成できたら「アクセスキーID」と「シークレットアクセスキー」が表示されるのでメモしておく。

awsコマンドをインストール

公式ページを参考にawsコマンドをインストールする。詳細な方法は割愛する。

awsコマンドで認証情報を設定

awsコマンドがインストールできたら認証情報を設定する。以下のコマンドを実行し、先ほどメモしておいた「アクセスキーID」と「シークレットアクセスキー」を入力する。

aws configure

Pythonコーディング

今回は以下の環境でPythonを実行する。

– OS: Windows 10
– Python 3.8.1
– pip 20.0.2

最初にPython SDKであるboto3をインストールしておく。

pip install boto3

今回は一つのファイル(aws_speech.py)で実装していく。汎用性の高い関数として実装した「関数コード」と、「main関数コード」の2パートに分けて紹介する。

まずはファイルの先頭にライブラリをインポートするimport文を記述する。

# -*- coding: utf-8 -*-
import os
import uuid
import argparse
import logging
import re
import time
import json

import boto3
from botocore.exceptions import ClientError

STEP1: ローカルにある音声ファイルをS3にアップロード

関数コード

この関数は公式ドキュメントを参考にした。

def upload_file(bucket_name, local_path, local_file_name):
    root_ext_pair = os.path.splitext(local_file_name)
    object_name = root_ext_pair[0] + str(uuid.uuid4()) + root_ext_pair[1] # S3バケット内で唯一のオブジェクト名になるようにランダム文字列(UUID)を挿入しておく
    upload_file_path = os.path.join(local_path, local_file_name)

    # Upload the file
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(upload_file_path, bucket_name, object_name)
    except ClientError as e:
        print(e)
        return None
    
    print("\nUploading " + local_file_name +" to S3 as object:\n\t" + object_name)
    return object_name

main関数コード

S3バケット名は環境変数で指定するようにした。音声ファイルと言語はプログラム実行時に引数で指定するようにした。

if __name__ == "__main__":
    bucket_name = os.getenv("S3_BUCKET_NAME")

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--file", type=argparse.FileType("r", encoding="UTF-8"), required=True)
    parser.add_argument("-l", "--locale", help="e.g. \"en-US\" or \"ja-JP\"", required=True)
    args = parser.parse_args()
    file_name = args.file.name
    locale = args.locale

    # STEP1: ローカルにある音声ファイルをS3にアップロード
    base_dir_pair = os.path.split(file_name)
    local_path = base_dir_pair[0]
    local_file_name = base_dir_pair[1]
    object_name = upload_file(bucket_name, local_path, local_file_name)

実行

実行前に環境変数にS3バケット名を設定する。テキスト変換させる音声は攻殻機動隊SAC26話の名セリフを使ってみる。Netflixで視聴できる。

> $env:S3_BUCKET_NAME = "speech.tetsis.com"
> python .\aws_speech.py -f .\草薙名セリフ.mp3 -l ja-JP

Uploading 草薙名セリフ.mp3 to S3 as object:
        草薙名セリフf6a85e99-8f22-4b95-82a0-77cfd3be80db.mp3

音声ファイルをS3にアップロードできた。実際にAWSコンソール画面を見ると音声ファイルがアップロードされているのが確認できる。

STEP2: Transcribeに文字起こし処理を依頼

関数コード

Transcribeで文字起こしを開始するためにはジョブ名を設定する必要がある。ジョブ名には文字数と種類の制約があるので、それに合わせるためS3にアップロードしたオブジェクト名を変換している。

また、OutputBucketNameには文字起こし結果を格納するS3バケットを指定できる。今回はSTEP1でアップロードしたバケットと同じバケットを使うこととにする。この関数は公式ドキュメントが参考になった。

def start_transcription_job(bucket_name, object_name, language_code):
    job_name = re.sub(r'[^a-zA-Z0-9._-]', '', object_name)[:199] # Transcribeのフォーマット制約に合わせる(最大200文字。利用可能文字は英大文字小文字、数字、ピリオド、アンダーバー、ハイフン)
    media_file_url = 's3://' + bucket_name + '/' + object_name

    client = boto3.client('transcribe')
    response = client.start_transcription_job(
        TranscriptionJobName=job_name,
        LanguageCode=language_code,
        Media={
        'MediaFileUri': media_file_url
        },
        OutputBucketName=bucket_name
    )

    print("\nTranscription start")
    return job_name

main関数コード

if __name__ == "__main__":
    # STEP1のコード(ここでは省略)

    # STEP2: Transcribeに文字起こし処理を依頼
    job_name = start_transcription_job(bucket_name, object_name, locale)

実行

> python .\aws_speech.py -f .\草薙名セリフ.mp3 -l ja-JP

Uploading 草薙名セリフ.mp3 to S3 as object:
        草薙名セリフ04bd9138-da2f-400c-9ea6-7650d524e34e.mp3

Transcription start

これで文字起こし処理を開始するところまでできた。AWSのコンソール画面からTranscribeページを見ると、処理が登録されていることが確認できる。Webブラウザから簡単に処理状態や文字起こし結果を確認できるのがAWSのいいところだと思う。

STEP3: 定期的に処理結果を確認

関数コード

get_transcription_jobのレスポンスのjsonフォーマットは公式ドキュメントを参考にした。

def get_transcription_status(job_name):
    client = boto3.client("transcribe")
    response = client.get_transcription_job(
        TranscriptionJobName=job_name
    )
    status = response["TranscriptionJob"]["TranscriptionJobStatus"]

    print("Transcription status:\t" + status)
    return status

main関数コード

if __name__ == "__main__":
    # STEP1のコード(ここでは省略)
    # STEP2のコード(ここでは省略)

    # STEP3: 定期的に処理結果を確認
    status = ""
    while status not in ["COMPLETED", "FAILED"]: # 処理が完了したら状態が"COMPLETED"、失敗したら"FAILED"になるからそれまでループ
        status = get_transcription_status(job_name) # 処理状態を取得
        time.sleep(3)

実行

> python .\aws_speech.py -f .\草薙名セリフ.mp3 -l ja-JP

Uploading 草薙名セリフ.mp3 to S3 as object:
        草薙名セリフ6d53dece-ea7c-4966-923a-e45687da42b0.mp3

Transcription start
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   COMPLETED

処理が完了するのを待つところまで実行できた。

STEP4: 処理が完了したら処理結果のURLを取得

関数コード

処理が完了すると、文字起こし結果がS3にアップロードされる。get_transcription_job関数では文字起こし結果を格納しているS3オブジェクトのURLを返してくれる。

def get_transcription_file_url(job_name):
    client = boto3.client("transcribe")
    response = client.get_transcription_job(
        TranscriptionJobName=job_name
    )
    file_url = response["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]

    print("\nFile URL:\n\t" + file_url)
    return file_url

main関数コード

if __name__ == "__main__":
    # STEP1のコード(ここでは省略)
    # STEP2のコード(ここでは省略)
    # STEP3のコード(ここでは省略)

    # STEP4: 処理が完了したら処理結果のURLを取得
    file_url = get_transcription_file_url(job_name)

実行

> python .\aws_speech.py -f .\草薙名セリフ.mp3 -l ja-JP

Uploading 草薙名セリフ.mp3 to S3 as object:
        草薙名セリフd3496aa1-11f8-4f90-aeb8-e4f705467a0d.mp3

Transcription start
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
...(中略)...
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   COMPLETED

File URL:
        https://s3.ap-northeast-1.amazonaws.com/speech.tetsis.com/d3496aa1-11f8-4f90-aeb8-e4f705467a0d.mp3.json

最後に表示されているURLが文字起こし結果が格納されているS3オブジェクト。

STEP5: 処理結果をS3からダウンロードし結果を表示

関数コード

download_file関数は公式ドキュメントを参考にした。ダウンロードしたjsonファイルのデータフォーマットも公式ドキュメントを参考にした。

def download_file(bucket_name, object_name):
    s3 = boto3.client('s3')
    s3.download_file(bucket_name, object_name, object_name)
    print("\nDownloading S3 object:\n\t" + object_name)

def get_transcript_from_file(file_name):
    with open(file_name, "r", encoding="utf-8") as f:
        df = json.load(f)

    transcript = ""
    for result in df["results"]["transcripts"]:
            transcript += result["transcript"] + "\n"
    
    print("\nTranscription result:\n" + transcript)
    return transcript

main関数コード

if __name__ == "__main__":
    # STEP1のコード(ここでは省略)
    # STEP2のコード(ここでは省略)
    # STEP3のコード(ここでは省略)
    # STEP4のコード(ここでは省略)

    # STEP5: 処理結果をS3からダウンロードし結果を表示 (Pyhon SDK)
    result_object_name = file_url[file_url.find(bucket_name) + len(bucket_name) + 1:]
    download_file(bucket_name, result_object_name) # ここでダウンロード
    get_transcript_from_file(result_object_name)

実行

> python .\aws_speech.py -f .\草薙名セリフ.mp3 -l ja-JP

Uploading 草薙名セリフ.mp3 to S3 as object:
        草薙名セリフ6aa4385a-f760-4c76-9b03-2cf015593a59.mp3

Transcription start
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
...(中略)...
Transcription status:   IN_PROGRESS
Transcription status:   IN_PROGRESS
Transcription status:   COMPLETED

File URL:
        https://s3.ap-northeast-1.amazonaws.com/speech.tetsis.com/6aa4385a-f760-4c76-9b03-2cf015593a59.mp3.json  

Downloading S3 object:
        6aa4385a-f760-4c76-9b03-2cf015593a59.mp3.json

Transcription result:
だ けど 私 は 情報 の 並列 化 の 果て に 後 を 取り戻す ため の 一つ の 可能 性 を 見つけ た わ ちなみに その 答え は 好奇 心 多分 ね

どのセリフを文字起こししたのか、まあ分かる、よね。単語ごと(?)にスペース区切りされているから少し見づらいけどほとんど正確に文字に変換してくれてる(「後」は本当は「個」。セリフの重要な部分なだけに残念)。「ちなみに その 答え は」は別の人のセリフなんだけどそれでもちゃんと認識しててすごい。

全コード

最後に今回のコードの全体を載せておく。

# -*- coding: utf-8 -*-
import os
import uuid
import argparse
import logging
import re
import time
import json

import boto3
from botocore.exceptions import ClientError

def upload_file(bucket_name, local_path, local_file_name):
    root_ext_pair = os.path.splitext(local_file_name)
    object_name = root_ext_pair[0] + str(uuid.uuid4()) + root_ext_pair[1] # S3バケット内で唯一のオブジェクト名になるようにランダム文字列(UUID)を挿入しておく
    upload_file_path = os.path.join(local_path, local_file_name)

    # Upload the file
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(upload_file_path, bucket_name, object_name)
    except ClientError as e:
        print(e)
        return None
    
    print("\nUploading " + local_file_name +" to S3 as object:\n\t" + object_name)
    return object_name

def download_file(bucket_name, object_name):
    s3 = boto3.client('s3')
    s3.download_file(bucket_name, object_name, object_name)
    print("\nDownloading S3 object:\n\t" + object_name)

def start_transcription_job(bucket_name, object_name, language_code):
    job_name = re.sub(r'[^a-zA-Z0-9._-]', '', object_name)[:199] # Transcribeのフォーマット制約に合わせる(最大200文字。利用可能文字は英大文字小文字、数字、ピリオド、アンダーバー、ハイフン)
    media_file_url = 's3://' + bucket_name + '/' + object_name

    client = boto3.client('transcribe')
    response = client.start_transcription_job(
        TranscriptionJobName=job_name,
        LanguageCode=language_code,
        Media={
        'MediaFileUri': media_file_url
        },
        OutputBucketName=bucket_name
    )

    print("\nTranscription start")
    return job_name

def get_transcription_status(job_name):
    client = boto3.client("transcribe")
    response = client.get_transcription_job(
        TranscriptionJobName=job_name
    )
    status = response["TranscriptionJob"]["TranscriptionJobStatus"]

    print("Transcription status:\t" + status)
    return status

def get_transcription_file_url(job_name):
    client = boto3.client("transcribe")
    response = client.get_transcription_job(
        TranscriptionJobName=job_name
    )
    file_url = response["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]

    print("\nFile URL:\n\t" + file_url)
    return file_url

def get_transcript_from_file(file_name):
    with open(file_name, "r", encoding="utf-8") as f:
        df = json.load(f)

    transcript = ""
    for result in df["results"]["transcripts"]:
            transcript += result["transcript"] + "\n"
    
    print("\nTranscription result:\n" + transcript)
    return transcript

if __name__ == "__main__":
    bucket_name = os.getenv("S3_BUCKET_NAME")

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--file", type=argparse.FileType("r", encoding="UTF-8"), required=True)
    parser.add_argument("-l", "--locale", help="e.g. \"en-US\" or \"ja-JP\"", required=True)
    args = parser.parse_args()
    file_name = args.file.name
    locale = args.locale

    # STEP1: ローカルにある音声ファイルをS3にアップロード
    base_dir_pair = os.path.split(file_name)
    local_path = base_dir_pair[0]
    local_file_name = base_dir_pair[1]
    object_name = upload_file(bucket_name, local_path, local_file_name)

    # STEP2: Transcribeに文字起こし処理を依頼
    job_name = start_transcription_job(bucket_name, object_name, locale)

    # STEP3: 定期的に処理結果を確認
    status = ""
    while status not in ["COMPLETED", "FAILED"]: # 処理が完了したら状態が"COMPLETED"、失敗したら"FAILED"になるからそれまでループ
        status = get_transcription_status(job_name) # 処理状態を取得
        time.sleep(3)

    # STEP4: 処理が完了したら処理結果のURLを取得
    file_url = get_transcription_file_url(job_name)

    # STEP5: 処理結果をS3からダウンロードし結果を表示 (Pyhon SDK)
    result_object_name = file_url[file_url.find(bucket_name) + len(bucket_name) + 1:]
    download_file(bucket_name, result_object_name) # ここでダウンロード
    get_transcript_from_file(result_object_name)

Azureの音声テキスト変換サービス(Speech Services – Speech to Text)をPythonで使ってみた

Azureの文字起こしサービスを使ってみたので、Azureのポータル画面での操作とPythonコードをまとめておく。

今回使うAzureのサービスは文字起こしの中でも「バッチ文字起こし」というもので、Blob Storageにアップロードした音声ファイルを文字起こししてくれるものである。文字起こしの結果もBlob Storageに保存される。

処理の流れをまとめると以下のようになる。この記事は以下の順番で書いている。

  • 準備
  • STEP1: ローカルにある音声ファイルをBlob Storageにアップロード (Python SDK)
  • STEP2: AzureのSpeech Servicesに文字起こし処理を依頼 (REST API)
  • STEP3: 定期的に処理結果を確認 (REST API)
  • STEP4: 処理が完了したら処理結果のURLを取得 (REST API)
  • STEP5: 処理結果をBlob Storageからダウンロードし結果を表示 (Pyhon SDK)

準備

Azureポータル画面でリソースの作成と認証情報の取得を行う。

Blob Storage(コンテナー)を作成

まずはストレージアカウントというコンテナ―の上位概念を作成。作成ページへの行き方はいろいろあると思うけど、ポータルのホーム画面から左上の三本線アイコンから「ストレージアカウント」をクリックするのが簡単。

ストレージアカウント名はAzure内でユニークでなければならない。今回は「tetsisstorageaccount」にしてみた。リソースグループは事前に作っておいた「SpeechRecognition」、場所は「東日本」にした。その他はデフォルト。

ストレージアカウントが作成できたらコンテナーを作成する。図の赤枠で囲った「コンテナー」をクリックする。

右側から「新しいコンテナー」モーダルが出てくる。今回は「speechcontainer」というコンテナー名にしてみた。

接続文字列を取得

ストレージアカウント画面の左メニューから「アクセスキー」をクリックすると下図のようなページが表示されるので、「接続文字列」をメモしておく。(このページはいつでも見にこれる。key1とkey2はどちらでもよい)

Speech Servicesを作成

音声認識サービスのリソースを作成する。作成ページに行くにはポータル画面の検索欄で「speech」と入力して表示される「Cognitive Services」をクリックするのが簡単。Cognitive ServicesはSpeech Servicesの上位概念。

下図の「追加」をクリックする。

Marketplaceの検索欄で「speech」と入力して表示される「音声」をクリックする。

「作成」ボタンをクリックする。

情報入力ページが表示される。名前は任意でよい。今回は「firstSpeech」にしてみた。場所は「東日本」。価格レベルは「SO」を選択する。今回利用するバッチ文字起こしはSOでないと利用できない。

サブスクリプションキーを取得

Cognitive Services画面の左メニューから「Keys and Endpoint」をクリックすると下図のようなページが表示されるので、サブスクリプションキーである「キー1」をメモしておく。 (このページはいつでも見にこれる。キー1とキー2はどちらでもよい)

Pythonコーディング

ブラウザ操作はこれで終わり。ようやくコーディングのお時間。今回は以下の環境で実行している。
– OS: Windows 10
– Python 3.8.1
– pip 20.0.2

以下のライブラリをインストールしておく。

pip install requests azure-storage-blob

今回は一つのPythonファイルでコマンドラインから実行する簡単な方法でやってみる(ファイル名はazure_speech.py)。

まずファイルの先頭に以下のimport文を記載する。

# -*- coding: utf-8 -*-
import os
import uuid
import argparse
from datetime import datetime, timedelta
import requests
import json
import time

from azure.storage.blob import BlobServiceClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions, ContainerSasPermissions
from azure.storage.blob import generate_account_sas, generate_container_sas

STEP1: ローカルにある音声ファイルをBlob Storageにアップロード (Python SDK)

関数コード

def upload_blob(connect_str, container_name, local_path, local_file_name):
    root_ext_pair = os.path.splitext(local_file_name)
    blob_name = root_ext_pair[0] + str(uuid.uuid4()) + root_ext_pair[1] # Blob Storage(コンテナー)内で唯一のBlob名になるようにランダム文字列(UUID)を挿入しておく
    upload_file_path = os.path.join(local_path, local_file_name)

    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

    print("\nUploading to Azure Storage as blob:\n\t" + blob_name)

    with open(upload_file_path, "rb") as data:
        blob_client.upload_blob(data)

    return blob_name

main関数コード

実行時に引数として①音声ファイルと②言語を指定するようにした。これでいろんな言語の音声ファイルで試しやすくなるはず。

if __name__ == "__main__":
    connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
    container_name = os.getenv("AZURE_STORAGE_CONTAINER_NAME")
    subscription_key = os.getenv("AZURE_SPEECH_SERVICE_SUBSCRIPTION_KEY")

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--file", type=argparse.FileType("r", encoding="UTF-8"), required=True)
    parser.add_argument("-l", "--locale", help="e.g. \"en-US\" or \"ja-JP\"", required=True)
    args = parser.parse_args()
    file_name = args.file.name
    locale = args.locale

    # STEP1: ローカルにある音声ファイルをBlob Storageにアップロード (Python SDK)
    base_dir_pair = os.path.split(file_name)
    local_path = base_dir_pair[0]
    local_file_name = base_dir_pair[1]
    blob_name = upload_blob(connect_str, container_name, local_path, local_file_name) # ここでアップロード

実行

最初に環境変数を設定してからPythonファイルを実行する。今回は環境変数の設定方法がWindows仕様。Mac、Linuxの場合は export コマンドを使えば同じことができる。

音声ファイルは攻殻機動隊の名言でテストしてみた。攻殻機動隊SAC5話のセリフでNetflixから視聴可能。Pythonファイル(azure_speech.py)と同じフォルダに音声ファイル(荒巻名セリフ.wav)を置いている。

> $env:AZURE_STORAGE_CONNECTION_STRING = "メモしておいた接続文字列"
> $env:AZURE_STORAGE_CONTAINER_NAME = "作成したコンテナー名"
> $env:AZURE_SPEECH_SERVICE_SUBSCRIPTION_KEY = "メモしておいたサブスクリプションキー"
> python .\azure_speech.py -f .\荒巻名セリフ.wav -l ja-JP

Uploading to Azure Storage as blob:
        荒巻名セリフ43936629-807a-4d1a-b6d5-a5d0c67c50fe.wav

Azureのポータル画面にて、元のファイル名にUUIDが追加されたファイル(上の実行結果の場合は 荒巻名セリフ43936629-807a-4d1a-b6d5-a5d0c67c50fe.wav )がコンテナーにアップロードされているのが確認できる。

STEP2: AzureのSpeech Servicesに文字起こし処理を依頼 (REST API)

API仕様はSwaggerに載っていて実装の時は結構参考になった。「Custom Speech transcriptions:」セクションが今回使うAPI。

関数コード

最初の2つの関数はアカウントSASとサービスSASという、APIを使う際の認証情報を取得する関数である。SAS自体はAzureが提供している関数を利用して生成している。公式のドキュメントページでSAS生成関数が紹介されている。

TranscriptionResultsContainerUrl には処理結果のファイルを格納してもらうコンテナ―を認証情報つきで記載する。今回は最初に作ったコンテナ―を流用した。なので、音声ファイルと同じコンテナ―に処理結果のファイルも格納される。

def get_sas_token(connect_str):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    sas_token = generate_account_sas(
        blob_service_client.account_name,
        account_key=blob_service_client.credential.account_key,
        resource_types=ResourceTypes(object=True),
        permission=AccountSasPermissions(read=True),
        expiry=datetime.utcnow() + timedelta(hours=1)
    )
    return sas_token

def get_service_sas_token(connect_str, container_name):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    service_sas_token = generate_container_sas(
        blob_service_client.account_name,
        container_name,
        account_key=blob_service_client.credential.account_key,
        permission=ContainerSasPermissions(read=True, write=True),
        expiry=datetime.utcnow() + timedelta(hours=1),
    )
    return service_sas_token

def start_transcription(connect_str, container_name, blob_name, subscription_key, locale):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)

    account_name = blob_service_client.account_name
    container_url = "https://" + account_name + ".blob.core.windows.net/" + container_name

    sas_token = get_sas_token(connect_str)
    service_sas_token = get_service_sas_token(connect_str, container_name)

    url = "https://japaneast.cris.ai/api/speechtotext/v2.0/transcriptions"
    headers = {
        "content-type": "application/json",
        "accept": "application/json",
        "Ocp-Apim-Subscription-Key": subscription_key,
    }
    payload = {
        "recordingsUrl": container_url + "/" + blob_name + "?" + sas_token,
        "locale": locale,
        "name": blob_name,
        "properties": {
            "TranscriptionResultsContainerUrl" : container_url + "?" + service_sas_token
        }
    }
    r = requests.post(url, data=json.dumps(payload), headers=headers)

    print("\nTranscription start")

main関数コード

if __name__ == "__main__":
    # STEP1のコード(ここでは省略)

    # STEP2: AzureのSpeech serviceに文字起こし処理を依頼 (REST API)
    start_transcription(connect_str, container_name, blob_name, subscription_key, locale)

実行

> python .\azure_speech.py -f .\荒巻名セリフ.wav -l ja-JP

Uploading to Azure Storage as blob:
        荒巻名セリフcbbbe4e2-5507-4cae-8b54-a13003104be8.wav

Transcription start

最後に「Transcription start」と表示される。

STEP3: 定期的に処理結果を確認 (REST API)

STEP2で開始した処理を特定できる情報は名前 name なんだけど、実行状態を取得するためのGETのAPIはIDで指定する仕様になっている。そのため最初にget_transcription_id関数で名前からIDを取得してから、IDを使って定期的に状態を取得している。これ、STEP2でPOSTした時のレスポンスにIDを含めてくれたらいんだけどなぁ、、と思う。名前だけだと毎回リスト取得するしかなくて無駄に情報取ってくることになって効率悪いんだよなぁ。

関数コード

def get_transcription_id(subscription_key, transcription_name):
    url = "https://japaneast.cris.ai/api/speechtotext/v2.0/transcriptions"
    headers = {
        "content-type": "application/json",
        "accept": "application/json",
        "Ocp-Apim-Subscription-Key": subscription_key,
    }
    r = requests.get(url, headers=headers)
    response = json.loads(r.text)

    for res in response:
        if res["name"] == transcription_name:
            id = res["id"]
            print("\nTranscription ID:\n\t" + id)
            return id
    return None

def get_transcription_info_from_id(subscription_key, transcription_id):
    url = "https://japaneast.cris.ai/api/speechtotext/v2.0/transcriptions/" + transcription_id
    headers = {
        "content-type": "application/json",
        "accept": "application/json",
        "Ocp-Apim-Subscription-Key": subscription_key,
    }
    r = requests.get(url, headers=headers)
    response = json.loads(r.text)
    return response

def get_transcription_status(subscription_key, transcription_id):
    response = get_transcription_info_from_id(subscription_key, transcription_id)

    status = response["status"]
    print("Transcription status:\t" + status)
    return status

main関数コード

if __name__ == "__main__":
    # STEP1のコード(ここでは省略)
    # STEP2のコード(ここでは省略)

    # STEP3: 定期的に処理結果を確認 (REST API)
    transcription_id = get_transcription_id(subscription_key, blob_name) # Transcription nameからIDを取得
    status = ""
    while status != "Succeeded": # 処理が完了したら状態が"Succeeded"になるからそれまでループ
        status = get_transcription_status(subscription_key, transcription_id) # 処理状態を取得
        time.sleep(3)

実行

> python .\azure_speech.py -f .\荒巻名セリフ.wav -l ja-JP

Uploading to Azure Storage as blob:
        荒巻名セリフb822c506-843c-4f1d-af81-35799e9209dc.wav

Transcription start

Transcription ID:
        e5512338-1e66-44c0-bd5f-68b680d99ba6
Transcription status:   NotStarted
Transcription status:   NotStarted
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Succeeded

Transcription statusNotStartedからRunningになり、最終的にSucceededになっている。

STEP4: 処理が完了したら処理結果のBlob URLを取得 (REST API)

関数コード

def get_transcription_result_url(subscription_key, transcription_id):
    response = get_transcription_info_from_id(subscription_key, transcription_id)

    if "resultsUrls" in response:
        result_url = response["resultsUrls"]["channel_0"]
        print("\nResult URL:\n\t" + result_url)
        return result_url
    return None

main関数コード

if __name__ == "__main__":
    # STEP1のコード(ここでは省略)
    # STEP2のコード(ここでは省略)
    # STEP3のコード(ここでは省略)

    # STEP4: 処理が完了したら処理結果のBlob URLを取得 (REST API)
    result_url = get_transcription_result_url(subscription_key, transcription_id)

実行

> python .\azure_speech.py -f .\荒巻名セリフ.wav -l ja-JP

Uploading to Azure Storage as blob:
        荒巻名セリフ9b3f55c6-4c3f-4656-a48e-a4546907a31e.wav

Transcription start

Transcription ID:
        e83a0767-d4ab-421b-8476-7cba3f5dde30
Transcription status:   NotStarted
Transcription status:   NotStarted
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Succeeded

Result URL:
        https://tetsisstorageaccount.blob.core.windows.net/speechcontainer/e83a0767-d4ab-421b-8476-7cba3f5dde30_transcription_channel_0.json

処理結果のURLが表示される。

STEP5: 処理結果をBlob Storageからダウンロードし結果を表示 (Pyhon SDK)

関数コード

ダウンロードのコードもアップロードと同様に公式ページに載っている方法を参考にした。

結果のjsonファイルのデータフォーマットについては公式ページSwaggerページを参考にした。テキスト化した結果は Lexical, ITN, MaskedITN, Display の4種類があるみたいだけど、今回は句読点とかつけてくれる Display を表示することにした。

def download_blob(connect_str, container_name, blob_name):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

    print("\nUploading to Azure Storage as blob:\n\t" + blob_name)
    with open(blob_name, "wb") as download_file:
        download_file.write(blob_client.download_blob().readall())

def get_transcript_from_file(file_name):
    with open(file_name, "r", encoding="utf-8") as f:
        df = json.load(f)

    transcript = ""
    for audio_file_result in df["AudioFileResults"]:
        for combined_result in audio_file_result["CombinedResults"]:
            transcript += combined_result["Display"] + "\n"
    
    print("\nTranscription result:\n" + transcript)
    return transcript

main関数コード

if __name__ == "__main__":
    # STEP1のコード(ここでは省略)
    # STEP2のコード(ここでは省略)
    # STEP3のコード(ここでは省略)
    # STEP4のコード(ここでは省略)

    # STEP5: 処理結果をBlob Storageからダウンロードし結果を表示 (Pyhon SDK)
    blob_name = result_url[result_url.rfind("/") + 1:]
    download_blob(connect_str, container_name, blob_name) # ここでダウンロード
    get_transcript_from_file(blob_name) # ダウンロードしたファイルから音声をテキスト化した文字列を取得

実行

> python .\azure_speech.py -f .\荒巻名セリフ.wav -l ja-JP

Uploading to Azure Storage as blob:
        荒巻名セリフc2d034b7-3803-431d-bcf0-368310cd678b.wav

Transcription start

Transcription ID:
        9388030b-cc98-428d-ae0c-3c5cff02ed3c
Transcription status:   NotStarted
Transcription status:   NotStarted
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Running
Transcription status:   Succeeded

Result URL:
        https://tetsisstorageaccount.blob.core.windows.net/speechcontainer/9388030b-cc98-428d-ae0c-3c5cff02ed3c_transcription_channel_0.json

Uploading to Azure Storage as blob:
        9388030b-cc98-428d-ae0c-3c5cff02ed3c_transcription_channel_0.json

Transcription result:
よかろう我々の間にはチームプレーなどという都合の良い言い訳は存在せん。あるとすれば、、スタンドプレーから生じるチームワークだけだ。

どのセリフをテキスト変換させてたのか完璧に分かる。

全コード

最後に今回のコードの全体を載せておく。

# -*- coding: utf-8 -*-
import os
import uuid
import argparse
from datetime import datetime, timedelta
import requests
import json
import time

from azure.storage.blob import BlobServiceClient
from azure.storage.blob import generate_account_sas, generate_container_sas, ResourceTypes, AccountSasPermissions, ContainerSasPermissions

def upload_blob(connect_str, container_name, local_path, local_file_name):
    root_ext_pair = os.path.splitext(local_file_name)
    blob_name = root_ext_pair[0] + str(uuid.uuid4()) + root_ext_pair[1] # Blob Storage(コンテナー)内で唯一のBlob名になるようにランダム文字列(UUID)を挿入しておく
    upload_file_path = os.path.join(local_path, local_file_name)

    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

    print("\nUploading to Azure Storage as blob:\n\t" + blob_name)

    with open(upload_file_path, "rb") as data:
        blob_client.upload_blob(data)

    return blob_name

def download_blob(connect_str, container_name, blob_name):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

    print("\nUploading to Azure Storage as blob:\n\t" + blob_name)
    with open(blob_name, "wb") as download_file:
        download_file.write(blob_client.download_blob().readall())

def get_sas_token(connect_str):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    sas_token = generate_account_sas(
        blob_service_client.account_name,
        account_key=blob_service_client.credential.account_key,
        resource_types=ResourceTypes(object=True),
        permission=AccountSasPermissions(read=True),
        expiry=datetime.utcnow() + timedelta(hours=1)
    )
    return sas_token

def get_service_sas_token(connect_str, container_name):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    service_sas_token = generate_container_sas(
        blob_service_client.account_name,
        container_name,
        account_key=blob_service_client.credential.account_key,
        permission=ContainerSasPermissions(read=True, write=True),
        expiry=datetime.utcnow() + timedelta(hours=1),
    )
    return service_sas_token

def start_transcription(connect_str, container_name, blob_name, subscription_key, locale):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)

    account_name = blob_service_client.account_name
    container_url = "https://" + account_name + ".blob.core.windows.net/" + container_name

    sas_token = get_sas_token(connect_str)
    service_sas_token = get_service_sas_token(connect_str, container_name)

    url = "https://japaneast.cris.ai/api/speechtotext/v2.0/transcriptions"
    headers = {
        "content-type": "application/json",
        "accept": "application/json",
        "Ocp-Apim-Subscription-Key": subscription_key,
    }
    payload = {
        "recordingsUrl": container_url + "/" + blob_name + "?" + sas_token,
        "locale": locale,
        "name": blob_name,
        "properties": {
            "TranscriptionResultsContainerUrl" : container_url + "?" + service_sas_token
        }
    }
    r = requests.post(url, data=json.dumps(payload), headers=headers)

    print("\nTranscription start")

def get_transcription_id(subscription_key, transcription_name):
    url = "https://japaneast.cris.ai/api/speechtotext/v2.0/transcriptions"
    headers = {
        "content-type": "application/json",
        "accept": "application/json",
        "Ocp-Apim-Subscription-Key": subscription_key,
    }
    r = requests.get(url, headers=headers)
    response = json.loads(r.text)

    for res in response:
        if res["name"] == transcription_name:
            id = res["id"]
            print("\nTranscription ID:\n\t" + id)
            return id
    return None

def get_transcription_info_from_id(subscription_key, transcription_id):
    url = "https://japaneast.cris.ai/api/speechtotext/v2.0/transcriptions/" + transcription_id
    headers = {
        "content-type": "application/json",
        "accept": "application/json",
        "Ocp-Apim-Subscription-Key": subscription_key,
    }
    r = requests.get(url, headers=headers)
    response = json.loads(r.text)
    return response

def get_transcription_status(subscription_key, transcription_id):
    response = get_transcription_info_from_id(subscription_key, transcription_id)

    status = response["status"]
    print("Transcription status:\t" + status)
    return status

def get_transcription_result_url(subscription_key, transcription_id):
    response = get_transcription_info_from_id(subscription_key, transcription_id)

    if "resultsUrls" in response:
        result_url = response["resultsUrls"]["channel_0"]
        print("\nResult URL:\n\t" + result_url)
        return result_url
    return None

def get_transcript_from_file(file_name):
    with open(file_name, "r", encoding="utf-8") as f:
        df = json.load(f)

    transcript = ""
    for audio_file_result in df["AudioFileResults"]:
        for combined_result in audio_file_result["CombinedResults"]:
            transcript += combined_result["Display"] + "\n"
    
    print("\nTranscription result:\n" + transcript)
    return transcript

if __name__ == "__main__":
    connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
    container_name = os.getenv("AZURE_STORAGE_CONTAINER_NAME")
    subscription_key = os.getenv("AZURE_SPEECH_SERVICE_SUBSCRIPTION_KEY")

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--file", type=argparse.FileType("r", encoding="UTF-8"), required=True)
    parser.add_argument("-l", "--locale", help="e.g. \"en-US\" or \"ja-JP\"", required=True)
    args = parser.parse_args()
    file_name = args.file.name
    locale = args.locale

    # STEP1: ローカルにある音声ファイルをBlob Storageにアップロード (Python SDK)
    base_dir_pair = os.path.split(file_name)
    local_path = base_dir_pair[0]
    local_file_name = base_dir_pair[1]
    blob_name = upload_blob(connect_str, container_name, local_path, local_file_name) # ここでアップロード

    # STEP2: AzureのSpeech serviceに文字起こし処理を依頼 (REST API)
    start_transcription(connect_str, container_name, blob_name, subscription_key, locale)

    # STEP3: 定期的に処理結果を確認 (REST API)
    transcription_id = get_transcription_id(subscription_key, blob_name) # Transcription nameからIDを取得
    status = ""
    while status != "Succeeded": # 処理が完了したら状態が"Succeeded"になるからそれまでループ
        status = get_transcription_status(subscription_key, transcription_id) # 処理状態を取得
        time.sleep(3)

    # STEP4: 処理が完了したら処理結果のBlob URLを取得 (REST API)
    result_url = get_transcription_result_url(subscription_key, transcription_id)

    # STEP5: 処理結果をBlob Storageからダウンロードし結果を表示 (Pyhon SDK)
    blob_name = result_url[result_url.rfind("/") + 1:]
    download_blob(connect_str, container_name, blob_name) # ここでダウンロード
    get_transcript_from_file(blob_name) # ダウンロードしたファイルから音声をテキスト化した文字列を取得

PythonでURLから末尾のファイル名を抜き出す

最近S3やBlob StorageのオブジェクトURLからファイル名だけ抜き出すことが何度かあって 毎回方法をググってたので備忘録にまとめておく。

object_url = 'https://account_name.blob.core.windows.net/container_name/object.json'
object_name = object_url[object_url.rfind('/') + 1:]
print(object_name)
# object.json と出力される

説明。URLの末尾から「/(スラッシュ)」を検索して、スラッシュより後ろの文字列を取得する。

Accept-Languageを優先順位でソートするpython関数

Accept-Languageに応じてユーザに提供する言語を変えることがあったので、Accept-Languageの重みづけを見て優先順位でソートされた言語リストを返す関数を作った。
汎用性がありそうなので備忘録がてらブログに書いておく。

こちらのサイトを参考にしました。

def sort_accept_language(accept_language):
languages = accept_language.split(",")
locale_q_pairs = []

for language in languages:
if language.split(";")[0] == language:
# no q => q = 1
locale_q_pairs.append({"locale": language.strip(), "q": 1})
else:
locale = language.split(";")[0].strip()
q = float(language.split(";")[1].split("=")[1])
locale_q_pairs.append({"locale": locale, "q": q})

sorted_locale_q_pairs = sorted(locale_q_pairs, key=lambda x: (x['q'],), reverse=True)
print(sorted_locale_q_pairs)

sorted_locale = [l["locale"] for l in sorted_locale_q_pairs]
print(sorted_locale)

return sorted_locale

実行結果

関数の引数を”ja,en-US;q=0.7,en;q=0.3″にした場合

[{'locale': 'ja', 'q': 1}, {'locale': 'en-US', 'q': 0.7}, {'locale': 'en', 'q': 0.3}]
['ja', 'en-US', 'en']

bottleをmod_wsgiで動かす(CentOS7, Python2.7)

環境

  • CentOS 7.3
  • Apache 2.4.6
  • mod_wsgi 3.4
  • Python 2.7.5
  • bottle 0.12.13

上記環境を用意しbottleで作成したWebページを表示させます。
各バージョンはyumやpipを使ってデフォルトでインストールできるものです。
設定ファイル等は編集せずデフォルトのままでOKです。

参考にインストールコマンドを載せておきます。

# yum install epel-release
# yum install httpd mod_wsgi python-pip
# pip install bottle

Apacheの設定

/etc/httpd/conf/httpd.conf の末尾に以下を追加

WSGIScriptAlias / /var/www/html/adapter.wsgi

アダプタの作成・編集

/var/www/html/adapter.wsgi を作成し以下の内容を記述

# coding: utf-8
import sys, os
import bottle

dirpath = os.path.dirname(os.path.abspath(__file__))
print(dirpath)
sys.path.append(dirpath)
os.chdir(dirpath)

import index
application = bottle.default_app()

「import index」の位置が重要だったりします(最初import文を上にまとめて書いててハマりました)。

ページを記述

/var/www/html/index.py を作成し以下の内容を記述

# coding: utf-8
from bottle import route

@route('/')
def index():
return "Hello, world."

Apacheの再起動と確認

# systemctl restart httpd

Webブラウザで「http://127.0.0.1」にアクセスし、「Hello, world.」と表示されれば成功

Python3でpostgresqlを使う

PostgreSQLの準備

インストール

# yum install postgresql-server
# postgresql-setup initdb

設定

/var/lib/pgsql/data/pg_hba.conf

host    all             all             127.0.0.1/32            md5

起動

# systemctl start postgresql
# systemctl enable postgresql

ユーザ作成

# su - postgres
$ psql
CREATE ROLE dbuser WITH LOGIN PASSWORD 'password';

データベース作成

CREATE DATABASE dbname OWNER dbuser ENCODING 'utf8';

Python用モジュールをインストール

pip install psycopg2

CREATE文

conn = psycopg2.connect("host=127.0.0.1 port=5432 dbname=dbname user=dbuser password=password")
dict_cur = conn.cursor()
dict_cur.execute("CREATE TABLE table (id serial PRIMARY KEY, name varchar(23), password varchar(255))")
conn.commit()
dict_cur.close()
conn.close()

SELECT文

conn = psycopg2.connect("host=127.0.0.1 port=5432 dbname=dbname user=user password=password")
dict_cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
dict_cur.execute("select name from table where (name)=(%s)", (name,))
for row in dict_cur:
if row['name'] == name:
flag = True
break
dict_cur.close()
conn.close()

INSERT文

conn = psycopg2.connect("host=127.0.0.1 port=5432 dbname=dbname user=user password=password")
dict_cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
dict_cur.execute("insert into table (name, password) values (%s, %s)", (name, password))
conn.commit()
dict_cur.close()
conn.close()