package com.alipay.mobile.uep.framework.operator;

import com.alibaba.fastjson.JSON;
import com.alipay.mobile.common.logging.api.LoggerFactory;
import com.alipay.mobile.uep.event.UEPEvent;
import com.alipay.mobile.uep.framework.function.SourceFunction;
import com.alipay.mobile.uep.framework.stream.StreamElement;
import com.alipay.mobile.uep.framework.time.TimeCharacteristic;
import com.alipay.mobile.uep.framework.time.TimeProcessCallback;
import com.alipay.mobile.uep.framework.time.TimeService;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: classes.dex */
public class SourceOperator<T> extends Operator<UEPEvent, T> {

    /* renamed from: a, reason: collision with root package name */
    private final TimeCharacteristic f24259a;
    private final long b;
    private RuntimeContext c;
    private SourceFunction<T> d;
    private Queue<StreamElement<T>> e = new PriorityBlockingQueue();

    public SourceOperator(TimeCharacteristic timeCharacteristic, long j, SourceFunction<T> sourceFunction) {
        this.f24259a = timeCharacteristic;
        this.d = sourceFunction;
        if (j > 0) {
            this.b = j;
        } else {
            this.b = TimeUnit.SECONDS.toMillis(2L);
        }
    }

    private void a(T t) {
        collect((StreamElement) new StreamElement<>(t));
    }

    @Override // com.alipay.mobile.uep.framework.operator.Operator
    public void open(RuntimeContext runtimeContext) {
        super.open(runtimeContext);
        this.c = runtimeContext;
    }

    @Override // com.alipay.mobile.uep.framework.operator.Operator
    public void processElement(StreamElement<UEPEvent> streamElement) {
        UEPEvent element = streamElement.getElement();
        LoggerFactory.getTraceLogger().info("SourceOperator", element.getClass().getSimpleName() + JSON.toJSONString(element));
        T source = this.d.source(element);
        if (source != null) {
            switch (this.f24259a) {
                case EventTime:
                    final TimeService timerService = this.c.timerService();
                    this.e.add(new StreamElement<>(source, timerService.currentWallTime()));
                    timerService.emitWaterMark(this.b, new TimeProcessCallback() { // from class: com.alipay.mobile.uep.framework.operator.SourceOperator.1
                        @Override // com.alipay.mobile.uep.framework.time.TimeProcessCallback
                        public void onProcess() {
                            SourceOperator.this.processWatermark(timerService.currentWaterMark());
                        }
                    });
                    return;
                case ProcessingTime:
                    a(source);
                    return;
                default:
                    return;
            }
        }
    }

    @Override // com.alipay.mobile.uep.framework.operator.Operator
    public synchronized void processWatermark(long j) {
        StreamElement<T> peek = this.e.peek();
        while (peek != null && peek.getIngestionTime() <= j) {
            a(peek.getElement());
            this.e.remove(peek);
            peek = this.e.peek();
        }
        super.processWatermark(j);
    }
}
