/*
 * Decompiled with CFR 0.152.
 */
package com.bfd.crawler.kafka7.consumer;

import com.bfd.crawler.kafka7.consumer.ConsumerThread;
import com.bfd.crawler.kafka7.utils.PropertiesParser;
import com.bfd.crawler.kafka7.utils.SimpleThreadPoolExecutor;
import com.bfd.crawler.kafka7.utils.ThreadPoolMonitor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerGroup {
    private PropertiesParser prop;
    private int corePoolSize;
    private int maximumPoolSize;
    private final int numberOfConsumers;
    private String topic;
    private String groupId;
    private BlockingQueue<String> queue;
    private List<ConsumerThread> consumers;
    private ThreadPoolExecutor consumerThreadPool;
    private ThreadPoolMonitor threadPoolMonitor;
    private int totalMessage;

    public ConsumerGroup(int numberOfConsumers, String topic, String groupId, BlockingQueue<String> queue, int kafkaServerName) {
        this.prop = new PropertiesParser("../etc/" + kafkaServerName + "_kafka.properties");
        this.corePoolSize = this.prop.getIntProperty("crawl.kfk.consumer.thread.core.pool.size");
        this.maximumPoolSize = this.prop.getIntProperty("crawl.kfk.consumer.thread.maximum.pool.size");
        this.numberOfConsumers = numberOfConsumers;
        this.topic = topic;
        this.groupId = groupId;
        this.queue = queue;
        this.totalMessage = 0;
        this.consumers = new ArrayList<ConsumerThread>();
        this.consumerThreadPool = new SimpleThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadPoolExecutor.DiscardPolicy(), "consumer(" + this.topic + ")" + "ThreadPool");
        this.threadPoolMonitor = new ThreadPoolMonitor(this.consumerThreadPool, "consumer(" + this.topic + ")" + "ThreadPool");
        for (int i = 0; i < this.numberOfConsumers; ++i) {
            ConsumerThread ncThread = new ConsumerThread(this.queue, this.topic, this.groupId, false, kafkaServerName, this.prop);
            this.consumers.add(ncThread);
        }
    }

    public void execute() {
        for (ConsumerThread ncThread : this.consumers) {
            Thread t = new Thread(ncThread);
            this.consumerThreadPool.execute(t);
        }
        new Thread(this.threadPoolMonitor).start();
    }

    public void start() {
        for (ConsumerThread ncThread : this.consumers) {
            ncThread.setStop(false);
            Thread t = new Thread(ncThread);
            this.consumerThreadPool.execute(t);
        }
    }

    public void stop() {
        for (ConsumerThread ncThread : this.consumers) {
            ncThread.setStop(true);
        }
    }

    public int getConsumerNum() {
        for (ConsumerThread ncThread : this.consumers) {
            this.totalMessage += ncThread.getMessageNum();
        }
        return this.totalMessage;
    }

    public ThreadPoolExecutor getConsumerThreadPool() {
        return this.consumerThreadPool;
    }

    public ThreadPoolMonitor getThreadPoolMonitor() {
        return this.threadPoolMonitor;
    }
}

