Saya menggunakan WordCountProg dari tutorial di https://www.tutorialspoint.com/apache_flink/apache_flink_creating_application.htm . Kodenya adalah sebagai berikut:
WordCountProg.java
package main.java.spendreport;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCountProg {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
Contoh ini menggunakan file teks sebagai masukan, menghitung berapa kali sebuah kata muncul di dokumen, dan menulis hasilnya ke file keluaran.
Saya membuat Gambar Pekerjaan saya menggunakan Dockerfile berikut:
Dockerfile
FROM flink:1.13.0-scala_2.11
WORKDIR /opt/flink/usrlib
# Create Directory for Input/Output
RUN mkdir /opt/flink/resources
COPY target/wordcount-0.0.1-SNAPSHOT.jar /opt/flink/usrlib/wordcount.jar
Kemudian yaml untuk pekerjaan saya terlihat seperti berikut:
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: docker/wordcount:latest
imagePullPolicy: Never
env:
#command: ["ls"]
args: ["standalone-job", "--job-classname", "main.java.spendreport.WordCountProg", "-input", "/opt/flink/resources/READ.txt", "-output", "/opt/flink/resources/results.txt"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
#args: ["standalone-job", "--job-classname", "org.sense.flink.examples.stream.tpch.TPCHQuery03"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: job-artifacts-volume
mountPath: /opt/flink/resources
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
# directory location on host
path: /Users/my-user/Documents/semafor/apache_flink/PV
Tujuannya adalah untuk memasang /Users/my-user/Documents/semafor/apache_flink/PV di mana terdapat file READ.txt ke dalam pod yang berfungsi sebagai input untuk pekerjaan tersebut. Namun ketika pekerjaan mencoba dijalankan, saya mendapatkan kesalahan berikut:
java.io.FileNotFoundException: File /opt/flink/resources/READ.txt does not exist or the user running Flink ('flink') has insufficient permissions to access it.
Saya telah mencoba menjalankan:
sudo chown -R 9999:9999 /Users/my-user/Documents/semafor/apache_flink/PV
Juga menjalankan chmod 777... tapi saya mendapatkan kesalahan yang sama.
Saya juga mencoba menyalin toples ke tempat file READ.txt berada: /Users/my-user/Documents/semafor/apache_flink/PV di direktori lokal saya dan memasangnya ke /opt/flink/usrlib sebagai gantinya, tapi kemudian saya mendapat:
org.apache.flink.util.FlinkException: Could not find the provided job class (main.java.spendreport.WordCountProg) in the user lib directory (/opt/flink/usrlib).
Saya tidak begitu berpengalaman dengan Kubernetes atau Flink, jadi saya tidak yakin apakah saya melakukan pemasangan yang salah atau melakukan sesuatu yang salah. Kalau ada saran silahkan lmk. Terima kasih sebelumnya.