Websocketapps 在参数值更改时添加另一个查询

Posted

技术标签:

【中文标题】Websocketapps 在参数值更改时添加另一个查询【英文标题】:Websocketapps to add another query when the argument value is changed 【发布时间】:2021-03-18 11:01:12 【问题描述】:

也许我不确定如何正确地问这个问题,因为我无法通过自己的谷歌搜索找到一个好的答案。

我已经很接近了,但我还有最后一步来完成下面的脚本。该脚本使用 YLiveTicker 从 Finance.yahoo.com 的网络流中提取实时股票数据,以获取来自 csv 文件('stock_selection_active.csv')的给定股票代码列表。该列表将如下所示:['NLSPW', 'ENTX', 'TRCH', 'NVFY']。修改后的 YLiveTicker 脚本然后获取列表并收听给定符号的网络流。但问题是如果股票代码列表更新,我必须终止脚本并重新运行它。我想修改自动执行此操作的脚本。

例如,我最初可以有 ['NLSPW', 'ENTX', 'TRCH', 'NVFY'],一旦将 'AAPL' 作为另一个项目添加到列表中,它将添加一个额外的查询 ' AAPL' 到 websocket。

我想我正在努力操作 websocketapp。我有点能够使用 websockets 和 asyncio 库复制脚本的某些部分来设置服务器和客户端以进行交互以获得类似的效果,但它只从 wss://streamer.finance.yahoo.com 提取空数据。

如何修改以下脚本以不断检查 csv 文件并将另一个符号添加到 websocketapp 以进行查询?

import base64
import json, sys, os, csv
import websocket
import asyncio
#import websockets
from datetime import datetime, timedelta
import time
import pandas as pd
from pytz import timezone
from pathlib import Path
from yaticker_pb2 import yaticker

try:
    import thread
except ImportError:
    import _thread as thread

nyc = timezone('America/New_York')
today_str = datetime.today().astimezone(nyc).strftime('%Y-%m-%d')
PATH = Path(__file__).parent
DATA_PATH = PATH.joinpath("data").resolve()
DATE_DATA_PATH = DATA_PATH.joinpath(today_str)

if not os.path.exists(DATE_DATA_PATH):
    os.makedirs(DATE_DATA_PATH)

stock_selection = DATA_PATH.joinpath("stock_select_active.csv")

def PullTickersToQuery():
    if os.path.exists(stock_selection):
        df = pd.read_csv(stock_selection)
        tickers_to_query = df['Symbol'].tolist()
        return (tickers_to_query)

