django+celery+rabbitmq 编码错误和 sig-kill
Posted
技术标签:
【中文标题】django+celery+rabbitmq 编码错误和 sig-kill【英文标题】:django+celery+rabbitmq encode error and sig-kill 【发布时间】:2017-03-09 23:59:09 【问题描述】:我现在正在做一个小项目,它使用 celery 将 csv 和 xlsx 文件转换为 postgresql 表。 下面的代码在没有 celery 的情况下可以正常工作(大文件除外),但是在使用 celery 之后会产生一些错误和错误。 我在 *** 中寻找过类似的问题,但不知道该怎么做以及为什么。 希望大家能帮帮我,谢谢。
-
第一个错误如下:
csv-1
csv-2
我认为这与我的编码部分有关,但我尝试使用 utf-8-sig 和 big-5 打开它,但无法正常工作。(没有芹菜也可以正常工作)
`
# -*- coding: utf-8 -*-
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.conf import settings
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
from celery import Celery
from celery import task
import json
import csv
import sys
import random
import psycopg2
import xlrd
import openpyxl as pyxl
from .models import Document
from .forms import DocumentForm
app = Celery(
'tasks',
broker='amqp://guest:guest@localhost:5672//',
backend='rpc://'
)
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False
@app.task()
def csvwritein(doc):# Transform csv to table
doc = doc
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
readcur = conn.cursor()
readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
check = readcur.fetchone()[0]
try:
fr = open(doc.path,encoding = 'utf-8-sig')
dr.delay(fr,doc,check)
fr.close()
except Exception as e:
fr = open(doc.path,encoding = 'big5')
dr.delay(fr,doc,check)
fr.close()
conn.commit()
readcur.close()
@app.task()
def dr(fr,doc,check): # make datareader as function to keep code 'dry'
csvt = 0 #count csv reader loop time
row_id = 1 # used for following id field
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
maincur = conn.cursor()
writecur = conn.cursor()
datareader = csv.reader(fr, delimiter=',')
for row in datareader:
if csvt == 0: # first time in loop(create field) and check no
same file exists
if check == True:
app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
tname = '%s-%s' % (doc.tablename,app
tablename = '"%s-%s"' % (doc.tablename,app)
doc.tablename = tname
doc.save()
else:
tablename = '"%s"' % doc.tablename
maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
row_count = sum(1 for line in datareader)
col_count = len(row)
frow = row
for i in range(0,col_count,1):
row[i] = '"%s"' % row[i] # change number to
string
maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,row[i]))
csvt = csvt+1
fr.seek(0)
next(datareader)
elif csvt > 0: # not first time(insert data) and check no
same file exists
for j in range(0,col_count,1):
if j == 0:
writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');" % (tablename,frow[j],row[j]))
else:
writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';" %(tablename,frow[j],row[j],row_id))
csvt = csvt+1
row_id = row_id+1
else:
break
conn.commit()
maincur.close()
writecur.close()
conn.close()
csvt = 0
doc = Document.objects.all()
`
-
第二个错误是关于将一个 xlsx 文件(大约 130,000 行)转换为 postgresql 表,并且工作人员在 2-3 分钟后获得了 sig-kill。
调试消息:
[2016-10-27 06:17:05,227: ERROR/MainProcess] 进程 'Worker-1' pid:13829 以 'signal 9 (SIGKILL)' 退出 [2016-10-27 06:17:05,328:ERROR /MainProcess] 任务 data.tasks.xlsxwritein[5aec4679-c48b-4d07-a0a9-5e4e37fcd24b] 引发意外:WorkerLostError('Worker 过早退出:信号 9 (SIGKILL).',) Traceback(最近一次调用最后一次):文件“/ usr/local/lib/python3.4/dist-packages/billiard/pool.py",第 1175 行,在 mark_as_worker_lost human_status(exitcode)),billiard.exceptions.WorkerLostError: Worker 过早退出:信号 9 (SIGKILL)。
#The code continues from the above task.py file
@app.task()
def xlsxwritein(doc): # write into database for file type xlsx
xlsxt = 0
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
maincur = conn.cursor()
readcur = conn.cursor()
writecur = conn.cursor()
readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
check = readcur.fetchone()[0]
row_id = 1 # used for following id field
wb = pyxl.load_workbook(doc.path)
sheetnames = wb.get_sheet_names()
ws = wb.get_sheet_by_name(sheetnames[0])
for rown in range(ws.get_highest_row()):
if xlsxt == 0:
if check == True:
app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
tname = '%s-%s' % (doc.tablename,app)
tablename = '"%s-%s"' % (doc.tablename,app)
doc.tablename = tname
doc.save()
else:
tablename = '"%s"' % doc.tablename
field = [ws.cell(row=1,column=col_index).value for
col_index in range(1,ws.get_highest_column()+1)]
maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
for coln in range(ws.get_highest_column()):
field[coln] = '"%s"' % field[coln] # change
number to string
if field[coln] == 'ID':
field[coln] = 'original_id'
maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,field[coln]))
xlsxt = xlsxt+1
elif xlsxt > 0 and check == False: # not first time(insert
data) and check no same file exists
for coln in range(ws.get_highest_column()):
if coln == 0:
writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');"
%(tablename,field[coln],str(ws.cell(row=rown,column=coln+1).value)))
else:
writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';"
%(tablename,field[coln],str(ws.cell(row=rown+1,column=coln+1).value),row_id))
xlsxt = xlsxt+1
row_id = row_id+1
else:
break
conn.commit()
maincur.close()
readcur.close()
writecur.close()
conn.close()
xlsxt = 0
【问题讨论】:
【参考方案1】:在参数反序列化过程中可能出现了问题。不要传递 doc 对象,而是尝试传递文件名,然后在任务中读取文件。
【讨论】:
谢谢,我会试试的。第二个问题呢? 见***.com/a/22844120/1477280 - 你的工作进程很可能也耗尽内存并被杀死 我之前检查过那个案例,但无法理解确切的过程。我会努力的,谢谢。以上是关于django+celery+rabbitmq 编码错误和 sig-kill的主要内容,如果未能解决你的问题,请参考以下文章
Django-celery 和 RabbitMQ 不执行任务
Heroku 上的 Celery、RabbitMQ 和 Django:达到内存限制
未执行的任务(Django + Heroku + Celery + RabbitMQ)
ModuleNotFoundError: 没有使用 celery 和 rabbitmq 的名为 'django.config' 的模块