/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.adx.common.market.exchange;

import com.bxm.adx.common.AdxProperties;
import com.bxm.adx.common.CacheKeys;
import com.bxm.adx.common.adapter.AdxContextFactory;
import com.bxm.adx.common.buy.Buyer;
import com.bxm.adx.common.buy.buyers.BuyerWrapper;
import com.bxm.adx.common.buy.cache.BuyerResponseCache;
import com.bxm.adx.common.buy.dsp.Dsp;
import com.bxm.adx.common.buy.position.AdvertPointService;
import com.bxm.adx.common.ingetration.AdxCounterServiceIntegration;
import com.bxm.adx.common.log.datalog.DataLogDao;
import com.bxm.adx.common.market.Deal;
import com.bxm.adx.common.market.exchange.ExchangeCallable;
import com.bxm.adx.common.market.exchange.ExchangeParam;
import com.bxm.adx.common.market.exchange.Exchanger;
import com.bxm.adx.common.market.filter.PriceLowerFilter;
import com.bxm.adx.common.micrometer.BuyerMeter;
import com.bxm.adx.common.sell.BidRequest;
import com.bxm.adx.facade.constant.enums.AdxErrEnum;
import com.bxm.adx.facade.exception.AdxException;
import com.bxm.warcar.cache.Counter;
import com.bxm.warcar.cache.KeyGenerator;
import com.bxm.warcar.integration.eventbus.EventPark;
import com.bxm.warcar.utils.NamedThreadFactory;
import com.google.common.collect.Lists;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;

