Flume 3集群如何采集信息呢?

Posted brentboys

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume 3集群如何采集信息呢?相关的知识,希望对你有一定的参考价值。

master做为集合节点

$FLUME_HOME/conf/agent1.conf

  

agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

agent1.channels.ch1.type=memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100
agent1.channels.ch1.keep-alive = 30

agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type=avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 5140
agent1.sources.avro-sources1.threads = 20


agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type=hdfs
agent1.sinks.log-sink1.hdfs.path=hdfs://master:9000/flume/data/%Y-%m-%d
agent1.sinks.log-sink1.hdfs.filePrefix = data
agent1.sinks.log-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.log-sink1.hdfs.inUseSuffix = .tmp
agent1.sinks.log-sink1.hdfs.writeFormat = Text
agent1.sinks.log-sink1.hdfs.fileType=DataStream
agent1.sinks.log-sink1.hdfs.rollInterval = 300
agent1.sinks.log-sink1.hdfs.rollSize = 0
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.batchSize = 10
agent1.sinks.log-sink1.txnEventMax = 1000
agent1.sinks.log-sink1.hdfs.callTimeout = 60000
agent1.sinks.log-sink1.hdfs.appendTimeout=60000

$FLUME_HOME/agent1.bash

#!bin/bash
bin/flume-ng agent -n agent1 -c conf -f conf/agent1.conf -Dflume.root.logger=DEBUG,console>./agent1.log 2>&1 &

 

slave1 为采集节点

$FLUME_HOME/conf/agent2.conf

agent2.sources = source2
#define agent1‘s sink name
agent2.sinks = sink2
#define agent2‘s channel name
agent2.channels = channel2


#define source1‘s source type
agent2.sources.source2.type=syslogtcp
#define source1 listener‘s device name
agent2.sources.source2.bind = slave1
#define port of source1 use syslogtcp recive data
agent2.sources.source2.port = 5140
#define encoding method of source recive data
agent2.sources.source2.encoding=utf-8

 

#define sink1 send data of type
agent2.sinks.sink2.type=avro
#define sink1 send data to where device
agent2.sinks.sink2.hostname = master
#defin sink1 send data to where port
agent2.sinks.sink2.port = 5140


#define channel1 save data of type
agent2.channels.channel2.type=memory
#define max of save data
agent2.channels.channel2.capacity=100000
#
agent2.channels.channel2.transactionCapacity=100
#
agent2.channels.channel2.keep-alive = 30

#let source1 and sink1 bind to channel1
agent2.sources.source2.channels = channel2
agent2.sinks.sink2.channel = channel2

 

$FLUME_HOME/agent2.bash

#!bin/bash
bin/flume-ng agent -n agent2 -c conf -f conf/agent2.conf -Dflume.root.logger=DEBUG,console>./agent2.log 2>&1 &

slava2 为采集节点

$FLUME_HOME/conf/agent3.conf

gent4‘s source name
agent3.sources = source3
#define agent1‘s sink name
agent3.sinks = sink3
#define agent1‘s channel name
agent3.channels = channel3


#define source3‘s source type
agent3.sources.source3.type=syslogtcp
#define source3 listener‘s device name
agent3.sources.source3.bind = slave2
#define port of source1 use syslogtcp recive data
agent3.sources.source3.port = 5140
#define encoding method of source recive data
agent3.sources.source3.encoding=utf-8

 

#define sink1 send data of type
agent3.sinks.sink3.type=avro
#define sink1 send data to where device
agent3.sinks.sink3.hostname = master
#defin sink1 send data to where port
agent3.sinks.sink3.port = 5140
#define channel1 save data of type
agent3.channels.channel3.type=memory
#define max of save data
agent3.channels.channel3.capacity=100000
#
agent3.channels.channel3.transactionCapacity=100
#
agent3.channels.channel3.keep-alive = 30

#let source1 and sink3 bind to channel1
agent3.sources.source3.channels = channel3
agent3.sinks.sink3.channel = channel3

 

$FLUME_HOME/agent1.bash

#!bin/bash
bin/flume-ng agent -n agent2 -c conf -f conf/agent2.conf -Dflume.root.logger=DEBUG,console>./agent2.log 2>&1 &

 

 

 

 

JAVA生成数据发送到slave1和slave2上面。

package cn.brent;

import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class ProductData {

public static long userTime;
public static String userAccount;
public static String[] userIP1 = { "223", "175", "202", "110", "221",
"103", "118", "125" };
public static String[] userIP2 = { "220", "184", "100", "166", "207", "22",
"213", "72" };
public static String[] userIP3 = { "0", "128", "255", "191", "135", "100",
"63", "103", "144", "159", "136", "143" };
public static String[] userIP4 = { "0", "255" };
public static int skypeid;
public static String innerIP;
public static String privateValue;
public static String[] devName1 = { "MacBook Pro mac", "iphone 5s",
"Thinkpad" };
public static String[] osName1 = { "os x 10.9", "ios", "win 8" };
public static String str;
public static String data;
public static int skypeid1;
public static String natIPTest;

/**
* gen MD5 for string.
*/
public String md5s(String plainText) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(plainText.getBytes());
byte b[] = md.digest();
int i;
StringBuffer buf = new StringBuffer("");
for (int offset = 0; offset < b.length; offset++) {
i = b[offset];
if (i < 0)
i += 256;
//if (i < 16)
//buf.append("0");
//buf.append(Integer.toHexString(i));
}
str = buf.toString();
// System.out.println(buf.toString());// 32 bit encryption
// System.out.println("result: " + buf.toString().substring(8,
// 24));// 16 bit encryption
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
return str;
}

