配置环境<br/>
包括<br/>
JAVA_HOME<br/>
jobmanager.rpc.address<br/>
jobmanager.heap.mb 和 taskmanager.heap.mb<br/>
taskmanager.numberOfTaskSlots<br/>
taskmanager.tmp.dirs<br/>
slaves文件

启动关闭
bin/start-cluster.sh
bin/stop-cluster.sh

 
初步使用

    public static void main(String[] args) throws Exception {

        if (args.length != 2){<br/>
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");<br/>
            return;<br/>
        }

        String hostName = args[0];<br/>
        Integer port = Integer.parseInt(args[1]);

        // set up the execution environment<br/>
        final StreamExecutionEnvironment env = StreamExecutionEnvironment<br/>
                .getExecutionEnvironment();

        // get input data<br/>
        DataStream<String> text = env.socketTextStream(hostName, port);

        DataStream<Tuple2<String, Integer>> counts =<br/>
        // split up the lines in pairs (2-tuples) containing: (word,1)<br/>
        text.flatMap(new LineSplitter())<br/>
        // group by the tuple field "0" and sum up tuple field "1"<br/>
                .keyBy(0)<br/>
                .sum(1);

        counts.print();

        // execute program<br/>
        env.execute("WordCount from SocketTextStream Example");<br/>
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override<br/>
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {<br/>
            // normalize and split the line<br/>
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs<br/>
            for (String token : tokens) {<br/>
                if (token.length() > 0) {<br/>
                    out.collect(new Tuple2<String, Integer>(token, 1));<br/>
                }<br/>
            }<br/>
        }<br/>
    }    
编程步骤,和spark很类似<br/>
Obtain an execution environment,<br/>
Load/create the initial data,<br/>
Specify transformations on this data,<br/>
Specify where to put the results of your computations,<br/>
Trigger the program execution
连接flink的接口 StreamExecutionEnvironment<br/>
getExecutionEnvironment()<br/>
createLocalEnvironment()<br/>
createRemoteEnvironment(String host, int port, String... jarFiles)

Accumulators & Counters 用于求和和计数<br/>
步骤包括定义,添加到上下文,操作,最后获取<br/>
private IntCounter numLines = new IntCounter();<br/>
getRuntimeContext().addAccumulator("num-lines", this.numLines);<br/>
this.numLines.add(1);<br/>
myJobExecutionResult=env.execute("xxx");<br/>
myJobExecutionResult.getAccumulatorResult("num-lines")
并发数设置<br/>
System Level:<br/>
parallelism.default=10<br/>
Client Level:<br/>
./bin/flink run -p 10 example.jar<br/>
client.run(program, 10, true);

Execution Environment Level:<br/>
env.setParallelism(3);

Operator Level:<br/>
DataStream<Tuple2<String, Integer>> wordCounts = text<br/>
    .flatMap(new LineSplitter())<br/>
    .keyBy(0)<br/>
    .timeWindow(Time.seconds(5))<br/>
    .sum(1).setParallelism(5);

最后上架构图和执行流程图,看起来和spark很类似

apache flink 入门