@Configuration
public class RtbExchanger
implements Exchanger {
    private static final Logger log = LoggerFactory.getLogger(RtbExchanger.class);
    private final AdxProperties properties;
    private final AdxCounterServiceIntegration service;
    private final BuyerMeter buyerMeter;
    private final Counter counter;
    private final EventPark eventPark;
    private final BuyerResponseCache buyerResponseCache;
    private final PriceLowerFilter priceLowerFilter;
    private final AdvertPointService advertPointService;
    private final DataLogDao dataLogDao;

    public RtbExchanger(AdxProperties properties, AdxCounterServiceIntegration service, BuyerMeter buyerMeter, Counter counter, EventPark eventPark, BuyerResponseCache buyerResponseCache, PriceLowerFilter priceLowerFilter, AdvertPointService advertPointService, DataLogDao dataLogDao) {
        this.properties = properties;
        this.service = service;
        this.buyerMeter = buyerMeter;
        this.counter = counter;
        this.eventPark = eventPark;
        this.buyerResponseCache = buyerResponseCache;
        this.priceLowerFilter = priceLowerFilter;
        this.advertPointService = advertPointService;
        this.dataLogDao = dataLogDao;
    }

    @Override
    public List<Deal> bidding(BidRequest bidRequest, Collection<BuyerWrapper> buyers) {
        int size = buyers.size();
        if (0 == size) {
            return Collections.emptyList();
        }
        if (null == bidRequest) {
            return Collections.emptyList();
        }
        ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory("exchange"));
        executor.allowCoreThreadTimeOut(true);
        ArrayList futures = Lists.newArrayListWithCapacity((int)size);
        CountDownLatch waitCountDown = new CountDownLatch(buyers.size());
        CountDownLatch overtimeCountDown = new CountDownLatch(1);
        HashMap<Buyer, BidRequest> forCache = new HashMap<Buyer, BidRequest>();
        for (BuyerWrapper buyer : buyers) {
            forCache.put(buyer.getBuyer(), bidRequest);
            if (!this.dspQPSLimit(buyer.getBuyer().getDsp())) continue;
            ExchangeParam exchangeParam = new ExchangeParam();
            exchangeParam.setDispatcher(buyer.getDispatcher());
            Future<Deal> future = executor.submit(new ExchangeCallable(buyer.getBuyer(), bidRequest, this.service, this.buyerMeter, exchangeParam, this.eventPark, this.buyerResponseCache, waitCountDown, overtimeCountDown, this.advertPointService, AdxContextFactory.get().getBidConfig(), this.dataLogDao));
            futures.add(future);
        }
        try {
            waitCountDown.await(bidRequest.getWaitTime(), TimeUnit.MILLISECONDS);
            ArrayList waitDeals = Lists.newArrayList();
            Iterator futureIterator = futures.iterator();
            while (futureIterator.hasNext()) {
                Future future = (Future)futureIterator.next();
                try {
                    if (!future.isDone()) continue;
                    Deal response = (Deal)future.get();
                    if (null != response) {
                        waitDeals.add(response);
                    }
                    futureIterator.remove();
                }
                catch (InterruptedException | ExecutionException e) {
                    future.cancel(true);
                    log.error("execute: ", (Throwable)e);
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("wait deals = {}", (Object)waitDeals.stream().map(Deal::getBuyer).map(Buyer::getDsp).map(Dsp::getDspCode).collect(Collectors.toList()).toString());
            }
            ArrayList<Deal> firstDeals = new ArrayList<Deal>();
            List<Deal> waitCacheDeals = this.buyerResponseCache.getCacheDealList(forCache);
            if (log.isDebugEnabled()) {
                log.debug("wait deals size = {}, cache deals size = {}", (Object)waitDeals.size(), (Object)waitCacheDeals.size());
            }
            firstDeals.addAll(waitDeals);
            firstDeals.addAll(waitCacheDeals);
            if (!CollectionUtils.isEmpty(firstDeals)) {
                this.priceLowerFilter.filter(firstDeals);
                if (!CollectionUtils.isEmpty(firstDeals)) {
                    Deal deal;
                    if (log.isDebugEnabled()) {
                        log.debug("first deals = {}", (Object)firstDeals.stream().map(Deal::getBuyer).map(Buyer::getDsp).map(Dsp::getDspCode).collect(Collectors.toList()).toString());
                    }
                    if (waitCacheDeals.contains(deal = (Deal)firstDeals.iterator().next())) {
                        if (log.isDebugEnabled()) {
                            log.debug("use cache, remove cache");
                        }
                        this.buyerResponseCache.removeResponse(deal);
                    }
                    return firstDeals;
                }
            }
            if (futures.size() == 0) {
                throw new AdxException(AdxErrEnum.DSP_EMPTY_RESPONSE);
            }
            overtimeCountDown.await(bidRequest.getOvertime() - bidRequest.getWaitTime(), TimeUnit.MILLISECONDS);
            ArrayList overDeals = Lists.newArrayList();
            for (Future future : futures) {
                try {
                    Deal response;
                    if (!future.isDone() || null == (response = (Deal)future.get())) continue;
                    overDeals.add(response);
                }
                catch (InterruptedException | ExecutionException e) {
                    future.cancel(true);
                    log.error("execute: ", (Throwable)e);
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("overtime deals = {}", (Object)overDeals.stream().map(Deal::getBuyer).map(Buyer::getDsp).map(Dsp::getDspCode).collect(Collectors.toList()).toString());
            }
            ArrayList<Deal> secondDeals = new ArrayList<Deal>();
            secondDeals.addAll(overDeals);
            this.priceLowerFilter.filter(secondDeals);
            if (!CollectionUtils.isEmpty(secondDeals)) {
                if (log.isDebugEnabled()) {
                    log.debug("second deals = {}", (Object)secondDeals.stream().map(Deal::getBuyer).map(Buyer::getDsp).map(Dsp::getDspCode).collect(Collectors.toList()).toString());
                }
                return secondDeals;
            }
        }
        catch (InterruptedException e) {
            log.error("err", (Throwable)e);
        }
        throw new AdxException(AdxErrEnum.DSP_TIMEOUT);
    }

    private boolean dspQPSLimit(Dsp dsp) {
        if (dsp == null) {
            return false;
        }
        Integer qps = dsp.getQps();
        if (qps == null) {
            return true;
        }
        if (qps < 0) {
            return true;
        }
        if (qps == 0) {
            return false;
        }
        long time = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
        KeyGenerator keyGenerator = CacheKeys.getDspNowQPS(time + "");
        Long now = this.counter.hincrementAndGet(keyGenerator, dsp.getDspCode(), 600);
        return now <= (long)qps.intValue();
    }
}

