MaybeEncodingError:错误发送结果:'<multiprocessing.pool.ExceptionWithTraceback object at 0x0000018F09F33

Posted

技术标签:

【中文标题】MaybeEncodingError:错误发送结果:\'<multiprocessing.pool.ExceptionWithTraceback object at 0x0000018F09F334A8>\'【英文标题】:MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x0000018F09F334A8>'MaybeEncodingError:错误发送结果:'<multiprocessing.pool.ExceptionWithTraceback object at 0x0000018F09F334A8>' 【发布时间】:2019-08-03 12:57:47 【问题描述】:

当我使用多处理下载文件时出现以下错误。我正在下载 Wikipedia 页面视图,它们按小时计算,因此可能包含大量下载。

对为什么会导致此错误以及如何解决有任何建议吗?谢谢

MaybeEncodingError:发送结果错误: ''。原因:'TypeError("无法序列化 '_io.BufferedReader' 对象",)'

import fnmatch
import requests
import urllib.request
from bs4 import BeautifulSoup
import multiprocessing as mp

def download_it(download_file):
    global path_to_save_document
    filename = download_file[download_file.rfind("/")+1:]
    save_file_w_submission_path = path_to_save_document + filename
    request = urllib.request.Request(download_file)
    response = urllib.request.urlopen(request)
    data_content = response.read()
    with open(save_file_w_submission_path, 'wb') as wf:    
        wf.write(data_content)
    print(save_file_w_submission_path)  

pattern = r'*200801*'
url_to_download = r'https://dumps.wikimedia.org/other/pagecounts-raw/'
path_to_save_document = r'D:\Users\Jonathan\Desktop\Wikipedia\\'    

def main():
    global pattern
    global url_to_download
    r  = requests.get(url_to_download)
    data = r.text
    soup = BeautifulSoup(data,features="lxml")

    list_of_href_year = []
    for i in range(2):
        if i == 0:
            for link in soup.find_all('a'):
                lien = link.get('href')
                if len(lien) == 4:
                    list_of_href_year.append(url_to_download + lien + '/')
        elif i == 1:
            list_of_href_months = [] 
            list_of_href_pageviews = []        
            for loh in list_of_href_year: 
                r  = requests.get(loh)
                data = r.text
                soup = BeautifulSoup(data,features="lxml")   
                for link in soup.find_all('a'):
                    lien = link.get('href')
                    if len(lien) == 7:
                        list_of_href_months.append(loh + lien + '/')
                if not list_of_href_months:
                   continue
                for lohp in list_of_href_months: 
                    r  = requests.get(lohp)
                    data = r.text
                    soup = BeautifulSoup(data,features="lxml")              
                    for link in soup.find_all('a'):
                        lien = link.get('href')
                        if "pagecounts" in lien:
                            list_of_href_pageviews.append(lohp + lien)       

    matching_list_of_href = fnmatch.filter(list_of_href_pageviews, pattern)   
    matching_list_of_href.sort()
    with mp.Pool(mp.cpu_count()) as p:
        print(p.map(download_it, matching_list_of_href))

if __name__ == '__main__':
    main()

【问题讨论】:

multiprocessing.Pool: urllib TypeError if not using dummy module的可能重复 泡菜没用。我收到关于 TypeError: cannot serialize '_io.BufferedReader' object 的相同错误 Pickle 不是解决方案,而是您收到该错误的原因。序列化意味着 Python 中的酸洗。我在链接答案中的代码只是证明了这一点。您需要将链接问题中的错误消息滚动到右侧,以查看它也是关于 Reason: 'TypeError("cannot serialize '_io.BufferedReader' object") 我没有看到除我之外的任何相关问题 您正在尝试将matching_list_of_href 中的响应对象传递给此处的子进程:p.map(download_it, matching_list_of_href)。池需要腌制它发送给其子进程的所有内容。您的响应对象包含 _io.BufferedReader 对象,这些对象不能被腌制,因此您会收到该错误。 【参考方案1】:

正如达科诺特提议的那样。我改用了多线程。

例子:

from multiprocessing.dummy import Pool as ThreadPool 

'''This function is used for the download the files using multi threading'''    
def multithread_download_files_func(self,download_file):
    try:
        filename = download_file[download_file.rfind("/")+1:]
        save_file_w_submission_path = self.ptsf + filename
        '''Check if the download doesn't already exists. If not, proceed otherwise skip'''
        if not os.path.exists(save_file_w_submission_path):
            data_content = None
            try:
                '''Lets download the file'''
                request = urllib.request.Request(download_file)
                response = urllib.request.urlopen(request)
                data_content = response.read()     
            except urllib.error.HTTPError:
                '''We will do a retry on the download if the server is temporarily unavailable'''
                retries = 1
                success = False
                while not success:
                    try:
                        '''Make another request if the previous one failed'''
                        response = urllib.request.urlopen(download_file)
                        data_content = response.read()                        
                        success = True
                    except Exception:
                        '''We will make the program wait a bit before sending another request to download the file'''
                        wait = retries * 5;
                        time.sleep(wait)
                        retries += 1 
            except Exception as e:
                print(str(e))   
            '''If the response data is not empty, we will write as a new file and stored in the data lake folder'''                     
            if data_content:
                with open(save_file_w_submission_path, 'wb') as wf:    
                    wf.write(data_content)
                print(self.present_extract_RC_from_RS + filename)                   
    except Exception as e:
        print('funct multithread_download_files_func' + str(e))

