1 import requests
2 import time
3 import csv
4 from math import ceil
5 import asyncio
6 from aiohttp import ClientSession, TCPConnector, client_exceptions
7 from random import choice
8
9
10 class KEY:
11 def __init__(self):
12 self.keys = list(csv.reader(open(u'高德秘钥.csv', 'r')))
13 self.keys_length = len(self.keys)
14 self.i = 0
15
16 def get_key(self):
17 return self.keys[self.i][1]
18
19 def next_key(self):
20 self.i += 1
21 if self.i == self.keys_length:
22 print("秘钥耗尽")
23 self.i = 0
24 return self.get_key()
25
26 def random_key(self):
27 return choice(self.keys)[1]
28
29 def check_out(self):
30 lon = 104.12159
31 lat = 30.65447
32 while self.i < self.keys_length:
33 print(self.i)
34 base_url = 'https://restapi.amap.com/v3/assistant/coordinate/convert'
35 _params_ = {'locations': '{},{}'.format(lon, lat),
36 'coordsys': 'gps', 'output': 'json', 'key': self.keys[self.i][1]}
37
38 infocode = int(requests.get(base_url, _params_).json()['infocode'])
39 if infocode == 10001:
40 print(self.keys[self.i])
41
42 self.i += 1
43
44 def get_all_key(self):
45 self.keys = list(csv.reader(open(u'高德秘钥.csv', 'r')))
46
47 def drop(self, key):
48 if key in self.keys:
49 self.keys.remove(key)
50
51 def get_len(self):
52 return len(self.keys)
1 class Method:
2 def __init__(self):
3 self.keys = KEY()
4 self.k = self.keys.random_key()
5 self.event_loop = asyncio.get_event_loop()
6
7 # 访问网址
8 async def get_response(self, base_url_, params_):
9 """
10 :param base_url_: 网址
11 :param params_: 访问网址的参数字典
12 :return: 访问网址的响应内容
13 """
14 async with ClientSession(connector=TCPConnector(verify_ssl=False)) as session:
15 while True:
16 try:
17 async with session.get(base_url_, params=params_) as r:
18 response_ = await r.json()
19 infocode = int(response_['infocode'])
20 if infocode == 10000:
21 return response_
22 else:
23 print(infocode)
24 print("切换key,重新查询中")
25 self.k = self.keys.random_key()
26 params_['key'] = self.k
27 except client_exceptions.ClientConnectorError:
28 print('wait a moment')
29 for i in range(60, 0, -1):
30 print("倒计时{}秒".format(i), flush=True, end="\b" * 10)
31 time.sleep(1)
32 except Exception as e:
33 print(e)
34 for i in range(60, 0, -1):
35 print("倒计时{}秒".format(i), flush=True, end="\b" * 10)
36 time.sleep(1)
37
38 # gps坐标转化高德坐标
39 def exchange(self, coordinate):
40 """
41 :param coordinate: 坐标 经度,纬度|经度,纬度……
42 :return: 高德坐标
43 """
44 base_url = 'https://restapi.amap.com/v3/assistant/coordinate/convert'
45 task = []
46 for c in coordinate:
47 _params_ = {'locations': c, 'coordsys': 'gps', 'output': 'json', 'key': self.k}
48 task.append(self.get_response(base_url, _params_))
49
50 response = self.event_loop.run_until_complete(asyncio.gather(*task))
51 result = []
52 for r in response:
53 result.append(r['locations'])
54 return result
55
56 # 获取行政区信息
57 def get_region(self, coordinate_array, batch=False):
58 """
59 :param coordinate_array: 坐标数组 【"经度,纬度",……】
60 :param batch: 批量查询
61 :return: 省,市,区(县),详细地址
62 """
63
64 # 坐标转换
65 coordinate = []
66 for c in coordinate_array:
67 coordinate += [c]
68 coordinate = self.exchange(coordinate)
69
70 task = []
71 for c in coordinate:
72 base_url = 'https://restapi.amap.com/v3/geocode/regeo'
73 _params_ = {'location': c, 'extensions': 'all',
74 'output': 'json', 'key': self.k, 'batch': str(batch).lower()}
75 task.append(self.get_response(base_url, _params_))
76 response_array = self.event_loop.run_until_complete(asyncio.gather(*task))
77
78 for response in response_array:
79 if not batch:
80 province = response['regeocode']['addressComponent']['province']
81 city = response['regeocode']['addressComponent']['city']
82 district = response['regeocode']['addressComponent']['district']
83 address = response['regeocode']['formatted_address']
84 pois = [[r['type'].split(';')[0], float(r['distance'])] for r in response['regeocode']['pois'][:3]]
85
86 yield province, city, district, address, pois
87 else:
88 regeocodes = response['regeocodes']
89 result = []
90 for r in regeocodes:
91 r['pois'] = sorted(r['pois'], key=lambda x: x['distance'])
92 result.append(
93 (
94 r['addressComponent']['province'],
95 r['addressComponent']['city'],
96 r['addressComponent']['district'],
97 r['formatted_address'],
98 [[i['type'].split(';')[0], float(i['distance'])] for i in r['pois'][:3]]
99 )
100 )
101 yield result
102
103 # 获取POI信息(多边形搜索--矩形搜索)
104 def get_poi(self, coordinate_array, lon_gap, lat_gap, poi_type, output_poiname=False):
105 """
106 :param coordinate: 【【经度,纬度】,【经度,纬度】】 矩形的中心经纬度
107 :param lon_gap: 矩形经度间隔
108 :param lat_gap: 矩形纬度间隔
109 :param poi_type: 查询POI类型
110 :param output_poiname: 输出POI name
111 :return: 相应poi数量
112 """
113 # 坐标转换
114 coordinate = []
115 for c in coordinate_array:
116 coordinate += [c[0]-lon_gap/2, c[1]+lat_gap/2, c[0]+lon_gap/2, c[1]-lat_gap/2]
117 coordinate = self.exchange(coordinate)
118
119 base_url = 'https://restapi.amap.com/v3/place/polygon'
120
121 task = []
122 for i in range(0, len(coordinate), 4):
123 params = {'polygon': '{},{}|{},{}'.format(coordinate[i], coordinate[i+1], coordinate[i+2], coordinate[i+3]),
124 'types': poi_type, 'output': 'json', 'key': self.keys.random_key(), 'page': 1, 'offset': 20}
125
126 task.append(self.get_response(base_url, params))
127 response = self.event_loop.run_until_complete(asyncio.gather(*task))
128
129 for r in response:
130 n = int(r['count'])
131 task = []
132 if output_poiname:
133 print('Crawling POI {} name'.format(poi_type))
134 name = ['{}({})'.format(e['name'], e['typecode']) for e in r['pois']]
135 while params['page'] < ceil(n/20):
136 params['page'] += 1
137 task.append(self.get_response(base_url, params))
138 result = self.event_loop.run_until_complete(asyncio.gather(*task))
139 for res in result:
140 name += ['{}({})'.format(e['name'], e['typecode']) for e in res['pois']]
141
142 yield n, name
143 else:
144 yield n
145
146 def get_distance(self, coordinate_array):
147 """
148 :param coordinate_array: [o_lon, o_lat, d_lon, d_lat]
149 :return:
150 """
151 # 坐标转换
152 coordinate = []
153 for c in coordinate_array:
154 coordinate += c
155 coordinate = self.exchange(coordinate)
156
157 base_url = 'https://restapi.amap.com/v3/direction/driving'
158 task = []
159
160 for i in range(0, len(coordinate), 2):
161 parameters = {
162 'key': self.keys.random_key(), 'strategy': 2, 'nosteps': 1, 'extensions': 'base',
163 'origin': coordinate[i],
164 'destination': coordinate[i+1]
165 }
166 task.append(self.get_response(base_url, parameters))
167 response = self.event_loop.run_until_complete(asyncio.gather(*task))
168
169 for r in response:
170 yield float(r['route']['paths'][0]['distance'])