发布时间:2023-02-20 11:00
引入pom.xml
org.apache.maven.plugins
maven-compiler-plugin
8
1.13.0
1.8
2.12
1.7.30
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
org.apache.flink
flink-cep_${scala.binary.version}
${flink.version}
org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
org.slf4j
slf4j-api
${slf4j.version}
org.slf4j
slf4j-log4j12
${slf4j.version}
org.apache.flink
flink-table-api-java-bridge_${scala.binary.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
org.apache.logging.log4j
log4j-to-slf4j
2.14.0
log4j.properties
log4j.rootLogger=ERROR, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
nihao nihao
jiushi nihao
public class FlinkSoctet {
public static void main(String[] args) throws Exception {
//得到执行环境对象
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource initData = env.readTextFile("input/wc.txt");
//使用内部内的优点是不用考虑类型擦除的问题
MapOperator> mapValue = initData.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String textItem, Collector out) throws Exception {
String[] resItem = textItem.split(" ");
for (String s : resItem) {
out.collect(s);
}
}
}).map(new MapFunction>() {
@Override
public Tuple2 map(String item) throws Exception {
return Tuple2.of(item, 1);
}
});
//mapValue得到的数据是(key,value)元组类型,0表示key的位置,1表示value的位置
//下面就是用key进行分组,用value进行求和
mapValue.groupBy(0).sum(1).print();
}
}
(nihao,3)
(jiushi,1)
nihao nihao
jiushi nihao
public class FlinkSoctet {
public static void main(String[] args) throws Exception {
//得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource initData = env.readTextFile("input/wc.txt");
SingleOutputStreamOperator> map = initData.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String item, Collector out) throws Exception {
String[] resItem = item.split(" ");
for (String s : resItem) {
out.collect(s);
}
}
}).map(new MapFunction>() {
@Override
public Tuple2 map(String item) throws Exception {
return Tuple2.of(item, 1);
}
});
//对于得到的元组的流数据,进行分组聚合
map.keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
}).sum(1).print();
//由于是流处理程序,所以这里要不断的执行
env.execute();
}
}
前面的数字是分配到的Task的编号,可以看到key相同的数据到了一个Task里面执行,比如nihao都在1线程里面处理
2> (jiushi,1)
1> (nihao,1)
1> (nihao,2)
1> (nihao,3)
在发送数据的linux上面执行
nc -lk 9997
public class FlinkSoctet {
public static void main(String[] args) throws Exception {
//得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource initData = env.socketTextStream("master",9997);
SingleOutputStreamOperator> map = initData.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String item, Collector out) throws Exception {
String[] resItem = item.split(" ");
for (String s : resItem) {
out.collect(s);
}
}
}).map(new MapFunction>() {
@Override
public Tuple2 map(String item) throws Exception {
return Tuple2.of(item, 1);
}
});
//对于得到的元组的流数据,进行分组聚合
map.keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
}).sum(1).print();
//由于是流处理程序,所以这里要不断的执行
env.execute();
}
}
输入数据
q v d s fd a q
9> (d,1)
8> (fd,1)
4> (q,1)
9> (s,1)
4> (q,2)
11> (a,1)
3> (v,1)
Apache Flink: Downloads
集群规划
master | node1 | node2 |
Jobmanager,TaskManager | TaskManager | TaskManager |
解压
tar -zxvf flink-1.13.2-bin-scala_2.12.tgz
修改配置文件
flink-conf.yaml
jobmanager.rpc.address: master
masters
master:8081
workers
master
node1
node2
分发到集群其他机器
./xsync /home/bigdata/congxueflink
启动集群(在bin目录里面)
./start-cluster.sh
访问http://master:8081/
关闭
./stop-cluster.sh
加入打包插件
org.apache.maven.plugins
maven-compiler-plugin
8
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
编写一个监听socket的程序
public class FlinkSoctet {
public static void main(String[] args) throws Exception {
//得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource initData = env.socketTextStream("master",9997);
SingleOutputStreamOperator> map = initData.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String item, Collector out) throws Exception {
String[] resItem = item.split(" ");
for (String s : resItem) {
out.collect(s);
}
}
}).map(new MapFunction>() {
@Override
public Tuple2 map(String item) throws Exception {
return Tuple2.of(item, 1);
}
});
//对于得到的元组的流数据,进行分组聚合
map.keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
}).sum(1).print();
//由于是流处理程序,所以这里要不断的执行
env.execute();
}
}
然后打包
提交成功并执行
-m是jobmanager开放提交的地址
-c是提交jar的启动类
-p是并行度
最后的jar就是自己打包的jar
./flink run -m master:8081 -c com.chongxue.flink.FlinkSoctet -p 2 flink-1.0-SNAPSHOT.jar
停止任务(后面的是job的id)
./flink cancel d9e4f4dcb0516551d6611675c16113bb