电商订单ElasticSearch同步解决方案--使用logstash

Posted yangxiaohui227

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了电商订单ElasticSearch同步解决方案--使用logstash相关的知识,希望对你有一定的参考价值。

一.使用logstash同步订单数据(订单表和订单项表)到ElasticSearch:

  1.到官网下载logstash:https://www.elastic.co/cn/downloads/logstash

  2.安装logstash前,确保需要先安装java的jdk环境

  3.下载后,解压:之后千万别到bin环境点击logstash.bat这个命令启动,这样会报错的

  4.接下来,在logstash安装目录找到config文件夹,在那里新增一个文件夹,我新建的为shop文件夹,然后在里面添加如下文件:

  技术图片

 

  5.开始时.last_run_item.txt和last_run_order.txt文件是没数据的

   6.logstash_order.conf文件的配置如下:

 

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input 
  jdbc 

    type => "order_mast" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个
    jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包
    jdbc_paging_enabled => "true"
    jdbc_page_size => "2000"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    #jdbc跟账号密码需改成对应环境的
    jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false"
    jdbc_user => "shop"
    jdbc_password => "shop"
    schedule => "* * * * *" #这个代表每分钟同步一次
    statement_filepath => "../config/shop/order_mast.sql" #这个是shop文件下的sql文件
    record_last_run => true
    use_column_value => false
    last_run_metadata_path => "../config/shop/last_run_order.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了
    clean_run => false
    #是否将 字段(column) 名称转小写
    lowercase_column_names => false
   
  

   jdbc 

    type => "order_item" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个
    jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包
    jdbc_paging_enabled => "true"
    jdbc_page_size => "2000"
    jdbc_driver_class => "com.mysql.jdbc.Driver" #这个代表每分钟同步一次
    #jdbc跟账号密码需改成对应环境的
    jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false"
    jdbc_user => "shop"
    jdbc_password => "shop"
    schedule => "* * * * *"
    statement_filepath => "../config/shop/order_item.sql" #这个是shop文件下的sql文件
    record_last_run => true
    use_column_value => false
    last_run_metadata_path => "../config/shop/last_run_item.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了
    clean_run => false
    #是否将 字段(column) 名称转小写
    lowercase_column_names => false
   
  



filter 
    #jdbc默认json,暂时没找到修改方法
    #json 
    #    source => "message"
    #    remove_field => ["message"]
    #

    mutate   
    #需要移除的字段
        remove_field => "@timestamp"
        remove_field => "@version"

    


 
output 

     if [type]=="order_mast"
        elasticsearch 
  
        hosts => ["http://localhost:9200"]
        #如果有账号密码,在下面添加,并去除#号
        #user => elastic
        #password => "elastic@test.com"

        index => "shop_order_mast"  
        document_type => "order_mast" #这个在es7.0版本后就没有type属性了
        document_id => "%cod_order_id"
    
  

   if [type]=="order_item"
    elasticsearch 

        hosts => ["http://localhost:9200"]
        #如果有账号密码,在下面添加,并去除#号
        #user => elastic
        #password => "elastic@test.com"

        index => "shop_order_item"
        document_type => "order_item"
        document_id => "%cod_order_item_id"
    
  


   stdout 
        codec => json_lines
  
  

//如果只有一张表的时候,单表output的配置:

output 


        elasticsearch 
  
        hosts => ["http://localhost:9200"]
        #如果有账号密码,在下面添加,并去除#号
        #user => elastic
        #password => "elastic@test.com"

        index => "shop_order_mast"  
        document_type => "order_mast" #这个在es7.0版本后就没有type属性了
        document_id => "%cod_order_id"
    
  stdout 
        codec => json_lines
  

  

//sql的写法,这里只提供orderItem

