Trident整合MongoDB

Posted 动物管理猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Trident整合MongoDB相关的知识,希望对你有一定的参考价值。

MongoDB是大数据技术中常用的NoSql型数据库,它提供的大量的查询、聚合等操作函数,对于大量查询的日志系统来说,该MongoDB是大数据日志存储的福音。Storm的高级编程技术Trident,也提供了与Mongo集成的方法,但官方只提供了新增的处理,对于常用的修改操作并未提供接口,本文提供了一种使用Trident进行mongoDB修改操作的方式,并且对持久化的数据提供了输出的拓展操作,具体代码见下方:

import java.util.Objects;

/**
 * <p>
 * Date-Time: 2018/09/05   15:14
 * Company: 百趣
 * </p>
 * 请求类型枚举
 *
 * @author fangyuanjie
 * @version 1.0.0
 */

public enum MethodTypeEnum {

    // GET请求
    GET("GET", "GET请求"),

    // POST请求
    POST("POST", "POST请求");

    private String code;
    private String desc;

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    MethodTypeEnum(String code, String desc) {
        this.code = code;
        this.desc = desc;
    }

    public static MethodTypeEnum getByCode(String code) {
        for (MethodTypeEnum methodTypeEnum : values()) {
            if (Objects.equals(methodTypeEnum.getCode(), code)) {
                return methodTypeEnum;
            }
        }
        return null;
    }

}import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;

/**
 * <p>
 * Copyright: Copyright (c) 2018/9/10 14:28
 * <p>
 * Company: 百趣
 * <p>
 *  格式过滤
 * @author tangzhe
 * @version 1.0.0
 */
public class FormatFilter extends BaseFilter {

    @Override
    public boolean isKeep(TridentTuple tuple) {
        String message = tuple.getStringByField("str");
        System.out.println(this.getClass().getSimpleName() + "->message:" + message);
        if (StringUtils.isBlank(message)) {
            System.out.println(this.getClass().getSimpleName() + ": 消息不能为空!");
            return false;
        }
        JSONObject jsonObject;
        try {
            jsonObject = JSONObject.parseObject(message);
        } catch (Exception e) {
            System.out.println(this.getClass().getSimpleName() + ": 消息格式有误!");
            return false;
        }
        if (jsonObject.getLong("reqTime") == null ||
                jsonObject.getJSONObject("headers") == null ||
                jsonObject.getString("reqURI") == null) {
            System.out.println(this.getClass().getSimpleName() + ": 请求信息不能为空!");
            return false;
        }
        try {
            jsonObject.getJSONObject("headers");
            jsonObject.getJSONObject("uriArgs");
            jsonObject.getJSONObject("bodyData");
        } catch (JSONException e) {
            System.out.println(this.getClass().getSimpleName() + ": 请求信息格式有误!");
            return false;
        }
        return true;
    }

}import com.alibaba.fastjson.JSONObject;
import net.baiqu.storm.trident.enums.MethodTypeEnum;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

import java.util.Date;

/**
 * <p>
 * Copyright: Copyright (c) 2018/9/10 14:34
 * <p>
 * Company: 百趣
 * <p>
 * 日志解析函数
 * @author tangzhe
 * @version 1.0.0
 */
public class OperateLogParseFunction extends BaseFunction {

    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String message = tuple.getStringByField("str");
        JSONObject jsonObject = JSONObject.parseObject(message);
        System.out.println(this.getClass().getSimpleName() + "->message: " + message);
        JSONObject headers = jsonObject.getJSONObject("headers");
        JSONObject uriArgs = null;
        String method = jsonObject.getString("method");
        if (MethodTypeEnum.GET.getCode().equals(method)) {
            uriArgs = jsonObject.getJSONObject("uriArgs");
        } else if (MethodTypeEnum.POST.getCode().equals(method)) {
            uriArgs = jsonObject.getJSONObject("bodyData");
        }
        uriArgs = uriArgs != null ? uriArgs : new JSONObject();
        String appId = jsonObject.getString("appId");
        String userId = uriArgs.getString("userId");
        String ip = jsonObject.getString("ip");
        String host = headers.getString("host");
        String requestURI = jsonObject.getString("reqURI");
        String username = uriArgs.getString("username");
        String role = uriArgs.getString("role");
        String memo = uriArgs.getString("memo");
        Date requestTime = new Date(jsonObject.getLong("reqTime") * 1000);
        collector.emit(new Values(appId, host, requestURI, method, ip, requestTime,
                userId, username, role, memo, new Date()));
    }

}import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;

/**
 * <p>
 * Copyright: Copyright (c) 2018/9/10 16:33
 * <p>
 * Company: 百趣
 * <p>
 * 结果记录函数
 * @author tangzhe
 * @version 1.0.0
 */
public class OperatePrintFunction extends BaseFunction {

