python3 异步-协程版:常用高德方法总结

Posted jpapplication

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python3 异步-协程版:常用高德方法总结相关的知识,希望对你有一定的参考价值。

 1 import requests
 2 import time
 3 import csv
 4 from math import ceil
 5 import asyncio
 6 from aiohttp import ClientSession, TCPConnector, client_exceptions
 7 from random import choice
 8 
 9 
10 class KEY:
11     def __init__(self):
12         self.keys = list(csv.reader(open(u高德秘钥.csv, r)))
13         self.keys_length = len(self.keys)
14         self.i = 0
15 
16     def get_key(self):
17         return self.keys[self.i][1]
18 
19     def next_key(self):
20         self.i += 1
21         if self.i == self.keys_length:
22             print("秘钥耗尽")
23             self.i = 0
24         return self.get_key()
25 
26     def random_key(self):
27         return choice(self.keys)[1]
28 
29     def check_out(self):
30         lon = 104.12159
31         lat = 30.65447
32         while self.i < self.keys_length:
33             print(self.i)
34             base_url = https://restapi.amap.com/v3/assistant/coordinate/convert
35             _params_ = {locations: {},{}.format(lon, lat),
36                         coordsys: gps, output: json, key: self.keys[self.i][1]}
37 
38             infocode = int(requests.get(base_url, _params_).json()[infocode])
39             if infocode == 10001:
40                 print(self.keys[self.i])
41 
42             self.i += 1
43 
44     def get_all_key(self):
45         self.keys = list(csv.reader(open(u高德秘钥.csv, r)))
46 
47     def drop(self, key):
48         if key in self.keys:
49             self.keys.remove(key)
50 
51     def get_len(self):
52         return len(self.keys)
  1 class Method:
  2     def __init__(self):
  3         self.keys = KEY()
  4         self.k = self.keys.random_key()
  5         self.event_loop = asyncio.get_event_loop()
  6 
  7     # 访问网址
  8     async def get_response(self, base_url_, params_):
  9         """
 10         :param base_url_:  网址
 11         :param params_: 访问网址的参数字典
 12         :return: 访问网址的响应内容
 13         """
 14         async with ClientSession(connector=TCPConnector(verify_ssl=False)) as session:
 15             while True:
 16                 try:
 17                     async with session.get(base_url_, params=params_) as r:
 18                         response_ = await r.json()
 19                         infocode = int(response_[infocode])
 20                         if infocode == 10000:
 21                             return response_
 22                         else:
 23                             print(infocode)
 24                         print("切换key,重新查询中")
 25                         self.k = self.keys.random_key()
 26                         params_[key] = self.k
 27                 except client_exceptions.ClientConnectorError:
 28                     print(wait a moment)
 29                     for i in range(60, 0, -1):
 30                         print("倒计时{}秒".format(i), flush=True, end="" * 10)
 31                         time.sleep(1)
 32                 except Exception as e:
 33                     print(e)
 34                     for i in range(60, 0, -1):
 35                         print("倒计时{}秒".format(i), flush=True, end="" * 10)
 36                         time.sleep(1)
 37 
 38     # gps坐标转化高德坐标
 39     def exchange(self, coordinate):
 40         """
 41         :param coordinate: 坐标 经度,纬度|经度,纬度……
 42         :return: 高德坐标
 43         """
 44         base_url = https://restapi.amap.com/v3/assistant/coordinate/convert
 45         task = []
 46         for c in coordinate:
 47             _params_ = {locations: c, coordsys: gps, output: json, key: self.k}
 48             task.append(self.get_response(base_url, _params_))
 49 
 50         response = self.event_loop.run_until_complete(asyncio.gather(*task))
 51         result = []
 52         for r in response:
 53             result.append(r[locations])
 54         return result
 55 
 56     # 获取行政区信息
 57     def get_region(self, coordinate_array,  batch=False):
 58         """
 59         :param coordinate_array: 坐标数组 【"经度,纬度",……】
 60         :param batch: 批量查询
 61         :return: 省,市,区(县),详细地址
 62         """
 63 
 64         # 坐标转换
 65         coordinate = []
 66         for c in coordinate_array:
 67             coordinate += [c]
 68         coordinate = self.exchange(coordinate)
 69 
 70         task = []
 71         for c in coordinate:
 72             base_url = https://restapi.amap.com/v3/geocode/regeo
 73             _params_ = {location: c, extensions: all,
 74                         output: json, key: self.k, batch: str(batch).lower()}
 75             task.append(self.get_response(base_url, _params_))
 76         response_array = self.event_loop.run_until_complete(asyncio.gather(*task))
 77 
 78         for response in response_array:
 79             if not batch:
 80                 province = response[regeocode][addressComponent][province]
 81                 city = response[regeocode][addressComponent][city]
 82                 district = response[regeocode][addressComponent][district]
 83                 address = response[regeocode][formatted_address]
 84                 pois = [[r[type].split(;)[0], float(r[distance])] for r in response[regeocode][pois][:3]]
 85 
 86                 yield province, city, district, address, pois
 87             else:
 88                 regeocodes = response[regeocodes]
 89                 result = []
 90                 for r in regeocodes:
 91                     r[pois] = sorted(r[pois], key=lambda x: x[distance])
 92                     result.append(
 93                         (
 94                             r[addressComponent][province],
 95                             r[addressComponent][city],
 96                             r[addressComponent][district],
 97                             r[formatted_address],
 98                             [[i[type].split(;)[0], float(i[distance])] for i in r[pois][:3]]
 99                         )
100                     )
101                 yield result
102 
103     # 获取POI信息(多边形搜索--矩形搜索)
104     def get_poi(self, coordinate_array, lon_gap, lat_gap, poi_type, output_poiname=False):
105         """
106         :param coordinate: 【【经度,纬度】,【经度,纬度】】 矩形的中心经纬度
107         :param lon_gap: 矩形经度间隔
108         :param lat_gap: 矩形纬度间隔
109         :param poi_type: 查询POI类型
110         :param output_poiname: 输出POI name
111         :return: 相应poi数量
112         """
113         # 坐标转换
114         coordinate = []
115         for c in coordinate_array:
116             coordinate += [c[0]-lon_gap/2, c[1]+lat_gap/2, c[0]+lon_gap/2, c[1]-lat_gap/2]
117         coordinate = self.exchange(coordinate)
118 
119         base_url = https://restapi.amap.com/v3/place/polygon
120 
121         task = []
122         for i in range(0, len(coordinate), 4):
123             params = {polygon: {},{}|{},{}.format(coordinate[i], coordinate[i+1], coordinate[i+2], coordinate[i+3]),
124                       types: poi_type, output: json, key: self.keys.random_key(), page: 1, offset: 20}
125 
126             task.append(self.get_response(base_url, params))
127         response = self.event_loop.run_until_complete(asyncio.gather(*task))
128 
129         for r in response:
130             n = int(r[count])
131             task = []
132             if output_poiname:
133                 print(Crawling POI {} name.format(poi_type))
134                 name = [{}({}).format(e[name], e[typecode]) for e in r[pois]]
135                 while params[page] < ceil(n/20):
136                     params[page] += 1
137                     task.append(self.get_response(base_url, params))
138                 result = self.event_loop.run_until_complete(asyncio.gather(*task))
139                 for res in result:
140                     name += [{}({}).format(e[name], e[typecode]) for e in res[pois]]
141 
142                 yield n, name
143             else:
144                 yield n
145 
146     def get_distance(self, coordinate_array):
147         """
148         :param coordinate_array: [o_lon, o_lat, d_lon, d_lat]
149         :return:
150         """
151         # 坐标转换
152         coordinate = []
153         for c in coordinate_array:
154             coordinate += c
155         coordinate = self.exchange(coordinate)
156 
157         base_url = https://restapi.amap.com/v3/direction/driving
158         task = []
159 
160         for i in range(0, len(coordinate), 2):
161             parameters = {
162                 key: self.keys.random_key(), strategy: 2, nosteps: 1, extensions: base,
163                 origin: coordinate[i],
164                 destination: coordinate[i+1]
165             }
166             task.append(self.get_response(base_url, parameters))
167         response = self.event_loop.run_until_complete(asyncio.gather(*task))
168 
169         for r in response:
170             yield float(r[route][paths][0][distance])

 

以上是关于python3 异步-协程版:常用高德方法总结的主要内容,如果未能解决你的问题,请参考以下文章

小爬虫程序协程版

Kotlin 协程版的 AutoDispose

python3 高德常用方法归类

Python3异步编程

python 异步编程

python asyncio 异步 I/O - 协程(Coroutine)与运行