package it.polimi.middleware.projects.flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.utils.ParameterTool; public class KMeans { public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Read CSV input DataSet> csvInput = env.readCsvFile(params.get("input")).types(Double.class); // Convert CSV to internal format DataSet input = csvInput .map(point -> point.f0); // DEBUG Means all the points DataSet> mean = input .map(new MapFunction>() { public Tuple2 map(Double value) { return new Tuple2(value, 1); } }) .reduce(new ReduceFunction>() { public Tuple2 reduce(Tuple2 a, Tuple2 b) { return new Tuple2(a.f0 + b.f0, a.f1 + b.f1); } }) .map(new MapFunction, Tuple1>() { public Tuple1 map(Tuple2 value) { return new Tuple1(value.f0 / value.f1); } }); mean.writeAsCsv(params.get("output", "output.csv")); env.execute("K-Means clustering"); } }