How To Build Your First Flink App In 5 Minutes
- ·
- 18 Jan 2019
- ·
- ·
- 6171 Views
You'll learn how to build your first Flink application quickly from scratch in this article.
Prepare for The Development Environment
Flink can run on Linux, Max OS X, or Windows. In order to develop Flink applications, Java 8.x and maven environments are required on the local machine.
If you have gotten a Java 8 environment, it will output the following version information when running the following command:
$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
If you have gotten a maven environment, it will output the following version information when running the following command:
$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
In addition, we recommend using ItelliJ IDEA (Community Free Edition is sufficient) as the development IDE for Flink applications. Eclipse is fine, but there are some known issues if using Eclipse for the Scala and Java hybrid projects, so Eclipse is not recommended. In the next chapter, we will show you how to create a Flink project and import it into ItelliJ IDEA.
Create a Maven Project
We'll use Flink Maven Archetype to create our project structure and some initial default dependencies. Run the following command to create the project in your working directory:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.1 \
-DgroupId=my-flink-project \
-DartifactId=my-flink-project \
-Dversion=0.1 \
-Dpackage=myflink \
-DinteractiveMode=false
You can edit the groupId, artifactId, package above to your preferred path. Maven will use the above parameters to create the project structure automatically for you, which is as shown below:
$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
└── main
├── java
│ └── myflink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
Our pom.xml
file already contains the required Flink dependencies, and there are several sample program frameworks under src/main/java
. Next we'll start writing the first Flink program.
Write a Flink Program
Startup IntelliJ IDEA, select Import Project, and select pom.xml
under the root of my-flink-project
. Then complete the project import according to the guidance.
Create a SocketWindowWordCount.java
file under src/main/java/myflink
:
package myflink;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
}
}
Though the program is still very basic now, we will fill in the code step by step. Note that we won't write the import statements below because the IDE will add them automatically. At the end of the section, I will show you the complete code. You can paste the last complete code into the editor directly if you want to skip the following steps.
The first step for the Flink program is to create a StreamExecutionEnvironment
. It is an entry class that can be used to set parameters and to create data sources and submit tasks. So let's add it to the main function:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Then we'll create a data source that reads data from the socket of local port number 9000:
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
Now we've created the DataStream
of the string type. DataStream
is the core API for stream processing in Flink, and it defines a lot of common operations (such as filtering, transformation, aggregation, window, association, etc.). In this example, what we are interested in is the number of times each word appears in a particular time window, such as a 5 second window. So we first need to parse the string data into words and numbers of times (using Tuple2<String, Integer>
). The first field is the word and the second field is the number of times. The initial value of the number of times is set to 1. We implement a flatmap
for parsing because there may be multiple words in a single row of data.
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
Then we group the data stream according to the word field (index field 0). Here we can simply use the keyBy(int index)
method to get a Tuple2<String, Integer>
data stream with the word as the key. Then we can specify the desired window on the stream and calculate the result based on the data in the window. In our case, we want to aggregate the numbers of the words every 5 seconds, and each window is counted from zero:
DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
The second call to .timeWindow()
specifies that we want a 5 second Tumbling window. The third call specifies the sum
aggregate function for each window of each key, which in our case is added by the number of times field (index field 1). The result data stream we get will be output every 5 seconds for the number of occurrences of each word within 5 seconds.
Now print the data stream to the console and start to execute the command:
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
The final env.execute
call is necessary for starting up the actual Flink operation. All operator operations (such as creating sources, aggregating, printing) are just creating graphics that build internal operator operations. Only when execute()
is called will it be committed the cluster or executed on the local machine.
Below is the complete code, some of which are simplified (the code is also available on GitHub):
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// Create the execution environment.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get the input data by connecting the socket. Here it is connected to the local port 9000. If the port 9000 has been already occupied, change to another port.
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
// Parse the data, and group, windowing and aggregate it by word.
DataStream<Tuple2<String, Integer>> windowCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// Print the results to the console, note that here it uses the single-threaded printing instead of multi-threading
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
Run The Program
To run the sample program, let's startup netcat on the terminal to get the input stream first:
nc -lk 9000
For Windows platforms, you can install ncat through https://nmap
ncat -lk 9000
Run the main method of SocketWindowWordCount
directly.
Now you just need to type the word in the netcat console, and you can see the word frequency statistics for each word in the output console of SocketWindowWordCount
. And if you want to see a count greater than 1, type the same word repeatedly in 5 seconds.
Cheers!
0 Comment
Login to post a comment
Login With Github