SELECT
    `cod_order_item_id` , -- 注意,这里写了cod_order_item_id和下面同样下了cod_order_item_id的意义不一样,第一个是作为ES文档的Id,会跟上面logstash_order.conf文件的 document_id => "%cod_order_item_id"匹配上
    `cod_order_item_id` as "orderItemId",
    `cod_order_id`as "orderId",
    `flg_item_type`as "itemType",
    `cod_market_id`as "marketId",
    `cod_item_id`as "itemId",
    `cod_item_id_main`as "mainItemId",
    `txt_name`as "itemTitle",
    `cod_item_quantity`as "quantity",
    `amt_item`as "itemPrice",
    `cod_score_total`as "scoreTotal",
    `amt_score`as "scoreAmount",
    `amt_charge`as "chargeAmount",
    `amt_standard_price`as "standardPrice",
    `amt_balance_discount`as "balanceDiscountAmount",
    `amt_payment_total`as "itemTotalAmount",
    `amt_coupon_total`as "couponTotalAmount",
    `amt_act_discount`as "actDiscountAmount",
    `cod_order_parent_id`as "parentOrderId",
    `cod_merchant_no`as "shopId",
    `cod_create_user`as "createUserId",

    DATE_FORMAT(
        `dat_modify`,
        ‘%Y-%m-%d %T‘
    ) AS "updateTime",
    DATE_FORMAT(
        `dat_create`,
        ‘%Y-%m-%d %T‘
    ) AS  "createTime",
    
    `cod_modify_user`as "updateUserId"
    
from 
shop_order_item 
WHERE
    dat_modify >= :sql_last_value  -- 这个sql_last_value会读取shop文件夹下的last_run_item.txt的值,第一次同步时,没有该值,所以默认就会是1970年7月1日,相当于是全量新增了

7.如果运行过一次后,打开last_run_item.txt可以看到

技术图片

 

 8.启动logstash:需要保证你的ES已经启动了,并创建了对应的index和type

window环境:在安装目录bin文件下,打开命令窗口,或者打开命令窗口,切换到该路径: logstash -f ../config/shop/logstash_order.conf

如果是在linux环境,切换安装的bin目录执行:

nohup logstash -f ../config/shop/logstash_order.conf > ../logs/logstash.out &

9.之后打开ES查询数据

技术图片

 

 可以看到数据已经同步过来了

10.之后可以在项目中进行对应的数据操作了,因为该同步是一分钟同步一次,所以对于实时性要求特别高的,可以在代码中使用ES的crud操作也进行同步,这样就可以保证万无一失了

11.ES相关操作可以参考:https://www.cnblogs.com/yangxiaohui227/p/11237268.html

12.附上一个orderItem表的(ES版本为6.4.3)操作

@Configuration
public class ElasticsearchConfig  implements InitializingBean

    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);

    @Value("$elasticsearch.cluster.name")
    private String clusterName;

    @Value("$elasticsearch.port")
    private Integer port;

    @Value("$elasticsearch.host")
    private String host;
    
    /**
     * Springboot整合Elasticsearch 在项目启动前设置一下的属性,防止报错
     * 解决netty冲突后初始化client时还会抛出异常
     * java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
     */
    @PostConstruct
   void init() 
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    

//    @Before 
    @Bean
    public TransportClient getTransportClient() 
        TransportClient client=null;
        LOGGER.info("elasticsearch init.");
        try 
            Settings settings = Settings.builder()
                    .put("cluster.name", clusterName) //集群名字
                    .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群
                    .put("thread_pool.search.size", 5).build();//增加线程池个数
            client = new PreBuiltTransportClient(settings);
            TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(host), port);
            client.addTransportAddresses(transportAddress);
            LOGGER.info("elasticsearch init success.");
            return client;
         catch (Exception e) 
            throw new RuntimeException("elasticsearch init fail."+ e);
        
    