    @Override
    public void execute(TridentTuple input, TridentCollector collector) {
        String result = input.getStringByField("result");
        if ("success".equalsIgnoreCase(result)) {
            System.out.println(this.getClass().getSimpleName() + "->: 插入mongo成功");
        } else {
            System.out.println(this.getClass().getSimpleName() + "->: 插入mongo失败");
        }
    }
}import org.apache.storm.mongodb.trident.state.MongoState;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.BaseStateUpdater;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

import java.util.List;

/**
 * <p>
 * Copyright: Copyright (c) 2018/9/10 16:29
 * <p>
 * Company: 百趣
 * <p>
 *
 * @author tangzhe
 * @version 1.0.0
 */
public class MyMongoStateUpdater extends BaseStateUpdater<MongoState> {

    @Override
    public void updateState(MongoState state, List<TridentTuple> tuples,
                            TridentCollector collector) {
        try {
            state.updateState(tuples, collector);
            collector.emit(new Values("success"));
        } catch (Exception e) {
            e.printStackTrace();
            collector.emit(new Values("fail"));
        }
    }
}import com.google.common.collect.Lists;
import com.mongodb.client.model.Filters;
import org.apache.commons.lang.Validate;
import org.apache.storm.mongodb.common.MongoDBClient;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.trident.state.MongoState;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
 * <p>
 * Date-Time: 2018/09/10   13:50
 * Company: 百趣
 * </p>
 *
 * @author tangzhe
 * @version 1.0.0
 */
public class OperateMongoState implements State {

    private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);

    private OperateMongoState.Options options;
    private MongoDBClient mongoClient;
    private Map map;

    protected OperateMongoState(Map map, OperateMongoState.Options options) {
        this.options = options;
        this.map = map;
    }

    public static class Options implements Serializable {
        private String url;
        private String collectionName;
        private MongoMapper mapper;

        public OperateMongoState.Options withUrl(String url) {
            this.url = url;
            return this;
        }

        public OperateMongoState.Options withCollectionName(String collectionName) {
            this.collectionName = collectionName;
            return this;
        }

        public OperateMongoState.Options withMapper(MongoMapper mapper) {
            this.mapper = mapper;
            return this;
        }
    }

    protected void prepare() {
        Validate.notEmpty(options.url, "url can not be blank or null");
        Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
        Validate.notNull(options.mapper, "MongoMapper can not be null");

        this.mongoClient = new MongoDBClient(options.url, options.collectionName);
    }

    @Override
    public void beginCommit(Long txid) {
        LOG.debug("beginCommit is noop.");
    }

    @Override
    public void commit(Long txid) {
        LOG.debug("commit is noop.");
    }

    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
        List<Document> documents = Lists.newArrayList();
        for (TridentTuple tuple : tuples) {
            Document document = options.mapper.toDocument(tuple);
            documents.add(document);
        }
        this.mongoClient.update(
                Filters.eq("logDate",
                        tuples.get(0).getStringByField("logDate")),
                new Document("$set", documents.get(0)), true);
    }

}import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;

import java.util.Map;

/**
 * <p>
 * Date-Time: 2018/09/10   13:50
 * Company: 百趣
 * </p>
 *
 * @author tangzhe
 * @version 1.0.0
 */
public class OperateMongoStateFactory implements StateFactory {

    private OperateMongoState.Options options;

    public OperateMongoStateFactory(OperateMongoState.Options options) {
        this.options = options;
    }

    @Override
    public State makeState(Map conf, IMetricsContext metrics,
                           int partitionIndex, int numPartitions) {
        OperateMongoState state = new OperateMongoState(conf, options);
        state.prepare();
        return state;
    }

}
package net.baiqu.storm.trident.state;

import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.BaseStateUpdater;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

import java.util.List;

/**
 * <p>
 * Date-Time: 2018/09/10   13:50
 * Company: 百趣
 * </p>
 *
 * @author tangzhe
 * @version 1.0.0
 */
public class OperateMongoStateUpdater extends BaseStateUpdater<OperateMongoState> {

    @Override
    public void updateState(OperateMongoState state, List<TridentTuple> tuples, TridentCollector collector) {
        state.updateState(tuples, collector);
        String userId = tuples.get(0).getStringByField("userId");
        collector.emit(new Values(userId));
    }

}
package net.baiqu.storm.trident.topology;

import kafka.api.OffsetRequest;
import net.baiqu.storm.trident.filter.FormatFilter;
import net.baiqu.storm.trident.function.OperateLogParseFunction;
import net.baiqu.storm.trident.function.OperatePrintFunction;
import net.baiqu.storm.trident.state.MyMongoStateUpdater;
import net.baiqu.storm.trident.util.TridentMongoFactory;
import net.baiqu.storm.utils.Constants;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

/**
 * <p>
 * Date-Time: 2018/09/10   13:50
 * Company: 百趣
 * </p>
 *
 * @author tangzhe
 * @version 1.0.0
 */
public class OperateLogTridentTopology {

