在 Twitter-Storm 中使用非 JVM 语言将真实数据传递给 Storms Spout

Posted

技术标签:

【中文标题】在 Twitter-Storm 中使用非 JVM 语言将真实数据传递给 Storms Spout【英文标题】:Pass real data to the Storms Spout using Non-JVM language in Twitter-Storm 【发布时间】:2013-04-16 21:19:57 【问题描述】:

我无法理解如何将真实数据传递给 Spout, 例如:

我有这两个文件(它们工作正常):

#! /usr/bin/env python

import os, random, sys, time

for i in xrange(50):
    print("%s\t%s"%(os.getpid(), i))
    sys.stdout.flush()
    time.sleep(random.randint(0,5))

#! /usr/bin/env python

from __future__ import print_function
from select import select
from subprocess import Popen,PIPE

p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True) 

timeout = 0.1 # seconds
while p:
    # remove finished processes from the list 
    if p.poll() is not None: # process ended
        print(p.stdout.read(), end='') # read the rest
        p.stdout.close()
        processes.remove(p)

    # wait until there is something to read
    rlist = select([p.stdout], [],[], timeout)[0]

    # read a line from each process that has output ready
    for f in rlist:
        print(f.readline(), end='') #NOTE: it can block

现在假设我想将这些随机行传递给 spout 以供将来处理,我正在尝试这个: 从 uuid 导入 uuid4 从选择导入选择 从子流程导入 Popen,PIPE 进口风暴

class TwitterSpout(storm.Spout):

    def initialize(self, conf, context):
        self.pid = os.getpid()
        try:
            self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True)
        except OSError, e:
            self.log('%s'%e)
            sys.exit(1)

比在 nextTuple() 中:

def nextTuple(self):
    timeout = 0.1 # seconds
    while self.p:
        # remove finished processes from the list 
        if self.p.poll() is not None: # process ended
        self.log ("%s"%self.p.stdout.read()) # read the rest
        self.p.stdout.close()
        processes.remove(self.p)

        # wait until there is something to read
        rlist = select([self.p.stdout], [],[], timeout)[0]

        # read a line from each process that has output ready
        for f in rlist:
        self.log ("%s%s"%f.readline()) #NOTE: it can block
        msgId = random.randint(0,500)
        self.log('MSG IN SPOUT %s\n'%msgId)
        storm.emit([f.readline()], id=msgId)

但是这种结构不起作用,我总是收到错误"Pipi seems to be broken...",或者如果我尝试此代码的不同变体,我会阻塞进程,Storm 永远不会丰富 NextTuple。请帮我解决我的问题,或者如果有人可以给我一些如何做类似事情的例子,或者只是一些建议。 谢谢

【问题讨论】:

@Vor 请问我可以得到你的电子邮件吗? 【参考方案1】:

可能存在多个问题。

while 循环中没有中断 -- 无限循环。

您拨打f.readline() 两次。您可能打算在每个 select 之后只调用一次。

为避免阻塞,请在select 之后使用data = os.read(f.fileno(), 1024)

不知道在子进程退出之前阻塞nextTuple()是否可以接受。

如果您所做的只是从子进程中读取行,那么您不需要select

def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')):
    p = Popen(args, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL,
              bufsize=1, close_fds=True)
    for line in iter(p.stdout.readline, b''): # expect b'\n' newline
        yield line
    p.stdout.close()
    raise StopIteration(p.wait())

例子:

# ...
self.lines = iter_lines(sys.executable, '-u', 'rand_lines.py')

#...
def nextTuple(self):
    try:
        line = next(self.lines).decode('ascii', 'ignore')
    except StopIteration as e:
        self.exit_status = e.args[0]
    else:
        storm.emit([line.strip()])

【讨论】:

以上是关于在 Twitter-Storm 中使用非 JVM 语言将真实数据传递给 Storms Spout的主要内容,如果未能解决你的问题,请参考以下文章

如何使用非 jvm 语言(例如 erlang、php)注册 Eureka?

快速了解阿里开源中间件动态非侵入AOP解决方案 JVM-Sandbox

阿里开源动态非侵入 AOP 解决方案 JVM-Sandbox | 软件推介

2.JVM的参数配置

JVM升级七(启动参数)

jvm参数设置