はじめに
本記事ではSnowflakeの機能のみを使って最低限のデータパイプラインを作成する方法をご紹介します。流れとしては、CSVファイルからSnowflakeにデータをインジェストし、生のデータをレポートに必要なターゲットデータモデルに変換し、そのパイプラインを自動化します。
また、本記事はSnowflake初学者を対象としているため、これからご紹介するデータパイプラインはセキュリティ・ガバナンスなどには触れません。まずはSnowflakeでデータパイプラインを構築する一番シンプルな方法を学び、全体感を掴んでいただくことを目的としています。
データパイプラインの全体像

各ステップ
- ローカルマシンからCSVファイルをSnowflakeのステージング領域にアップロード
- ステージングされたファイルからステージングテーブルへデータロード
- MERGEコマンドによるデータの積み上げ
- ターゲットテーブルから集計テーブルを作成
- Snowflakeタスクによる処理の自動化
ローカルマシンからCSVファイルをSnowflakeのステージング領域にアップロード
ここではダミーのセールスデータをローカルからSnowflakeのステージングにアップロードを行います。
ダミーのCSVデータを用意
次のようなセールスデータとします。消費者(Customer)が注文日(Order date)に製品を(Product type)をいくつ(Quantity)注文し、いつ発送(Delivery date)されたかを示す営業データです。
以下は CSV に記載されている内容のサンプルです(読みやすく整形していますが、実際の入力は CSV ファイルです)。
Customer ,Order date ,Delivery date ,Product type ,Quantity
Satou ,2024-11-11 ,2024-11-14 ,Sneakers ,2
Tanaka ,2024-11-11 ,2024-11-14 ,T-shirt ,3
Suzuki ,2024-11-11 ,2024-11-14 ,Backpack ,1
Yamada ,2024-11-11 ,2024-11-14 ,Hoodie ,2
Kobayashi,2024-11-11 ,2024-11-14 ,Jeans ,1
Nakamura ,2024-11-11 ,2024-11-14 ,Jacket ,1
Kato ,2024-11-11 ,2024-11-14 ,Sneakers ,3
Sasaki ,2024-11-11 ,2024-11-14 ,Dress ,1
...
Snowflake側の準備
CSVデータ取り込み先のDBとスキーマを作成します。
use role SYSADMIN;
create database ORDERS_DB;
create schema ORDERS;
create warehouse ORDERS_WH with warehouse_size = 'XSMALL';
内部ステージ
今回は、最初のデータパイプラインをシンプルに保つために、Snowflakeの内部ステージを使用します。データファイルは、さまざまな種類の内部ステージにステージングできます:
- ユーザーステージ:各Snowflakeユーザーに割り当てられたステージ
- テーブルステージ:Snowflakeで作成された各テーブルに割り当てられたステージ
- 名前付き内部ステージ:Snowflakeの名前付き内部ステージは、スキーマ内で作成されるデータベースオブジェクトです。複数のユーザーが管理するファイルをステージングし、複数のテーブルにロードすることができるため、ユーザーステージやテーブルステージよりも柔軟です。
パイプラインを作成するために、Snowflakeの名前付き内部ステージを作成します。
use database ORDERS_DB;
use schema ORDERS;
create stage ORDERS_STAGE;
SnowSQLよりCSVファイルのアップロードを行います。
put file://./Orders_2023-11-11.csv @ORDERS_STAGE;
LISTコマンドを使って、先ほどアップロードしたCSVファイルがステージ格納されていることを確認できます。
list @ORDERS_STAGE;
念のため期待通りにデータがロードされているか確認をしておきます。アップロードされたCSVファイルには5つの列が含まれていることがわかっているので、内部ステージからすべてを選択するには、次のコマンドを実行します。
select $1, $2, $3, $4, $5 from @ORDERS_STAGE;
ステージングされたファイルからステージングテーブルへデータロード
CSVファイルがSnowflakeの内部ステージに保存されたので、次はそのファイルからステージングテーブルにデータをロードします。
ファイルからデータをロードする際は、データがいつ取り込まれたのか、どこから来たのかを追跡するための追加情報を格納する列を追加するのがベストプラクティスです。そこで、データの出所となるソースファイル名を格納する列と、インジェスト時のタイムスタンプを格納する列を追加します。Snowflakeには便利なメタデータ列の1つに$filenameがあり、データがどのファイルからインジェストされたかを示します。現在はステージに1つのファイルしかないため、この情報はそれほど重要ではありませんが、複数のファイルをステージに追加し始めると、各レコードの出所となるファイルを知りたくなる場合に有効です。
格納先のステージングテーブルの作成
use database ORDERS_DB;
use schema ORDERS;
create table ORDERS_STG (
customer varchar,
order_date date,
delivery_date date,
product_type varchar,
quantity number,
source_file_name varchar,
load_ts timestamp
);
ステージ領域からステージングテーブルへデータのコピー
use database ORDERS_DB;
use schema ORDERS;
copy into ORDERS_STG
from (
select $1, $2, $3, $4, $5, metadata$filename, current_timestamp()
from @ORDERS_STAGE
)
file_format = (type = csv, skip_header = 1)
on_error = abort_statement
purge = true;
ORDERS_STGテーブルにデータが格納されたことを確認できます。
select * from ORDERS_STG;
COPY文でpurgeオプションを指定していたので、ステージにあるデータは削除されます。
MERGEコマンドによるデータの積み上げ
続けて、ORDERS_STG からターゲットテーブルのCUSTOMER_ORDERS にデータを積み上げます。CUSTOMER_ORDERSの構造は、ステージングテーブルと同じです。次のコマンドを実行して、ORDERS_DBとORDERSスキーマを使用してテーブルを作成します。
use database ORDERS_DB;
use schema ORDERS;
create table CUSTOMER_ORDERS (
customer varchar,
order_date date,
delivery_date date,
product_type varchar,
quantity number,
source_file_name varchar,
load_ts timestamp
);
ORDERS_STGからCUSTOMER_ORDERSへのデータ移行はSQLのMergeを使います。CUSTOMER_ORDERSは集計用テーブルの元となるため、データの整合性が必要となります。例えば、最初注文をキャンセルし再度注文すると2重注文が生まれビジネスロジックが破綻します。そのためターゲットテーブルではcustomer、delivery_date、product_type ごとに1つのレコードだけが存在するようにするために、ORDERS_STGのデータをCUSTOMER_ORDERSにマージするクエリを構築します。
merge into CUSTOMER_ORDERS tgt
using ORDERS_STG as src
on src.customer = tgt.customer
and src.delivery_date = tgt.delivery_date
and src.product_type = tgt.product_type
when matched then
update set tgt.quantity = src.quantity,
tgt.source_file_name = src.source_file_name,
tgt.load_ts = current_timestamp()
when not matched then
insert (customer, order_date, delivery_date, product_type,
quantity, source_file_name, load_ts)
values( src.customer, src.order_date, src.delivery_date,
src.product_type, src.quantity, src.source_file_name,
current_timestamp());
ORDERS_STGからCUSTOMER_ORDERSにデータをマージすることで、customer、delivery_date、product_type ごとに1行のみがターゲットテーブルに保持され、数量の最新の値が反映されるようになります。
ターゲットテーブルから集計テーブルを作成
CUSTOMER_ORDERSには、過去および顧客によって予約された注文が含まれています。ユーザーはこのテーブルをクエリして過去の注文傾向を分析し、レポートを作成できます。
例えば、日々の各製品(Product)がどれだけ注文されているかを把握するなどです。
SUMMARY_ORDERSという名前の集計テーブルを作成します。このテーブルは次の3つの列で構成されます:
- 配達日(delivery date)
- 製品種類(product type)
- 合計数量(total quantity)
use database ORDERS_DB;
use schema ORDERS;
create table SUMMARY_ORDERS(
delivery_date date,
product_type varchar,
total_quantity number
);
前のステップで取り込んだCUSTOMER_ORDERSから集計データを取り込みます。
シンプルにするために、毎回SUMMARY_ORDERSをtruncateし、完全な集計データを取り込む洗い替えを行います。
毎回洗い替えを行うのは今回のデータが少量だから採用できています。毎回すべてのデータを置き換えることによるコンピューティングコストやデータ取り込みにかかる時間を考慮する必要がないからです。
CUSTOMER_ORDERSデータを集計するクエリは次のとおりです。
select delivery_date, product_type, sum(quantity) as total_quantity
from CUSTOMER_ORDERS
group by all;
このクエリでは、SnowflakeのGROUP BY ALL構文を使用しています。
GROUP BY句に含めるすべての列を個別に指定する代わりに、SnowflakeではキーワードALLを使用できます。これにより、集約の一部ではないすべての列をグループ化できます。
CUSTOMER_ORDERSデータへのデータコピーは次のクエリを使用します。
insert into SUMMARY_ORDERS(delivery_date, product_type, total_quantity)
select delivery_date, product_type, sum(quantity) as total_quantity
from CUSTOMER_ORDERS
group by all;
Snowflakeタスクによる処理の自動化
毎日定期的に顧客から新しい注文や更新された注文を受け取るため、可能な限りデータの取り込みと変換プロセスを自動化します。CSVファイルをSnowflakeの内部ステージに手動でのアップロード以降のプロセスは、データエンジニアがこれまでのステップを組み合わせて自動化し、毎晩など定期的に実行できるようにスケジュール設定することができます。
この目的のために、Snowflakeタスクを作成します。
Snowflakeタスクとは?
Snowflakeタスクは、以下の操作を実行できます:
- 単一のSQL文の実行
- ストアドプロシージャの実行
- Snowflakeスクリプトの手続き型ロジックを使用した複数のSQL文の実行
毎日実行されるタスク「PROCESS_ORDERS」の作成
PROCESS_ORDERSタスクは、毎日指定したスケジュールで、これまでのすべてのステップを順番に実行します:
- ステージングテーブルを初期化(TRUNCATE)
- 内部ステージからデータをステージングテーブルに読み込む(COPYコマンド)
- ステージングテーブルからターゲットテーブルにデータをマージ(MERGE)
- サマリーテーブルを初期化(TRUNCATE)
- サマリーテーブルに集計データを挿入(INSERT)
スケジュール設定
- 毎晩11時にタスクを実行するようにスケジュールします。
タスク作成
Snowflakeでタスクを作成すると、デフォルトでタスクは停止状態になっています。
タスクをスケジュール通りに実行させるには、タスクを再開(RESUME)する必要があります。
use database ORDERS_DB;
use schema ORDERS;
create task PROCESS_ORDERS
warehouse = ORDERS_WH
schedule = 'USING CRON 0 23 * * * UTC';
as
begin
truncate table ORDERS_STG;
copy into ORDERS_STG
from (
select $1, $2, $3, $4, $5, metadata$filename, current_timestamp()
from @ORDERS_STAGE
)
file_format = (type = csv, skip_header = 1)
on_error = abort_statement
purge = true;
merge into CUSTOMER_ORDERS tgt
using ORDERS_STG as src
on src.customer = tgt.customer
and src.delivery_date = tgt.delivery_date
and src.product_type = tgt.product_type
when matched then
update set tgt.quantity = src.quantity,
tgt.source_file_name = src.source_file_name,
tgt.load_ts = current_timestamp()
when not matched then
insert (customer, order_date, delivery_date, product_type,
quantity, source_file_name, load_ts)
values( src.customer, src.order_date, src.delivery_date,
src.product_type, src.quantity, src.source_file_name,
current_timestamp());
truncate table SUMMARY_ORDERS;
insert into SUMMARY_ORDERS(delivery_date, product_type, total_quantity)
select delivery_date, product_type, sum(quantity) as total_quantity
from CUSTOMER_ORDERS
group by all;
end;
タスクの実行権限を付与
タスクを実行するロールにEXECUTE TASK権限を付与する必要があります。
NOTE: Snowflakeでは、すべてのロールが追加の権限なしでタスクを作成できますが、デフォルトではタスクを実行できません。
今回はSYSADMINロールを使用しているため、このロールにEXECUTE TASK権限を付与します。 通常は、データエンジニアが作成したカスタムロールにEXECUTE TASK権限を付与するのが推奨されます。
次のコマンドで、SYSADMINロールにEXECUTE TASK権限を付与できます:
use role accountadmin;
grant execute task on account to role sysadmin;
use role sysadmin;
タスクに対する権限を付与するためには、ACCOUNTADMINロールを使用する必要があります。
ACCOUNTADMINロールを使用して権限の付与やその他の作業を行う場合、作業が終わったら必ず開発に使用しているロールに戻すことを忘れないでください。そのため、ここではSYSADMINロールに戻ります。
EXECUTE TASK権限を付与した後、次のコマンドでタスクを手動実行できます。
execute task PROCESS_ORDERS;
TIPS タスクの実行確認 タスクを実行した後は、正しく完了したか確認するためにINFORMATION_SCHEMAのTASK_HISTORY()テーブル関数を使って、クエリ実行に関する情報を取得できます。
まとめ
今回構築したパイプラインの全体像はこちらのとおりです。

データの取り込み部分をもう少しシステマチックにすれば最低限のデータパイプラインとして機能すると思います。ただし改善点はまだまだありますので1例として参考にしつつ足りない部分や改善点をここから修正していくことでより良いパイプラインに仕上げることができると思います。
本記事を通して、Snowflake初心者の方に最低限のデータパイプラインの構築の全体像を掴んでいただくきっかけになれば幸いです。