//高级查询对象
public class EsQueryObject 
    private String orderId;

    private String customerId;

    private String txtOrderTitle;

    private Integer orderStatus;

    private Integer paymentStatus;

    private String phone;

    private String recieveName;

    private String addresss;

    private String orderSubmitTime_S;
    private String orderSubmitTime_E;

    private String payTime_S;
    private String payTime_E;

    private BigDecimal minPayAmount;

    private BigDecimal maxPayAmount;

    private String shopId;

    private String itemId;

    private String itemTile;

    private Page page;

    public String getOrderId() 
        return orderId;
    

    public void setOrderId(String orderId) 
        this.orderId = orderId;
    

    public String getCustomerId() 
        return customerId;
    

    public void setCustomerId(String customerId) 
        this.customerId = customerId;
    

    public String getTxtOrderTitle() 
        return txtOrderTitle;
    

    public void setTxtOrderTitle(String txtOrderTitle) 
        this.txtOrderTitle = txtOrderTitle;
    

    public Integer getOrderStatus() 
        return orderStatus;
    

    public void setOrderStatus(Integer orderStatus) 
        this.orderStatus = orderStatus;
    

    public Integer getPaymentStatus() 
        return paymentStatus;
    

    public void setPaymentStatus(Integer paymentStatus) 
        this.paymentStatus = paymentStatus;
    

    public String getPhone() 
        return phone;
    

    public void setPhone(String phone) 
        this.phone = phone;
    

    public String getRecieveName() 
        return recieveName;
    

    public void setRecieveName(String recieveName) 
        this.recieveName = recieveName;
    

    public String getAddresss() 
        return addresss;
    

    public void setAddresss(String addresss) 
        this.addresss = addresss;
    

    public String getOrderSubmitTime_S() 
        return orderSubmitTime_S;
    

    public void setOrderSubmitTime_S(String orderSubmitTime_S) 
        this.orderSubmitTime_S = orderSubmitTime_S;
    

    public String getOrderSubmitTime_E() 
        return orderSubmitTime_E;
    

    public void setOrderSubmitTime_E(String orderSubmitTime_E) 
        this.orderSubmitTime_E = orderSubmitTime_E;
    

    public String getPayTime_S() 
        return payTime_S;
    

    public void setPayTime_S(String payTime_S) 
        this.payTime_S = payTime_S;
    

    public String getPayTime_E() 
        return payTime_E;
    

    public void setPayTime_E(String payTime_E) 
        this.payTime_E = payTime_E;
    

    public BigDecimal getMinPayAmount() 
        return minPayAmount;
    

    public void setMinPayAmount(BigDecimal minPayAmount) 
        this.minPayAmount = minPayAmount;
    

    public BigDecimal getMaxPayAmount() 
        return maxPayAmount;
    

    public void setMaxPayAmount(BigDecimal maxPayAmount) 
        this.maxPayAmount = maxPayAmount;
    

    public String getShopId() 
        return shopId;
    

    public void setShopId(String shopId) 
        this.shopId = shopId;
    

    public String getItemId() 
        return itemId;
    

    public void setItemId(String itemId) 
        this.itemId = itemId;
    

    public String getItemTile() 
        return itemTile;
    

    public void setItemTile(String itemTile) 
        this.itemTile = itemTile;
    

    public Page getPage() 
        return page;
    

    public void setPage(Page page) 
        this.page = page;
    

package com.tft.shop.service.order;

