…
Cloud DataFlow menawarkan fitur unik yang disebut Templat DataFlow untuk memungkinkan membangun pipa yang dapat disesuaikan dan dapat digunakan kembali. Selain itu, ini adalah cara yang jauh lebih baik untuk memisahkan proses pengembangan, pengujian dan produksi membuat dan menjalankan pipa data menggunakan balok Apache. Dokumentasi ini mencakup banyak detail tentang templat (klasik dan fleksibel) serta tutorial tentang cara membangun dan menjalankan templat. Namun, contoh yang terdokumentasi menggunakan GCS sebagai sumber dan wastafel. Di sebagian besar jaringan pipa yang telah saya bantu bangun, BigQuery biasanya wastafel dan saya menemukan bahwa membangun templat dengan BigQuery sebagai wastafel memiliki beberapa nuansa tambahan.
Dalam posting blog ini, saya akan mendemonstrasikan membangun dan menjalankan templat Dataflow klasik dengan BigQuery saat wastafel yang mencakup poin -poin khusus ini.
Pipa cukup sederhana seperti yang ditunjukkan di bawah ini

Kode untuk pipa ini tersedia di halaman GitHub saya.
Pada level tinggi, kode pipa melakukan hal berikut
- Membaca file CSV dari Google Cloud Storage. URI dari file input adalah variabel runtime.
- Membuat objek kamus dari baris file input CSV.
- Tambahkan baris ke tabel BigQuery yang namanya variabel runtime.
Untuk mendukung parameter runtime dan nilainya, Anda perlu menggunakan salah satu dari ValueProvider. Ada tiga jenis valueprovider yaitu runtimevalueprovider (opsi default), staticvalueprovider dan NestedValueprovider. Tidak seperti StaticValueProvider, nilai parameter menggunakan runtimevalueprovider tersedia di runtime saja dan tidak selama konstruksi pipa dan karenanya tidak mempengaruhi grafik eksekusi. Dokumentasi DataFlow mencakup spesifik ini secara rinci.
Untuk pipa ini, runtimevalueprovider akan digunakan.
Dalam cuplikan kode di bawah ini, saya mengatur dua parameter runtime. Ini adalah input_file_path dan bq_table_id. Perhatikan bahwa bq_table_id adalah referensi seluruh tabel dalam format dataset_id.bq_table_id. Dataset_id sebagai parameter runtime mandiri tidak didukung dan akan melempar kesalahan saat digunakan dalam panggilan writetoBigQuery io panggilan.
class RunTimeOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input_file_path",
help="uri of the input file"
)
parser.add_value_provider_argument(
"--bq_table_id",
help="dataset_id.BQ table id"
)
Gunakan runtimeOptions yang ditetapkan di atas.
user_options = pipeline_options.view_as(RunTimeOptions)
Dan dalam referensi pipa user_options.input_file_path dan user_option.bq_table_id untuk melewati nilai -nilai dua parameter runtime ini.
p = (pipeline | "Read From Input Datafile" >> beam.io.ReadFromText(user_options.input_file_path)
| "Convert to Dict" >> beam.Map(lambda r: data_ingestion.parse_method(r))
| "Write to BigQuery Table" >> beam.io.WriteToBigQuery(table=user_options.bq_table_id,
schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
pipeline.run()
Bangun template
Bangun template menggunakan perintah berikut:
python3 -m <template name> \
--runner DataflowRunner \
--project <project-name> \
--staging_location gs://bk_dataflow_template/staging \
--temp_location gs://bk_dataflow_template/temp \
--template_location gs://bk_dataflow_template/templates/df_csv_gcs_to_bq \
--experiment=use_beam_bq_sink
Pilihan –Experiment = use_beam_bq_sink Diperlukan karena saat ini Dataflow mengesampingkan BigQuery dengan versi asli yang tidak mendukung ValueProvider.
Jalankan pipa menggunakan template
Jalankan pipa menggunakan template dengan melewati URI dari file input CSV dan referensi tabel dalam bentuk dataset_id.table_id. Perintah ini akan menjadwalkan pipa untuk segera berjalan dan mengembalikan kontrol. Anda dapat memeriksa status pipa menggunakan DataFlow Console (atau CLI).
gcloud dataflow jobs run template-csv-gcs-to-bq-04 \
--gcs-location gs://bk_dataflow_template/templates/df_csv_gcs_to_bq \
--region europe-west2 \
--parameters input_file_path=gs://da_batch_pipeline/risk_test_data,bq_table_id=da_batch_pipeline.df_test_table
Semoga posting ini bermanfaat dan menghemat waktu saat membangun template Dataflow pertama Anda dengan BigQuery sebagai wastafel.