* kafka消费
*
* @author crossoverJie
* @date 2017年6月19日 下午3:15:16
*/
public class {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMsgConsumer.class);
private static final int CORE_POOL_SIZE = 4;
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int BLOCKING_QUEUE_CAPACITY = 4000;
private static final String KAFKA_CONFIG = "kafkaConfig";
private static final ExecutorService fixedThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(BLOCKING_QUEUE_CAPACITY));
private static AtomicLong LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());
private static MsgIterator iter = null;
private static String topic;
static {
Properties properties = new Properties();
String path = System.getProperty(KAFKA_CONFIG);
checkArguments(!StringUtils.isBlank(path), "启动参数中没有配置kafka_easyframe_msg参数来指定kafka启动参数,请使用-DkafkaConfig=/path/fileName/easyframe-msg.properties");
try {
properties.load(new FileInputStream(new File(path)));
} catch (IOException e) {
LOGGER.error("IOException" ,e);
}
EasyMsgConfig.setProperties(properties);
}
private static void iteratorTopic() {
if (iter == null) {
iter = MsgUtil.consume(topic);
}
long i = 0L;
while (iter.hasNext()) {
i++;
if (i % 10000 == 0) {
LOGGER.info("consume i:" + i);
}
try {
String message = iter.next();
if (StringUtils.isEmpty(message)) {
continue;
}
LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());
LOGGER.debug("msg = " + JSON.toJSONString(message));
} catch (Exception e) {
LOGGER.error("KafkaMsgConsumer err:", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
LOGGER.error("Thread InterruptedException", e1);
}
break;
}
}
}
public static void main(String[] args) {
topic = System.getProperty("topic");
checkArguments(!StringUtils.isBlank(topic), "system property topic or log_path is must!");
while (true) {
try {
iteratorTopic();
} catch (Exception e) {
MsgUtil.shutdownConsummer();
iter = null;
LOGGER.error("KafkaMsgConsumer err:", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
LOGGER.error("Thread InterruptedException", e1);
}
} finally {
if (DateUtil.getLongTime() - LAST_MESSAGE_TIME.get() > 10 * 60) {
fixedThreadPool.shutdown();
LOGGER.info("线程池是否关闭:" + fixedThreadPool.isShutdown());
try {
while (!fixedThreadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
LOGGER.info("检测线程池是否终止:" + fixedThreadPool.isTerminated());
}
} catch (InterruptedException e) {
LOGGER.error("等待线程池关闭错误", e);
}
LOGGER.info("线程池是否终止:" + fixedThreadPool.isTerminated());
LOGGER.info("in 10 min dont have data break");
break;
}
}
}
LOGGER.info("app shutdown");
System.exit(0);
}
}