package org.apache.kylin.engine.mr.steps;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.class */
public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWritable, Text> {
    private List<TblColRef> columnList;
    private List<Long> baseCuboidRowCountInMappers;
    protected long baseCuboidId;
    protected CubeDesc cubeDesc;
    private int samplingPercentage;
    private List<ByteArray> colValues;
    private KylinConfig cubeConfig;
    protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
    private String statisticsOutput = null;
    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
    private long totalRowsBeforeMerge = 0;
    private TblColRef col = null;
    private boolean isStatistics = false;
    private boolean isPartitionCol = false;

    protected void setup(Reducer<Text, Text, NullWritable, Text>.Context context) throws IOException {
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration configuration = context.getConfiguration();
        KylinConfig loadKylinPropsAndMetadata = AbstractHadoopJob.loadKylinPropsAndMetadata();
        CubeInstance cube = CubeManager.getInstance(loadKylinPropsAndMetadata).getCube(configuration.get(BatchConstants.CFG_CUBE_NAME));
        this.cubeConfig = cube.getConfig();
        this.cubeDesc = cube.getDescriptor();
        this.columnList = CubeManager.getInstance(loadKylinPropsAndMetadata).getAllDictColumnsOnFact(this.cubeDesc);
        boolean parseBoolean = Boolean.parseBoolean(configuration.get(BatchConstants.CFG_STATISTICS_ENABLED));
        int numReduceTasks = context.getNumReduceTasks();
        int id = context.getTaskAttemptID().getTaskID().getId();
        if (parseBoolean && id == numReduceTasks - 1) {
            this.isStatistics = true;
            this.statisticsOutput = configuration.get(BatchConstants.CFG_STATISTICS_OUTPUT);
            this.baseCuboidRowCountInMappers = Lists.newArrayList();
            this.cuboidHLLMap = Maps.newHashMap();
            this.samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
            return;
        }
        if (parseBoolean && id == numReduceTasks - 2) {
            this.isStatistics = false;
            this.isPartitionCol = true;
            this.col = this.cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
            this.colValues = Lists.newLinkedList();
            return;
        }
        this.isStatistics = false;
        this.isPartitionCol = false;
        this.col = this.columnList.get(id);
        this.colValues = Lists.newLinkedList();
    }

    public void reduce(Text text, Iterable<Text> iterable, Reducer<Text, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
        if (!this.isStatistics) {
            this.colValues.add(new ByteArray(Bytes.copy(text.getBytes(), 1, text.getLength() - 1)));
            if (this.colValues.size() == 1000000) {
                logger.info("spill values to disk...");
                outputDistinctValues(this.col, this.colValues, context);
                this.colValues.clear();
                return;
            }
            return;
        }
        long j = Bytes.toLong(text.getBytes(), 1, 8);
        for (Text text2 : iterable) {
            HyperLogLogPlusCounter hyperLogLogPlusCounter = new HyperLogLogPlusCounter(this.cubeConfig.getCubeStatsHLLPrecision());
            hyperLogLogPlusCounter.readRegisters(ByteBuffer.wrap(text2.getBytes(), 0, text2.getLength()));
            this.totalRowsBeforeMerge += hyperLogLogPlusCounter.getCountEstimate();
            if (j == this.baseCuboidId) {
                this.baseCuboidRowCountInMappers.add(Long.valueOf(hyperLogLogPlusCounter.getCountEstimate()));
            }
            if (this.cuboidHLLMap.get(Long.valueOf(j)) != null) {
                this.cuboidHLLMap.get(Long.valueOf(j)).merge(hyperLogLogPlusCounter);
            } else {
                this.cuboidHLLMap.put(Long.valueOf(j), hyperLogLogPlusCounter);
            }
        }
    }

    private void outputDistinctValues(TblColRef tblColRef, Collection<ByteArray> collection, Reducer<Text, Text, NullWritable, Text>.Context context) throws IOException {
        Configuration configuration = context.getConfiguration();
        FileSystem fileSystem = FileSystem.get(configuration);
        Path path = new Path(configuration.get(BatchConstants.CFG_OUTPUT_PATH), tblColRef.getName());
        FSDataOutputStream fSDataOutputStream = null;
        try {
            if (fileSystem.exists(path)) {
                fSDataOutputStream = fileSystem.append(path);
                logger.info("append file " + path);
            } else {
                fSDataOutputStream = fileSystem.create(path);
                logger.info("create file " + path);
            }
            for (ByteArray byteArray : collection) {
                fSDataOutputStream.write(byteArray.array(), byteArray.offset(), byteArray.length());
                fSDataOutputStream.write(10);
            }
        } finally {
            IOUtils.closeQuietly(fSDataOutputStream);
        }
    }

    protected void cleanup(Reducer<Text, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
        if (!this.isStatistics) {
            if (this.colValues.size() > 0) {
                outputDistinctValues(this.col, this.colValues, context);
                this.colValues.clear();
                return;
            }
            return;
        }
        long j = 0;
        Iterator<HyperLogLogPlusCounter> it = this.cuboidHLLMap.values().iterator();
        while (it.hasNext()) {
            j += it.next().getCountEstimate();
        }
        double d = j == 0 ? 0.0d : this.totalRowsBeforeMerge / j;
        writeMapperAndCuboidStatistics(context);
        CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(this.statisticsOutput), this.cuboidHLLMap, this.samplingPercentage, d);
    }

    private void writeMapperAndCuboidStatistics(Reducer<Text, Text, NullWritable, Text>.Context context) throws IOException {
        FSDataOutputStream create = FileSystem.get(context.getConfiguration()).create(new Path(this.statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME));
        try {
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.addAll(this.cuboidHLLMap.keySet());
            Collections.sort(newArrayList);
            writeLine(create, "Total cuboid number: \t" + newArrayList.size());
            writeLine(create, "Samping percentage: \t" + this.samplingPercentage);
            writeLine(create, "The following statistics are collected based on sampling data.");
            for (int i = 0; i < this.baseCuboidRowCountInMappers.size(); i++) {
                if (this.baseCuboidRowCountInMappers.get(i).longValue() > 0) {
                    writeLine(create, "Base Cuboid in Mapper " + i + " row count: \t " + this.baseCuboidRowCountInMappers.get(i));
                }
            }
            long j = 0;
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                long longValue = ((Long) it.next()).longValue();
                j += this.cuboidHLLMap.get(Long.valueOf(longValue)).getCountEstimate();
                writeLine(create, "Cuboid " + longValue + " row count is: \t " + this.cuboidHLLMap.get(Long.valueOf(longValue)).getCountEstimate());
            }
            writeLine(create, "Sum of all the cube segments (before merge) is: \t " + this.totalRowsBeforeMerge);
            writeLine(create, "After merge, the cube has row count: \t " + j);
            if (j > 0) {
                writeLine(create, "The mapper overlap ratio is: \t" + (this.totalRowsBeforeMerge / j));
            }
        } finally {
            IOUtils.closeQuietly(create);
        }
    }

    private void writeLine(FSDataOutputStream fSDataOutputStream, String str) throws IOException {
        fSDataOutputStream.write(str.getBytes());
        fSDataOutputStream.write(10);
    }

    public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
        reduce((Text) obj, (Iterable<Text>) iterable, (Reducer<Text, Text, NullWritable, Text>.Context) context);
    }
}
