Hive中的锁的用法和使用场景

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive中的锁的用法和使用场景相关的知识,希望对你有一定的参考价值。

参考技术A

前面遇到过一次因为 Hive 中表被锁住了,导致定时任务一直失败。这两天又出现了表被锁,原因是连接 hiveserver2 过于频繁, mysql 连接被打满,引发的连锁反应,导致我们的小时任务一直失败,下午重点注意到这个问题,才解决好。

在执行 insert into 或 insert overwrite 任务时,中途手动将程序停掉,会出现卡死情况(无法提交MapReduce),只能执行查询操作,而 drop insert 操作均不可操作,无论执行多久,都会保持卡死状态。

查看 Hive 的中死锁,可以使用 show locks [table] 来查看。

可以看到里面的那个Type下的EXCLUSIVE,这是一种互斥锁,需要解决,否则后续的查询和插入任务都会影响。

hive存在两种锁,共享锁 Shared (S) 和互斥锁 Exclusive (X)

锁的基本机制是:

触发共享锁的操作是可以并发执行的,但是触发互斥锁,那么该表和该分区就不能并发的执行作业了。

对于上面的情况,使用解锁命令:

注意 : 表锁和分区锁是两个不同的锁 ,对表解锁,对分区是无效的,分区需要单独解锁

查看表被锁的情况:

常规解锁方法:

高版本hive默认插入数据时,不能查询,因为有锁

解锁之路通常不是一帆风顺的,可能会遇到各种问题,笔者是在 Hive2.1.1 下面测试,比如:

这个命令无法执行,说 LockManager 没有指定,这时候需要执行命令:

这样重新执行,命令就可以执行了

如果还!是!不!行,终极方法,可以直接去mysql元数据执行:

查到所有的锁,然后根据条件把对应的锁删掉,这个锁住的表即可释放出来了。

注意 : 表名和字段都需要大写 。

通过这种办法,通常可以彻底解决锁的问题。

多线程中的锁的几种用法总结

一、ReentrantLock

 1 package com.ietree.basicskill.mutilthread.lock;
 2 
 3 import java.util.concurrent.locks.Lock;
 4 import java.util.concurrent.locks.ReentrantLock;
 5 
 6 /**
 7  * Created by Administrator on 2017/5/17.
 8  */
 9 public class UseReentrantLock {
10 
11     private Lock lock = new ReentrantLock();
12 
13     public void method1(){
14         try {
15             lock.lock();
16             System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1..");
17             Thread.sleep(1000);
18             System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1..");
19             Thread.sleep(1000);
20         } catch (InterruptedException e) {
21             e.printStackTrace();
22         } finally {
23 
24             lock.unlock();
25         }
26     }
27 
28     public void method2(){
29         try {
30             lock.lock();
31             System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2..");
32             Thread.sleep(2000);
33             System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2..");
34             Thread.sleep(1000);
35         } catch (InterruptedException e) {
36             e.printStackTrace();
37         } finally {
38 
39             lock.unlock();
40         }
41     }
42 
43     public static void main(String[] args) {
44 
45         final UseReentrantLock ur = new UseReentrantLock();
46         Thread t1 = new Thread(new Runnable() {
47             @Override
48             public void run() {
49                 ur.method1();
50                 ur.method2();
51             }
52         }, "t1");
53 
54         t1.start();
55         try {
56             Thread.sleep(10);
57         } catch (InterruptedException e) {
58             e.printStackTrace();
59         }
60         //System.out.println(ur.lock.getQueueLength());
61     }
62 
63 }

二、ReentrantReadWriteLock

 1 package com.ietree.basicskill.mutilthread.lock;
 2 
 3 import java.util.concurrent.locks.ReentrantReadWriteLock;
 4 
 5 /**
 6  * Created by Administrator on 2017/5/17.
 7  */
 8 public class UseReentrantReadWriteLock {
 9 
10     private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
11     private ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
12     private ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
13 
14     public void read(){
15         try {
16             readLock.lock();
17             System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
18             Thread.sleep(3000);
19             System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
20         } catch (Exception e) {
21             e.printStackTrace();
22         } finally {
23             readLock.unlock();
24         }
25     }
26 
27     public void write(){
28         try {
29             writeLock.lock();
30             System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
31             Thread.sleep(3000);
32             System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
33         } catch (Exception e) {
34             e.printStackTrace();
35         } finally {
36             writeLock.unlock();
37         }
38     }
39 
40     public static void main(String[] args) {
41 
42         final UseReentrantReadWriteLock urrw = new UseReentrantReadWriteLock();
43 
44         Thread t1 = new Thread(new Runnable() {
45             @Override
46             public void run() {
47                 urrw.read();
48             }
49         }, "t1");
50         Thread t2 = new Thread(new Runnable() {
51             @Override
52             public void run() {
53                 urrw.read();
54             }
55         }, "t2");
56         Thread t3 = new Thread(new Runnable() {
57             @Override
58             public void run() {
59                 urrw.write();
60             }
61         }, "t3");
62         Thread t4 = new Thread(new Runnable() {
63             @Override
64             public void run() {
65                 urrw.write();
66             }
67         }, "t4");
68 
69 //        t1.start();
70 //        t2.start();
71 
72 //        t1.start(); // R
73 //        t3.start(); // W
74 
75         t3.start();
76         t4.start();
77     }
78 }