import com.alibaba.fastjson.JSON;
import com.bootcrabframework.cloud.core.common.base.GenericBaseService;
import com.bootcrabframework.cloud.core.util.CommonUtil;
import com.bootcrabframework.cloud.core.util.DateUtil;
import com.google.common.collect.Lists;
import com.tft.shop.constant.order.OrderConstant;
import com.tft.shop.entity.es.EsShopOrderItem;
import com.tft.shop.entity.es.EsShopOrderItemRequestDTO;
import com.tft.shop.entity.order.ShopOrderItem;
import com.tft.shop.util.StringUtil;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Service
public class EsShopOrderItemService  extends GenericBaseService 

    @Resource
    private TransportClient transportClient;

    //批量新增
    public void batchInsert(List<EsShopOrderItem> list)
        if(CommonUtil.isNull(list))
            return;
        
        BulkRequest bulkRequest = new BulkRequest();
        list.forEach(a->
            IndexRequest indexRequest = new IndexRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, a.getOrderItemId());
            indexRequest.source(JSON.toJSONString(a), XContentType.JSON);
            bulkRequest.add(indexRequest);

        );

        ActionFuture<BulkResponse> bulk = transportClient.bulk(bulkRequest);
        boolean failures = bulk.actionGet().hasFailures();
        if(!failures)
            return; //没有失败
        
        //如果有失败,输出哪一条是失败的
        try 
            BulkResponse bulkItemResponses = bulk.get();
            if(bulkItemResponses==null)
                return;
            
            if(CommonUtil.isNull(bulkItemResponses.getItems()))
                return;
            
            for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) 
                boolean failed = bulkItemResponse.isFailed();
                if(failed)

                   logger.error("订单项插入ES失败,错误信息,对应订单项编号",bulkItemResponse.getId(),bulkItemResponse.getFailureMessage());
                
            

         catch (InterruptedException e) 
            e.printStackTrace();
         catch (ExecutionException e) 
            e.printStackTrace();
        

    
    //单条新增
    public void insertOne(EsShopOrderItem item)
       if(null==item)
           return;
       
       List<EsShopOrderItem> list =Lists.newArrayList();
       list.add(item);
       this.batchInsert(list);

    
    //单条新增
    public void insertOne(ShopOrderItem orderItem)
        this.insertOne(shopOrderItemChangeToEsOrderItem(orderItem));

    

    private EsShopOrderItem shopOrderItemChangeToEsOrderItem(ShopOrderItem orderItem)
        if(null==orderItem)
            return null;
        
        EsShopOrderItem shopOrderItem = new EsShopOrderItem();
        shopOrderItem.setOrderItemId(orderItem.getCodOrderItemId());
        shopOrderItem.setOrderId(orderItem.getCodOrderId());
        shopOrderItem.setItemType(orderItem.getFlgItemType());
        shopOrderItem.setMarketId(orderItem.getCodMarketId());
        shopOrderItem.setItemId(orderItem.getCodItemId());
        shopOrderItem.setMainItemId(orderItem.getCodItemIdMain());
        shopOrderItem.setItemTitle(orderItem.getTxtName());
        shopOrderItem.setQuantity(orderItem.getCodItemQuantity());
        shopOrderItem.setItemPrice(orderItem.getAmtItem());
        shopOrderItem.setScoreTotal(orderItem.getCodScoreTotal());
        shopOrderItem.setScoreAmount(orderItem.getAmtScore());
        shopOrderItem.setChargeAmount(orderItem.getAmtCharge());
        shopOrderItem.setStandardPrice(orderItem.getAmtStandardPrice());
        shopOrderItem.setBalanceDiscountAmount(orderItem.getAmtBalanceDiscount());
        shopOrderItem.setItemTotalAmount(orderItem.getAmtPaymentTotal());
        shopOrderItem.setActDiscountAmount(orderItem.getAmtActDiscount());
        shopOrderItem.setCouponTotalAmount(orderItem.getAmtCouponTotal());
        shopOrderItem.setParentOrderId(orderItem.getCodOrderParentId());
        shopOrderItem.setShopId(orderItem.getCodMerchantNo());
        shopOrderItem.setCreateUserId(orderItem.getCodCreateUser());
        if(null!=orderItem.getDatCreate())
            shopOrderItem.setCreateTime(DateUtil.dateFormat(orderItem.getDatCreate(),DateUtil.TIME_FORMAT_FULL));
        
        if(null!=orderItem.getDatModify())
            shopOrderItem.setUpdateTime(DateUtil.dateFormat(orderItem.getDatModify(),DateUtil.TIME_FORMAT_FULL));
        
        shopOrderItem.setUpdateUserId(orderItem.getCodModifyUser());
        return shopOrderItem;
    

    //删除
    public void deleteOne(String orderItemId)
        if(CommonUtil.isNull(orderItemId))
            return;
        
        ActionFuture<DeleteResponse> actionFuture = transportClient.delete(new DeleteRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId));
        if(actionFuture==null)
            return;
        
        DeleteResponse deleteResponse = actionFuture.actionGet();
        if(null==deleteResponse || null==deleteResponse.status())
            return;
        
        if(deleteResponse.status().getStatus()!=200)

            logger.error("删除ES订单项,编号为,删除失败",orderItemId);
        


    

    //修改
    public void updateOne(EsShopOrderItem esShopOrderItem)

        if(null==esShopOrderItem)
            return;
        
        UpdateResponse updateResponse = transportClient.prepareUpdate(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, esShopOrderItem.getOrderItemId())
                .setDoc(JSON.toJSONString(esShopOrderItem), XContentType.JSON).execute().actionGet();
        if(null==updateResponse || null==updateResponse.status())
            return;
        
        if(updateResponse.status().getStatus()!=200)
            logger.error("修改ES订单项失败,编号为",esShopOrderItem.getOrderItemId());
        

    
    //修改
    public void updateOne(ShopOrderItem orderItem)
        this.updateOne(this.shopOrderItemChangeToEsOrderItem(orderItem));
    


    //查询单个

    public EsShopOrderItem selectById(String orderItemId)
        if(StringUtil.isEmpty(orderItemId))
            return null;
        
        GetRequestBuilder ret = transportClient.prepareGet(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId);
        if(null==ret || null==ret.get())
            return null;
        
        GetResponse response = ret.get();
        if(StringUtil.isEmpty(response.getSourceAsString()))
            return null;
        
        return JSON.parseObject(response.getSourceAsString(),EsShopOrderItem.class);

    

    /**
     *
     *
     * @param req 高级查询对象,当用商品标题查询的时候,限制只返回最大2000条
     * @return
     */
    public List<EsShopOrderItem> queryAdvanced(EsShopOrderItemRequestDTO req)
        if(null==req)
            return null;
        

        SearchRequest searchRequest = new SearchRequest(OrderConstant.ES_ORDER_ITEM_INDEX);
        searchRequest.types(OrderConstant.ES_ORDER_ITEM_TYPE);
        // 构造查询器
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if(!StringUtils.isEmpty(req.getItemTitle()))
            boolQueryBuilder.must(QueryBuilders.matchQuery("itemTitle",req.getItemTitle()));
        
        if(!StringUtils.isEmpty(req.getItemId()))
            boolQueryBuilder.must(QueryBuilders.termQuery("itemId",req.getItemId()));
        
        if(!StringUtils.isEmpty(req.getShopId()))
            boolQueryBuilder.must(QueryBuilders.termQuery("shopId",req.getShopId()));
        
        if(!StringUtils.isEmpty(req.getCustomerId()))
            boolQueryBuilder.must(QueryBuilders.termQuery("createUserId",req.getCustomerId()));
        
        if(!StringUtils.isEmpty(req.getParentOrderId()))
            boolQueryBuilder.must(QueryBuilders.termQuery("parentOrderId",req.getParentOrderId()));
        
        if(!StringUtils.isEmpty(req.getOrderId()))
            boolQueryBuilder.must(QueryBuilders.termQuery("orderId",req.getOrderId()));
        
        if(null!=req.getItemType() && req.getItemType()>=0)
            boolQueryBuilder.must(QueryBuilders.termQuery("itemType",req.getItemType()));
        
        if(!StringUtils.isEmpty(req.getCreateStartTime()))
            boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte(req.getCreateStartTime()));
        
        if(!StringUtils.isEmpty(req.getCreateEndTime()))
            boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").lte(req.getCreateEndTime()));
        
        sourceBuilder.query(boolQueryBuilder);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        if(!StringUtils.isEmpty(req.getItemTitle()))
            sourceBuilder.from(0).size(2000);
        
        sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.DESC));
        searchRequest.source(sourceBuilder);
        searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
        SearchResponse searchResponse = transportClient.search(searchRequest).actionGet();
        if(null==searchResponse || null==searchResponse.getHits() || searchResponse.getHits().totalHits<=0)
            return null;
        
        List<EsShopOrderItem> list = new ArrayList<>();
        SearchHits hits = searchResponse.getHits();
        for (SearchHit hit : hits) 
            String sourceAsString = hit.getSourceAsString();
            EsShopOrderItem orderItem = JSON.parseObject(sourceAsString, EsShopOrderItem.class);
            list.add(orderItem);
        
        return list;

    

    public List<String> queryOrderIdList(EsShopOrderItemRequestDTO req)
        if(null==req)
            return null;
        
        List<EsShopOrderItem> shopOrderItems = this.queryAdvanced(req);
        if(CommonUtil.isNull(shopOrderItems))
            return null;
        
        return shopOrderItems.stream().map(a->a.getOrderId()).collect(Collectors.toList());

    

