电商订单ElasticSearch同步解决方案--使用logstash
Posted yangxiaohui227
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了电商订单ElasticSearch同步解决方案--使用logstash相关的知识,希望对你有一定的参考价值。
一.使用logstash同步订单数据(订单表和订单项表)到ElasticSearch:
1.到官网下载logstash:https://www.elastic.co/cn/downloads/logstash
2.安装logstash前,确保需要先安装java的jdk环境
3.下载后,解压:之后千万别到bin环境点击logstash.bat这个命令启动,这样会报错的
4.接下来,在logstash安装目录找到config文件夹,在那里新增一个文件夹,我新建的为shop文件夹,然后在里面添加如下文件:
5.开始时.last_run_item.txt和last_run_order.txt文件是没数据的
6.logstash_order.conf文件的配置如下:
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input jdbc type => "order_mast" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个 jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包 jdbc_paging_enabled => "true" jdbc_page_size => "2000" jdbc_driver_class => "com.mysql.jdbc.Driver" #jdbc跟账号密码需改成对应环境的 jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false" jdbc_user => "shop" jdbc_password => "shop" schedule => "* * * * *" #这个代表每分钟同步一次 statement_filepath => "../config/shop/order_mast.sql" #这个是shop文件下的sql文件 record_last_run => true use_column_value => false last_run_metadata_path => "../config/shop/last_run_order.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了 clean_run => false #是否将 字段(column) 名称转小写 lowercase_column_names => false jdbc type => "order_item" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个 jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包 jdbc_paging_enabled => "true" jdbc_page_size => "2000" jdbc_driver_class => "com.mysql.jdbc.Driver" #这个代表每分钟同步一次 #jdbc跟账号密码需改成对应环境的 jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false" jdbc_user => "shop" jdbc_password => "shop" schedule => "* * * * *" statement_filepath => "../config/shop/order_item.sql" #这个是shop文件下的sql文件 record_last_run => true use_column_value => false last_run_metadata_path => "../config/shop/last_run_item.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了 clean_run => false #是否将 字段(column) 名称转小写 lowercase_column_names => false filter #jdbc默认json,暂时没找到修改方法 #json # source => "message" # remove_field => ["message"] # mutate #需要移除的字段 remove_field => "@timestamp" remove_field => "@version" output if [type]=="order_mast" elasticsearch hosts => ["http://localhost:9200"] #如果有账号密码,在下面添加,并去除#号 #user => elastic #password => "elastic@test.com" index => "shop_order_mast" document_type => "order_mast" #这个在es7.0版本后就没有type属性了 document_id => "%cod_order_id" if [type]=="order_item" elasticsearch hosts => ["http://localhost:9200"] #如果有账号密码,在下面添加,并去除#号 #user => elastic #password => "elastic@test.com" index => "shop_order_item" document_type => "order_item" document_id => "%cod_order_item_id" stdout codec => json_lines
//如果只有一张表的时候,单表output的配置:
output elasticsearch hosts => ["http://localhost:9200"] #如果有账号密码,在下面添加,并去除#号 #user => elastic #password => "elastic@test.com" index => "shop_order_mast" document_type => "order_mast" #这个在es7.0版本后就没有type属性了 document_id => "%cod_order_id" stdout codec => json_lines
//sql的写法,这里只提供orderItem
SELECT `cod_order_item_id` , -- 注意,这里写了cod_order_item_id和下面同样下了cod_order_item_id的意义不一样,第一个是作为ES文档的Id,会跟上面logstash_order.conf文件的 document_id => "%cod_order_item_id"匹配上 `cod_order_item_id` as "orderItemId", `cod_order_id`as "orderId", `flg_item_type`as "itemType", `cod_market_id`as "marketId", `cod_item_id`as "itemId", `cod_item_id_main`as "mainItemId", `txt_name`as "itemTitle", `cod_item_quantity`as "quantity", `amt_item`as "itemPrice", `cod_score_total`as "scoreTotal", `amt_score`as "scoreAmount", `amt_charge`as "chargeAmount", `amt_standard_price`as "standardPrice", `amt_balance_discount`as "balanceDiscountAmount", `amt_payment_total`as "itemTotalAmount", `amt_coupon_total`as "couponTotalAmount", `amt_act_discount`as "actDiscountAmount", `cod_order_parent_id`as "parentOrderId", `cod_merchant_no`as "shopId", `cod_create_user`as "createUserId", DATE_FORMAT( `dat_modify`, ‘%Y-%m-%d %T‘ ) AS "updateTime", DATE_FORMAT( `dat_create`, ‘%Y-%m-%d %T‘ ) AS "createTime", `cod_modify_user`as "updateUserId" from shop_order_item WHERE dat_modify >= :sql_last_value -- 这个sql_last_value会读取shop文件夹下的last_run_item.txt的值,第一次同步时,没有该值,所以默认就会是1970年7月1日,相当于是全量新增了
7.如果运行过一次后,打开last_run_item.txt可以看到
8.启动logstash:需要保证你的ES已经启动了,并创建了对应的index和type
window环境:在安装目录bin文件下,打开命令窗口,或者打开命令窗口,切换到该路径: logstash -f ../config/shop/logstash_order.conf
如果是在linux环境,切换安装的bin目录执行:
nohup logstash -f ../config/shop/logstash_order.conf > ../logs/logstash.out &
9.之后打开ES查询数据
可以看到数据已经同步过来了
10.之后可以在项目中进行对应的数据操作了,因为该同步是一分钟同步一次,所以对于实时性要求特别高的,可以在代码中使用ES的crud操作也进行同步,这样就可以保证万无一失了
11.ES相关操作可以参考:https://www.cnblogs.com/yangxiaohui227/p/11237268.html
12.附上一个orderItem表的(ES版本为6.4.3)操作
@Configuration public class ElasticsearchConfig implements InitializingBean private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class); @Value("$elasticsearch.cluster.name") private String clusterName; @Value("$elasticsearch.port") private Integer port; @Value("$elasticsearch.host") private String host; /** * Springboot整合Elasticsearch 在项目启动前设置一下的属性,防止报错 * 解决netty冲突后初始化client时还会抛出异常 * java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4] */ @PostConstruct void init() System.setProperty("es.set.netty.runtime.available.processors", "false"); // @Before @Bean public TransportClient getTransportClient() TransportClient client=null; LOGGER.info("elasticsearch init."); try Settings settings = Settings.builder() .put("cluster.name", clusterName) //集群名字 .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群 .put("thread_pool.search.size", 5).build();//增加线程池个数 client = new PreBuiltTransportClient(settings); TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(host), port); client.addTransportAddresses(transportAddress); LOGGER.info("elasticsearch init success."); return client; catch (Exception e) throw new RuntimeException("elasticsearch init fail."+ e);
//高级查询对象 public class EsQueryObject private String orderId; private String customerId; private String txtOrderTitle; private Integer orderStatus; private Integer paymentStatus; private String phone; private String recieveName; private String addresss; private String orderSubmitTime_S; private String orderSubmitTime_E; private String payTime_S; private String payTime_E; private BigDecimal minPayAmount; private BigDecimal maxPayAmount; private String shopId; private String itemId; private String itemTile; private Page page; public String getOrderId() return orderId; public void setOrderId(String orderId) this.orderId = orderId; public String getCustomerId() return customerId; public void setCustomerId(String customerId) this.customerId = customerId; public String getTxtOrderTitle() return txtOrderTitle; public void setTxtOrderTitle(String txtOrderTitle) this.txtOrderTitle = txtOrderTitle; public Integer getOrderStatus() return orderStatus; public void setOrderStatus(Integer orderStatus) this.orderStatus = orderStatus; public Integer getPaymentStatus() return paymentStatus; public void setPaymentStatus(Integer paymentStatus) this.paymentStatus = paymentStatus; public String getPhone() return phone; public void setPhone(String phone) this.phone = phone; public String getRecieveName() return recieveName; public void setRecieveName(String recieveName) this.recieveName = recieveName; public String getAddresss() return addresss; public void setAddresss(String addresss) this.addresss = addresss; public String getOrderSubmitTime_S() return orderSubmitTime_S; public void setOrderSubmitTime_S(String orderSubmitTime_S) this.orderSubmitTime_S = orderSubmitTime_S; public String getOrderSubmitTime_E() return orderSubmitTime_E; public void setOrderSubmitTime_E(String orderSubmitTime_E) this.orderSubmitTime_E = orderSubmitTime_E; public String getPayTime_S() return payTime_S; public void setPayTime_S(String payTime_S) this.payTime_S = payTime_S; public String getPayTime_E() return payTime_E; public void setPayTime_E(String payTime_E) this.payTime_E = payTime_E; public BigDecimal getMinPayAmount() return minPayAmount; public void setMinPayAmount(BigDecimal minPayAmount) this.minPayAmount = minPayAmount; public BigDecimal getMaxPayAmount() return maxPayAmount; public void setMaxPayAmount(BigDecimal maxPayAmount) this.maxPayAmount = maxPayAmount; public String getShopId() return shopId; public void setShopId(String shopId) this.shopId = shopId; public String getItemId() return itemId; public void setItemId(String itemId) this.itemId = itemId; public String getItemTile() return itemTile; public void setItemTile(String itemTile) this.itemTile = itemTile; public Page getPage() return page; public void setPage(Page page) this.page = page;
package com.tft.shop.service.order; import com.alibaba.fastjson.JSON; import com.bootcrabframework.cloud.core.common.base.GenericBaseService; import com.bootcrabframework.cloud.core.util.CommonUtil; import com.bootcrabframework.cloud.core.util.DateUtil; import com.google.common.collect.Lists; import com.tft.shop.constant.order.OrderConstant; import com.tft.shop.entity.es.EsShopOrderItem; import com.tft.shop.entity.es.EsShopOrderItemRequestDTO; import com.tft.shop.entity.order.ShopOrderItem; import com.tft.shop.util.StringUtil; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service public class EsShopOrderItemService extends GenericBaseService @Resource private TransportClient transportClient; //批量新增 public void batchInsert(List<EsShopOrderItem> list) if(CommonUtil.isNull(list)) return; BulkRequest bulkRequest = new BulkRequest(); list.forEach(a-> IndexRequest indexRequest = new IndexRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, a.getOrderItemId()); indexRequest.source(JSON.toJSONString(a), XContentType.JSON); bulkRequest.add(indexRequest); ); ActionFuture<BulkResponse> bulk = transportClient.bulk(bulkRequest); boolean failures = bulk.actionGet().hasFailures(); if(!failures) return; //没有失败 //如果有失败,输出哪一条是失败的 try BulkResponse bulkItemResponses = bulk.get(); if(bulkItemResponses==null) return; if(CommonUtil.isNull(bulkItemResponses.getItems())) return; for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) boolean failed = bulkItemResponse.isFailed(); if(failed) logger.error("订单项插入ES失败,错误信息,对应订单项编号",bulkItemResponse.getId(),bulkItemResponse.getFailureMessage()); catch (InterruptedException e) e.printStackTrace(); catch (ExecutionException e) e.printStackTrace(); //单条新增 public void insertOne(EsShopOrderItem item) if(null==item) return; List<EsShopOrderItem> list =Lists.newArrayList(); list.add(item); this.batchInsert(list); //单条新增 public void insertOne(ShopOrderItem orderItem) this.insertOne(shopOrderItemChangeToEsOrderItem(orderItem)); private EsShopOrderItem shopOrderItemChangeToEsOrderItem(ShopOrderItem orderItem) if(null==orderItem) return null; EsShopOrderItem shopOrderItem = new EsShopOrderItem(); shopOrderItem.setOrderItemId(orderItem.getCodOrderItemId()); shopOrderItem.setOrderId(orderItem.getCodOrderId()); shopOrderItem.setItemType(orderItem.getFlgItemType()); shopOrderItem.setMarketId(orderItem.getCodMarketId()); shopOrderItem.setItemId(orderItem.getCodItemId()); shopOrderItem.setMainItemId(orderItem.getCodItemIdMain()); shopOrderItem.setItemTitle(orderItem.getTxtName()); shopOrderItem.setQuantity(orderItem.getCodItemQuantity()); shopOrderItem.setItemPrice(orderItem.getAmtItem()); shopOrderItem.setScoreTotal(orderItem.getCodScoreTotal()); shopOrderItem.setScoreAmount(orderItem.getAmtScore()); shopOrderItem.setChargeAmount(orderItem.getAmtCharge()); shopOrderItem.setStandardPrice(orderItem.getAmtStandardPrice()); shopOrderItem.setBalanceDiscountAmount(orderItem.getAmtBalanceDiscount()); shopOrderItem.setItemTotalAmount(orderItem.getAmtPaymentTotal()); shopOrderItem.setActDiscountAmount(orderItem.getAmtActDiscount()); shopOrderItem.setCouponTotalAmount(orderItem.getAmtCouponTotal()); shopOrderItem.setParentOrderId(orderItem.getCodOrderParentId()); shopOrderItem.setShopId(orderItem.getCodMerchantNo()); shopOrderItem.setCreateUserId(orderItem.getCodCreateUser()); if(null!=orderItem.getDatCreate()) shopOrderItem.setCreateTime(DateUtil.dateFormat(orderItem.getDatCreate(),DateUtil.TIME_FORMAT_FULL)); if(null!=orderItem.getDatModify()) shopOrderItem.setUpdateTime(DateUtil.dateFormat(orderItem.getDatModify(),DateUtil.TIME_FORMAT_FULL)); shopOrderItem.setUpdateUserId(orderItem.getCodModifyUser()); return shopOrderItem; //删除 public void deleteOne(String orderItemId) if(CommonUtil.isNull(orderItemId)) return; ActionFuture<DeleteResponse> actionFuture = transportClient.delete(new DeleteRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId)); if(actionFuture==null) return; DeleteResponse deleteResponse = actionFuture.actionGet(); if(null==deleteResponse || null==deleteResponse.status()) return; if(deleteResponse.status().getStatus()!=200) logger.error("删除ES订单项,编号为,删除失败",orderItemId); //修改 public void updateOne(EsShopOrderItem esShopOrderItem) if(null==esShopOrderItem) return; UpdateResponse updateResponse = transportClient.prepareUpdate(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, esShopOrderItem.getOrderItemId()) .setDoc(JSON.toJSONString(esShopOrderItem), XContentType.JSON).execute().actionGet(); if(null==updateResponse || null==updateResponse.status()) return; if(updateResponse.status().getStatus()!=200) logger.error("修改ES订单项失败,编号为",esShopOrderItem.getOrderItemId()); //修改 public void updateOne(ShopOrderItem orderItem) this.updateOne(this.shopOrderItemChangeToEsOrderItem(orderItem)); //查询单个 public EsShopOrderItem selectById(String orderItemId) if(StringUtil.isEmpty(orderItemId)) return null; GetRequestBuilder ret = transportClient.prepareGet(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId); if(null==ret || null==ret.get()) return null; GetResponse response = ret.get(); if(StringUtil.isEmpty(response.getSourceAsString())) return null; return JSON.parseObject(response.getSourceAsString(),EsShopOrderItem.class); /** * * * @param req 高级查询对象,当用商品标题查询的时候,限制只返回最大2000条 * @return */ public List<EsShopOrderItem> queryAdvanced(EsShopOrderItemRequestDTO req) if(null==req) return null; SearchRequest searchRequest = new SearchRequest(OrderConstant.ES_ORDER_ITEM_INDEX); searchRequest.types(OrderConstant.ES_ORDER_ITEM_TYPE); // 构造查询器 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); if(!StringUtils.isEmpty(req.getItemTitle())) boolQueryBuilder.must(QueryBuilders.matchQuery("itemTitle",req.getItemTitle())); if(!StringUtils.isEmpty(req.getItemId())) boolQueryBuilder.must(QueryBuilders.termQuery("itemId",req.getItemId())); if(!StringUtils.isEmpty(req.getShopId())) boolQueryBuilder.must(QueryBuilders.termQuery("shopId",req.getShopId())); if(!StringUtils.isEmpty(req.getCustomerId())) boolQueryBuilder.must(QueryBuilders.termQuery("createUserId",req.getCustomerId())); if(!StringUtils.isEmpty(req.getParentOrderId())) boolQueryBuilder.must(QueryBuilders.termQuery("parentOrderId",req.getParentOrderId())); if(!StringUtils.isEmpty(req.getOrderId())) boolQueryBuilder.must(QueryBuilders.termQuery("orderId",req.getOrderId())); if(null!=req.getItemType() && req.getItemType()>=0) boolQueryBuilder.must(QueryBuilders.termQuery("itemType",req.getItemType())); if(!StringUtils.isEmpty(req.getCreateStartTime())) boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte(req.getCreateStartTime())); if(!StringUtils.isEmpty(req.getCreateEndTime())) boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").lte(req.getCreateEndTime())); sourceBuilder.query(boolQueryBuilder); sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); if(!StringUtils.isEmpty(req.getItemTitle())) sourceBuilder.from(0).size(2000); sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.DESC)); searchRequest.source(sourceBuilder); searchRequest.searchType(SearchType.QUERY_THEN_FETCH); SearchResponse searchResponse = transportClient.search(searchRequest).actionGet(); if(null==searchResponse || null==searchResponse.getHits() || searchResponse.getHits().totalHits<=0) return null; List<EsShopOrderItem> list = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); for (SearchHit hit : hits) String sourceAsString = hit.getSourceAsString(); EsShopOrderItem orderItem = JSON.parseObject(sourceAsString, EsShopOrderItem.class); list.add(orderItem); return list; public List<String> queryOrderIdList(EsShopOrderItemRequestDTO req) if(null==req) return null; List<EsShopOrderItem> shopOrderItems = this.queryAdvanced(req); if(CommonUtil.isNull(shopOrderItems)) return null; return shopOrderItems.stream().map(a->a.getOrderId()).collect(Collectors.toList());
//附上shop_order_item的mapping配置:
put shop_order_item
"settings":
"analysis":
"analyzer":
"thai_analyzer":
"type": "custom",
"tokenizer": "thai",
"filter": [
"lowercase",
"asciifolding"
]
,
"caseSensitive":
"filter": "lowercase",
"type": "custom",
"tokenizer": "keyword"
,
"mappings":
"order_item":
"properties":
"orderId":
"type": "keyword"
,
"parentOrderId":
"type": "keyword"
,
"shopId":
"type": "keyword"
,
"orderItemId":
"type": "keyword"
,
"itemTitle":
"type": "text",
"analyzer": "thai_analyzer",
"search_analyzer": "thai_analyzer"
,
"itemId":
"type": "keyword"
,
"mainItemId":
"type": "keyword"
,
"marketId":
"type": "keyword"
,
"itemType":
"type": "integer"
,
"quantity":
"type": "integer"
,
"scoreTotal":
"type": "integer"
,
"scoreAmount":
"type": "double"
,
"chargeAmount":
"type": "double"
,
"itemPrice":
"type": "double"
,
"standardPrice":
"type": "double"
,
"itemTotalAmount":
"type": "double"
,
"couponTotalAmount":
"type": "double"
,
"balanceDiscountAmount":
"type": "double"
,
"actDiscountAmount":
"type": "double"
,
"createTime":
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
"type": "date"
,
"updateTime":
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
"type": "date"
,
"createUserId":
"type": "keyword"
,
"updateUserId":
"type": "keyword"
以上是关于电商订单ElasticSearch同步解决方案--使用logstash的主要内容,如果未能解决你的问题,请参考以下文章
亿级高并发电商项目-- 实战篇 --万达商城项目 十(安装与配置Elasticsearch和kibana编写搜索功能向ES同步数据库商品数据)
亿级高并发电商项目-- 实战篇 --万达商城项目 十(安装与配置Elasticsearch和kibana编写搜索功能向ES同步数据库商品数据)
亿级高并发电商项目-- 实战篇 --万达商城项目 十(安装与配置Elasticsearch和kibana编写搜索功能向ES同步数据库商品数据)