如何通过其 API 从网站读取数据?
Posted
技术标签:
【中文标题】如何通过其 API 从网站读取数据?【英文标题】:How to read data from a website through its API? 【发布时间】:2022-01-23 19:24:19 【问题描述】:我对 Spark 很陌生。我需要从 Opensky 网站读取数据,使用他们为它提供的 api (https://openskynetwork.github.io/opensky-api/python.html)。 bbox 参数是正好四个值(min_latitude、max_latitude、min_longitude、max_latitude)的元组。以下代码显示了在某些坐标上注册的航班:
import json
from random import sample
from opensky_api import OpenSkyApi
api = OpenSkyApi()
states = api.get_states(bbox=(45.8389, 47.8229, 5.9962, 10.5226))
for s in sample(states.states,5):
flight =
'callsign':s.callsign,
'country': s.origin_country,
'longitude': s.longitude,
'latitude': s.latitude,
'velocity': s.velocity,
'vertical_rate': s.vertical_rate,
flight_data= json.dumps(flight, indent=2).encode('utf-8')
print("(%r, %r,%r, %r, %r, %r)" % (s.callsign, s.origin_country, s.longitude, s.latitude,s.velocity,s.vertical_rate))
我需要创建一个 python 程序,以便能够每 10 秒发送一次航班信息(通过我分配的端口)。首先,我必须在终端中使用从 Opensky 读取的套接字服务器运行 python 程序,然后我必须在另一个终端中运行带有结构化流的 Spark 程序。我需要发送数据并通过终端以json格式显示(使用json.dumps函数)。
我有以下模板可以做到这一点,但我不知道应该如何修改它们才能读取数据。模板如下:
服务器套接字:
import socket
server = socket.socket()
host = ????
port = ????
server.bind((host, port))
server.listen(2)
client_socket, addr = server.accept()
print("connection established.")
# Sending data
client_socket.sendall("Text".encode())
Spark 结构化流式传输:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("FlightsInformation") \
.getOrCreate()
flights= spark \
.readStream \
.format("socket") \
.option("host", "????") \
.option("port", ????) \
.load()
flights_information= ????
query = flight_information\
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
我该怎么做?
【问题讨论】:
【参考方案1】:这就是我创建套接字以通过套接字发送 JSON 数据的方式。
import socket
import sys
import json
from random import sample
from time import sleep
from opensky_api import OpenSkyApi
api = OpenSkyApi()
states = api.get_states(bbox=(45.8389, 47.8229, 5.9962, 10.5226))
# Create a socket (SOCK_STREAM means a TCP socket)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err:
print('Socket error because of %s' %(err))
try:
# Connectar al server
sock.bind(('127.0.0.1', PORT))
except socket.error as err:
print('Error, could not bind to server because of %s' %(err))
sys.exit
sock.listen(2)
client_socket, addr = sock.accept()
print("connection established.")
while True:
for s in sample(states.states, 5):
vuelo_dict =
'callsign':s.callsign,
'country': s.origin_country,
'longitude': s.longitude,
'latitude': s.latitude,
'velocity': s.velocity,
'vertical_rate': s.vertical_rate,
flight_data = json.dumps(vuelo_dict, indent=2).encode('utf-8')
print("(%r, %r,%r, %r, %r, %r)" % (s.callsign, s.origin_country, s.longitude, s.latitude,s.velocity,s.vertical_rate))
try:
client_socket.sendall(flight_data)
sleep(10)
#print('Sent: '.format(flight_data))
except socket.gaierror:
print ('There an error resolving the host')
sock.close()
Spark 结构化流式传输:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("FlightsInformation") \
.getOrCreate()
flights_information= spark \
.readStream \
.format('socket')\
.option('host', 'localhost')\
.option('port', XXXXX)\
.load()
query = flights_information\
.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
【讨论】:
我已经尝试过这段代码,但它对我不起作用。首先我在终端中运行套接字代码,然后在另一个终端中运行 Spark Structured Streaming 代码,但我收到以下错误消息:ERROR streaming.MicroBatchExecution: Query [id = 7166f1d2-04e6-48b3-b444-db741d217c4b, runId = 0309162a-fb2a-4124-be76-a15c4bcc3fee] 因错误 java.lang.NullPointerException 而终止。知道为什么我会收到此错误吗?你在代码的套接字部分中写着 PORT 的地方写了什么? 在我提到 PORT 和 XXXXX 的地方,您必须编写分配给您的 localhost 的端口。就像示例 sock.bind(('127.0.0.1', 11111)) 和 .option('port', 11111)。以上是关于如何通过其 API 从网站读取数据?的主要内容,如果未能解决你的问题,请参考以下文章
我应该使用 Screen Scrapers 还是 API 从网站读取数据
使用 JdbcTemplate 从数据库中读取大数据并通过 api 公开?