Apache Flink - Pasang Volume ke Job Pod

Saya menggunakan WordCountProg dari tutorial di https://www.tutorialspoint.com/apache_flink/apache_flink_creating_application.htm . Kodenya adalah sebagai berikut:

WordCountProg.java

package main.java.spendreport;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

public class WordCountProg {

       // *************************************************************************
       // PROGRAM
       // *************************************************************************
       public static void main(String[] args) throws Exception {
          final ParameterTool params = ParameterTool.fromArgs(args);
          // set up the execution environment
          final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          // make parameters available in the web interface
          env.getConfig().setGlobalJobParameters(params);
          // get input data
          DataSet<String> text = env.readTextFile(params.get("input"));
          DataSet<Tuple2<String, Integer>> counts =
          // split up the lines in pairs (2-tuples) containing: (word,1)
          text.flatMap(new Tokenizer())
          // group by the tuple field "0" and sum up tuple field "1"
          .groupBy(0)
          .sum(1);
          // emit result
          if (params.has("output")) {
             counts.writeAsCsv(params.get("output"), "\n", " ");
             // execute program
             env.execute("WordCount Example");
          } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             counts.print();
          }
       }
       
       // *************************************************************************
       // USER FUNCTIONS
       // *************************************************************************
       public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
             // normalize and split the line
             String[] tokens = value.toLowerCase().split("\\W+");
             // emit the pairs
             for (String token : tokens) {
                if (token.length() > 0) {
                   out.collect(new Tuple2<>(token, 1));
                }
             }
          }
       }
    }

Contoh ini menggunakan file teks sebagai masukan, menghitung berapa kali sebuah kata muncul di dokumen, dan menulis hasilnya ke file keluaran.

Saya membuat Gambar Pekerjaan saya menggunakan Dockerfile berikut:

Dockerfile

FROM flink:1.13.0-scala_2.11
WORKDIR /opt/flink/usrlib
# Create Directory for Input/Output
RUN mkdir /opt/flink/resources
COPY target/wordcount-0.0.1-SNAPSHOT.jar /opt/flink/usrlib/wordcount.jar

Kemudian yaml untuk pekerjaan saya terlihat seperti berikut:

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: docker/wordcount:latest
          imagePullPolicy: Never
          env:
          #command: ["ls"]
          args: ["standalone-job", "--job-classname", "main.java.spendreport.WordCountProg", "-input", "/opt/flink/resources/READ.txt", "-output", "/opt/flink/resources/results.txt"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          #args: ["standalone-job", "--job-classname", "org.sense.flink.examples.stream.tpch.TPCHQuery03"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: job-artifacts-volume
              mountPath: /opt/flink/resources
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: job-artifacts-volume
          hostPath:
          # directory location on host
            path: /Users/my-user/Documents/semafor/apache_flink/PV

Tujuannya adalah untuk memasang /Users/my-user/Documents/semafor/apache_flink/PV di mana terdapat file READ.txt ke dalam pod yang berfungsi sebagai input untuk pekerjaan tersebut. Namun ketika pekerjaan mencoba dijalankan, saya mendapatkan kesalahan berikut:

java.io.FileNotFoundException: File /opt/flink/resources/READ.txt does not exist or the user running Flink ('flink') has insufficient permissions to access it.

Saya telah mencoba menjalankan:

sudo chown -R 9999:9999 /Users/my-user/Documents/semafor/apache_flink/PV 

Juga menjalankan chmod 777... tapi saya mendapatkan kesalahan yang sama.

Saya juga mencoba menyalin toples ke tempat file READ.txt berada: /Users/my-user/Documents/semafor/apache_flink/PV di direktori lokal saya dan memasangnya ke /opt/flink/usrlib sebagai gantinya, tapi kemudian saya mendapat:

org.apache.flink.util.FlinkException: Could not find the provided job class (main.java.spendreport.WordCountProg) in the user lib directory (/opt/flink/usrlib).

Saya tidak begitu berpengalaman dengan Kubernetes atau Flink, jadi saya tidak yakin apakah saya melakukan pemasangan yang salah atau melakukan sesuatu yang salah. Kalau ada saran silahkan lmk. Terima kasih sebelumnya.


person p192    schedule 02.06.2021    source sumber
comment
Coba buat dir /tmp/flink pada host dan pasang di pod Anda. Jika berhasil, maka itu adalah masalah izin   -  person Rakesh Gupta    schedule 02.06.2021
comment
@RakeshGupta Saya mencobanya, tetapi saya masih mendapatkan: java.io.FileNotFoundException: File /tmp/resources/READ.txt tidak ada atau pengguna yang menjalankan Flink ('flink') tidak memiliki izin yang cukup untuk mengaksesnya. Sepertinya hostPath tidak dipasang sama sekali.   -  person p192    schedule 03.06.2021
comment
Di mana Anda membuat volume host? simpul utama?   -  person Rakesh Gupta    schedule 03.06.2021
comment
Saya menggunakan minikube di mesin lokal saya, jadi ya. Saya juga sudah mencoba PV tetapi masalahnya sama. Mungkin saya tidak memahami cara kerjanya. Jika saya memasang direktori di lokal saya sebagai PV, saya berharap file yang ada di direktori itu dapat diakses oleh pod yang memiliki klaim atas PV tersebut, dan file apa pun yang ditempatkan pod di PV itu, saya akan melihatnya di lokal saya. Tampaknya hal ini tidak terjadi.   -  person p192    schedule 03.06.2021
comment
Jika menggunakan minikube, Anda harus memasang volume terlebih dahulu menggunakan minikube mount /Users/my-user/Documents/semafor/apache_flink/PV:/tmp/PV Kemudian gunakan /tmp/PV di konfigurasi hostPath Anda di bagian volume Lihat thread ini : stackoverflow.com/questions/38682114/ stackoverflow.com/questions/60479594/   -  person Rakesh Gupta    schedule 03.06.2021
comment
Itulah masalahnya. Saya sekarang menyadari bahwa Minikube membuat cluster pada VM, dan VM tersebut tidak memiliki akses ke direktori lokal saya kecuali saya memasangnya. Terima kasih atas bantuan Anda @RakeshGupta. Jangan ragu untuk menambahkan komentar Anda di atas sebagai jawaban atas pertanyaan ini dan saya akan menerimanya sebagai solusinya.   -  person p192    schedule 03.06.2021
comment
Senang itu berhasil untuk Anda. Saya telah menambahkan jawabannya.   -  person Rakesh Gupta    schedule 03.06.2021


Jawaban (1)


Jika menggunakan minikube Anda harus memasang volume terlebih dahulu menggunakan

minikube mount /Users/my-user/Documents/semafor/apache_flink/PV:/tmp/PV 

Kemudian gunakan /tmp/PV dalam konfigurasi hostPath Anda di bagian volume

Lihat rangkaian pesan berikut: Izin menulis volume minikube?

HostPath dengan minikube - Kubernetes

person Rakesh Gupta    schedule 03.06.2021