Spark and Kafka
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark and Kafka相关的知识,希望对你有一定的参考价值。
package cn.itcast_02_zkclientapi;
import java.util.List;
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestZKclient {
private ZkClient zkClient = null;
/**
* 创建zookeeper连接
*/
@Before
public void connection() {
//zookeeper地址和超时时间
zkClient = new ZkClient("slave1:2181,slave2:2181,slave3:2181",2000);
}
/**
* 关闭zookeeper连接
*/
@After
public void close() {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
zkClient.close();
}
/**
* 创建节点
*/
@Test
public void testCreateZnode() {
zkClient.create("/zkClient", "abc", CreateMode.PERSISTENT);
}
/**
* 删除节点
*/
@Test
public void testDeleteZnode() {
zkClient.delete("/zkClient0000000010");
}
/**
* 更新节点
*/
@Test
public void testUpdateZnode() {
zkClient.updateDataSerialized("/zkClient", new DataUpdater<String>() {
@Override
public String update(String currentData) {
//返回之前znode的data
System.out.println(currentData);
//设置新的data
return "bbbbbbb";
}
} );
}
/**
* 查询创建时间
*/
@Test
public void testCreationTime() {
long creationTime = zkClient.getCreationTime("/test");
System.out.println(creationTime);
}
/**
* 查询节点内容
*/
@Test
public void testGetData() {
String readData = zkClient.readData("/zkClient",true);
System.out.println(readData);
}
/**
* 查询子节点
* 统计子节点个数
*/
@Test
public void testChild() {
int countChildren = zkClient.countChildren("/zkClient");
System.out.println("/test共有"+countChildren+"个子节点!");
List<String> children = zkClient.getChildren("/zkClient");
for (String string : children) {
System.out.println(string);
}
}
/**
* 注册节点更改监听
*/
@Test
public void testDataChangesListener() {
zkClient.subscribeDataChanges("/zkClient", new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("节点被删除!");
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("节点被更改!");
}
});
for (int i = 0; i < 5; i++) {
zkClient.updateDataSerialized("/zkClient", new DataUpdater<String>() {
@Override
public String update(String currentData) {
return currentData;
}
} );
}
}
/**
* 注册子节点改变监听
* @throws InterruptedException
*/
@Test
public void testChildChangesListener() throws InterruptedException {
zkClient.subscribeChildChanges("/zkClient", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
System.out.println(parentPath+"子节点被修改!");
for (String string : currentChilds) {
System.out.println("现在为"+string);
}
}
});
for (int i = 0; i < 5; i++) {
zkClient.create("/zkClient/test", "abc", CreateMode.EPHEMERAL_SEQUENTIAL);
Thread.sleep(100);
}
}
}
以上是关于Spark and Kafka的主要内容,如果未能解决你的问题,请参考以下文章
Flink与Spark Streaming在与kafka结合的区别!
城堡大地幼儿园《周末英语》—— Spark 1 and Spark2
Spark SQL 教程翻译Datasets and DataFrames 概述
[Spark]What's the difference between spark.sql.shuffle.partitions and spark.default.parallelism?