python每个线程消费不用数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python每个线程消费不用数据相关的知识,希望对你有一定的参考价值。

参考技术A 关于python每个线程消费不用数据相关资料如下
python kafka多线程消费数据

1、打印每个线程id,满足预期,开启了8个线程,每个线程号都不一样;

 

2、查看kafka状态,也能满足预期,每个分区的消费者id都是不一样的,下面第二个图是开启一个消费者时的状态,每个分区的消费者id都是相同的;对比之下能满足需求;

python 线程同步:生产/消费者模式

Python中的Queue对象提供了对线程同步的支持,使用queue对象可以实现多生产者和多消费者形成的先进先出的队列。

每个生产者将数据放入队列,而每个消费者依次从队列中取出数据。

# coding:utf-8
import threading,time,Queue

class Producer(threading.Thread):
    def __init__(self,threadname):
        threading.Thread.__init__(self,name=threadname)
    def run(self):
        global queue
        queue.put(self.getName())
        print  self.getName(),‘put‘,self.getName(),‘to queue‘
class Consumer(threading.Thread):
    def __init__(self,threadname):
        threading.Thread.__init__(self,name=threadname)
    def run(self,threadname):
        global queue
        print self.getName(),‘get‘,queue.get(),‘from queue‘
#生成队列对象
queue = Queue.Queue()
#生产者对象列表
plist = []
#消费者对象列表
clist = []
for i in range(10):
    p = Producer(‘Producer‘ + str(i))
    plist.append(p)
for i in range(10):
    c = Consumer(‘Consumer‘ + str(i))
#运行生产者
for i in plist:
    i.start()
    i.join()
#运行消费者
for i in clist:
    i.start()
    i.join()

运行结果:

Producer0 put Producer0 to queue
Producer1 put Producer1 to queue
Producer2 put Producer2 to queue
Producer3 put Producer3 to queue
Producer4 put Producer4 to queue
Producer5 put Producer5 to queue
Producer6 put Producer6 to queue
Producer7 put Producer7 to queue
Producer8 put Producer8 to queue
Producer9 put Producer9 to queue

Process finished with exit code 0

另外,可以使用Stackless Python实现,它只是一个Python的修改版本,对多线程编程有更好的支持。如果对多线程应用有较高的要求,则可以考虑使用Stackless Python来完成。

Stackless官方网站:https://bitbucket.org/stackless-dev/stackless/wiki/Home

同样实现生产者消费者案例代码如下:

import stackless,Queue

def Producer(i):
    global queue
    queue.put(i)
    print "Producer",i,‘add‘,i

def Consumer():
    global  queue
    i = queue.get()
    print ‘Consumer‘,i, ‘get‘,i

queue = Queue.Queue()
for i in range(10):
    stackless.tasklet(Producer)(i)
for i in range(10):
    stackless.tasklet(Consumer)()
stackless.run()

执行结果:

Producer 0 add 0
Producer 1 add 1
Producer 2 add 2
Producer 3 add 3
Producer 4 add 4
Producer 5 add 5
Producer 6 add 6
Producer 7 add 7
Producer 8 add 8
Producer 9 add 9
Consumer 0 get 0
Consumer 1 get 1
Consumer 2 get 2
Consumer 3 get 3
Consumer 4 get 4
Consumer 5 get 5
Consumer 6 get 6
Consumer 7 get 7
Consumer 8 get 8
Consumer 9 get 9

Process finished with exit code 0

Stackless Python 提供了对微线程的支持,微线程是轻量级的线程,与前面的Thread相比,占用资源更少。

本文出自 “-=湖边竹=-” 博客,请务必保留此出处http://bronte.blog.51cto.com/2418552/1872549

以上是关于python每个线程消费不用数据的主要内容,如果未能解决你的问题,请参考以下文章

python小知识点的总结

Python 生产者消费者模式

Python 生产者与消费者模型

11.python并发入门(part8 基于线程队列实现生产者消费者模型)

Python 队列queue与多线程组合(生产者+消费者模式)

python 生产者消费者模式