class YLiveTicker:
    def __init__(
        self,
        on_ticker=None,
        ticker_names=['MSFT'],
        on_error=None,
        on_close=None,
        enable_socket_trace=False,
    ):

        self.symbol_list = dict()
        self.symbol_list["subscribe"] = ticker_names

        websocket.enableTrace(enable_socket_trace)

        self.on_ticker = on_ticker
        self.on_custom_close = on_close
        self.on_custom_error = on_error

        self.yaticker = yaticker()

        self.ticker_names = ticker_names
        self.ws = websocket.WebSocketApp(
            "wss://streamer.finance.yahoo.com/",
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
        )
        self.ws.on_open = self.on_open
        self.ws.run_forever()

    def on_message(self, message):
        message_bytes = base64.b64decode(message)
        self.yaticker.ParseFromString(message_bytes)
        data = 
            "Symbol": self.yaticker.id,
            "Exchange": self.yaticker.exchange,
            "QuoteType": self.yaticker.quoteType,
            "Price": self.yaticker.price,
            "Timestamp": (datetime.today().astimezone(timezone('America/New_York')).strftime('%Y%m%d %H:%M:%S.%f')), #self.yaticker.time,
            "MarketHours": self.yaticker.marketHours,
            "ChangePercent": self.yaticker.changePercent,
            "DayVolume": self.yaticker.dayVolume,
            "DayHigh": self.yaticker.dayHigh,
            "DayLow": self.yaticker.dayLow,
            "Change": self.yaticker.change,
            "PriceHint": self.yaticker.priceHint,
            'Bid': self.yaticker.bid,
            'BidSize': self.yaticker.bidSize,
            'Ask': self.yaticker.ask,
            'AskSize': self.yaticker.askSize
        

        quoteType = 
            0: 'NONE',
            5: 'ALTSYMBOL',
            7: 'HEARTBEAT',
            8: 'EQUITY',
            9: 'INDEX',
            11: 'MUTUALFUND',
            12: 'MONEYMARKET',
            13: 'OPTION',
            14: 'CURRENCY',
            15: 'WARRANT',
            17: 'BOND',
            18: 'FUTURE',
            20: 'ETF',
            23: 'COMMODITY',
            28: 'ECNQUOTE',
            41: 'CRYPTOCURRENCY',
            42: 'INDICATOR',
            1000: 'INDUSTRY',
        

        marketHoursType = 
            0: 'PRE_MARKET',
            1: 'REGULAR_MARKET',
            2: 'POST_MARKET',
            3: 'EXTENDED_HOURS_MARKET'
        


        for q in quoteType:
            if data['QuoteType'] == q:
                data['QuoteType'] = quoteType[q]

        for m in marketHoursType:
            if data['MarketHours'] == m:
                data['MarketHours'] = marketHoursType[m]

        row = list(data.values())
        headers = list(data.keys())
        print (row)

        STOCK_DATA_PATH = DATE_DATA_PATH.joinpath('.csv'.format(data['Symbol']))
        file_exists = os.path.isfile(STOCK_DATA_PATH)

        with open(STOCK_DATA_PATH, 'a') as f:
            writer = csv.DictWriter(f, delimiter=',', lineterminator='\n', fieldnames=headers)
            
            if not file_exists:
                writer.writeheader()

            writer.writerow(data)
            f.close()

    def on_error(self, error):
        if self.on_custom_error is None:
            print(error)
        else:
            self.on_custom_error(error)

    def on_close(self):

        if self.on_custom_close is None:
            #print("### connection is closed ###")
            pass
        else:
            self.on_custom_close()

    def on_open(self):
        def run(*args):
            #self.symbol_list = PullTickersToQuery()
            #print (self.symbol_list)
            self.ws.send(json.dumps(self.symbol_list))

        thread.start_new_thread(run, ())
        #print("### connection is open ###")
        print('Symbol,Exchange,MarketHours,QuoteType,Date,Price,ChangePercent,DayVolume,DayHigh,DayLow,Bid,BidSize,Ask,AskSize')

if __name__ == '__main__':
    tickers_to_query = PullTickersToQuery()
    YLiveTicker(ticker_names=tickers_to_query)

yaticker_pb2.py

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: yaticker.proto

