-
GCP Composer - BigQuery DataTransfer OperatorIT/Google Cloud 2024. 3. 1. 21:23
BigQuery의 Data Transfer 작업을 위한 Airflow Operator들에 대한 문서는 현재 bigquery_dts 문서에 있는데, 설명이 친절하지 못하여 여기에 dag 파일 예시를 소개한다.
from airflow.providers.google.cloud.operators.bigquery_dts import BigQueryDataTransferServiceStartTransferRunsOperator from airflow.providers.google.cloud.sensors.bigquery_dts import BigQueryDataTransferServiceTransferRunSensor from airflow.models.xcom_arg import XComArg . . . # dag 선언을 위한 사항들... . . . # 기존에 만들어져 있는 transfer를 시작하는 task bq_dataset_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator( task_id="bq_dataset_start_transfer", project_id=[여기에 transfer 작업이 이루어질 project id], location=[region 이름], transfer_config_id=[transfer_config_id는 별도로 설명], # 현재 시간의 10초 후 requested_run_time={"seconds": int(time.time() + 10)}, ) # transfer가 성공했는지 확인하는 task bq_dataset_run_sensor = BigQueryDataTransferServiceTransferRunSensor( task_id="bq_dataset_run_sensor", project_id=[여기에 transfer 작업이 이루어질 project id 문자열], location=[region 이름], transfer_config_id=[transfer_config_id는 별도로 설명], # 관심이 있는 run_id가 바로 위 task에 의한 것일 경우 (XComArg에 대한 자세한 설명은 생략) run_id=str(XComArg(bq_dataset_start_transfer, key="run_id")), expected_statuses={"SUCCEEDED"}, request_timeout=180, ) bq_dataset_start_transfer >> bq_dataset_run_sensor
여기서 transfer_config_id를 찾는 것이 어려울 수 있는데,
BigQuery의 Data Transfers 메뉴에서 실행하려는 전송 작업의 구성을 보면 Resource name 값이 다음과 같은 양식으로 보인다.
projects/NNNNNNNNNN/locations/[region 이름]/transferConfigs/XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
이 문자열의 마지막 XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX에 해당하는 부분이 transfer_config_id이니 이 값을 복사해서 적으면 된다.