实训任务5:ZooKeeper节点操作
Posted howard2005
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实训任务5:ZooKeeper节点操作相关的知识,希望对你有一定的参考价值。
文章目录
一、实训目的
- 通过实训,使学生更加熟练掌握通过Java代码实现ZooKeeper节点的操作。
二、实训要求
- 认真完成实训任务,写一篇CSDN博客,记录操作过程。
三、实训任务
- 每支股票都受到很多因素的影响,比如包括政策、盈利、产品等,将这些因素量化成数字以后,可以呈现股价的增减。比如广深铁路上市时股价为
6.0
元,然后随着时间推移,政策发生变化提升股价0.1
元,盈利提升股价0.2
元,产品降低股价0.2
元,则此时股价为6.1
元。 - 创建节点
/guangshen
,数据为6
,并创建子节点:/policy
、/profit
、/product
开始默认值都为0
。模拟两个客户端,第一个客户端定时修改三个子节点/policy
、/profit
、/product
的值,要求值是随机的,范围在[-1, 1]
之间,当三个节点/policy
、/profit
和/product
的值变化时,则修改/guangshen
节点的值。另外一个客户端监控实时股价,并显示当前股价为多少。
四、完成任务
(一)准备工作
- 在master虚拟机上启动ZK服务
- 在slave1虚拟机上启动ZK服务
- 在slave2虚拟机上启动ZK服务
(二)实现步骤
1、创建Maven项目
- Maven项目 -
ZkStock
- 单击【Finish】按钮
2、添加相关依赖
- 在
pom.xml
文件里添加ZooKeeper和junit依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.hw.zk</groupId>
<artifactId>ZkStock</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>
</project>
3、创建日志属性文件
- 在
resources
目录里创建log4j.properties
文件
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/zkprice.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4、创建股票价格类
- 创建
net.hw.zk
包,在包里创建StockPrice
类
(1)创建init()方法
- 连接zk服务器,创建相关节点
package net.hw.zk;
import org.apache.zookeeper.*;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
/**
* 功能:股票价格
* 作者:华卫
* 日期:2022年12月10日
*/
public class StockPrice
@Test
public void init() throws Exception
final CountDownLatch cdl = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("master:2181",
3000,
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("连接zk服务器集群成功!");
cdl.countDown();
);
cdl.await();
final CountDownLatch cdl1 = new CountDownLatch(1);
zk.create("/guangshen", "6".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback()
@Override
public void processResult(int rc, String path, Object ctx, String name)
if (rc == 0)
System.out.println("节点" + path + "创建成功!");
else
System.out.println("节点" + path + "创建失败!");
cdl1.countDown();
, "stock");
cdl1.await();
final CountDownLatch cdl2 = new CountDownLatch(1);
zk.create("/guangshen/policy", "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback()
@Override
public void processResult(int rc, String path, Object ctx, String name)
if (rc == 0)
System.out.println("节点" + path + "创建成功!");
else
System.out.println("节点" + path + "创建失败!");
cdl2.countDown();
, "stock");
cdl2.await();
final CountDownLatch cdl3 = new CountDownLatch(1);
zk.create("/guangshen/profit", "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback()
@Override
public void processResult(int rc, String path, Object ctx, String name)
if (rc == 0)
System.out.println("节点" + path + "创建成功!");
else
System.out.println("节点" + path + "创建失败!");
cdl3.countDown();
, "stock");
cdl3.await();
final CountDownLatch cdl4 = new CountDownLatch(1);
zk.create("/guangshen/product", "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback()
@Override
public void processResult(int rc, String path, Object ctx, String name)
if (rc == 0)
System.out.println("节点" + path + "创建成功!");
else
System.out.println("节点" + path + "创建失败!");
cdl4.countDown();
, "stock");
cdl4.await();
(2)创建client1()方法
- 定时修改节点的值
@Test
public void client1() throws Exception
final CountDownLatch cdl = new CountDownLatch(1);
final ZooKeeper zk = new ZooKeeper("master:2181",
3000,
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("连接zk服务器集群成功!");
cdl.countDown();
);
cdl.await();
new Thread(new Runnable()
@Override
public void run()
while (true)
Random random = new Random();
try
zk.setData("/guangshen/policy", String.valueOf(random.nextDouble() * 2 - 1).getBytes(), -1);
zk.setData("/guangshen/profit", String.valueOf(random.nextDouble() * 2 - 1).getBytes(), -1);
zk.setData("/guangshen/product", String.valueOf(random.nextDouble() * 2 - 1).getBytes(), -1);
try
Thread.sleep(3000);
catch (InterruptedException e)
e.printStackTrace();
catch (KeeperException e)
e.printStackTrace();
catch (InterruptedException e)
e.printStackTrace();
).start();
while (true)
final CountDownLatch cdl1 = new CountDownLatch(3);
try
byte[] policy = zk.getData("/guangshen/policy", new Watcher()
@Override
public void process(WatchedEvent event)
if (event.getType() == Event.EventType.NodeDataChanged)
cdl1.countDown();
, null);
byte[] profit = zk.getData("/guangshen/profit", new Watcher()
@Override
public void process(WatchedEvent event)
if (event.getType() == Event.EventType.NodeDataChanged)
cdl1.countDown();
, null);
byte[] product = zk.getData("/guangshen/product", new Watcher()
@Override
public void process(WatchedEvent event)
if (event.getType() == Event.EventType.NodeDataChanged)
cdl1.countDown();
, null);
cdl1.await();
double p1 = Double.parseDouble(new String(policy));
double p2 = Double.parseDouble(new String(profit));
double p3 = Double.parseDouble(new String(product));
double price = 6 + p1 + p2 + p3;
zk.setData("/guangshen", String.valueOf(price).getBytes(), -1);
catch (KeeperException e)
e.printStackTrace();
catch (InterruptedException e)
e.printStackTrace();
Zookeeper3.5.7版本——客户端命令行操作(监听器原理)