import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor.FileDescriptor(
  name='yaticker.proto',
  package='',
  syntax='proto3',
  serialized_pb=_b('\n\x0eyaticker.proto\"\xcb\x08\n\x08yaticker\x12\n\n\x02id\x18\x01 \x01(\t\x12\r\n\x05price\x18\x02 \x01(\x02\x12\x0c\n\x04time\x18\x03 \x01(\x12\x12\x10\n\x08\x63urrency\x18\x04 \x01(\t\x12\x10\n\x08\x65xchange\x18\x05 \x01(\t\x12&\n\tquoteType\x18\x06 \x01(\x0e\x32\x13.yaticker.QuoteType\x12.\n\x0bmarketHours\x18\x07 \x01(\x0e\x32\x19.yaticker.MarketHoursType\x12\x15\n\rchangePercent\x18\x08 \x01(\x02\x12\x11\n\tdayVolume\x18\t \x01(\x12\x12\x0f\n\x07\x64\x61yHigh\x18\n \x01(\x02\x12\x0e\n\x06\x64\x61yLow\x18\x0b \x01(\x02\x12\x0e\n\x06\x63hange\x18\x0c \x01(\x02\x12\x11\n\tshortName\x18\r \x01(\t\x12\x12\n\nexpireDate\x18\x0e \x01(\x12\x12\x11\n\topenPrice\x18\x0f \x01(\x02\x12\x15\n\rpreviousClose\x18\x10 \x01(\x02\x12\x13\n\x0bstrikePrice\x18\x11 \x01(\x02\x12\x18\n\x10underlyingSymbol\x18\x12 \x01(\t\x12\x14\n\x0copenInterest\x18\x13 \x01(\x12\x12)\n\x0boptionsType\x18\x14 \x01(\x0e\x32\x14.yaticker.OptionType\x12\x12\n\nminiOption\x18\x15 \x01(\x12\x12\x10\n\x08lastSize\x18\x16 \x01(\x12\x12\x0b\n\x03\x62id\x18\x17 \x01(\x02\x12\x0f\n\x07\x62idSize\x18\x18 \x01(\x12\x12\x0b\n\x03\x61sk\x18\x19 \x01(\x02\x12\x0f\n\x07\x61skSize\x18\x1a \x01(\x12\x12\x11\n\tpriceHint\x18\x1b \x01(\x12\x12\x10\n\x08vol_24hr\x18\x1c \x01(\x12\x12\x18\n\x10volAllCurrencies\x18\x1d \x01(\x12\x12\x14\n\x0c\x66romcurrency\x18\x1e \x01(\t\x12\x12\n\nlastMarket\x18\x1f \x01(\t\x12\x19\n\x11\x63irculatingSupply\x18  \x01(\x01\x12\x11\n\tmarketcap\x18! \x01(\x01\"\x80\x02\n\tQuoteType\x12\x08\n\x04NONE\x10\x00\x12\r\n\tALTSYMBOL\x10\x05\x12\r\n\tHEARTBEAT\x10\x07\x12\n\n\x06\x45QUITY\x10\x08\x12\t\n\x05INDEX\x10\t\x12\x0e\n\nMUTUALFUND\x10\x0b\x12\x0f\n\x0bMONEYMARKET\x10\x0c\x12\n\n\x06OPTION\x10\r\x12\x0c\n\x08\x43URRENCY\x10\x0e\x12\x0b\n\x07WARRANT\x10\x0f\x12\x08\n\x04\x42OND\x10\x11\x12\n\n\x06\x46UTURE\x10\x12\x12\x07\n\x03\x45TF\x10\x14\x12\r\n\tCOMMODITY\x10\x17\x12\x0c\n\x08\x45\x43NQUOTE\x10\x1c\x12\x12\n\x0e\x43RYPTOCURRENCY\x10)\x12\r\n\tINDICATOR\x10*\x12\r\n\x08INDUSTRY\x10\xe8\x07\"\x1f\n\nOptionType\x12\x08\n\x04\x43\x41LL\x10\x00\x12\x07\n\x03PUT\x10\x01\"a\n\x0fMarketHoursType\x12\x0e\n\nPRE_MARKET\x10\x00\x12\x12\n\x0eREGULAR_MARKET\x10\x01\x12\x0f\n\x0bPOST_MARKET\x10\x02\x12\x19\n\x15\x45XTENDED_HOURS_MARKET\x10\x03\x62\x06proto3')
)



_YATICKER_QUOTETYPE = _descriptor.EnumDescriptor(
  name='QuoteType',
  full_name='yaticker.QuoteType',
  filename=None,
  file=DESCRIPTOR,
  values=[
    _descriptor.EnumValueDescriptor(
      name='NONE', index=0, number=0,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='ALTSYMBOL', index=1, number=5,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='HEARTBEAT', index=2, number=7,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='EQUITY', index=3, number=8,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='INDEX', index=4, number=9,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='MUTUALFUND', index=5, number=11,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='MONEYMARKET', index=6, number=12,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='OPTION', index=7, number=13,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='CURRENCY', index=8, number=14,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='WARRANT', index=9, number=15,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='BOND', index=10, number=17,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='FUTURE', index=11, number=18,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='ETF', index=12, number=20,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='COMMODITY', index=13, number=23,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='ECNQUOTE', index=14, number=28,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='CRYPTOCURRENCY', index=15, number=41,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='INDICATOR', index=16, number=42,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='INDUSTRY', index=17, number=1000,
      options=None,
      type=None),
  ],
  containing_type=None,
  options=None,
  serialized_start=730,
  serialized_end=986,
)
_sym_db.RegisterEnumDescriptor(_YATICKER_QUOTETYPE)

_YATICKER_OPTIONTYPE = _descriptor.EnumDescriptor(
  name='OptionType',
  full_name='yaticker.OptionType',
  filename=None,
  file=DESCRIPTOR,
  values=[
    _descriptor.EnumValueDescriptor(
      name='CALL', index=0, number=0,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='PUT', index=1, number=1,
      options=None,
      type=None),
  ],
  containing_type=None,
  options=None,
  serialized_start=988,
  serialized_end=1019,
)
_sym_db.RegisterEnumDescriptor(_YATICKER_OPTIONTYPE)