'''This function is used as a wrapper before using multi threading in order to download the files to be stored in the Data Lake'''            
def download_files(self,filter_files,url_to_download,path_to_save_file):
    try:
        self.ptsf = path_to_save_file = path_to_save_file + 'Step 1 - Data Lake\Wikipedia Pagecounts\\'
        filter_files_df = filter_files 
        self.filter_pattern = filter_files       
        self.present_extract_RC_from_RS = 'WK Downloaded->           ' 
        
        if filter_files_df == '*':
            '''We will create a string of all the years concatenated together for later use in this program'''
            reddit_years = [2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018]
            filter_files_df = ''
            '''Go through the years from 2005 to 2018'''
            for idx, ry in enumerate(reddit_years):
                filter_files_df += '*' + str(ry) + '*'
                if (idx != len(reddit_years)-1):
                    filter_files_df += '&'   
                    
        download_filter = list([x.strip() for x in filter_files_df.split('&')])
        download_filter.sort()
        
        '''If folder doesn't exist, create one'''
        if not os.path.exists(os.path.dirname(self.ptsf)):
            os.makedirs(os.path.dirname(self.ptsf))       
        
        '''We will get the website html elements using beautifulsoup library'''
        r  = requests.get(url_to_download)
        data = r.text
        soup = BeautifulSoup(data,features="lxml")
        
        list_of_href_year = []
        for i in range(2):
            if i == 0:
                '''Lets get all href available on this particular page. The first page is the year page'''
                for link0 in soup.find_all('a'):
                    lien0 = link0.get('href')
                    '''We will check if the length is 4 which corresponds to a year'''
                    if len(lien0) == 4:
                        list_of_href_year.append(url_to_download + lien0 + '/')
                        
            elif i == 1:
                list_of_href_months = [] 
                list_of_href_pageviews = []        
                for loh in list_of_href_year: 
                    r1  = requests.get(loh)
                    data1 = r1.text
                    '''Get the webpage HTML Tags'''
                    soup1 = BeautifulSoup(data1,features="lxml")   
                    for link1 in soup1.find_all('a'):
                        lien1 = link1.get('href')
                        '''We will check if the length is 7 which corresponds to the year and month'''
                        if len(lien1) == 7:
                            list_of_href_months.append(loh + lien1 + '/')                                            
                for lohm in list_of_href_months: 
                    r2  = requests.get(lohm)
                    data2 = r2.text
                    '''Get the webpage HTML Tags'''
                    soup2 = BeautifulSoup(data2,features="lxml")              
                    for link2 in soup2.find_all('a'): 
                        lien2 = link2.get('href')
                        '''We will now get all href that contains pagecounts in their name. We will have the files based on Time per hour. So 24 hrs is 24 files
                        and per year is 24*365=8760 files in minimum'''                            
                        if "pagecounts" in lien2:
                            list_of_href_pageviews.append(lohm + lien2)      
     
        existing_file_list = []
        for file in os.listdir(self.ptsf):
             filename = os.fsdecode(file)     
             existing_file_list.append(filename)  
         
        '''Filter the links'''
        matching_fnmatch_list = []
        if filter_files != '':
            for dfilter in download_filter:
                fnmatch_list = fnmatch.filter(list_of_href_pageviews, dfilter) 
                i = 0
                for fnl in fnmatch_list:
                    '''Break for demo purpose only'''
                    if self.limit_record != 0:
                        if (i == self.limit_record) and (i != 0):
                            break
                    i += 1
                    matching_fnmatch_list.append(fnl) 
        
        '''If the user stated a filter, we will try to remove the files which are outside that filter in the list'''
        to_remove = []
        for efl in existing_file_list:
            for mloh in matching_fnmatch_list:
                if efl in mloh:         
                    to_remove.append(mloh)
        
        '''Lets remove the files which has been found outside the filter'''
        for tr in to_remove:
            matching_fnmatch_list.remove(tr)   
            
        matching_fnmatch_list.sort()    
          
        '''Multi Threading of 200'''
        p = ThreadPool(200)
        p.map(self.multithread_download_files_func, matching_fnmatch_list)
    except Exception as e:
        print('funct download_files' + str(e))

【讨论】:

对于遇到类似错误的人,您能否详细说明“我使用了多线程”是什么意思?【参考方案2】:

从接受的答案中,我了解到它只是将from multiprocessing import Pool 替换为from multiprocessing.dummy import Pool

这对我有用。

【讨论】:

以上是关于MaybeEncodingError:错误发送结果:'<multiprocessing.pool.ExceptionWithTraceback object at 0x0000018F09F33的主要内容,如果未能解决你的问题,请参考以下文章

Shell命令的退出状态及错误检查

将结果从 pdo 发送到 ajax 时遇到问题

FCM 错误结果 - 将错误提交给令牌

使用 Airflow 将 Bigquery 查询结果发送到数据框

云功能不向 PubSub 发送消息

快速谓词导致错误无法识别选择器发送到实例 0x156c9580'