三、Condition

 1 package com.ietree.basicskill.mutilthread.lock;
 2 
 3 import java.util.concurrent.locks.Condition;
 4 import java.util.concurrent.locks.Lock;
 5 import java.util.concurrent.locks.ReentrantLock;
 6 
 7 /**
 8  * Created by Administrator on 2017/5/17.
 9  */
10 public class UseCondition {
11     private Lock lock = new ReentrantLock();
12     private Condition condition = lock.newCondition();
13 
14     public void method1(){
15         try {
16             lock.lock();
17             System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态..");
18             Thread.sleep(3000);
19             System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..");
20             condition.await();    // Object wait
21             System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行...");
22         } catch (Exception e) {
23             e.printStackTrace();
24         } finally {
25             lock.unlock();
26         }
27     }
28 
29     public void method2(){
30         try {
31             lock.lock();
32             System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..");
33             Thread.sleep(3000);
34             System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..");
35             condition.signal();        //Object notify
36         } catch (Exception e) {
37             e.printStackTrace();
38         } finally {
39             lock.unlock();
40         }
41     }
42 
43     public static void main(String[] args) {
44 
45         final UseCondition uc = new UseCondition();
46         Thread t1 = new Thread(new Runnable() {
47             @Override
48             public void run() {
49                 uc.method1();
50             }
51         }, "t1");
52         Thread t2 = new Thread(new Runnable() {
53             @Override
54             public void run() {
55                 uc.method2();
56             }
57         }, "t2");
58         t1.start();
59 
60         t2.start();
61     }
62 }

四、ManyCondition

  1 package com.ietree.basicskill.mutilthread.lock;
  2 
  3 import java.util.concurrent.locks.Condition;
  4 import java.util.concurrent.locks.ReentrantLock;
  5 
  6 /**
  7  * Created by Administrator on 2017/5/17.
  8  */
  9 public class UseManyCondition {
 10     private ReentrantLock lock = new ReentrantLock();
 11     private Condition c1 = lock.newCondition();
 12     private Condition c2 = lock.newCondition();
 13 
 14     public void m1(){
 15         try {
 16             lock.lock();
 17             System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
 18             c1.await();
 19             System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
 20         } catch (Exception e) {
 21             e.printStackTrace();
 22         } finally {
 23             lock.unlock();
 24         }
 25     }
 26 
 27     public void m2(){
 28         try {
 29             lock.lock();
 30             System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
 31             c1.await();
 32             System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
 33         } catch (Exception e) {
 34             e.printStackTrace();
 35         } finally {
 36             lock.unlock();
 37         }
 38     }
 39 
 40     public void m3(){
 41         try {
 42             lock.lock();
 43             System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
 44             c2.await();
 45             System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
 46         } catch (Exception e) {
 47             e.printStackTrace();
 48         } finally {
 49             lock.unlock();
 50         }
 51     }
 52 
 53     public void m4(){
 54         try {
 55             lock.lock();
 56             System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
 57             c1.signalAll();
 58         } catch (Exception e) {
 59             e.printStackTrace();
 60         } finally {
 61             lock.unlock();
 62         }
 63     }
 64 
 65     public void m5(){
 66         try {
 67             lock.lock();
 68             System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
 69             c2.signal();
 70         } catch (Exception e) {
 71             e.printStackTrace();
 72         } finally {
 73             lock.unlock();
 74         }
 75     }
 76 
 77     public static void main(String[] args) {
 78 
 79 
 80         final UseManyCondition umc = new UseManyCondition();
 81         Thread t1 = new Thread(new Runnable() {
 82             @Override
 83             public void run() {
 84                 umc.m1();
 85             }
 86         },"t1");
 87         Thread t2 = new Thread(new Runnable() {
 88             @Override
 89             public void run() {
 90                 umc.m2();
 91             }
 92         },"t2");
 93         Thread t3 = new Thread(new Runnable() {
 94             @Override
 95             public void run() {
 96                 umc.m3();
 97             }
 98         },"t3");
 99         Thread t4 = new Thread(new Runnable() {
100             @Override
101             public void run() {
102                 umc.m4();
103             }
104         },"t4");
105         Thread t5 = new Thread(new Runnable() {
106             @Override
107             public void run() {
108                 umc.m5();
109             }
110         },"t5");
111 
112         t1.start();    // c1
113         t2.start();    // c1
114         t3.start();    // c2
115 
116 
117         try {
118             Thread.sleep(2000);
119         } catch (InterruptedException e) {
120             e.printStackTrace();
121         }
122 
123         t4.start();    // c1
124         try {
125             Thread.sleep(2000);
126         } catch (InterruptedException e) {
127             e.printStackTrace();
128         }
129         t5.start();    // c2
130 
131     }
132 }

 

以上是关于Hive中的锁的用法和使用场景的主要内容,如果未能解决你的问题,请参考以下文章

java中的各种锁详细介绍

java中的各种锁详细介绍

Java 锁详解(转)

多线程中的锁的几种用法总结

肝一下ZooKeeper实现分布式锁的方案,附带实例!

Java多线程有哪几种实现方式? Java中的类如何保证线程安全? 请说明ThreadLocal的用法和适用场景