diff --git a/.gitignore b/.gitignore
index 2edcb90..00cb95a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,3 +33,6 @@ dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar
+
+# This project
+*.csv
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..541f1d5
--- /dev/null
+++ b/README.md
@@ -0,0 +1,24 @@
+
+# Usage
+
+Compile job data
+
+```shell
+mvn package
+```
+
+Generate vectors to cluster
+
+```shell
+./genVectors.py $DIMENSION $NUMBER > $FILE
+```
+
+(example: `./genVectors.py 2 15 > myInput.csv`)
+
+
+
+Run
+
+```shell
+flink run target/project-*.jar --input $INPUT --output $OUTPUT
+```
diff --git a/genVectors.py b/genVectors.py
new file mode 100755
index 0000000..b31a919
--- /dev/null
+++ b/genVectors.py
@@ -0,0 +1,12 @@
+#!/usr/bin/env python3
+
+import random
+import sys
+
+random.seed(0)
+
+D = int(sys.argv[1]) # Number of dimensions
+N = int(sys.argv[2]) # Number of vectors
+
+for _ in range(N):
+ print(','.join([str(random.random()) for _ in range(D)]))
diff --git a/pom.xml b/pom.xml
index 1088bfe..3388876 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,7 +144,7 @@ under the License.
- it.polimi.middleware.projects.flink.StreamingJob
+ it.polimi.middleware.projects.flink.KMeans
diff --git a/src/main/java/it/polimi/middleware/projects/flink/BatchJob.java b/src/main/java/it/polimi/middleware/projects/flink/BatchJob.java
deleted file mode 100644
index 606742f..0000000
--- a/src/main/java/it/polimi/middleware/projects/flink/BatchJob.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package it.polimi.middleware.projects.flink;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Skeleton for a Flink Batch Job.
- *
- *
For a tutorial how to write a Flink batch application, check the
- * tutorials and examples on the Flink Website.
- *
- *
To package your application into a JAR file for execution,
- * change the main class in the POM.xml file to this class (simply search for 'mainClass')
- * and run 'mvn clean package' on the command line.
- */
-public class BatchJob {
-
- public static void main(String[] args) throws Exception {
- // set up the batch execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- /*
- * Here, you can start creating your execution plan for Flink.
- *
- * Start with getting some data from the environment, like
- * env.readTextFile(textPath);
- *
- * then, transform the resulting DataSet using operations
- * like
- * .filter()
- * .flatMap()
- * .join()
- * .coGroup()
- *
- * and many more.
- * Have a look at the programming guide for the Java API:
- *
- * http://flink.apache.org/docs/latest/apis/batch/index.html
- *
- * and the examples
- *
- * http://flink.apache.org/docs/latest/apis/batch/examples.html
- *
- */
-
- // execute program
- env.execute("Flink Batch Java API Skeleton");
- }
-}
diff --git a/src/main/java/it/polimi/middleware/projects/flink/KMeans.java b/src/main/java/it/polimi/middleware/projects/flink/KMeans.java
new file mode 100644
index 0000000..3e70451
--- /dev/null
+++ b/src/main/java/it/polimi/middleware/projects/flink/KMeans.java
@@ -0,0 +1,50 @@
+package it.polimi.middleware.projects.flink;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+
+public class KMeans {
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool params = ParameterTool.fromArgs(args);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // Read CSV input
+ DataSet> csvInput = env.readCsvFile(params.get("input")).types(Double.class);
+
+ // Convert CSV to internal format
+ DataSet input = csvInput
+ .map(point -> point.f0);
+
+ // DEBUG Means all the points
+ DataSet> mean = input
+ .map(new MapFunction>() {
+ public Tuple2 map(Double value) {
+ return new Tuple2(value, 1);
+ }
+ })
+ .reduce(new ReduceFunction>() {
+ public Tuple2 reduce(Tuple2 a, Tuple2 b) {
+ return new Tuple2(a.f0 + b.f0, a.f1 + b.f1);
+ }
+ })
+ .map(new MapFunction, Tuple1>() {
+ public Tuple1 map(Tuple2 value) {
+ return new Tuple1(value.f0 / value.f1);
+ }
+ });
+
+ mean.writeAsCsv(params.get("output", "output.csv"));
+
+ env.execute("K-Means clustering");
+ }
+}
diff --git a/src/main/java/it/polimi/middleware/projects/flink/StreamingJob.java b/src/main/java/it/polimi/middleware/projects/flink/StreamingJob.java
deleted file mode 100644
index 55b6a84..0000000
--- a/src/main/java/it/polimi/middleware/projects/flink/StreamingJob.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package it.polimi.middleware.projects.flink;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Skeleton for a Flink Streaming Job.
- *
- * For a tutorial how to write a Flink streaming application, check the
- * tutorials and examples on the Flink Website.
- *
- *
To package your application into a JAR file for execution, run
- * 'mvn clean package' on the command line.
- *
- *
If you change the name of the main class (with the public static void main(String[] args))
- * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
- */
-public class StreamingJob {
-
- public static void main(String[] args) throws Exception {
- // set up the streaming execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- /*
- * Here, you can start creating your execution plan for Flink.
- *
- * Start with getting some data from the environment, like
- * env.readTextFile(textPath);
- *
- * then, transform the resulting DataStream using operations
- * like
- * .filter()
- * .flatMap()
- * .join()
- * .coGroup()
- *
- * and many more.
- * Have a look at the programming guide for the Java API:
- *
- * http://flink.apache.org/docs/latest/apis/streaming/index.html
- *
- */
-
- // execute program
- env.execute("Flink Streaming Java API Skeleton");
- }
-}