spark数据监控实战

Posted bigdataer

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark数据监控实战相关的知识,希望对你有一定的参考价值。

转载请注明出处。更多文章请访问 http://bigdataer.net

1.概述

数据准确性,稳定性,时效性是数据开发中需要重点关注的,一般称之为数据质量。保证数据质量往往会占用数据开发工程师的很多精力,所以一个好的数据监控系统或者一个合理的数据监控方案对于数据质量的保证至关重要。本文将展示一种实际生产中使用过的数据监控方案,并给出相关的代码。
数据计算采用spark,报警形式采用邮件报警。涉及到的内容有使用springMVC构建一个支持发送html和文件的邮件接口;在spark计算任意过程中调用邮件接口;在spark中通过邮件接口发送hdfs上的结果数据。

2.架构图

技术分享

说明:通常情况下公司内部的hadoop/spark集群和外网隔离,直接在spark作业里发送邮件显然不现实。所以需要构建一个邮件发送服务,暴露内网接口给spark作业调用,同时也能访问外网,把邮件发送到用户邮箱中。

3.基于springMVC构建的邮件服务

3.1 设计目标

(1)支持自定义邮件发件人昵称,主题,内容等
(2)支持发送html以及文件

3.2技术方案

springMVC,JavaMail

3.3核心代码

邮件发送工具类EmailSendUtil

java    98行

import java.io.File;
import java.util.Date;
import java.util.Properties;

import javax.activation.CommandMap;
import javax.activation.DataHandler;
import javax.activation.FileDataSource;
import javax.activation.MailcapCommandMap;
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;

import org.springframework.stereotype.Service;

@Service
public class EmailSendUtil {

public void sendEmail(String nick,String subject,String content,String receivers,File file) throws Exception {
		
		Properties proper = new Properties();
		proper.setProperty("mail.transport.protocol", "smtp");
		proper.setProperty("mail.stmp.auth", "true");
		
		//账号密码认证
		Session session = Session.getInstance(proper);
		MimeMessage msg = new MimeMessage(session);
		
		try {
			
			MailcapCommandMap mc = (MailcapCommandMap) CommandMap.getDefaultCommandMap();
	        mc.addMailcap("text/html;; x-Java-content-handler=com.sun.mail.handlers.text_html");
	        mc.addMailcap("text/xml;; x-java-content-handler=com.sun.mail.handlers.text_xml");
	        mc.addMailcap("text/plain;; x-java-content-handler=com.sun.mail.handlers.text_plain");
	        mc.addMailcap("multipart/*;; x-java-content-handler=com.sun.mail.handlers.multipart_mixed");
	        mc.addMailcap("message/rfc822;; x-java-content-handler=com.sun.mail.handlers.message_rfc822");
	        CommandMap.setDefaultCommandMap(mc);
			//设置发件人
			String nickname=javax.mail.internet.MimeUtility.encodeText(nick); 
			msg.setFrom(new InternetAddress(nickname+"发件人邮箱地址"));
			//设置收件人
			msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(receivers));
			//设置邮件主题
			msg.setSubject(subject);
			MimeMultipart  msgMimePart = new MimeMultipart ("mixed");
			//正文内容
			MimeBodyPart contents = getBodyPart(content);
			msgMimePart.addBodyPart(contents);
			//附件
			if(file!=null){
				MimeBodyPart attachment = getAttachPart(file);
				msgMimePart.addBodyPart(attachment);
			}
			
			//设置邮件消息体
			msg.setContent(msgMimePart);
			//设置发送时间
			msg.setSentDate(new Date());
			msg.saveChanges();
			
			Transport trans=session.getTransport();
			trans.connect("smtp.exmail.qq.com", "发件人邮箱地址", "密码");
			trans.sendMessage(msg, msg.getRecipients(Message.RecipientType.TO));
			trans.close();
		} catch (Exception e) {
			throw new Exception("email send error:"+e.getMessage());
		}finally{
			if(file!=null&&file.exists()){
				file.delete();
			}
		}
	}

	private static MimeBodyPart getBodyPart(String content) throws MessagingException{
		MimeBodyPart body = new MimeBodyPart();
		MimeMultipart mmp = new MimeMultipart("related");
		MimeBodyPart contents = new MimeBodyPart();
		contents.setContent(content, "text/html;charset=utf-8");
		mmp.addBodyPart(contents);
		body.setContent(mmp);
		return body;
	}
	
	private static MimeBodyPart getAttachPart(File file) throws MessagingException{
		MimeBodyPart attach = new MimeBodyPart();
		FileDataSource fds = new FileDataSource(file);
		attach.setDataHandler(new DataHandler(fds));
		attach.setFileName(file.getName());
		return attach;
	}
}

controller类,写的比较粗糙,提供了两个接口,一个发纯html,一个可以发送混合格式的邮件。

java    47行

