django+celery+rabbitmq 编码错误和 sig-kill

Posted

技术标签:

【中文标题】django+celery+rabbitmq 编码错误和 sig-kill【英文标题】:django+celery+rabbitmq encode error and sig-kill 【发布时间】:2017-03-09 23:59:09 【问题描述】:

我现在正在做一个小项目,它使用 celery 将 csv 和 xlsx 文件转换为 postgresql 表。 下面的代码在没有 celery 的情况下可以正常工作(大文件除外),但是在使用 celery 之后会产生一些错误和错误。 我在 *** 中寻找过类似的问题,但不知道该怎么做以及为什么。 希望大家能帮帮我,谢谢。

    第一个错误如下: csv-1 csv-2 我认为这与我的编码部分有关,但我尝试使用 utf-8-sig 和 big-5 打开它,但无法正常工作。(没有芹菜也可以正常工作)

`

# -*- coding: utf-8 -*-
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.conf import settings
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
from celery import Celery
from celery import task
import json
import csv
import sys
import random
import psycopg2
import xlrd
import openpyxl as pyxl
from .models import Document
from .forms import DocumentForm

app = Celery(
    'tasks',
    broker='amqp://guest:guest@localhost:5672//',
    backend='rpc://'
)
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False

@app.task()
def csvwritein(doc):# Transform csv to table
        doc = doc
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
 readcur = conn.cursor()
        readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
        check = readcur.fetchone()[0]
        try:
                fr = open(doc.path,encoding = 'utf-8-sig')
                dr.delay(fr,doc,check)
                fr.close()
        except Exception as e:
                fr = open(doc.path,encoding = 'big5')
                dr.delay(fr,doc,check)
                fr.close()
        conn.commit()
        readcur.close()

@app.task()
def dr(fr,doc,check): # make datareader as function to keep code 'dry'
        csvt = 0 #count csv reader loop time
        row_id = 1 # used for following id field
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
        maincur = conn.cursor()
        writecur = conn.cursor()
        datareader = csv.reader(fr, delimiter=',')
        for row in datareader:
                if csvt == 0: # first time in loop(create field) and check no
same file exists
                        if check == True:
                                app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
                                tname = '%s-%s' % (doc.tablename,app
tablename = '"%s-%s"' % (doc.tablename,app)
                                doc.tablename = tname
                                doc.save()
                        else:
                                tablename = '"%s"' % doc.tablename
                        maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
                        row_count = sum(1 for line in datareader)
                        col_count = len(row)
                        frow = row
                        for i in range(0,col_count,1):
                                row[i] = '"%s"' % row[i] # change number to
string
                                maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,row[i]))
                        csvt = csvt+1
                        fr.seek(0)
                        next(datareader)
                elif csvt > 0: # not first time(insert data) and check no
same file exists
                        for j in range(0,col_count,1):
                                if j == 0:
                                        writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');" % (tablename,frow[j],row[j]))
                                else:
                                        writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';" %(tablename,frow[j],row[j],row_id))
                        csvt = csvt+1
                        row_id = row_id+1
                else:
                        break
        conn.commit()
        maincur.close()
        writecur.close()
        conn.close()
        csvt = 0
        doc = Document.objects.all()

`

    第二个错误是关于将一​​个 xlsx 文件(大约 130,000 行)转换为 postgresql 表,并且工作人员在 2-3 分钟后获得了 sig-kill。 调试消息:

[2016-10-27 06:17:05,227: ERROR/MainProcess] 进程 'Worker-1' pid:13829 以 'signal 9 (SIGKILL)' 退出 [2016-10-27 06:17:05,328:ERROR /MainProcess] 任务 data.tasks.xlsxwritein[5aec4679-c48b-4d07-a0a9-5e4e37fcd24b] 引发意外:WorkerLostError('Worker 过早退出:信号 9 (SIGKILL).',) Traceback(最近一次调用最后一次):文件“/ usr/local/lib/python3.4/dist-packages/billiard/pool.py",第 1175 行,在 mark_as_worker_lost human_status(exitcode)),billiard.exceptions.WorkerLostError: Worker 过早退出:信号 9 (SIGKILL)。

#The code continues from the above task.py file
@app.task()
def xlsxwritein(doc): # write into database for file type xlsx
        xlsxt = 0
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
        maincur = conn.cursor()
        readcur = conn.cursor()
        writecur = conn.cursor()
        readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
        check = readcur.fetchone()[0]
        row_id = 1 # used for following id field
        wb = pyxl.load_workbook(doc.path)
        sheetnames = wb.get_sheet_names()
        ws = wb.get_sheet_by_name(sheetnames[0])
        for rown in range(ws.get_highest_row()):
                if xlsxt == 0:
                        if check == True:
                                app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
                                tname = '%s-%s' % (doc.tablename,app)
                                tablename = '"%s-%s"' % (doc.tablename,app)
                                doc.tablename = tname
                                doc.save()
                        else:
                                tablename = '"%s"' % doc.tablename
                        field = [ws.cell(row=1,column=col_index).value for
col_index in range(1,ws.get_highest_column()+1)]
                        maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
                        for coln in range(ws.get_highest_column()):
                                field[coln] = '"%s"' % field[coln] # change
number to string
                                if field[coln] == 'ID':
                                        field[coln] = 'original_id'
                                maincur.execute("ALTER TABLE %s ADD %s
CITEXT;"  % (tablename,field[coln]))
                        xlsxt = xlsxt+1
                elif xlsxt > 0 and check == False: # not first time(insert
data) and check no same file exists
                        for coln in range(ws.get_highest_column()):
                                if coln == 0:
                                        writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');"
%(tablename,field[coln],str(ws.cell(row=rown,column=coln+1).value)))
                                else:
                                        writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';"
%(tablename,field[coln],str(ws.cell(row=rown+1,column=coln+1).value),row_id))
                        xlsxt = xlsxt+1
                        row_id = row_id+1
                else:
                        break
        conn.commit()
        maincur.close()
        readcur.close()
        writecur.close()
        conn.close()
        xlsxt = 0

【问题讨论】:

【参考方案1】:

在参数反序列化过程中可能出现了问题。不要传递 doc 对象,而是尝试传递文件名,然后在任务中读取文件。

【讨论】:

谢谢,我会试试的。第二个问题呢? 见***.com/a/22844120/1477280 - 你的工作进程很可能也耗尽内存并被杀死 我之前检查过那个案例,但无法理解确切的过程。我会努力的,谢谢。

以上是关于django+celery+rabbitmq 编码错误和 sig-kill的主要内容,如果未能解决你的问题,请参考以下文章

Django-celery 和 RabbitMQ 不执行任务

Heroku 上的 Celery、RabbitMQ 和 Django:达到内存限制

django+celery+rabbitmq实践

未执行的任务(Django + Heroku + Celery + RabbitMQ)

ModuleNotFoundError: 没有使用 celery 和 rabbitmq 的名为 'django.config' 的模块

Django、Celery、Redis、RabbitMQ:Fanout-On-Writes 的链式任务