package org.apache.flink.streaming.api.datastream;

import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;

@Public
/* loaded from: input_file:org/apache/flink/streaming/api/datastream/ConnectedStreams.class */
public class ConnectedStreams<IN1, IN2> {
    protected final StreamExecutionEnvironment environment;
    protected final DataStream<IN1> inputStream1;
    protected final DataStream<IN2> inputStream2;

    /* JADX INFO: Access modifiers changed from: protected */
    public ConnectedStreams(StreamExecutionEnvironment streamExecutionEnvironment, DataStream<IN1> dataStream, DataStream<IN2> dataStream2) {
        this.environment = (StreamExecutionEnvironment) Objects.requireNonNull(streamExecutionEnvironment);
        this.inputStream1 = (DataStream) Objects.requireNonNull(dataStream);
        this.inputStream2 = (DataStream) Objects.requireNonNull(dataStream2);
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.environment;
    }

    public DataStream<IN1> getFirstInput() {
        return this.inputStream1;
    }

    public DataStream<IN2> getSecondInput() {
        return this.inputStream2;
    }

    public TypeInformation<IN1> getType1() {
        return this.inputStream1.getType();
    }

    public TypeInformation<IN2> getType2() {
        return this.inputStream2.getType();
    }

    public ConnectedStreams<IN1, IN2> keyBy(int i, int i2) {
        return new ConnectedStreams<>(this.environment, this.inputStream1.keyBy(i), this.inputStream2.keyBy(i2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(int[] iArr, int[] iArr2) {
        return new ConnectedStreams<>(this.environment, this.inputStream1.keyBy(iArr), this.inputStream2.keyBy(iArr2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(String str, String str2) {
        return new ConnectedStreams<>(this.environment, this.inputStream1.keyBy(str), this.inputStream2.keyBy(str2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(String[] strArr, String[] strArr2) {
        return new ConnectedStreams<>(this.environment, this.inputStream1.keyBy(strArr), this.inputStream2.keyBy(strArr2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector, KeySelector<IN2, ?> keySelector2) {
        return new ConnectedStreams<>(this.environment, this.inputStream1.keyBy((KeySelector<IN1, K>) keySelector), this.inputStream2.keyBy((KeySelector<IN2, K>) keySelector2));
    }

    public <KEY> ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, KEY> keySelector, KeySelector<IN2, KEY> keySelector2, TypeInformation<KEY> typeInformation) {
        return new ConnectedStreams<>(this.environment, this.inputStream1.keyBy(keySelector, typeInformation), this.inputStream2.keyBy(keySelector2, typeInformation));
    }

    public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapFunction) {
        return transform("Co-Map", TypeExtractor.getBinaryOperatorReturnType(coMapFunction, CoMapFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true), new CoStreamMap((CoMapFunction) this.inputStream1.clean(coMapFunction)));
    }

    public <R> SingleOutputStreamOperator<R> flatMap(CoFlatMapFunction<IN1, IN2, R> coFlatMapFunction) {
        return transform("Co-Flat Map", TypeExtractor.getBinaryOperatorReturnType(coFlatMapFunction, CoFlatMapFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true), new CoStreamFlatMap((CoFlatMapFunction) this.inputStream1.clean(coFlatMapFunction)));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction) {
        return process(coProcessFunction, TypeExtractor.getBinaryOperatorReturnType(coProcessFunction, CoProcessFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true));
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction, TypeInformation<R> typeInformation) {
        return transform("Co-Process", typeInformation, ((this.inputStream1 instanceof KeyedStream) && (this.inputStream2 instanceof KeyedStream)) ? new LegacyKeyedCoProcessOperator((CoProcessFunction) this.inputStream1.clean(coProcessFunction)) : new CoProcessOperator((CoProcessFunction) this.inputStream1.clean(coProcessFunction)));
    }

    @PublicEvolving
    public <K, R> SingleOutputStreamOperator<R> process(KeyedCoProcessFunction<K, IN1, IN2, R> keyedCoProcessFunction) {
        return process(keyedCoProcessFunction, TypeExtractor.getBinaryOperatorReturnType(keyedCoProcessFunction, KeyedCoProcessFunction.class, 1, 2, 3, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true));
    }

    @Internal
    public <K, R> SingleOutputStreamOperator<R> process(KeyedCoProcessFunction<K, IN1, IN2, R> keyedCoProcessFunction, TypeInformation<R> typeInformation) {
        if ((this.inputStream1 instanceof KeyedStream) && (this.inputStream2 instanceof KeyedStream)) {
            return transform("Co-Keyed-Process", typeInformation, new KeyedCoProcessOperator((KeyedCoProcessFunction) this.inputStream1.clean(keyedCoProcessFunction)));
        }
        throw new UnsupportedOperationException("KeyedCoProcessFunction can only be used when both input streams are of type KeyedStream.");
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String str, TypeInformation<R> typeInformation, TwoInputStreamOperator<IN1, IN2, R> twoInputStreamOperator) {
        this.inputStream1.getType();
        this.inputStream2.getType();
        TwoInputTransformation twoInputTransformation = new TwoInputTransformation(this.inputStream1.getTransformation(), this.inputStream2.getTransformation(), str, twoInputStreamOperator, typeInformation, this.environment.getParallelism());
        if ((this.inputStream1 instanceof KeyedStream) && (this.inputStream2 instanceof KeyedStream)) {
            KeyedStream keyedStream = (KeyedStream) this.inputStream1;
            KeyedStream keyedStream2 = (KeyedStream) this.inputStream2;
            TypeInformation<?> keyType = keyedStream.getKeyType();
            TypeInformation keyType2 = keyedStream2.getKeyType();
            if (!keyType.canEqual(keyType2) || !keyType.equals(keyType2)) {
                throw new UnsupportedOperationException("Key types if input KeyedStreams don't match: " + keyType + " and " + keyType2 + ".");
            }
            twoInputTransformation.setStateKeySelectors(keyedStream.getKeySelector(), keyedStream2.getKeySelector());
            twoInputTransformation.setStateKeyType(keyType);
        }
        SingleOutputStreamOperator<R> singleOutputStreamOperator = new SingleOutputStreamOperator<>(this.environment, twoInputTransformation);
        getExecutionEnvironment().addOperator(twoInputTransformation);
        return singleOutputStreamOperator;
    }
}
