sertakan file lain dengan aliran data

Aliran data saya menggunakan file .sql. File ini berisi kueri dan berada di direktori bernama queries.

Saya perlu mengunggah file ini dengan aliran data saya.

Apa yang saya temukan adalah penggunaan file manifest.in tetapi sejauh yang saya bisa lihat tidak melakukan apa pun, saya telah membuat file ini bernama MANIFEST.in di direktori root saya dan berisi satu baris:

recursive-include queries *

Beberapa sumber lain memberi tahu saya bahwa saya perlu menggunakan file setup.py untuk ini. Jadi sekarang tampilannya seperti ini:

from __future__ import absolute_import
from __future__ import print_function

import subprocess
from distutils.command.build import build as _build

import setuptools  # pylint: disable-all
setuptools.setup(
    name='MarkPackage',
    version='0.0.1',
    install_requires=[],
    packages=setuptools.find_packages(),
    package_data={
        'queries': ['queries/*'],
    },
    include_package_data=True
)

Ini juga tidak berhasil. Kesalahannya adalah: RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: 'queries/testquery.sql' [while running 'generatedPtransform-20']

Apa praktik terbaik untuk menyertakan file apa pun untuk digunakan di salah satu atau semua bagian aliran data saya?


person Thijs    schedule 03.12.2019    source sumber


Jawaban (3)


Solusi ini dipersembahkan oleh konsultan Google Cloud kami. Ini berhasil tetapi disarankan untuk tidak melakukannya karena menambah kerumitan hanya untuk memisahkan kueri SQL dari kode Python. Alternatifnya adalah membuat Tampilan di Bigquery yang berisi kode SQL ini dan mempertahankannya di lingkungan Bigquery.

MANIFEST.in
include query.sql

setup.py

import setuptools
setuptools.setup(
    name="example",
    version="0.0.1",
    install_requires=[],
    packages=setuptools.find_packages(),
    data_files=[(".", ["query.sql"])],
    include_package_data=True,
)

main.py

with open ("query.sql", "r") as myfile:
        query=myfile.read()
    with beam.Pipeline(argv=pipeline_args) as p:
        rows = p | "ReadFromBQ" >> beam.io.Read(
            beam.io.BigQuerySource(query=query, use_standard_sql=True)
        )
        rows | "writeToBQ" >> beam.io.Write(
            "BQ Write"
            >> beam.io.WriteToBigQuery(
                known_args.output_table,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
        )
person Thijs    schedule 05.12.2019
comment
data_files adalah kuncinya - person Travis Webb; 15.01.2021

Itu tergantung pada apa yang Anda lakukan dengan file yang ingin Anda sertakan, tetapi mengingat ini adalah file SQL (dan bukan paket Python lokal atau ketergantungan Non-Python) salah satu cara untuk "memasukkannya" adalah dengan memasukkannya ke dalam Google Bucket Cloud Storage dan menambahkannya sebagai argumen:

def run(argv=None): 
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default='gs://bucket/queries/query.sql',
        help='Input SQL file.'
        )
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
          '--runner=DataflowRunner',
          '--project=proj',
          '--region=region',
          '--staging_location=gs://bucket/staging/',
          '--temp_location=gs://bucket/temp/',
          '--job_name=name',
          '--setup_file=./setup.py'
          ]) 

Sekarang jika Anda perlu menggunakan file ini sebagai parameter dalam PTransform, Anda dapat meneruskan known_args.input ke dalamnya. Semoga ini membantu

person manesioz    schedule 03.12.2019
comment
Ini berfungsi dengan baik untuk file teks yang perlu saya urai baris demi baris, tapi saya tidak begitu yakin bagaimana Anda menggunakan file SQL jadi mungkin meneruskannya sebagai parameter ke PTransform tidak sesuai dengan kasus penggunaan Anda. - person manesioz; 03.12.2019
comment
Ini adalah solusi yang cukup cerdas (terpilih) tetapi menurut saya harus ada solusi yang lebih sederhana yang tidak memerlukan penambahan komponen layanan (penyimpanan). - person Thijs; 03.12.2019

Harap pertimbangkan untuk menggunakan filesToStage, mengikuti pola yang dijelaskan dalam jawaban SO yang ada. Ini akan memungkinkan Anda untuk menyediakan file. Ada beberapa "kesalahan" dalam pendekatan ini, jadi harap tinjau jawabannya dengan cermat.

Sayangnya solusi paling sederhana yang saya temukan adalah solusi khusus Java. Menggunakan folder sumber daya untuk mengemas file konfigurasi ke dalam toples. Kemudian menggunakan API yang disediakan Java untuk membaca kembali file tersebut.

person Alex Amato    schedule 03.12.2019