Hive中的锁的用法和使用场景
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive中的锁的用法和使用场景相关的知识,希望对你有一定的参考价值。
参考技术A前面遇到过一次因为 Hive 中表被锁住了,导致定时任务一直失败。这两天又出现了表被锁,原因是连接 hiveserver2 过于频繁, mysql 连接被打满,引发的连锁反应,导致我们的小时任务一直失败,下午重点注意到这个问题,才解决好。
在执行 insert into 或 insert overwrite 任务时,中途手动将程序停掉,会出现卡死情况(无法提交MapReduce),只能执行查询操作,而 drop insert 操作均不可操作,无论执行多久,都会保持卡死状态。
查看 Hive 的中死锁,可以使用 show locks [table] 来查看。
可以看到里面的那个Type下的EXCLUSIVE,这是一种互斥锁,需要解决,否则后续的查询和插入任务都会影响。
hive存在两种锁,共享锁 Shared (S) 和互斥锁 Exclusive (X)
锁的基本机制是:
触发共享锁的操作是可以并发执行的,但是触发互斥锁,那么该表和该分区就不能并发的执行作业了。
对于上面的情况,使用解锁命令:
注意 : 表锁和分区锁是两个不同的锁 ,对表解锁,对分区是无效的,分区需要单独解锁
查看表被锁的情况:
常规解锁方法:
高版本hive默认插入数据时,不能查询,因为有锁
解锁之路通常不是一帆风顺的,可能会遇到各种问题,笔者是在 Hive2.1.1 下面测试,比如:
这个命令无法执行,说 LockManager 没有指定,这时候需要执行命令:
这样重新执行,命令就可以执行了
如果还!是!不!行,终极方法,可以直接去mysql元数据执行:
查到所有的锁,然后根据条件把对应的锁删掉,这个锁住的表即可释放出来了。
注意 : 表名和字段都需要大写 。
通过这种办法,通常可以彻底解决锁的问题。
多线程中的锁的几种用法总结
一、ReentrantLock
1 package com.ietree.basicskill.mutilthread.lock; 2 3 import java.util.concurrent.locks.Lock; 4 import java.util.concurrent.locks.ReentrantLock; 5 6 /** 7 * Created by Administrator on 2017/5/17. 8 */ 9 public class UseReentrantLock { 10 11 private Lock lock = new ReentrantLock(); 12 13 public void method1(){ 14 try { 15 lock.lock(); 16 System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1.."); 17 Thread.sleep(1000); 18 System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1.."); 19 Thread.sleep(1000); 20 } catch (InterruptedException e) { 21 e.printStackTrace(); 22 } finally { 23 24 lock.unlock(); 25 } 26 } 27 28 public void method2(){ 29 try { 30 lock.lock(); 31 System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2.."); 32 Thread.sleep(2000); 33 System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2.."); 34 Thread.sleep(1000); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } finally { 38 39 lock.unlock(); 40 } 41 } 42 43 public static void main(String[] args) { 44 45 final UseReentrantLock ur = new UseReentrantLock(); 46 Thread t1 = new Thread(new Runnable() { 47 @Override 48 public void run() { 49 ur.method1(); 50 ur.method2(); 51 } 52 }, "t1"); 53 54 t1.start(); 55 try { 56 Thread.sleep(10); 57 } catch (InterruptedException e) { 58 e.printStackTrace(); 59 } 60 //System.out.println(ur.lock.getQueueLength()); 61 } 62 63 }
二、ReentrantReadWriteLock
1 package com.ietree.basicskill.mutilthread.lock; 2 3 import java.util.concurrent.locks.ReentrantReadWriteLock; 4 5 /** 6 * Created by Administrator on 2017/5/17. 7 */ 8 public class UseReentrantReadWriteLock { 9 10 private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); 11 private ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock(); 12 private ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock(); 13 14 public void read(){ 15 try { 16 readLock.lock(); 17 System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..."); 18 Thread.sleep(3000); 19 System.out.println("当前线程:" + Thread.currentThread().getName() + "退出..."); 20 } catch (Exception e) { 21 e.printStackTrace(); 22 } finally { 23 readLock.unlock(); 24 } 25 } 26 27 public void write(){ 28 try { 29 writeLock.lock(); 30 System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..."); 31 Thread.sleep(3000); 32 System.out.println("当前线程:" + Thread.currentThread().getName() + "退出..."); 33 } catch (Exception e) { 34 e.printStackTrace(); 35 } finally { 36 writeLock.unlock(); 37 } 38 } 39 40 public static void main(String[] args) { 41 42 final UseReentrantReadWriteLock urrw = new UseReentrantReadWriteLock(); 43 44 Thread t1 = new Thread(new Runnable() { 45 @Override 46 public void run() { 47 urrw.read(); 48 } 49 }, "t1"); 50 Thread t2 = new Thread(new Runnable() { 51 @Override 52 public void run() { 53 urrw.read(); 54 } 55 }, "t2"); 56 Thread t3 = new Thread(new Runnable() { 57 @Override 58 public void run() { 59 urrw.write(); 60 } 61 }, "t3"); 62 Thread t4 = new Thread(new Runnable() { 63 @Override 64 public void run() { 65 urrw.write(); 66 } 67 }, "t4"); 68 69 // t1.start(); 70 // t2.start(); 71 72 // t1.start(); // R 73 // t3.start(); // W 74 75 t3.start(); 76 t4.start(); 77 } 78 }
三、Condition
1 package com.ietree.basicskill.mutilthread.lock; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.Lock; 5 import java.util.concurrent.locks.ReentrantLock; 6 7 /** 8 * Created by Administrator on 2017/5/17. 9 */ 10 public class UseCondition { 11 private Lock lock = new ReentrantLock(); 12 private Condition condition = lock.newCondition(); 13 14 public void method1(){ 15 try { 16 lock.lock(); 17 System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态.."); 18 Thread.sleep(3000); 19 System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁.."); 20 condition.await(); // Object wait 21 System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行..."); 22 } catch (Exception e) { 23 e.printStackTrace(); 24 } finally { 25 lock.unlock(); 26 } 27 } 28 29 public void method2(){ 30 try { 31 lock.lock(); 32 System.out.println("当前线程:" + Thread.currentThread().getName() + "进入.."); 33 Thread.sleep(3000); 34 System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒.."); 35 condition.signal(); //Object notify 36 } catch (Exception e) { 37 e.printStackTrace(); 38 } finally { 39 lock.unlock(); 40 } 41 } 42 43 public static void main(String[] args) { 44 45 final UseCondition uc = new UseCondition(); 46 Thread t1 = new Thread(new Runnable() { 47 @Override 48 public void run() { 49 uc.method1(); 50 } 51 }, "t1"); 52 Thread t2 = new Thread(new Runnable() { 53 @Override 54 public void run() { 55 uc.method2(); 56 } 57 }, "t2"); 58 t1.start(); 59 60 t2.start(); 61 } 62 }
四、ManyCondition
1 package com.ietree.basicskill.mutilthread.lock; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.ReentrantLock; 5 6 /** 7 * Created by Administrator on 2017/5/17. 8 */ 9 public class UseManyCondition { 10 private ReentrantLock lock = new ReentrantLock(); 11 private Condition c1 = lock.newCondition(); 12 private Condition c2 = lock.newCondition(); 13 14 public void m1(){ 15 try { 16 lock.lock(); 17 System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待.."); 18 c1.await(); 19 System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续.."); 20 } catch (Exception e) { 21 e.printStackTrace(); 22 } finally { 23 lock.unlock(); 24 } 25 } 26 27 public void m2(){ 28 try { 29 lock.lock(); 30 System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待.."); 31 c1.await(); 32 System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续.."); 33 } catch (Exception e) { 34 e.printStackTrace(); 35 } finally { 36 lock.unlock(); 37 } 38 } 39 40 public void m3(){ 41 try { 42 lock.lock(); 43 System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待.."); 44 c2.await(); 45 System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续.."); 46 } catch (Exception e) { 47 e.printStackTrace(); 48 } finally { 49 lock.unlock(); 50 } 51 } 52 53 public void m4(){ 54 try { 55 lock.lock(); 56 System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒.."); 57 c1.signalAll(); 58 } catch (Exception e) { 59 e.printStackTrace(); 60 } finally { 61 lock.unlock(); 62 } 63 } 64 65 public void m5(){ 66 try { 67 lock.lock(); 68 System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒.."); 69 c2.signal(); 70 } catch (Exception e) { 71 e.printStackTrace(); 72 } finally { 73 lock.unlock(); 74 } 75 } 76 77 public static void main(String[] args) { 78 79 80 final UseManyCondition umc = new UseManyCondition(); 81 Thread t1 = new Thread(new Runnable() { 82 @Override 83 public void run() { 84 umc.m1(); 85 } 86 },"t1"); 87 Thread t2 = new Thread(new Runnable() { 88 @Override 89 public void run() { 90 umc.m2(); 91 } 92 },"t2"); 93 Thread t3 = new Thread(new Runnable() { 94 @Override 95 public void run() { 96 umc.m3(); 97 } 98 },"t3"); 99 Thread t4 = new Thread(new Runnable() { 100 @Override 101 public void run() { 102 umc.m4(); 103 } 104 },"t4"); 105 Thread t5 = new Thread(new Runnable() { 106 @Override 107 public void run() { 108 umc.m5(); 109 } 110 },"t5"); 111 112 t1.start(); // c1 113 t2.start(); // c1 114 t3.start(); // c2 115 116 117 try { 118 Thread.sleep(2000); 119 } catch (InterruptedException e) { 120 e.printStackTrace(); 121 } 122 123 t4.start(); // c1 124 try { 125 Thread.sleep(2000); 126 } catch (InterruptedException e) { 127 e.printStackTrace(); 128 } 129 t5.start(); // c2 130 131 } 132 }
以上是关于Hive中的锁的用法和使用场景的主要内容,如果未能解决你的问题,请参考以下文章