From 24e85deb17dc5e46162560a06b672578c27f42eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Thu, 24 Jan 2019 20:42:41 +0100 Subject: [PATCH] Documentation --- README.md | 39 ++++++++++++++--- pom.xml | 2 +- .../middleware/projects/flink/KMeans.java | 43 ++++++++----------- 3 files changed, 54 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 541f1d5..78f85bf 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,53 @@ +# K-Means clustering algorithm using Apache Flink + +Project for the Middleware Technologies for Distributed Systems. + +## Note + +- Only supports 2 dimensions points as input data +- Non-deterministic. Only one starting point set is tried. +- Case where a mean cannot be updated: it is discarded (the value of K asked is not the one in the results) # Usage -Compile job data +## Compile job package + +You need Java ≥ 8 and Maven ≥ 3.1. ```shell mvn package ``` -Generate vectors to cluster +## Generate random vectors to cluster (optional) + +You need Python 3. ```shell ./genVectors.py $DIMENSION $NUMBER > $FILE ``` -(example: `./genVectors.py 2 15 > myInput.csv`) +(example: `./genVectors.py 2 1000 > input.csv`) +## Classify -Run +You need a running Apache Flink cluster + +Input data is a point per line, in the folowing format: `xCoords,yCoords`. +Output data is a point per line, in the folowing format: `xCoords,yCoords,clusterIndex`. ```shell -flink run target/project-*.jar --input $INPUT --output $OUTPUT +flink run target/project-*.jar --input $INPUT --output $OUTPUT [--k $K] [--maxIterations $ITERATIONS] ``` + +(example: `flink run target/project-1.0.jar --input $PWD/input.csv --output $PWD/output.csv --k 5`) + +## Show results + +You need Python 3, NumPy, Matplotlib. + +```shell +./plotClassification.py $FILE +``` + +(example: `./plotClassification.py output.csv`) diff --git a/pom.xml b/pom.xml index 3388876..0bde46c 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ under the License. it.polimi.middleware.projects.flink project - 1.0-SNAPSHOT + 1.0 jar Flink Quickstart Job diff --git a/src/main/java/it/polimi/middleware/projects/flink/KMeans.java b/src/main/java/it/polimi/middleware/projects/flink/KMeans.java index b8d5121..8b29744 100644 --- a/src/main/java/it/polimi/middleware/projects/flink/KMeans.java +++ b/src/main/java/it/polimi/middleware/projects/flink/KMeans.java @@ -34,9 +34,9 @@ import org.apache.flink.api.java.utils.ParameterTool; public class KMeans { public static void main(String[] args) throws Exception { - final ParameterTool params = ParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final ParameterTool params = ParameterTool.fromArgs(args); final Integer k = params.getInt("k", 3); final Integer maxIterations = params.getInt("maxIterations", 25); @@ -49,28 +49,18 @@ public class KMeans { // Find min and max of the coordinates to determine where the initial centroids should be DataSet> area = input - .map(new MapFunction>() { + .map(new MapFunction>() { // Format points so + // they can be passed as reduce parameters @Override public Tuple4 map(Point point) { return new Tuple4(point.x, point.y, point.x, point.y); } - }).reduce(new FindArea()); - - area.print(); - - DataSet> testCentroids = area - .flatMap(new RandomCentroids(k)) - .map(new MapFunction>() { - @Override - public Tuple2 map(Point point) { - return new Tuple2(point.x, point.y); - }}); - testCentroids.print(); + }).reduce(new FindArea()); // Gives the minX, minY, maxX, maxY of all the point // Generate random centroids IterativeDataSet centroids = area - .flatMap(new RandomCentroids(k)) - .iterate(maxIterations); + .flatMap(new RandomCentroids(k)) // Create centroids randomly in the area of the points + .iterate(maxIterations); // Mark beginning of the loop // Assign points to centroids DataSet> assigned = input @@ -78,18 +68,18 @@ public class KMeans { // Calculate means DataSet newCentroids = assigned - .map(new MeanPrepare()) + .map(new MeanPrepare()) // Add Integer field to tuple to count the points .groupBy(1) // GroupBy CentroidID - .reduce(new MeanSum()) - .map(new MeanDivide()); + .reduce(new MeanSum()) // Sum every points by centroid + .map(new MeanDivide()); // Divide by the number of points to get the average - DataSet finalCentroids = centroids.closeWith(newCentroids); + DataSet finalCentroids = centroids.closeWith(newCentroids); // Mark end of the loop - // Final assignment of points to centroids + // Final assignment of points to centroids (that's the data we want) assigned = input .map(new AssignCentroid()).withBroadcastSet(finalCentroids, "centroids"); - // Convert to external format + // Convert to CSV format DataSet> output = assigned .map(new MapFunction, Tuple3>() { @Override @@ -128,6 +118,7 @@ public class KMeans { } public Point divideBy(Integer factor) { + // Since input is always re-fetched we can overwrite the values x /= factor; y /= factor; return this; @@ -148,12 +139,13 @@ public class KMeans { } public static class RandomCentroids implements FlatMapFunction, Point> { + // minX, minY, maxX, maxY → Point × k Integer k; Random r; public RandomCentroids(Integer k) { this.k = k; - this.r = new Random(0); + this.r = new Random(); } private Double randomRange(Double min, Double max) { @@ -174,16 +166,19 @@ public class KMeans { @Override public void open(Configuration parameters) throws Exception { + // Centroids are sorted so they have an identifier common to all the operators centroids = new ArrayList(getRuntimeContext().getBroadcastVariable("centroids")); Collections.sort(centroids); } @Override public Tuple2 map(Point point) { + // Calculate the distance Point-Centroid for all centroids, + // keep the identifier of the closest centroid Integer c; Point centroid; Double distance; - Integer minCentroid = 4; + Integer minCentroid = 0; Double minDistance = Double.POSITIVE_INFINITY; for (c = 0; c < centroids.size(); c++) {