apache flink 入门
配置环境<br/> 包括<br/> JAVA_HOME<br/> jobmanager.rpc.address<br/> jobmanager.heap.mb 和 taskmanager.heap.mb<br/> taskmanager.numberOfTaskSlots<br/> taskmanager.tmp.dirs<br/> slaves文件
启动关闭
bin/start-cluster.sh
bin/stop-cluster.sh
初步使用
public static void main(String[] args) throws Exception {
if (args.length != 2){<br/>
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");<br/>
return;<br/>
}
String hostName = args[0];<br/>
Integer port = Integer.parseInt(args[1]);
// set up the execution environment<br/>
final StreamExecutionEnvironment env = StreamExecutionEnvironment<br/>
.getExecutionEnvironment();
// get input data<br/>
DataStream<String> text = env.socketTextStream(hostName, port);
DataStream<Tuple2<String, Integer>> counts =<br/>
// split up the lines in pairs (2-tuples) containing: (word,1)<br/>
text.flatMap(new LineSplitter())<br/>
// group by the tuple field "0" and sum up tuple field "1"<br/>
.keyBy(0)<br/>
.sum(1);
counts.print();
// execute program<br/>
env.execute("WordCount from SocketTextStream Example");<br/>
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override<br/>
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {<br/>
// normalize and split the line<br/>
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs<br/>
for (String token : tokens) {<br/>
if (token.length() > 0) {<br/>
out.collect(new Tuple2<String, Integer>(token, 1));<br/>
}<br/>
}<br/>
}<br/>
}
编程步骤,和spark很类似<br/> Obtain an execution environment,<br/> Load/create the initial data,<br/> Specify transformations on this data,<br/> Specify where to put the results of your computations,<br/> Trigger the program execution
连接flink的接口 StreamExecutionEnvironment<br/>
getExecutionEnvironment()<br/>
createLocalEnvironment()<br/>
createRemoteEnvironment(String host, int port, String... jarFiles)
Accumulators & Counters 用于求和和计数<br/>
步骤包括定义,添加到上下文,操作,最后获取<br/>
private IntCounter numLines = new IntCounter();<br/>
getRuntimeContext().addAccumulator("num-lines", this.numLines);<br/>
this.numLines.add(1);<br/>
myJobExecutionResult=env.execute("xxx");<br/>
myJobExecutionResult.getAccumulatorResult("num-lines")
并发数设置<br/>
System Level:<br/>
parallelism.default=10<br/>
Client Level:<br/>
./bin/flink run -p 10 example.jar<br/>
client.run(program, 10, true);
Execution Environment Level:<br/>
env.setParallelism(3);
Operator Level:<br/>
DataStream<Tuple2<String, Integer>> wordCounts = text<br/>
.flatMap(new LineSplitter())<br/>
.keyBy(0)<br/>
.timeWindow(Time.seconds(5))<br/>
.sum(1).setParallelism(5);
最后上架构图和执行流程图,看起来和spark很类似

转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢
