diff --git a/README.md b/README.md
index 541f1d5..78f85bf 100644
--- a/README.md
+++ b/README.md
@@ -1,24 +1,53 @@
+# K-Means clustering algorithm using Apache Flink
+
+Project for the Middleware Technologies for Distributed Systems.
+
+## Note
+
+- Only supports 2 dimensions points as input data
+- Non-deterministic. Only one starting point set is tried.
+- Case where a mean cannot be updated: it is discarded (the value of K asked is not the one in the results)
# Usage
-Compile job data
+## Compile job package
+
+You need Java ≥ 8 and Maven ≥ 3.1.
```shell
mvn package
```
-Generate vectors to cluster
+## Generate random vectors to cluster (optional)
+
+You need Python 3.
```shell
./genVectors.py $DIMENSION $NUMBER > $FILE
```
-(example: `./genVectors.py 2 15 > myInput.csv`)
+(example: `./genVectors.py 2 1000 > input.csv`)
+## Classify
-Run
+You need a running Apache Flink cluster
+
+Input data is a point per line, in the folowing format: `xCoords,yCoords`.
+Output data is a point per line, in the folowing format: `xCoords,yCoords,clusterIndex`.
```shell
-flink run target/project-*.jar --input $INPUT --output $OUTPUT
+flink run target/project-*.jar --input $INPUT --output $OUTPUT [--k $K] [--maxIterations $ITERATIONS]
```
+
+(example: `flink run target/project-1.0.jar --input $PWD/input.csv --output $PWD/output.csv --k 5`)
+
+## Show results
+
+You need Python 3, NumPy, Matplotlib.
+
+```shell
+./plotClassification.py $FILE
+```
+
+(example: `./plotClassification.py output.csv`)
diff --git a/pom.xml b/pom.xml
index 3388876..0bde46c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@ under the License.
it.polimi.middleware.projects.flink
project
- 1.0-SNAPSHOT
+ 1.0
jar
Flink Quickstart Job
diff --git a/src/main/java/it/polimi/middleware/projects/flink/KMeans.java b/src/main/java/it/polimi/middleware/projects/flink/KMeans.java
index b8d5121..8b29744 100644
--- a/src/main/java/it/polimi/middleware/projects/flink/KMeans.java
+++ b/src/main/java/it/polimi/middleware/projects/flink/KMeans.java
@@ -34,9 +34,9 @@ import org.apache.flink.api.java.utils.ParameterTool;
public class KMeans {
public static void main(String[] args) throws Exception {
- final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ final ParameterTool params = ParameterTool.fromArgs(args);
final Integer k = params.getInt("k", 3);
final Integer maxIterations = params.getInt("maxIterations", 25);
@@ -49,28 +49,18 @@ public class KMeans {
// Find min and max of the coordinates to determine where the initial centroids should be
DataSet> area = input
- .map(new MapFunction>() {
+ .map(new MapFunction>() { // Format points so
+ // they can be passed as reduce parameters
@Override
public Tuple4 map(Point point) {
return new Tuple4(point.x, point.y, point.x, point.y);
}
- }).reduce(new FindArea());
-
- area.print();
-
- DataSet> testCentroids = area
- .flatMap(new RandomCentroids(k))
- .map(new MapFunction>() {
- @Override
- public Tuple2 map(Point point) {
- return new Tuple2(point.x, point.y);
- }});
- testCentroids.print();
+ }).reduce(new FindArea()); // Gives the minX, minY, maxX, maxY of all the point
// Generate random centroids
IterativeDataSet centroids = area
- .flatMap(new RandomCentroids(k))
- .iterate(maxIterations);
+ .flatMap(new RandomCentroids(k)) // Create centroids randomly in the area of the points
+ .iterate(maxIterations); // Mark beginning of the loop
// Assign points to centroids
DataSet> assigned = input
@@ -78,18 +68,18 @@ public class KMeans {
// Calculate means
DataSet newCentroids = assigned
- .map(new MeanPrepare())
+ .map(new MeanPrepare()) // Add Integer field to tuple to count the points
.groupBy(1) // GroupBy CentroidID
- .reduce(new MeanSum())
- .map(new MeanDivide());
+ .reduce(new MeanSum()) // Sum every points by centroid
+ .map(new MeanDivide()); // Divide by the number of points to get the average
- DataSet finalCentroids = centroids.closeWith(newCentroids);
+ DataSet finalCentroids = centroids.closeWith(newCentroids); // Mark end of the loop
- // Final assignment of points to centroids
+ // Final assignment of points to centroids (that's the data we want)
assigned = input
.map(new AssignCentroid()).withBroadcastSet(finalCentroids, "centroids");
- // Convert to external format
+ // Convert to CSV format
DataSet> output = assigned
.map(new MapFunction, Tuple3>() {
@Override
@@ -128,6 +118,7 @@ public class KMeans {
}
public Point divideBy(Integer factor) {
+ // Since input is always re-fetched we can overwrite the values
x /= factor;
y /= factor;
return this;
@@ -148,12 +139,13 @@ public class KMeans {
}
public static class RandomCentroids implements FlatMapFunction, Point> {
+ // minX, minY, maxX, maxY → Point × k
Integer k;
Random r;
public RandomCentroids(Integer k) {
this.k = k;
- this.r = new Random(0);
+ this.r = new Random();
}
private Double randomRange(Double min, Double max) {
@@ -174,16 +166,19 @@ public class KMeans {
@Override
public void open(Configuration parameters) throws Exception {
+ // Centroids are sorted so they have an identifier common to all the operators
centroids = new ArrayList(getRuntimeContext().getBroadcastVariable("centroids"));
Collections.sort(centroids);
}
@Override
public Tuple2 map(Point point) {
+ // Calculate the distance Point-Centroid for all centroids,
+ // keep the identifier of the closest centroid
Integer c;
Point centroid;
Double distance;
- Integer minCentroid = 4;
+ Integer minCentroid = 0;
Double minDistance = Double.POSITIVE_INFINITY;
for (c = 0; c < centroids.size(); c++) {