/*
 * Decompiled with CFR 0.152.
 */
package org.apache.mahout.clustering.streaming.mapreduce;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.clustering.streaming.cluster.BallKMeans;
import org.apache.mahout.clustering.streaming.mapreduce.CentroidWritable;
import org.apache.mahout.clustering.streaming.mapreduce.StreamingKMeansThread;
import org.apache.mahout.clustering.streaming.mapreduce.StreamingKMeansUtilsMR;
import org.apache.mahout.math.Centroid;
import org.apache.mahout.math.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingKMeansReducer
extends Reducer<IntWritable, CentroidWritable, IntWritable, CentroidWritable> {
    private static final Logger log = LoggerFactory.getLogger(StreamingKMeansReducer.class);
    private Configuration conf;

    public void setup(Reducer.Context context) {
        this.conf = context.getConfiguration();
    }

    public void reduce(IntWritable key, Iterable<CentroidWritable> centroids, Reducer.Context context) throws IOException, InterruptedException {
        ArrayList intermediateCentroids = this.conf.getBoolean("reduceStreamingKMeans", false) ? Lists.newArrayList((Iterable)new StreamingKMeansThread(Iterables.transform(centroids, (Function)new Function<CentroidWritable, Centroid>(){

            public Centroid apply(CentroidWritable input) {
                Preconditions.checkNotNull((Object)input);
                return input.getCentroid().clone();
            }
        }), this.conf).call()) : StreamingKMeansReducer.centroidWritablesToList(centroids);
        int index = 0;
        for (Vector centroid : StreamingKMeansReducer.getBestCentroids(intermediateCentroids, this.conf)) {
            context.write((Object)new IntWritable(index), (Object)new CentroidWritable((Centroid)centroid));
            ++index;
        }
    }

    public static List<Centroid> centroidWritablesToList(Iterable<CentroidWritable> centroids) {
        return Lists.newArrayList((Iterable)Iterables.transform(centroids, (Function)new Function<CentroidWritable, Centroid>(){

            public Centroid apply(CentroidWritable input) {
                Preconditions.checkNotNull((Object)input);
                return input.getCentroid().clone();
            }
        }));
    }

    public static Iterable<Vector> getBestCentroids(List<Centroid> centroids, Configuration conf) {
        if (log.isInfoEnabled()) {
            log.info("Number of Centroids: {}", (Object)centroids.size());
        }
        int numClusters = conf.getInt("numClusters", 1);
        int maxNumIterations = conf.getInt("maxNumIterations", 10);
        float trimFraction = conf.getFloat("trimFraction", 0.9f);
        boolean kMeansPlusPlusInit = !conf.getBoolean("randomInit", false);
        boolean correctWeights = !conf.getBoolean("ignoreWeights", false);
        float testProbability = conf.getFloat("testProbability", 0.1f);
        int numRuns = conf.getInt("numBallKMeansRuns", 3);
        BallKMeans ballKMeansCluster = new BallKMeans(StreamingKMeansUtilsMR.searcherFromConfiguration(conf), numClusters, maxNumIterations, trimFraction, kMeansPlusPlusInit, correctWeights, testProbability, numRuns);
        return ballKMeansCluster.cluster(centroids);
    }
}

