Login With Github

How To Build Your First Flink App In 5 Minutes

You'll learn how to build your first Flink application quickly from scratch in this article.

Prepare for The Development Environment

Flink can run on Linux, Max OS X, or Windows. In order to develop Flink applications, Java 8.x and maven environments are required on the local machine.

If you have gotten a Java 8 environment, it will output the following version information when running the following command:

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

If you have gotten a maven environment, it will output the following version information when running the following command:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"

In addition, we recommend using ItelliJ IDEA (Community Free Edition is sufficient) as the development IDE for Flink applications. Eclipse is fine, but there are some known issues if using Eclipse for the Scala and Java hybrid projects, so Eclipse is not recommended. In the next chapter, we will show you how to create a Flink project and import it into ItelliJ IDEA.

Create a Maven Project

We'll use Flink Maven Archetype to create our project structure and some initial default dependencies. Run the following command to create the project in your working directory:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=my-flink-project \
    -DartifactId=my-flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false

You can edit the groupId, artifactId, package above to your preferred path. Maven will use the above parameters to create the project structure automatically for you, which is as shown below:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

Our pom.xml file already contains the required Flink dependencies, and there are several sample program frameworks under src/main/java. Next we'll start writing the first Flink program.

Write a Flink Program

Startup IntelliJ IDEA, select Import Project, and select pom.xml under the root of my-flink-project. Then complete the project import according to the guidance.

Create a SocketWindowWordCount.java file under src/main/java/myflink:

package myflink;
public class SocketWindowWordCount {
  public static void main(String[] args) throws Exception {
  }
}

Though the program is still very basic now, we will fill in the code step by step. Note that we won't write the import statements below because the IDE will add them automatically. At the end of the section, I will show you the complete code. You can paste the last complete code into the editor directly if you want to skip the following steps.

The first step for the Flink program is to create a StreamExecutionEnvironment. It is an entry class that can be used to set parameters and to create data sources and submit tasks. So let's add it to the main function:

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

Then we'll create a data source that reads data from the socket of local port number 9000:

DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

Now we've created the DataStream of the string type. DataStream is the core API for stream processing in Flink, and it defines a lot of common operations (such as filtering, transformation, aggregation, window, association, etc.). In this example, what we are interested in is the number of times each word appears in a particular time window, such as a 5 second window. So we first need to parse the string data into words and numbers of times (using Tuple2<String, Integer>). The first field is the word and the second field is the number of times. The initial value of the number of times is set to 1. We implement a flatmap for parsing because there may be multiple words in a single row of data.

DataStream<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        });

Then we group the data stream according to the word field (index field 0). Here we can simply use the keyBy(int index) method to get a Tuple2<String, Integer> data stream with the word as the key. Then we can specify the desired window on the stream and calculate the result based on the data in the window. In our case, we want to aggregate the numbers of the words every 5 seconds, and each window is counted from zero:

DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

The second call to .timeWindow() specifies that we want a 5 second Tumbling window. The third call specifies the sum aggregate function for each window of each key, which in our case is added by the number of times field (index field 1). The result data stream we get will be output every 5 seconds for the number of occurrences of each word within 5 seconds.

Now print the data stream to the console and start to execute the command:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

The final env.execute call is necessary for starting up the actual Flink operation. All operator operations (such as creating sources, aggregating, printing) are just creating graphics that build internal operator operations. Only when execute() is called will it be committed the cluster or executed on the local machine.

Below is the complete code, some of which are simplified (the code is also available on GitHub):

package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
  public static void main(String[] args) throws Exception {
    // Create the execution environment.
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Get the input data by connecting the socket. Here it is connected to the local port 9000. If the port 9000 has been already occupied, change to another port.
    DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
    // Parse the data, and group, windowing and aggregate it by word.
    DataStream<Tuple2<String, Integer>> windowCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        })
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);
    // Print the results to the console, note that here it uses the single-threaded printing instead of multi-threading
    windowCounts.print().setParallelism(1);
    env.execute("Socket Window WordCount");
  }
}

Run The Program

To run the sample program, let's startup netcat on the terminal to get the input stream first:

nc -lk 9000

For Windows platforms, you can install ncat through https://nmap.org/ncat/ and then run:

ncat -lk 9000

Run the main method of SocketWindowWordCount directly.

Now you just need to type the word in the netcat console, and you can see the word frequency statistics for each word in the output console of SocketWindowWordCount. And if you want to see a count greater than 1, type the same word repeatedly in 5 seconds.

Cheers!

0 Comment

temp