import java.io.File;

import net.bigdataer.api.weixin.utils.EmailSendUtil;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;

@Controller
@RequestMapping("/email_api")
public class EmailSendController extends DmBaseController{

	@Autowired
	private EmailSendUtil est;
	
	//只发送html
	@RequestMapping("/send")
	public @ResponseBody String sendEmail(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers){
		String result = "{\"status\":\"0\",\"msg\":\"success\"}";
		try{
			est.sendEmail(nickname,subject, content, receivers,null);
		}catch(Exception e){
			result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}";
		}
		return result;
	}
	
	//发送混合格式的邮件
	@RequestMapping("/sendattachment")
	public @ResponseBody String sendAttachment(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers,@RequestParam("attachment") MultipartFile attachment){
		String result = "{\"status\":\"0\",\"msg\":\"success\"}";
		File file = new File("/opt/soft/tomcat/temp/"+attachment.getOriginalFilename());
		try {
			attachment.transferTo(file); 
			est.sendEmail(nickname,subject, content, receivers,file);
		} catch (Exception e) { 
			result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}";
		}
		
		return result;
	}
	
}

4.spark作业调用邮件接口

4.1封装一个接口调用工具类

这个类提供了对https及http协议的访问,同时支持get和post请求。在本例中没有使用到get请求。
另外这个类依赖于httpclient的相关jar包。我这里使用的jarmaven依赖如下:

xml    11行

 		<dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.5.2</version>
		</dependency>

注意:因为spark源码中也使用了http-core的包,你的引用可能会和spark集群中本身的包冲突导致抛找不到某个类或者没有某个方法的异常,需要根据实际情况调整。不过代码大体上都一样,下面的代码可以参考

java    174行

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpResponseException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;

/**
 * 服务请求类
 * 封装了post,get请求
 * 同时支持http和https方式
 * @author liuxuecheng
 *
 */
public class HttpClientUtil { 

	private static final Log logger = LogFactory.getLog(HttpClientUtil.class);
	//设置超时(单位ms)
	private static int TIME_OUT = 3000;
	private static CloseableHttpClient client = null;
	
	private static TrustManager  trustManager = new X509TrustManager() { 
	    @Override 
	    public X509Certificate[] getAcceptedIssuers() { 
	        return null; 
	    } 
		@Override
		public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
		}
		@Override
		public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
		}
	};
	
	static{
		
		try{
		//请求配置
		RequestConfig config = RequestConfig.custom()
				.setConnectTimeout(TIME_OUT)
				.setConnectionRequestTimeout(TIME_OUT)
				.setSocketTimeout(TIME_OUT)
				.build();
		
		//访问https站点相关
		SSLContext context = SSLContext.getInstance("TLS");
		context.init(null, new TrustManager[]{trustManager}, null);
		SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(context, NoopHostnameVerifier.INSTANCE);
		//注册
		Registry<ConnectionSocketFactory> registry = RegistryBuilder
				.<ConnectionSocketFactory>create()
				.register("http", PlainConnectionSocketFactory.INSTANCE)
				.register("https", scsf)
				.build();
		
		//连接池
		PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(registry);
		
		//构造请求client
		client = HttpClients.custom()
				.setConnectionManager(manager)
				.setDefaultRequestConfig(config)
				.build();
		
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
	/**
	 * post方法
	 * post请求涉及到不同contentType,对应不同的HttpEntity
	 * 这里定义HttpEntity接口,所有实现了这个接口的实体均可传入
	 * @param token
	 * @param url
	 * @param entity
	 * @return
	 * @throws ClientProtocolException
	 * @throws IOException
	 */
	public static JSONObject post(String token,String url,HttpEntity entity) throws ClientProtocolException, IOException{
		//UrlEncodedFormEntity stringEntity = new UrlEncodedFormEntity(,"UTF-8");
		HttpPost post = new HttpPost(url);

		if(token !=null){
			post.setHeader("Authorization", "Bearer "+token);
		}
		post.setHeader("Accept-Charset", "UTF-8");
		post.setEntity(entity);
		
		return client.execute(post, handler);
	}
	
	/**
	 * get请求
	 * @param token
	 * @param url
	 * @param content_type
	 * @param params
	 * @return
	 * @throws ClientProtocolException
	 * @throws IOException
	 * @throws URISyntaxException 
	 */
	public static JSONObject get(String token,String url,String content_type,List<NameValuePair> params) throws ClientProtocolException, IOException, URISyntaxException{
		UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params,"UTF-8");
		entity.setContentType(content_type);
		
		String para = EntityUtils.toString(entity);
		
		HttpGet get = new HttpGet(url);
		if(token !=null){
			get.setHeader("Authorization", "Bearer "+token);
		}
		get.setHeader("Accept-Charset", "UTF-8");

		//get请求将参数拼接在参数中
		get.setURI(new URI(get.getURI().toString()+"?"+para));
		return client.execute(get, handler);
	}
	
	//请求返回格式
	public static ResponseHandler<JSONObject> handler = new ResponseHandler<JSONObject>(){

		@Override
		public JSONObject handleResponse(HttpResponse res) throws ClientProtocolException, IOException {
			StatusLine status = res.getStatusLine();
			HttpEntity entity = res.getEntity();
			
			if(status.getStatusCode()>300){
				throw new HttpResponseException(status.getStatusCode(),
						status.getReasonPhrase());
			} 
			
			if(entity==null){
				throw new ClientProtocolException("respones has no content");
			}
			
			String res_str = EntityUtils.toString(entity);
			
			return JSONObject.parseObject(res_str);
		}
		
	};
}

