включить другие файлы с потоком данных

В моем потоке данных используется файл .sql. Этот файл содержит запрос и находится в каталоге с именем queries.

Мне нужно, чтобы этот файл был загружен с моим потоком данных.

То, что я обнаружил, было использованием файла manifest.in, но, насколько я вижу, он ничего не делает, я создал этот файл с именем MANIFEST.in в своем корневом каталоге и содержит одну строку:

recursive-include queries *

Некоторые другие источники сообщают мне, что для этого мне нужно использовать файл setup.py. Итак, теперь это выглядит так:

from __future__ import absolute_import
from __future__ import print_function

import subprocess
from distutils.command.build import build as _build

import setuptools  # pylint: disable-all
setuptools.setup(
    name='MarkPackage',
    version='0.0.1',
    install_requires=[],
    packages=setuptools.find_packages(),
    package_data={
        'queries': ['queries/*'],
    },
    include_package_data=True
)

Это также не работает. Ошибка: RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: 'queries/testquery.sql' [while running 'generatedPtransform-20']

Как лучше всего включить любой файл для использования в какой-либо или во всех частях моего потока данных?


person Thijs    schedule 03.12.2019    source источник


Ответы (3)


Это решение мне принес наш консультант Google Cloud. Это сработало, но не рекомендуется, потому что это усложняет только отделение SQL-запроса от кода Python. В качестве альтернативы можно было бы создать View on Bigquery, содержащий этот код SQL, и поддерживать его там в среде Bigquery.

MANIFEST.in
include query.sql

setup.py

import setuptools
setuptools.setup(
    name="example",
    version="0.0.1",
    install_requires=[],
    packages=setuptools.find_packages(),
    data_files=[(".", ["query.sql"])],
    include_package_data=True,
)

main.py

with open ("query.sql", "r") as myfile:
        query=myfile.read()
    with beam.Pipeline(argv=pipeline_args) as p:
        rows = p | "ReadFromBQ" >> beam.io.Read(
            beam.io.BigQuerySource(query=query, use_standard_sql=True)
        )
        rows | "writeToBQ" >> beam.io.Write(
            "BQ Write"
            >> beam.io.WriteToBigQuery(
                known_args.output_table,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
        )
person Thijs    schedule 05.12.2019
comment
data_files это ключ - person Travis Webb; 15.01.2021

Это зависит от того, что вы делаете с файлом, который хотите включить, но, учитывая, что это файл SQL (а не локальный пакет Python или зависимость не от Python), один из способов «включить» его — поместить его в Google Сегмент Cloud Storage и добавление его в качестве аргумента:

def run(argv=None): 
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default='gs://bucket/queries/query.sql',
        help='Input SQL file.'
        )
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
          '--runner=DataflowRunner',
          '--project=proj',
          '--region=region',
          '--staging_location=gs://bucket/staging/',
          '--temp_location=gs://bucket/temp/',
          '--job_name=name',
          '--setup_file=./setup.py'
          ]) 

Теперь, если вам нужно использовать этот файл в качестве параметра в PTransform, вы можете передать в него known_args.input. Надеюсь это поможет

person manesioz    schedule 03.12.2019
comment
Это отлично работает для текстового файла, который мне нужно анализировать построчно, но я не совсем уверен, как вы используете файл SQL, поэтому, возможно, передача его в качестве параметра PTransform не подходит для вашего варианта использования. - person manesioz; 03.12.2019
comment
Это довольно умное решение (за него проголосовали), но я думаю, что должно быть более простое решение, которое не влечет за собой добавление сервисного компонента (хранилища). - person Thijs; 03.12.2019

Рассмотрите возможность использования filesToStage в соответствии с шаблоном, описанным в этом существующем ответе SO. Это позволит вам предоставить файл . В этом подходе есть некоторые «подводные камни», поэтому внимательно изучите ответ.

К сожалению, самое простое решение, которое я нашел, это специальное решение для Java. Использование папки ресурсов для упаковки файлов конфигурации в банку. Затем с помощью API-интерфейсов, предоставленных java, для обратного чтения файла.

person Alex Amato    schedule 03.12.2019