Spark and Kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark and Kafka相关的知识,希望对你有一定的参考价值。

package cn.itcast_02_zkclientapi;

import java.util.List;

import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestZKclient {
    private ZkClient zkClient = null;
    /**
     * 创建zookeeper连接
     */
    @Before
    public void connection() {
        //zookeeper地址和超时时间
        zkClient = new ZkClient("slave1:2181,slave2:2181,slave3:2181",2000);
    }
    /**
     * 关闭zookeeper连接
     */
    @After
    public void close() {
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        zkClient.close();
    }
    /**
     * 创建节点
     */
    @Test
    public void testCreateZnode() {
            zkClient.create("/zkClient", "abc", CreateMode.PERSISTENT);
    }
    /**
     * 删除节点
     */
    @Test
    public void testDeleteZnode() {
        zkClient.delete("/zkClient0000000010");
    }
    /**
     * 更新节点
     */
    @Test
    public void testUpdateZnode() {
        zkClient.updateDataSerialized("/zkClient", new DataUpdater<String>() {
            @Override
            public String update(String currentData) {
                //返回之前znode的data
                System.out.println(currentData);
                //设置新的data
                return "bbbbbbb";
            }
        } );
    }
    /**
     * 查询创建时间
     */
    @Test
    public void testCreationTime() {
        long creationTime = zkClient.getCreationTime("/test");
        System.out.println(creationTime);
    }
    /**
     * 查询节点内容
     */
    @Test
    public void testGetData() {
        String readData = zkClient.readData("/zkClient",true);
        System.out.println(readData);
    }
    /**
     * 查询子节点
     * 统计子节点个数
     */
    @Test
    public void testChild() {
        int countChildren = zkClient.countChildren("/zkClient");
        System.out.println("/test共有"+countChildren+"个子节点!");
        List<String> children = zkClient.getChildren("/zkClient");
        for (String string : children) {
            System.out.println(string);
        }
    }
    /**
     * 注册节点更改监听
     */
    @Test
    public void testDataChangesListener() {
        zkClient.subscribeDataChanges("/zkClient", new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("节点被删除!");
               
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("节点被更改!");
            }
        });
        for (int i = 0; i < 5; i++) {
            zkClient.updateDataSerialized("/zkClient", new DataUpdater<String>() {
                @Override
                public String update(String currentData) {
                    return currentData;
                }
            } );
        }
    }
    /**
     * 注册子节点改变监听
     * @throws InterruptedException
     */
    @Test
    public void testChildChangesListener() throws InterruptedException {
        zkClient.subscribeChildChanges("/zkClient", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds)
                    throws Exception {
                System.out.println(parentPath+"子节点被修改!");
                for (String string : currentChilds) {
                    System.out.println("现在为"+string);
                }
            }
        });
        for (int i = 0; i < 5; i++) {
             zkClient.create("/zkClient/test", "abc", CreateMode.EPHEMERAL_SEQUENTIAL);
             Thread.sleep(100);
        }
    }
}

以上是关于Spark and Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Flink与Spark Streaming在与kafka结合的区别!

城堡大地幼儿园《周末英语》—— Spark 1 and Spark2

Spark SQL 教程翻译Datasets and DataFrames 概述

[Hadoop][Spark]Cluster and HA

Spark SQL - 选择所有 AND 计算列?

[Spark]What's the difference between spark.sql.shuffle.partitions and spark.default.parallelism?