使用python脚本提取数据

Posted sgqhappy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用python脚本提取数据相关的知识,希望对你有一定的参考价值。

版权声明:本文为博主原创文章,转载请注明出处:https://www.cnblogs.com/sgqhappy/p/9956956.html

我们经常用到数据提取的Hive Sql的编写,每次数据提取都得进行hive的编写,为了将这种重复性强的运行命令简单化自动化人性化,我特地编写了一个python脚本,可以实现数据清洗,数据处理,计数下发,读写文件,保存日志等功能。

1. 导包

 1 #!/usr/bin/python
 2 #coding:utf-8
 3 
 4 ‘‘‘
 5 Made by sgqhappy
 6 Date: 20181113
 7 function: data extract
 8 ‘‘‘
 9 
10 from subprocess import Popen,PIPE
11 import os
12 import sys
13 import io
14 import re
15 import commands
16 import logging
17 from logging import handlers
18 from re import match

2. 定义一个类,用来打印脚本运行的log日志

日志既可以打印在控制台上,也可以输出到log文件。

技术分享图片
 1 class Logger(object):
 2     def __init__(self,log_file_name,log_level,logger_name):
 3         self.__logger = logging.getLogger(logger_name);
 4         self.__logger.setLevel(log_level);
 5         file_handler = logging.FileHandler(log_file_name);
 6         console_handler = logging.StreamHandler();
 7         
 8         #set log format and show log at console and log_file.
 9         LOG_FORMAT = "%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s : %(message)s";
10         formatter = logging.Formatter(LOG_FORMAT);
11         
12         file_handler.setFormatter(formatter);
13         console_handler.setFormatter(formatter);
14         
15         self.__logger.addHandler(file_handler);
16         self.__logger.addHandler(console_handler);
17         
18     def get_log(self):
19         return self.__logger;
View Code

3. 定义文件名及文件路径

技术分享图片
 1 #This is file name.
 2     file_name = "%s_%s_%s" % (sys.argv[2],sys.argv[4],sys.argv[11]);
 3     info_log_path = /python_test/%s.info.log % (file_name);
 4     
 5     #this is record name and path.
 6     record_name = "data_extract_record.txt";
 7     record_path = "/python_test/";
 8     
 9     logger = Logger(log_file_name="%s" % (info_log_path),log_level=logging.DEBUG,logger_name="myLogger").get_log();