_YATICKER_MARKETHOURSTYPE = _descriptor.EnumDescriptor(
  name='MarketHoursType',
  full_name='yaticker.MarketHoursType',
  filename=None,
  file=DESCRIPTOR,
  values=[
    _descriptor.EnumValueDescriptor(
      name='PRE_MARKET', index=0, number=0,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='REGULAR_MARKET', index=1, number=1,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='POST_MARKET', index=2, number=2,
      options=None,
      type=None),
    _descriptor.EnumValueDescriptor(
      name='EXTENDED_HOURS_MARKET', index=3, number=3,
      options=None,
      type=None),
  ],
  containing_type=None,
  options=None,
  serialized_start=1021,
  serialized_end=1118,
)
_sym_db.RegisterEnumDescriptor(_YATICKER_MARKETHOURSTYPE)


_YATICKER = _descriptor.Descriptor(
  name='yaticker',
  full_name='yaticker',
  filename=None,
  file=DESCRIPTOR,
  containing_type=None,
  fields=[
    _descriptor.FieldDescriptor(
      name='id', full_name='yaticker.id', index=0,
      number=1, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='price', full_name='yaticker.price', index=1,
      number=2, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='time', full_name='yaticker.time', index=2,
      number=3, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='currency', full_name='yaticker.currency', index=3,
      number=4, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='exchange', full_name='yaticker.exchange', index=4,
      number=5, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='quoteType', full_name='yaticker.quoteType', index=5,
      number=6, type=14, cpp_type=8, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='marketHours', full_name='yaticker.marketHours', index=6,
      number=7, type=14, cpp_type=8, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='changePercent', full_name='yaticker.changePercent', index=7,
      number=8, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='dayVolume', full_name='yaticker.dayVolume', index=8,
      number=9, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='dayHigh', full_name='yaticker.dayHigh', index=9,
      number=10, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='dayLow', full_name='yaticker.dayLow', index=10,
      number=11, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='change', full_name='yaticker.change', index=11,
      number=12, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='shortName', full_name='yaticker.shortName', index=12,
      number=13, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='expireDate', full_name='yaticker.expireDate', index=13,
      number=14, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='openPrice', full_name='yaticker.openPrice', index=14,
      number=15, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='previousClose', full_name='yaticker.previousClose', index=15,
      number=16, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='strikePrice', full_name='yaticker.strikePrice', index=16,
      number=17, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='underlyingSymbol', full_name='yaticker.underlyingSymbol', index=17,
      number=18, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='openInterest', full_name='yaticker.openInterest', index=18,
      number=19, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='optionsType', full_name='yaticker.optionsType', index=19,
      number=20, type=14, cpp_type=8, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='miniOption', full_name='yaticker.miniOption', index=20,
      number=21, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='lastSize', full_name='yaticker.lastSize', index=21,
      number=22, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='bid', full_name='yaticker.bid', index=22,
      number=23, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='bidSize', full_name='yaticker.bidSize', index=23,
      number=24, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='ask', full_name='yaticker.ask', index=24,
      number=25, type=2, cpp_type=6, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='askSize', full_name='yaticker.askSize', index=25,
      number=26, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='priceHint', full_name='yaticker.priceHint', index=26,
      number=27, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='vol_24hr', full_name='yaticker.vol_24hr', index=27,
      number=28, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='volAllCurrencies', full_name='yaticker.volAllCurrencies', index=28,
      number=29, type=18, cpp_type=2, label=1,
      has_default_value=False, default_value=0,
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='fromcurrency', full_name='yaticker.fromcurrency', index=29,
      number=30, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='lastMarket', full_name='yaticker.lastMarket', index=30,
      number=31, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='circulatingSupply', full_name='yaticker.circulatingSupply', index=31,
      number=32, type=1, cpp_type=5, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
    _descriptor.FieldDescriptor(
      name='marketcap', full_name='yaticker.marketcap', index=32,
      number=33, type=1, cpp_type=5, label=1,
      has_default_value=False, default_value=float(0),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None, file=DESCRIPTOR),
  ],
  extensions=[
  ],
  nested_types=[],
  enum_types=[
    _YATICKER_QUOTETYPE,
    _YATICKER_OPTIONTYPE,
    _YATICKER_MARKETHOURSTYPE,
  ],
  options=None,
  is_extendable=False,
  syntax='proto3',
  extension_ranges=[],
  oneofs=[
  ],
  serialized_start=19,
  serialized_end=1118,
)

_YATICKER.fields_by_name['quoteType'].enum_type = _YATICKER_QUOTETYPE
_YATICKER.fields_by_name['marketHours'].enum_type = _YATICKER_MARKETHOURSTYPE
_YATICKER.fields_by_name['optionsType'].enum_type = _YATICKER_OPTIONTYPE
_YATICKER_QUOTETYPE.containing_type = _YATICKER
_YATICKER_OPTIONTYPE.containing_type = _YATICKER
_YATICKER_MARKETHOURSTYPE.containing_type = _YATICKER
DESCRIPTOR.message_types_by_name['yaticker'] = _YATICKER
_sym_db.RegisterFileDescriptor(DESCRIPTOR)

yaticker = _reflection.GeneratedProtocolMessageType('yaticker', (_message.Message,), dict(
  DESCRIPTOR = _YATICKER,
  __module__ = 'yaticker_pb2'
  # @@protoc_insertion_point(class_scope:yaticker)
  ))
_sym_db.RegisterMessage(yaticker)


# @@protoc_insertion_point(module_scope)

【问题讨论】:

当我在寻找同样的东西时,我遇到了同样的问题:如何添加更多或更改列表。我没有使用 python 的经验。我的建议是创建tickerlist.json 文件并使用python 读取文件并使用它的内容(代码列表)。另一个服务将侦听文件更改并重新启动此主条,以使其有机会重新加载新的代码。你怎么看? 我终于解决了这个问题。问题是我认为我必须以某种方式修改脚本的 websocket 部分。解决方案是修改线程部分。我会继续上传解决方案。 @Maka,这可能是我们可以解决此问题的方法之一,但我只是决定从 csv 文件中提取列表。这完全取决于你,但我使用线程函数每 15 秒拉一次更新的列表。 【参考方案1】:

我想通了。如 cmets 部分所述,我必须修改线程函数以间隔运行。

如下所示,我们在 run 函数中发送 jsonified 字典。然后,我创建了一个名为“every”的单独函数,这样我就可以在“thread.start_new_thread”内部调用该函数,以便每15秒运行一次。这样,函数就会每隔设定的时间间隔运行部分脚本。

 def on_open(self):
        def run(*args):
            self.ws.send(json.dumps(CreateTickersDict()))

        thread.start_new_thread(lambda: every(15, run), ())
        print('Symbol,Exchange,MarketHours,QuoteType,Date,Price,ChangePercent,DayVolume,DayHigh,DayLow,Bid,BidSize,Ask,AskSize')

这是完整的脚本:

import base64
import json, sys, os, csv
import websocket
import asyncio
#import websockets
from datetime import datetime, timedelta
import time, traceback
import pandas as pd
from pytz import timezone
from pathlib import Path
from yaticker_pb2 import yaticker
import logging
try:
    import thread
except ImportError:
    import _thread as thread


nyc = timezone('America/New_York')
today_str = datetime.today().astimezone(nyc).strftime('%Y-%m-%d')
PATH = Path(__file__).parent
DATA_PATH = PATH.joinpath("data").resolve()

DATE_DATA_PATH = DATA_PATH.joinpath(today_str)
#DATE_DATA_PATH = DATA_PATH.joinpath('2021-03-15')
if not os.path.exists(DATE_DATA_PATH):
    os.makedirs(DATE_DATA_PATH)

stock_selection = DATA_PATH.joinpath("stock_select_active.csv")

def every(delay, task):
    next_time = time.time() + delay
    while True:
        time.sleep(max(0, next_time - time.time()))
        try:
            task()
        except Exception:
            traceback.print_exc()
            # in production code you might want to have this instead of course:
            # logger.exception("Problem while executing repetitive task.")

        # skip tasks if we are behind schedule:
        next_time += (time.time() - next_time) // delay * delay + delay

def PullTickersList():
    if os.path.exists(stock_selection):
        df = pd.read_csv(stock_selection)
        tickers_to_query = df['Symbol'].tolist()
        return (tickers_to_query)

def CreateTickersDict():    
    symbol_list = dict()
    symbol_list["subscribe"] = PullTickersList()
    #print (symbol_list)
    return (symbol_list)


class YLiveTicker:
    def __init__(
        self,
        on_ticker=None,
        ticker_names=['MSFT'],
        on_error=None,
        on_close=None,
        enable_socket_trace=False,
    ):
        #ticker_names = PullTickersList()
        #print (ticker_names)
        #self.symbol_list = dict()
        self.symbol_list = CreateTickersDict

        websocket.enableTrace(enable_socket_trace)

        self.on_ticker = on_ticker
        self.on_custom_close = on_close
        self.on_custom_error = on_error

        self.yaticker = yaticker()

        self.ticker_names = ticker_names
        self.ws = websocket.WebSocketApp(
            "wss://streamer.finance.yahoo.com/",
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
        )
        self.ws.on_open = self.on_open
        self.ws.run_forever()

    def on_message(self, message):
        message_bytes = base64.b64decode(message)
        self.yaticker.ParseFromString(message_bytes)
        data = 
            "Symbol": self.yaticker.id,
            "Exchange": self.yaticker.exchange,
            "QuoteType": self.yaticker.quoteType,
            "Price": self.yaticker.price,
            "Timestamp": (datetime.today().astimezone(timezone('America/New_York')).strftime('%Y%m%d %H:%M:%S.%f')), #self.yaticker.time,
            "MarketHours": self.yaticker.marketHours,
            "ChangePercent": self.yaticker.changePercent,
            "DayVolume": self.yaticker.dayVolume,
            "DayHigh": self.yaticker.dayHigh,
            "DayLow": self.yaticker.dayLow,
            "Change": self.yaticker.change,
            "PriceHint": self.yaticker.priceHint,
            'Bid': self.yaticker.bid,
            'BidSize': self.yaticker.bidSize,
            'Ask': self.yaticker.ask,
            'AskSize': self.yaticker.askSize
        

        quoteType = 
            0: 'NONE',
            5: 'ALTSYMBOL',
            7: 'HEARTBEAT',
            8: 'EQUITY',
            9: 'INDEX',
            11: 'MUTUALFUND',
            12: 'MONEYMARKET',
            13: 'OPTION',
            14: 'CURRENCY',
            15: 'WARRANT',
            17: 'BOND',
            18: 'FUTURE',
            20: 'ETF',
            23: 'COMMODITY',
            28: 'ECNQUOTE',
            41: 'CRYPTOCURRENCY',
            42: 'INDICATOR',
            1000: 'INDUSTRY',
        

        marketHoursType = 
            0: 'PRE_MARKET',
            1: 'REGULAR_MARKET',
            2: 'POST_MARKET',
            3: 'EXTENDED_HOURS_MARKET'
        


        for q in quoteType:
            if data['QuoteType'] == q:
                data['QuoteType'] = quoteType[q]

        for m in marketHoursType:
            if data['MarketHours'] == m:
                data['MarketHours'] = marketHoursType[m]

        row = list(data.values())
        headers = list(data.keys())
        print (row)

        STOCK_DATA_PATH = DATE_DATA_PATH.joinpath('.csv'.format(data['Symbol']))
        file_exists = os.path.isfile(STOCK_DATA_PATH)

        with open(STOCK_DATA_PATH, 'a') as f:
            writer = csv.DictWriter(f, delimiter=',', lineterminator='\n', fieldnames=headers)
            
            if not file_exists:
                writer.writeheader()

            writer.writerow(data)
            f.close()

    def on_error(self, error):
        if self.on_custom_error is None:
            print(error)
        else:
            self.on_custom_error(error)

    def on_close(self):

        if self.on_custom_close is None:
            #print("### connection is closed ###")
            pass
        else:
            self.on_custom_close()

    def on_open(self):
        def run(*args):
            self.ws.send(json.dumps(CreateTickersDict()))

        thread.start_new_thread(lambda: every(15, run), ())
        print('Symbol,Exchange,MarketHours,QuoteType,Date,Price,ChangePercent,DayVolume,DayHigh,DayLow,Bid,BidSize,Ask,AskSize')

if __name__ == '__main__':
    YLiveTicker()

请注意,用户不再需要将列表指定为 YLiveTikcer() 参数的一部分,因为我现在已经创建了单独的函数来每 15 秒提取一次更新的列表。

【讨论】:

以上是关于Websocketapps 在参数值更改时添加另一个查询的主要内容,如果未能解决你的问题,请参考以下文章

Asp.Net Cache,从缓存中修改对象并更改缓存值

Clickhouse - 添加一个新列,默认值基于另一列

我们可以在运行时更改参数值吗

Extjs:加载商店时更改参数值

添加/更改 URL 参数并重定向到新 URL

窗口函数:仅对另一列中的不同值求和