    public static void main(String[] args) {
        TridentTopology topology = new TridentTopology();

        BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS);
        String topic = Constants.KAFKA_LOG_TOPIC;
        String zkRoot = Constants.ZK_KAFKA_ROOT;
        String id = Constants.KAFKA_SPOUT_ID;

        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id);
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        // demo模式设置读取偏移量的操作
        if (StringUtils.equalsIgnoreCase("demo", Constants.MODE)) {
            kafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
        }

        TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);

        Stream stream = topology.newStream("kafkaSpout", kafkaSpout).parallelismHint(1);
        stream.shuffle().each(new Fields("str"), new FormatFilter())
                .parallelismHint(1)
                .shuffle().each(new Fields("str"), new OperateLogParseFunction(),
                new Fields("appId", "host", "requestURI", "method", "ip",
                        "requestTime", "userId", "username", "role", "memo", "logDate"))
                .parallelismHint(1)
                .partitionPersist(TridentMongoFactory.getMongoInsertState(),
                        new Fields("appId", "host", "requestURI", "method", "ip",
                                "requestTime", "userId", "username", "role", "memo", "logDate"),
                        new MyMongoStateUpdater(),
                        new Fields("result"))
                .parallelismHint(1)
                .newValuesStream().shuffle().each(
                        new Fields("result"), new OperatePrintFunction(), new Fields("none"))
                .parallelismHint(1);

        Config config = new Config();
        if (StringUtils.equalsIgnoreCase("demo", Constants.MODE)) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("operateLogTridentTopology", config, topology.build());
        } else {
            config.setNumWorkers(1);
            config.put(Config.NIMBUS_HOST, Constants.NIMBUS_HOST);
            config.put(Config.NIMBUS_THRIFT_PORT, Constants.NIMBUS_THRIFT_PORT);
            config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
            try {
                StormSubmitter.submitTopology(args[0], config, topology.build());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}
package net.baiqu.storm.trident.util;

import net.baiqu.storm.trident.state.OperateMongoState;
import net.baiqu.storm.trident.state.OperateMongoStateFactory;
import net.baiqu.storm.utils.Constants;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
import org.apache.storm.mongodb.trident.state.MongoState;
import org.apache.storm.mongodb.trident.state.MongoStateFactory;
import org.apache.storm.trident.state.StateFactory;

/**
 * <p>
 * Copyright: Copyright (c) 2018/9/10 14:56
 * <p>
 * Company: 百趣
 * <p>
 * trident mongo 工厂类
 * @author tangzhe
 * @version 1.0.0
 */
public class TridentMongoFactory {

    public static final String URL = "mongodb://" + Constants.MONGODB_USERNAME + ":"
            + Constants.MONGODB_PASSWORD.replace("@", "%40")
            + "@" + Constants.MONGODB_HOSTS + ":" + Constants.MONGODB_PORT + "/"
            + Constants.MONGODB_DATABASE + "?connectTimeoutMS=" + Constants.MONGODB_TIMEOUT;

    public static final String URL2 = "mongodb://" + Constants.MONGODB_HOSTS + ":" + Constants.MONGODB_PORT + "/"
            + Constants.MONGODB_DATABASE + "?connectTimeoutMS=" + Constants.MONGODB_TIMEOUT;

    public static final String OPERATE_LOG_DB = "operate_log";

    /**
     * 使用自带state实现插入mongo
     */
    public static StateFactory getMongoInsertState() {
        String url = getUrl();

        MongoMapper mapper = new SimpleMongoMapper()
                .withFields("appId", "host", "requestURI", "method", "ip", "requestTime",
                        "userId", "username", "role", "memo", "logDate");

        MongoState.Options options = new MongoState.Options()
                .withUrl(url)
                .withCollectionName(OPERATE_LOG_DB)
                .withMapper(mapper);

        return new MongoStateFactory(options);
    }

    /**
     * 自定义state实现更新mongo
     */
    public static StateFactory getMongoUpdateState() {
        String url = getUrl();
        MongoMapper mapper = new SimpleMongoMapper()
                .withFields("appId", "host", "requestURI", "method", "ip", "requestTime",
                        "userId", "username", "role", "memo", "logDate");

        OperateMongoState.Options options = new OperateMongoState.Options()
                .withUrl(url)
                .withCollectionName(OPERATE_LOG_DB)
                .withMapper(mapper);

        return new OperateMongoStateFactory(options);
    }

    /**
     * 获取mongo url
     */
    private static String getUrl() {
        String url;
        if (StringUtils.isNotBlank(Constants.MONGODB_USERNAME)) {
            url = URL;
        } else {
            url = URL2;
        }
        return url;
    }

}

 

以上是关于Trident整合MongoDB的主要内容,如果未能解决你的问题,请参考以下文章

如何将代码片段存储在 mongodb 中?

Springboot 整合mongodb以及mongo数据操作工具类代码实现

SpringBoot整合MongoDB教程

SpringBoot整合MongoDB教程

全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段

MongoDB_spring整合mongodb