10     
11     #this is log path.
12     path = /python_test/%s.desc.log % (file_name);
13     logger.info("
");
14     logger.info("log path: %s" % (path));
15     logger.info("
");
View Code

4. 提取字段信息保存

技术分享图片
 1 #function:write all fields to log file.
 2     hive_cmd_desc = beeline -u ip -n username -e "desc %s.%s" >> %s % (sys.argv[1],sys.argv[2],path);
 3     logger.info(hive_cmd_desc);
 4     logger.info("
");
 5     status,output = commands.getstatusoutput(hive_cmd_desc);
 6     logger.info(output);
 7     logger.info("
");
 8     
 9     #logger.info success or failed information.
10     if status ==0:
11         logger.info("desc %s to %s successful!" % (sys.argv[2],path));
12     else:
13         #set color: ‘33[;31;40m‘+...+‘33[0m‘
14         logger.error(33[;31;40m+"desc %s to %s failed!" % (sys.argv[2],path)+33[0m);
15         #exit program.
16         exit();
17     logger.info("
");
View Code

5. 字符串处理

技术分享图片
 1 #this is fields list
 2     fields_list = [];
 3     with io.open(path,r,encoding="utf-8") as f:
 4         fields = list(f);
 5         for line in fields:
 6             #remove start letter "|".
 7             line_rm_start_letter = line.strip("|");
 8             logger.info(line_rm_start_letter);
 9             #remove start and end space.
10             pos = line_rm_start_letter.find("|");
11             fields_list.append(line_rm_start_letter[0:pos].strip());
12     logger.info("
");
13     
14     #remove desc.log.
15     remove_desc_log = rm %s % (path);
16     logger.info(remove_desc_log);
17     status,output = commands.getstatusoutput(remove_desc_log);
18     
19     #logger.info success or failed information.
20     if status == 0:
21         logger.info("remove %s successful!" % (path));
22     else:
23         logger.error(33[;31;40m+"remove %s failed!" % (path)+33[0m);
24         exit();
25     logger.info("
");
26     
27     #remove the first three lines.
28     del fields_list[0:3];
29     create = "";
30     start_or_etl = "";
31     if etl_load_date in fields_list:
32         start_or_etl = "etl_load_date";
33         end_letter_pos = fields_list.index("etl_load_date");
34         len = len(fields_list);
35         del fields_list[end_letter_pos:len+1];
36     if start_dt in fields_list:
37         start_or_etl = "start_dt";
38         end_letter_pos = fields_list.index("start_dt");
39         len = len(fields_list);
40         del fields_list[end_letter_pos:len+1];    
View Code

6. 添加附加条件

技术分享图片
 1 #add condition_field.
 2     condition_field = "%s" % (sys.argv[3]);
 3     if condition_field == "0":
 4         pass;
 5     else:
 6         start_or_etl = condition_field;
 7         
 8     for i in fields_list:
 9         #logger.info(len(i));
10         logger.info(i);
11     logger.info("
");
View Code

7. 拼接字段

技术分享图片
1 #splice fields.
2     fields_splice = "";
3     for i in fields_list:
4         fields_splice = fields_splice+"nvl(a.`"+i+"`,‘‘),‘|‘,";
5     logger.info(fields_splice);
6     logger.info("
");
View Code

8. 建表

技术分享图片
 1 #create table command.
 2     add_conditions = "%s" % (sys.argv[9]);
 3     if add_conditions == "and 1=1":
 4         create = "create table if not exists database.%s stored as textfile as select concat (%s from %s.%s a join %s b on trim(a.`%s`)=trim(b.`%s`) where b.code=‘%s‘ and a.`%s`>=‘%s‘ and a.`%s`<=‘%s‘ %s;" % (file_name,fields_splice,sys_argv[1],sys.argv[2],sys.argv[6],sys.argv[7],sys.argv[8],sys.argv[4],start_or_etl,sys.argv[10],start_or_etl,sys.argv[11],sys.argv[9]);
 5     else:
 6         create = "create table if not exists database.%s stored as textfile as select concat(%s from %s.%s a %s;" % (file_name,fields_splice,sys.argv[1],sys.argv[2],sys.argv[9]);
 7     logger.info(create);
 8     logger.info("
");
 9     
10     #execute the command.
11     hive_cmd_create = beeline -u ip -n username -e "%s" % (create);
12     logger.info(hive_cmd_create);
13     logger.info("
");
14     status,output = commands.getstatusoutput(hive_cmd_create);
15     logger.info(output);
16     logger.info("
");
17     
18     #logger.info success or failed information.
19     if status ==0:
20         logger.info("create database.%s successful!" % (file_name));
21     else:
22         #set color: ‘33[;31;40m‘+...+‘33[0m‘
23         logger.error(33[;31;40m+"create database.%s failed!" % (file_name)+33[0m);
24         #exit program.
25         exit();
26     logger.info("
");
View Code

9. 计数

技术分享图片
 1 #count table_new command.
 2     count = "select count(*) from database.%s;" % (file_name);
 3     logger.info(count);
 4     logger.info("
");
 5     
 6     #execute the command.
 7     hive_cmd_count = beeline -u ip -n username -e "%s" % (count);
 8     logger.info(hive_cmd_count);
 9     logger.info("
");
10     status,output = commands.getstatusoutput(hive_cmd_count);
11     
12     #logger.info success or failed information.
13     if status ==0:
14         logger.info("count database.%s successful!" % (file_name));
15     else:
16         #set color: ‘33[;31;40m‘+...+‘33[0m‘
17         logger.error(33[;31;40m+"count database.%s failed!" % (file_name)+33[0m);
18         #exit program.
19         exit();
20     logger.info("
");
21     logger.info(output);
22     logger.info("
");
View Code

10. 提取数量

技术分享图片
 1 #extract number.
 2     output_split = output.split("
");
 3     number = output_split[7].strip("|").strip();
 4     result = re.match(r"^d+$",number);
 5     if result:
 6         #logger.info count.
 7         logger.info("The number matched success!");
 8         logger.info(33[1;33;40m+"The count is : %s" % (number)+33[0m);
 9         logger.info("
");
10     else:
11         logger.warning("The number matched failed!");
View Code

11. 抽样查看数据的准确性

技术分享图片
 1 #show the first five data.
 2     first_five_data = "select * from database.%s limit 5;" % (file_name);
 3     logger.info(first_five_data);
 4     logger.info("
");
 5     
 6     #execute the command.
 7     hive_first_five_data = beeline -u ip -n username -e "%s" % (first_five_data);
 8     logger.info(hive_first_five_data);
 9     logger.info("
");
10     status,output = commands.getstatusoutput(hive_first_five_data);
11     
12     #logger.info success or failed information.
13     if status == 0:
14         logger.info("show the first five data of database.%s successful!" % (file_name));
15     else:
16         #set color: ‘33[;31;40m‘+...+‘33[0m‘
17         logger.error(33[;31;40m+"show the first five data of database.%s failed!" % (file_name)+33[0m);
18         #exit program.
19         exit();
20     logger.info("
");
21     
22     #logger.info the first five data.
23     logger.info(33[1;33;40m+"the first five data are : 

%s" % (output)+33[0m);
24     logger.info("
");
View Code

12. 记录相关信息到文件

技术分享图片
 1 #append to record.txt.
 2     output = open("%s%s" % (record_path,record_name),a);
 3     if add_conditions == "and 1=1":
 4         output.write("%s	%s	%s	%s	%s	%s	%s	%s	%s	%s
" % (database_name,table_name,code,extract_date,count,rel_tb_name,rel_field_name_pre,rel_field_name_after,date_pre,date_after));
 5         output.write("%s	%s	%s	%s	%s	%s	%s	%s	%s	%s
" % (sys.argv[1],sys.argv[2],sys.argv[4],sys.argv[5],number,sys.argv[6],sys.argv[7],sys.argv[8],sys.argv[10],sys.argv[11]));
 6     else:
 7         output.write("%s	%s	%s	%s	%s	%s
" % (database_name,table_name,code,extract_date,count,add_conditions));
 8         output.write("%s	%s	%s	%s	%s	%s
" % (sys.argv[1],sys.argv[2],sys.argv[4],sys.argv[5],number,sys.argv[9]));
 9     output.close();
10     
11     #logger.info the data extraction success information.
12     logger.info(33[1;35;40m+"*****Data extract success!*****"+33[0m);
13     logger.info(33[1;35;40m+"*****Made by sgqhappy in %s!*****" % (sys.argv[5])+33[0m);
14     logger.info("
");
View Code

作者:sgqhappy
出处:https://www.cnblogs.com/sgqhappy/p/9956956.html
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。



以上是关于使用python脚本提取数据的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 中使用 BeautifulSoup 从脚本标签中提取文本

如何使用python脚本自动将数据从访问中提取到excel中

从单个按钮从多个片段中提取数据

Android课程---Android Studio使用小技巧:提取方法代码片段

python 使用WordPress Rest API从MemberPress提取数据的脚本

使用从循环内的代码片段中提取的函数避免代码冗余/计算开销