/**
* create random array.
*/
public String getDev() {
int index = (int) (Math.random() * devName1.length);
return devName1[index];
}

public String getOs() {
int index = (int) (Math.random() * osName1.length);
return osName1[index];
}

public String getIP1() {
int index = (int) (Math.random() * userIP1.length);
return userIP1[index];
}

public String getIP2() {
int index = (int) (Math.random() * userIP2.length);
return userIP2[index];
}

public String getIP3() {
int index = (int) (Math.random() * userIP3.length);
return userIP3[index];
}

public String getIP4() {
int index = (int) (Math.random() * userIP4.length);
return userIP4[index];
}

public int getSkypeID() {
skypeid1 = new Random().nextInt(999999999);
return skypeid1;
}

public String getInnerIP() {
java.util.Random rd = new java.util.Random();
return rd.nextInt(255) + "." + rd.nextInt(255);
}

/**
* create data
*/
public static List<String> creatData() {
ProductData pd = new ProductData();
Random rd = new Random((long) (Math.random() * 75));
List<String> dd = new ArrayList<String>();
userTime = System.currentTimeMillis();
innerIP = "192" + "." + "168" + "." + pd.getInnerIP();
String userIP = pd.getIP1() + "." + pd.getIP2() + "." + pd.getIP3()
+ "." + pd.getIP4();
String devName = pd.getDev();
String osName = pd.getOs();
userAccount = "9" + rd.nextInt(9999999);
if (userAccount.length() < 12) {
String userModel = "1234567890";
int a = 11 - userAccount.length();
String userResult = userModel
.substring((int) Math.random() * 10, a);
userAccount = userAccount + userResult;
}
int times = rd.nextInt(25) + 1;
skypeid = pd.getSkypeID();
for (int i = 0; i < times; i++) {
int s = rd.nextInt(3);
if (s == 2) {
skypeid = pd.getSkypeID();
innerIP = "192" + "." + "168" + "." + pd.getInnerIP();
privateValue = pd.md5s(userAccount);
}
String data = userTime + "\t" + userAccount + "\t" + userIP + "\t"
+ skypeid + "\t" + innerIP + "\t" + privateValue + "\t" + devName
+ "\t" + osName + "\n";
dd.add(data);
}
return dd;
}

/**
* send socket-UDP
*/

public void udpSendSocket() {
DatagramSocket ds;
try {
ds = new DatagramSocket();
List<String> dataflumn = ProductData.creatData();
System.out.println(dataflumn);
for (String list : dataflumn) {
;
byte[] by = list.getBytes();
DatagramPacket dp = new DatagramPacket(by, by.length,
InetAddress.getByName("192.168.2.53"), 10006);
ds.send(dp);
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* send socket-TCP
*/
public void tcpSendSocket() {
Socket s1 = null;
try {
List<String> dataflumn = ProductData.creatData();
// System.out.println(dataflumn);
for (String list : dataflumn) {
//s1 = new Socket("192.168.2.101", 10034);
s1 = new Socket("192.168.239.129",5140);
BufferedOutputStream bosout = new BufferedOutputStream(
s1.getOutputStream());
bosout.write(list.getBytes());
bosout.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != s1) {
try {
s1.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}


/**
* send socket-TCP
*/
public void tcpSendSocket2() {
Socket s1 = null;
try {
List<String> dataflumn = ProductData.creatData();
// System.out.println(dataflumn);
for (String list : dataflumn) {
//s1 = new Socket("192.168.2.101", 10034);
s1 = new Socket("192.168.239.130",5140);
BufferedOutputStream bosout = new BufferedOutputStream(
s1.getOutputStream());
bosout.write(list.getBytes());
bosout.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != s1) {
try {
s1.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

 

public static void main(String[] args) throws UnknownHostException {
Thread thread = new Thread(new SleepRunner());
thread.start();
Thread thread2 = new Thread(new SleepRunner2());
thread2.start();

}
}

/**
* set rate of transform
*/
class SleepRunner implements Runnable {

private ProductData pd = new ProductData();

@SuppressWarnings("static-access")
public void run() {
while (true) {
try {
pd.tcpSendSocket();
Thread.currentThread().sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* set rate of transform
*/
class SleepRunner2 implements Runnable {

private ProductData pd = new ProductData();

@SuppressWarnings("static-access")
public void run() {
while (true) {
try {
pd.tcpSendSocket2();
Thread.currentThread().sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

 实时监控日志tail -f agent1.log,tail -f agent3.log,tail -f agent3.log

 如何查看收集到HDFS上面的数据呢

hdfs dfs -cat /flume/data/2019-05-01/data*

hadoop fs -cat /flume/data/2019-05-01/data*

以上是关于Flume 3集群如何采集信息呢?的主要内容,如果未能解决你的问题,请参考以下文章

Flume和Kafka完成实时数据的采集

Flume数据采集之常见集群配置案例

flume日志采集框架使用

Hadoop

如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS

如何利用flume从syslog获取日志?