From de6a8814280fda2e349b161e6bfa4a8a95ca7cf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Wed, 23 Jan 2019 23:14:35 +0100 Subject: [PATCH] Basic scaffold Generate, read, means, write all the points --- .gitignore | 3 + README.md | 24 +++++++ genVectors.py | 12 ++++ pom.xml | 2 +- .../middleware/projects/flink/BatchJob.java | 66 ------------------- .../middleware/projects/flink/KMeans.java | 50 ++++++++++++++ .../projects/flink/StreamingJob.java | 64 ------------------ 7 files changed, 90 insertions(+), 131 deletions(-) create mode 100644 README.md create mode 100755 genVectors.py delete mode 100644 src/main/java/it/polimi/middleware/projects/flink/BatchJob.java create mode 100644 src/main/java/it/polimi/middleware/projects/flink/KMeans.java delete mode 100644 src/main/java/it/polimi/middleware/projects/flink/StreamingJob.java diff --git a/.gitignore b/.gitignore index 2edcb90..00cb95a 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,6 @@ dependency-reduced-pom.xml buildNumber.properties .mvn/timing.properties .mvn/wrapper/maven-wrapper.jar + +# This project +*.csv diff --git a/README.md b/README.md new file mode 100644 index 0000000..541f1d5 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ + +# Usage + +Compile job data + +```shell +mvn package +``` + +Generate vectors to cluster + +```shell +./genVectors.py $DIMENSION $NUMBER > $FILE +``` + +(example: `./genVectors.py 2 15 > myInput.csv`) + + + +Run + +```shell +flink run target/project-*.jar --input $INPUT --output $OUTPUT +``` diff --git a/genVectors.py b/genVectors.py new file mode 100755 index 0000000..b31a919 --- /dev/null +++ b/genVectors.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +import random +import sys + +random.seed(0) + +D = int(sys.argv[1]) # Number of dimensions +N = int(sys.argv[2]) # Number of vectors + +for _ in range(N): + print(','.join([str(random.random()) for _ in range(D)])) diff --git a/pom.xml b/pom.xml index 1088bfe..3388876 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,7 @@ under the License. - it.polimi.middleware.projects.flink.StreamingJob + it.polimi.middleware.projects.flink.KMeans diff --git a/src/main/java/it/polimi/middleware/projects/flink/BatchJob.java b/src/main/java/it/polimi/middleware/projects/flink/BatchJob.java deleted file mode 100644 index 606742f..0000000 --- a/src/main/java/it/polimi/middleware/projects/flink/BatchJob.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package it.polimi.middleware.projects.flink; - -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Skeleton for a Flink Batch Job. - * - *

For a tutorial how to write a Flink batch application, check the - * tutorials and examples on the Flink Website. - * - *

To package your application into a JAR file for execution, - * change the main class in the POM.xml file to this class (simply search for 'mainClass') - * and run 'mvn clean package' on the command line. - */ -public class BatchJob { - - public static void main(String[] args) throws Exception { - // set up the batch execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - /* - * Here, you can start creating your execution plan for Flink. - * - * Start with getting some data from the environment, like - * env.readTextFile(textPath); - * - * then, transform the resulting DataSet using operations - * like - * .filter() - * .flatMap() - * .join() - * .coGroup() - * - * and many more. - * Have a look at the programming guide for the Java API: - * - * http://flink.apache.org/docs/latest/apis/batch/index.html - * - * and the examples - * - * http://flink.apache.org/docs/latest/apis/batch/examples.html - * - */ - - // execute program - env.execute("Flink Batch Java API Skeleton"); - } -} diff --git a/src/main/java/it/polimi/middleware/projects/flink/KMeans.java b/src/main/java/it/polimi/middleware/projects/flink/KMeans.java new file mode 100644 index 0000000..3e70451 --- /dev/null +++ b/src/main/java/it/polimi/middleware/projects/flink/KMeans.java @@ -0,0 +1,50 @@ +package it.polimi.middleware.projects.flink; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.utils.ParameterTool; + + +public class KMeans { + + public static void main(String[] args) throws Exception { + final ParameterTool params = ParameterTool.fromArgs(args); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Read CSV input + DataSet> csvInput = env.readCsvFile(params.get("input")).types(Double.class); + + // Convert CSV to internal format + DataSet input = csvInput + .map(point -> point.f0); + + // DEBUG Means all the points + DataSet> mean = input + .map(new MapFunction>() { + public Tuple2 map(Double value) { + return new Tuple2(value, 1); + } + }) + .reduce(new ReduceFunction>() { + public Tuple2 reduce(Tuple2 a, Tuple2 b) { + return new Tuple2(a.f0 + b.f0, a.f1 + b.f1); + } + }) + .map(new MapFunction, Tuple1>() { + public Tuple1 map(Tuple2 value) { + return new Tuple1(value.f0 / value.f1); + } + }); + + mean.writeAsCsv(params.get("output", "output.csv")); + + env.execute("K-Means clustering"); + } +} diff --git a/src/main/java/it/polimi/middleware/projects/flink/StreamingJob.java b/src/main/java/it/polimi/middleware/projects/flink/StreamingJob.java deleted file mode 100644 index 55b6a84..0000000 --- a/src/main/java/it/polimi/middleware/projects/flink/StreamingJob.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package it.polimi.middleware.projects.flink; - -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * Skeleton for a Flink Streaming Job. - * - *

For a tutorial how to write a Flink streaming application, check the - * tutorials and examples on the Flink Website. - * - *

To package your application into a JAR file for execution, run - * 'mvn clean package' on the command line. - * - *

If you change the name of the main class (with the public static void main(String[] args)) - * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). - */ -public class StreamingJob { - - public static void main(String[] args) throws Exception { - // set up the streaming execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - /* - * Here, you can start creating your execution plan for Flink. - * - * Start with getting some data from the environment, like - * env.readTextFile(textPath); - * - * then, transform the resulting DataStream using operations - * like - * .filter() - * .flatMap() - * .join() - * .coGroup() - * - * and many more. - * Have a look at the programming guide for the Java API: - * - * http://flink.apache.org/docs/latest/apis/streaming/index.html - * - */ - - // execute program - env.execute("Flink Streaming Java API Skeleton"); - } -}