编程技巧篇之线程上下文
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了编程技巧篇之线程上下文相关的知识,希望对你有一定的参考价值。
一、 背景
在实际开发过程中,有时候会采用抽象继承的方式,如模板模式、策略模式实现代码编排和复用。
存在的问题:
- 不同流程之间会出现重复使用同一个参数调用下游接口的情况
- 后面某个步骤依赖前面某个步骤产出的中间数据,但前面步骤的返回值中不关注这个中间数据
- …
为了提高性能,避免重复的接口调用;为了降低耦合,可以使用线程上下文工具。
二、线程上下文
2.1 定义线程上下文
如果需要在多线程环境下使用,添加依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.6.1</version>
</dependency>
线程上下文参考代码:
import com.alibaba.ttl.TransmittableThreadLocal;
import java.util.HashMap;
import java.util.Map;
/**
* 线程上下文
*/
public class ThreadContext
private static final ThreadLocal<Map<String, Object>> CONTEXT = new TransmittableThreadLocal<>();
/**
* 初始化上下文
*/
protected static void initContext()
Map<String, Object> con = CONTEXT.get();
if (con == null)
CONTEXT.set(new HashMap<>(8));
else
CONTEXT.get().clear();
/**
* 清除上下文
*/
protected static void clearContext()
CONTEXT.remove();
/**
* 获取上下文内容
*/
public static <T> T getValue(String key)
Map<String, Object> con = CONTEXT.get();
if (con == null)
return null;
return (T) con.get(key);
/**
* 设置上下文参数
*/
public static void putValue(String key, Object value)
Map<String, Object> con = CONTEXT.get();
if (con == null)
CONTEXT.set(new HashMap<>(8));
con = CONTEXT.get();
con.put(key, value);
2.2 使用案例
2.2.1 复用查询结果
定义模板抽象类
public abstract class AbstractSomeService
abstract void step1(SomeReq req);
abstract void step2(SomeReq req);
abstract void step3(SomeReq req);
//模板
public final void play(SomeReq req)
//初始化上下文
ThreadContext.initContext();
try
// 步骤1
step1(req);
// 步骤2
step2(req);
// 步骤3
step3(req);
finally
// 清空上下文
ThreadContext.clearContext();
定义实现类
@Component
public class ASomeServiceImpl extends AbstractSomeService
@Resource
private UserService userService;
@Override
public void step1(SomeReq req)
// 查询用户信息
UserInfo userInfo = userService.query(req.getUserId());
ThreadContext.put("USER_INFO", userInfo)
// 其他步骤
@Override
public void step2(SomeReq req)
// 直接获取初始化部分已经查询过的用户信息
UserInfo userInfo = ThreadContext.get("USER_INFO")
// 其他步骤
@Override
public void step3(SomeReq req)
// 代码省略
假设 ASomeServiceImpl
的 step1 和 step2 方法中都需要查询用户信息,可以在 step1 中查询后放到线程上下文中,在 step2 中直接获取使用,避免再次执行 RPC 调用。
如果获取用户信息调用耗时 10 ms ,那么相当于降低了 10 ms ,如果还有其他查询结果可以复用,就可以降低更多耗时。
当然,有些接口可以使用这种方式,有些接口必须重新发起调用,需要根据实际情况来决定。
2.2.2 降低耦合
有时候后续环节需要前面步骤产出的一些上下文对象。
可以在该步骤中可以将该部分对象写入到上下文中,在后续执行环节从上下文中取出使用。
@Component
public class BSomeServiceImpl extends AbstractSomeService
@Resource
private UserService userService;
@Override
public void step1(SomeReq req)
// 代码省略
// 后续步骤需要的内容写入上下文
SomeDTO some = xxx;
ThreadContext.put("SOME_INFO", some)
// 代码省略
@Override
public void step2(SomeReq req)
// 代码省略
@Override
public void step3(SomeReq req)
// 代码省略
// 获取前面步骤的一些上下文
SomeDTO some = ThreadContext.get("SOME_INFO")
// 代码省略
2.3 看法
2.3.1 殊途同归
当然线程上下文并不是唯一的解法,你也可以定义上下文对象( SomeContext
)作为返回值。
将可复用的对象写入到自定义的上下文对象中,后续环节使用时直接从 SomeContext
中取出使用即可。
缺点:每个业务都需要自定义专用的上下文对象,甚至为了传递上下文需要修改函数签名(原本是 void 的返回值,会定义为 Context )。
优点:上下文中有哪些对象和对象的类型一目了然。
public abstract class AbstractSomeService
abstract SomeContext step1(SomeReq req);
abstract void step2(SomeReq req, SomeContext context);
abstract void step3(SomeReq req, SomeContext context);
//模板
public final void play(SomeReq req)
// 步骤1
SomeContext context = step1(req);
// 步骤2
step2(req, context);
// 步骤3
step3(req, context);
2.3.2 可读性
使用线程上下文后,获取可复用对象和设置可复用对象的方法大概率不会放在一起。
建议使用 @see
或者 @link
的方式提供设置和获取方法之间的跳转快捷方式。
@Component
public class BSomeServiceImpl extends AbstractSomeService
@Resource
private UserService userService;
/**
* 步骤1
*
* SOME_INFO 在 @link BSomeServiceImpl#step3 中使用
*/
@Override
public void step1(SomeReq req)
// 代码省略
// 后续步骤需要的内容写入上下文
SomeDTO some = xxx;
ThreadContext.put("SOME_INFO", some)
// 代码省略
@Override
public void step2(SomeReq req)
// 代码省略
/**
* 步骤3
*
* SOME_INFO 构造自 @link BSomeServiceImpl#step1
*/
@Override
public void step3(SomeReq req)
// 代码省略
// 获取前面步骤的一些上下文
SomeDTO some = ThreadContext.get("SOME_INFO")
// 代码省略
此外,建议大家把上下文的 key 定义为常量,这样即使不使用上面的方式跳转,也可以很快通过常量 find usage 找到设置和获取的代码。
@Component
public class BSomeServiceImpl extends AbstractSomeService
@Resource
private UserService userService;
/**
* 步骤1
*
* SOME_INFO 在 @link BSomeServiceImpl#step3 中使用
*/
@Override
public void step1(SomeReq req)
// 代码省略
// 后续步骤需要的内容写入上下文
SomeDTO some = xxx;
ThreadContext.put(ContextConstant.SOME_INFO, some)
// 代码省略
@Override
public void step2(SomeReq req)
// 代码省略
/**
* 步骤3
*
* SOME_INFO 构造自 @link BSomeServiceImpl#step1
*/
@Override
public void step3(SomeReq req)
// 代码省略
// 获取前面步骤的一些上下文
SomeDTO some = ThreadContext.get(ContextConstant.SOME_INFO)
// 代码省略
三、总结
很多同学写惯了 CURD ,很少去探索写的编码姿势。
本文提供一种使用线程上下文来提高性能和降低耦合的方式,希望对大家有帮助。
如果文章对你有帮助,欢迎点赞、关注,我会持续创作更多作品。
并发编程之多线程篇之四
主要内容:
一、信号量
二、Event事件
三、定时器
四、线程queue
五、进程池与线程池
1?? 信号量
1、信号量的理解
信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行。
例如:把互斥锁比作是合租房屋的人去抢一个厕所,那么信号量就是一群路人争抢公共厕所,公共厕所有多个坑位,这意味同一时间可以有多个人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小。
2、实例:
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
from threading import Thread,Semaphore,currentThread
import time,random
# Semaphore 指信号量,本质也是互斥锁,不同在于可以同时生成多把锁,把Lock比作私单人卫生间的话,一次只允许一个人使用;那么Semaphere就是公共厕所,一次允许指定的人数同时使用
sm = Semaphore(4) # 生成了五把锁
def task():
# sm.acquire()
# print(‘%s is in wc‘%currentThread().getName())
# sm.acquire()
with sm:
time.sleep(random.randint(1, 3))
print(‘%s is in‘%currentThread().getName())
if __name__ == ‘__main__‘:
for i in range(10):
t = Thread(target=task)
t.start()
2?? Event事件
1、Event对象的理解:
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断
某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要
使用threading库中的Event对象。
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的
信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直
阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。
如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
2、参数介绍:
from threading import Event
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
3、实例1:
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
from threading import Thread,Event,currentThread
import time
event = Event()
def student(name):
print(‘学生%s在听课‘%name)
event.wait() # 等待event.set()运行才能释放,接着运行
print(‘学生%s下课了‘%name)
def teacher(name):
print(‘老师%s在上课‘%name)
time.sleep(8)
event.set() # 释放event.wait(),继续执行
if __name__ == ‘__main__‘:
s1 = Thread(target=student,args=(‘cc1‘,))
s2 = Thread(target=student,args=(‘cc2‘,))
s3 = Thread(target=student,args=(‘cc3‘,))
t1 = Thread(target=teacher,args=(‘CC‘,))
s1.start()
s2.start()
s3.start()
t1.start()
‘‘‘
学生cc1在听课
学生cc2在听课
学生cc3在听课
老师CC在上课
学生cc2下课了
学生cc1下课了
学生cc3下课了
‘‘‘
实例2:
from threading import Thread,Event,currentThread
import time
event = Event()
def client():
n = 0
while not event.is_set(): # is_set() 是否激活线程
if n == 3:
print(‘time out,%s connected failed‘%currentThread().getName())
return
print(‘%s try %s‘%(currentThread().getName(),n))
event.wait(0.5)
n += 1
print(‘%s is connected‘%currentThread().getName())
def check():
print(‘%s is checking‘%currentThread().getName())
time.sleep(5)
event.set()
if __name__ == ‘__main__‘:
for i in range(3):
t = Thread(target=client)
t.start()
t = Thread(target=check)
t.start()
‘‘‘
Thread-1 try 0
Thread-2 try 0
Thread-3 try 0
Thread-4 is checking
Thread-3 try 1
Thread-1 try 1
Thread-2 try 1
Thread-2 try 2参数
Thread-1 try 2
Thread-3 try 2
time out,Thread-2 connected failed
time out,Thread-1 connected failed
time out,Thread-3 connected failed
‘‘‘
3?? 定时器
1、定时器的理解:
定时器,即指定n秒后执行某操作。
2、实例1:
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
from threading import Thread,Timer
import random
def task(name):
print(‘hello %s,welcome login!‘%name) # hello hyt,welcome login!
time = Timer(3,task,args=(‘hyt‘,)) # 等待指定时间,执行线程
time.start()
实例2--自动更新验证码
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
from threading import Thread,Timer
import random
class Check_number:
def __init__(self):
self.cash_code() # 程序一开始便实例化一个验证码
def make_code(self,n=4):
res = ‘‘
for i in range(n):
s1 = str(random.randint(0,9)) # 0到9间的任意自然数
s2 = chr(random.randint(65,90)) # 24个小写字母
res += random.choice([s1,s2]) # 字符和数字的任意组合
return res
def cash_code(self,interval=3):
self.code = self.make_code() # 实例化一个验证码
print(self.code) # 打印验证码
self.t = Timer(interval,self.make_code) # 定时器,等待指定时间再运行
self.t.start()
def check(self):
while True:
mes = input(‘输入验证码>>>:‘).strip()
if self.code == mes.upper():
print(‘输入正确!‘)
self.t.cancel() # 关闭定时器
break
obj = Check_number()
obj.check()
4?? 线程queue
1、queue 队列,先进先出
2、实例:
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import queue
# 队列,先进先出
q = queue.Queue(3) # 括号内指定队列长度
q.put(‘one‘)
q.put(2)
q.put(‘third‘)
q.put(4) # 此时队列已满,无法再放数据,造成堵塞,不会提示队列是否已满
q.put(4,block=True,timeout=5) # 与上式完全相同,但会提示队列已满,block表示阻塞,后面的timeout表示等待,延时指定时间报错
q.get_nowait(4) # 表示不等待,直接提示已满
q.put(4,block=False) # 不阻塞,直接提示,与上式相同
print(q.get()) # one
print(q.get()) # 2
print(q.get()) # third
print(q.get()) # 没有任何数据,且不提示
print(q.get_nowait()) # 不阻塞,直接报错提示队列已取空
print(q.get(block=True,timeout=3)) # 阻塞等待3秒,再报错提示
# 堆栈,后进先出,用法和队列相同
q2 = queue.LifoQueue(3) # 后进先出
q2.put(‘one‘)
q2.put(‘two‘)
q2.put(3)
print(q2.get())
print(q2.get())
print(q2.get())
# 优先级队列
q3 = queue.PriorityQueue(3)
q3.put((10,1))
q3.put((6,2))
q3.put((20,3))
print(q3.get())
print(q3.get())
print(q3.get())
‘‘‘数字越小优先级越高
(6, 2)
(10, 1)
(20, 3)
‘‘‘
5?? 进程池与线程池
1、进程池
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import random,os,time
# 进程池
def task(name):
print(‘name:%s pid:%s run ‘%(name,os.getpid()))
time.sleep(random.randint(1,5))
if __name__ == ‘__main__‘:
pool = ProcessPoolExecutor(3) # 不指定数目时,默认指定CPU核心数,此处进程池同时接收3个进程,且从始至终只有三个进程在运行(控制进程数)
for i in range(5):
pool.submit(task,‘cc%s‘%i) # 异步提交,不需要等待,提交了就接着提交下一波
pool.shutdown(wait=True) # 关闭进程池的入口,进程池内进程未执行完之前(进程池内的计数器每执行完一个进程,自减1,直到减为0)不允许其他进程进入进程池
print(‘主进程‘)
‘‘‘未加pool.shutdown()-->wait=True是默认参数
主进程
name:cc0 pid:2656 run
name:cc1 pid:7316 run
name:cc2 pid:4404 run
name:cc3 pid:7316 run
name:cc4 pid:2656 run
‘‘‘
‘‘‘加pool.shutdown(wait=True)之后输出,保证了进程的安全性
name:cc0 pid:1536 run
name:cc1 pid:7316 run
name:cc2 pid:1900 run
name:cc3 pid:7316 run
name:cc4 pid:1536 run
主进程
‘‘‘
2、线程池
from threading import Thread,currentThread
# 线程池
def task():
print(‘name:%s pid:%s is running‘%(currentThread().getName(),os.getpid()))
time.sleep(random.randint(1,3))
if __name__ == ‘__main__‘:
pool = ThreadPoolExecutor(3) # 与进程池同理,此时同时可容纳3个线程,自始至终就3个线程在运行(控制线程数)
for i in range(5):
pool.submit(task) # 异步提交,不管是否成功,继续提交下一个
pool.shutdown() # 关闭线程池入口
print(‘主线程‘)
‘‘‘
name:ThreadPoolExecutor-0_0 pid:772 is running
name:ThreadPoolExecutor-0_1 pid:772 is running
name:ThreadPoolExecutor-0_2 pid:772 is running
name:ThreadPoolExecutor-0_2 pid:772 is running
name:ThreadPoolExecutor-0_0 pid:772 is running
主线程
‘‘‘
以上是关于编程技巧篇之线程上下文的主要内容,如果未能解决你的问题,请参考以下文章