Apache Flink — подключение тома к рабочему блоку

Я использую WordCountProg из учебника на https://www.tutorialspoint.com/apache_flink/apache_flink_creating_application.htm . Код выглядит следующим образом:

WordCountProg.java

package main.java.spendreport;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

public class WordCountProg {

       // *************************************************************************
       // PROGRAM
       // *************************************************************************
       public static void main(String[] args) throws Exception {
          final ParameterTool params = ParameterTool.fromArgs(args);
          // set up the execution environment
          final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          // make parameters available in the web interface
          env.getConfig().setGlobalJobParameters(params);
          // get input data
          DataSet<String> text = env.readTextFile(params.get("input"));
          DataSet<Tuple2<String, Integer>> counts =
          // split up the lines in pairs (2-tuples) containing: (word,1)
          text.flatMap(new Tokenizer())
          // group by the tuple field "0" and sum up tuple field "1"
          .groupBy(0)
          .sum(1);
          // emit result
          if (params.has("output")) {
             counts.writeAsCsv(params.get("output"), "\n", " ");
             // execute program
             env.execute("WordCount Example");
          } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             counts.print();
          }
       }
       
       // *************************************************************************
       // USER FUNCTIONS
       // *************************************************************************
       public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
             // normalize and split the line
             String[] tokens = value.toLowerCase().split("\\W+");
             // emit the pairs
             for (String token : tokens) {
                if (token.length() > 0) {
                   out.collect(new Tuple2<>(token, 1));
                }
             }
          }
       }
    }

В этом примере в качестве входных данных используется текстовый файл, подсчитывается, сколько раз слово появляется в документе, и результаты записываются в выходной файл.

Я создаю свой образ работы, используя следующий файл Dockerfile:

Файл Docker

FROM flink:1.13.0-scala_2.11
WORKDIR /opt/flink/usrlib
# Create Directory for Input/Output
RUN mkdir /opt/flink/resources
COPY target/wordcount-0.0.1-SNAPSHOT.jar /opt/flink/usrlib/wordcount.jar

Тогда yaml для моей работы выглядит следующим образом:

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: docker/wordcount:latest
          imagePullPolicy: Never
          env:
          #command: ["ls"]
          args: ["standalone-job", "--job-classname", "main.java.spendreport.WordCountProg", "-input", "/opt/flink/resources/READ.txt", "-output", "/opt/flink/resources/results.txt"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          #args: ["standalone-job", "--job-classname", "org.sense.flink.examples.stream.tpch.TPCHQuery03"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: job-artifacts-volume
              mountPath: /opt/flink/resources
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: job-artifacts-volume
          hostPath:
          # directory location on host
            path: /Users/my-user/Documents/semafor/apache_flink/PV

Цель состоит в том, чтобы смонтировать /Users/my-user/Documents/semafor/apache_flink/PV, где есть файл READ.txt, в модуль, который служит входными данными для задания. Но когда задание пытается выполнить, я получаю следующую ошибку:

java.io.FileNotFoundException: File /opt/flink/resources/READ.txt does not exist or the user running Flink ('flink') has insufficient permissions to access it.

Я пытался запустить:

sudo chown -R 9999:9999 /Users/my-user/Documents/semafor/apache_flink/PV 

Также запустил chmod 777... но я получаю ту же ошибку.

Я также попытался скопировать банку туда, где находится файл READ.txt: /Users/my-user/Documents/semafor/apache_flink/PV в моем локальном каталоге и смонтировать его в /opt/flink/usrlib вместо этого, но потом я получил:

org.apache.flink.util.FlinkException: Could not find the provided job class (main.java.spendreport.WordCountProg) in the user lib directory (/opt/flink/usrlib).

У меня нет такого опыта работы с Kubernetes или Flink, поэтому я не уверен, неправильно ли я монтирую или делаю что-то не так. Если у вас есть какие-либо предложения, пожалуйста, lmk. Заранее спасибо.


person p192    schedule 02.06.2021    source источник
comment
Попробуйте создать каталог /tmp/flink на хосте и смонтировать его в своем модуле. Если это работает, то это проблема разрешения   -  person Rakesh Gupta    schedule 02.06.2021
comment
@RakeshGupta Я пробовал, но все равно получаю: java.io.FileNotFoundException: файл /tmp/resources/READ.txt не существует или у пользователя, запускающего Flink («flink»), недостаточно прав для доступа к нему. Как будто hostPath вообще не смонтирован.   -  person p192    schedule 03.06.2021
comment
Где вы создаете хост-том? Главный узел?   -  person Rakesh Gupta    schedule 03.06.2021
comment
Я использую minikube на своей локальной машине, так что да. Я также пробовал PV, но та же проблема. Возможно, я не понимаю, как они работают. Если я смонтирую каталог на моем локальном компьютере в качестве PV, я ожидал, что файлы, находящиеся в этом каталоге, будут доступны модулям, имеющим заявку на этот PV, и какие бы файлы модули ни размещали на этом PV, я должен это увидеть. на моем местном. Похоже, это не так.   -  person p192    schedule 03.06.2021
comment
Если вы используете minikube, вам необходимо сначала смонтировать том с помощью minikube mount /Users/my-user/Documents/semafor/apache_flink/PV:/tmp/PV. Затем используйте /tmp/PV в конфигурации hostPath в разделе томов. См. эти темы. : stackoverflow.com/questions/38682114/ stackoverflow.com/questions/60479594/   -  person Rakesh Gupta    schedule 03.06.2021
comment
В этом была проблема. Теперь я понимаю, что Minikube создает кластер на виртуальной машине, и такая виртуальная машина не имеет доступа к моим локальным каталогам, если я их не смонтирую. Спасибо за вашу помощь @RakeshGupta. Не стесняйтесь добавлять свой комментарий выше в качестве ответа на этот вопрос, и я приму его как решение.   -  person p192    schedule 03.06.2021
comment
Рад, что это сработало для вас. Я добавил ответ.   -  person Rakesh Gupta    schedule 03.06.2021


Ответы (1)


Если вы используете minikube, вам нужно сначала смонтировать том, используя

minikube mount /Users/my-user/Documents/semafor/apache_flink/PV:/tmp/PV 

Затем используйте /tmp/PV в конфигурации hostPath в разделе томов.

Обратитесь к этим темам: Разрешения на запись тома Minikube?

HostPath с minikube — Kubernetes

person Rakesh Gupta    schedule 03.06.2021