//附上shop_order_item的mapping配置:

put shop_order_item


    "settings":
        "analysis":
            "analyzer":
                "thai_analyzer":
                    "type": "custom",
                    "tokenizer": "thai",
                    "filter": [
                        "lowercase",
                        "asciifolding"
                    ]
                ,
                "caseSensitive":
                    "filter": "lowercase",
                    "type": "custom",
                    "tokenizer": "keyword"
                
            
        
    ,

    "mappings":
        "order_item":
            "properties":
                "orderId":
                    "type": "keyword"
                ,
                "parentOrderId":
                    "type": "keyword"
                ,
                "shopId":
                    "type": "keyword"
                ,

                "orderItemId":
                    "type": "keyword"
                ,
                "itemTitle":
                    "type": "text",
                    "analyzer": "thai_analyzer",
                    "search_analyzer": "thai_analyzer"
                ,
                "itemId":
                    "type": "keyword"
                ,
                "mainItemId":
                    "type": "keyword"
                ,
                "marketId":
                    "type": "keyword"
                ,
                "itemType":
                    "type": "integer"
                ,
                "quantity":
                    "type": "integer"
                ,
                "scoreTotal":
                    "type": "integer"
                ,

                "scoreAmount":
                    "type": "double"
                ,
                "chargeAmount":
                    "type": "double"
                ,
                "itemPrice":
                    "type": "double"
                ,
                "standardPrice":
                    "type": "double"
                ,
                "itemTotalAmount":
                    "type": "double"
                ,

                "couponTotalAmount":
                    "type": "double"
                ,
                "balanceDiscountAmount":
                    "type": "double"
                ,
                "actDiscountAmount":
                    "type": "double"
                ,

                "createTime":
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
                    "type": "date"
                ,
                "updateTime":
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
                    "type": "date"
                ,


                "createUserId":
                    "type": "keyword"
                ,
                "updateUserId":
                    "type": "keyword"
                

            
        
    

 

 

以上是关于电商订单ElasticSearch同步解决方案--使用logstash的主要内容,如果未能解决你的问题,请参考以下文章

亿级高并发电商项目-- 实战篇 --万达商城项目 十(安装与配置Elasticsearch和kibana编写搜索功能向ES同步数据库商品数据)

亿级高并发电商项目-- 实战篇 --万达商城项目 十(安装与配置Elasticsearch和kibana编写搜索功能向ES同步数据库商品数据)

亿级高并发电商项目-- 实战篇 --万达商城项目 十(安装与配置Elasticsearch和kibana编写搜索功能向ES同步数据库商品数据)

分布式集群架构场景化解决方案:集群时钟同步问题

分布式集群架构场景化解决方案:集群时钟同步问题

基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处理场景