4.2进一步对发送邮件的接口封装

以下为scala代码

scala    51行

import java.io.File
import java.util

import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.entity.mime.content.{FileBody, StringBody}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.CharsetUtils

/**
  * Created by liuxuecheng on 2017/1/4.
  */
object EmailSendUtil {

  def sendEmail(nickname: String,subject:String,content:String,receivers:String):Unit={
    val url = "邮件发送接口地址"
    val content_type = "application/x-www-form-urlencoded"

    val params = new util.ArrayList[NameValuePair]()
    params.add(new BasicNameValuePair ("nickname",nickname))
    params.add(new BasicNameValuePair ("subject",subject))
    params.add(new BasicNameValuePair ("content",content))
    params.add(new BasicNameValuePair ("receivers",receivers))
    try{
      val entity = new UrlEncodedFormEntity(params,"UTF-8")
      entity.setContentType(content_type)
      HttpClientUtil.post(null,url,entity)
    }catch{
      case e:Throwable=>e.printStackTrace()
    }
  }

  def sendAttachment(nickname: String,subject:String,content:String,receivers:String,file:File):Unit={
    val url = "邮件发送接口地址"
    val body = new FileBody(file)
    val entity = MultipartEntityBuilder.create()
      .setCharset(CharsetUtils.get("UTF-8"))
      .addPart("attachment",body)
      .addPart("nickname",new StringBody(nickname,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addPart("subject",new StringBody(subject,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addPart("content",new StringBody(content,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addTextBody("receivers",receivers)
      .setContentType(ContentType.MULTIPART_FORM_DATA)
      .build()

    HttpClientUtil.post(null,url,entity)
  }
}

4.3spark读取hdfs文件并创建File对象

以下截取代码片段

scala    7行

 def getHdfsFile(sc:SparkContext,path:String):File = {
 //需要hdfs的全路径,开头一般为hdfs://或者viewfs:// 并且具体到文件名
    val filePath = "viewfs://xx-cluster"+path+"part-00000"
    sc.addFile(filePath)
    new File(SparkFiles.get(new File(filePath).getName))
  }

既然都拿到文件了,发送邮件不就很简单了,调用上面封装好的接口就行。

4.4spark发送html

有时候没必要生成hdfs,计算结果适合报表展示的时候可以直接collect到内存中,然后构建一段html发送,上代码。

scala    17行

	val rdd = group_rdd.groupByKey(200)
      .map(x=>{
        val raw_uv = x._2.flatMap(e=>e._1).toSeq.distinct.size
        val raw_pv = x._2.map(e=>e._2).reduce(_+_)
        val cm_uv = x._2.flatMap(e=>e._3).toSeq.distinct.size
        val cm_pv = x._2.map(e=>e._4).reduce(_+_)
        IndexEntity(x._1._1,x._1._2,x._1._3,raw_uv,raw_pv,cm_uv,cm_pv)
      }).collect().sortBy(_.search_flag).sortBy(_.platform).sortBy(_.bd)

    //模板拼接
    val tbody:StringBuffer = new StringBuffer()
    rdd.foreach(entity=>{
      tbody.append(s"<tr><td>${entity.bd}</td><td>${entity.platform}</td><td>${entity.search_flag}</td>" +
        s"<td>${entity.raw_uv}</td><td>${entity.cm_uv}</td><td>${new DecimalFormat(".00").format((entity.cm_uv.toDouble/entity.raw_uv)*100)}%</td>" +
        s"<td>${entity.raw_pv}</td><td>${entity.cm_pv}</td><td>${new DecimalFormat(".00").format((entity.cm_pv.toDouble/entity.raw_pv)*100)}%</td></tr>")
    })



以上是关于spark数据监控实战的主要内容,如果未能解决你的问题,请参考以下文章

Spark编程实战-词频统计

Zabbix中小型企业Zabbix监控实战之告警大全

在这个 spark 代码片段中 ordering.by 是啥意思?

Spark入门实战系列--3.Spark编程模型(下)--IDEA搭建及实战

python+spark程序代码片段

Spark实时监控yarn指标: