在 Twitter-Storm 中使用非 JVM 语言将真实数据传递给 Storms Spout
Posted
技术标签:
【中文标题】在 Twitter-Storm 中使用非 JVM 语言将真实数据传递给 Storms Spout【英文标题】:Pass real data to the Storms Spout using Non-JVM language in Twitter-Storm 【发布时间】:2013-04-16 21:19:57 【问题描述】:我无法理解如何将真实数据传递给 Spout, 例如:
我有这两个文件(它们工作正常):
#! /usr/bin/env python
import os, random, sys, time
for i in xrange(50):
print("%s\t%s"%(os.getpid(), i))
sys.stdout.flush()
time.sleep(random.randint(0,5))
和
#! /usr/bin/env python
from __future__ import print_function
from select import select
from subprocess import Popen,PIPE
p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
timeout = 0.1 # seconds
while p:
# remove finished processes from the list
if p.poll() is not None: # process ended
print(p.stdout.read(), end='') # read the rest
p.stdout.close()
processes.remove(p)
# wait until there is something to read
rlist = select([p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
print(f.readline(), end='') #NOTE: it can block
现在假设我想将这些随机行传递给 spout 以供将来处理,我正在尝试这个: 从 uuid 导入 uuid4 从选择导入选择 从子流程导入 Popen,PIPE 进口风暴
class TwitterSpout(storm.Spout):
def initialize(self, conf, context):
self.pid = os.getpid()
try:
self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
except OSError, e:
self.log('%s'%e)
sys.exit(1)
比在 nextTuple() 中:
def nextTuple(self):
timeout = 0.1 # seconds
while self.p:
# remove finished processes from the list
if self.p.poll() is not None: # process ended
self.log ("%s"%self.p.stdout.read()) # read the rest
self.p.stdout.close()
processes.remove(self.p)
# wait until there is something to read
rlist = select([self.p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
self.log ("%s%s"%f.readline()) #NOTE: it can block
msgId = random.randint(0,500)
self.log('MSG IN SPOUT %s\n'%msgId)
storm.emit([f.readline()], id=msgId)
但是这种结构不起作用,我总是收到错误"Pipi seems to be broken..."
,或者如果我尝试此代码的不同变体,我会阻塞进程,Storm 永远不会丰富 NextTuple。请帮我解决我的问题,或者如果有人可以给我一些如何做类似事情的例子,或者只是一些建议。
谢谢
【问题讨论】:
@Vor 请问我可以得到你的电子邮件吗? 【参考方案1】:可能存在多个问题。
while
循环中没有中断 -- 无限循环。
您拨打f.readline()
两次。您可能打算在每个 select
之后只调用一次。
为避免阻塞,请在select
之后使用data = os.read(f.fileno(), 1024)
。
不知道在子进程退出之前阻塞nextTuple()
是否可以接受。
如果您所做的只是从子进程中读取行,那么您不需要select
:
def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')):
p = Popen(args, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL,
bufsize=1, close_fds=True)
for line in iter(p.stdout.readline, b''): # expect b'\n' newline
yield line
p.stdout.close()
raise StopIteration(p.wait())
例子:
# ...
self.lines = iter_lines(sys.executable, '-u', 'rand_lines.py')
#...
def nextTuple(self):
try:
line = next(self.lines).decode('ascii', 'ignore')
except StopIteration as e:
self.exit_status = e.args[0]
else:
storm.emit([line.strip()])
【讨论】:
以上是关于在 Twitter-Storm 中使用非 JVM 语言将真实数据传递给 Storms Spout的主要内容,如果未能解决你的问题,请参考以下文章
如何使用非 jvm 语言(例如 erlang、php)注册 Eureka?
快速了解阿里开源中间件动态非侵入AOP解决方案 JVM-Sandbox