python 这是我试图让Alexa返回Plex的On Deck和Recent Downloaded列表。它不是最漂亮的,但Plex的API不是最佳选择

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 这是我试图让Alexa返回Plex的On Deck和Recent Downloaded列表。它不是最漂亮的,但Plex的API不是最佳选择相关的知识,希望对你有一定的参考价值。

from __future__ import print_function
import urllib
import urllib2
import xml.etree.ElementTree
import logging

#enable basic logging to CloudWatch Logs 
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):

    print("event.session.application.applicationId=" +
          event['session']['application']['applicationId'])

    #only run if requested by a specific app ID (past your app ID below)
    if (event['session']['application']['applicationId'] !=
             "amzn1.echo-sdk-ams.app.[AppIDHere]"):
         raise ValueError("Invalid Application ID")

    if event['session']['new']:
        on_session_started({'requestId': event['request']['requestId']},
                           event['session'])

    if event['request']['type'] == "LaunchRequest":
        return on_launch(event['request'], event['session'])
    elif event['request']['type'] == "IntentRequest":
        return on_intent(event['request'], event['session'])
    elif event['request']['type'] == "SessionEndedRequest":
        return on_session_ended(event['request'], event['session'])


def on_session_started(session_started_request, session):
    #called when the session starts
    
    #nothing to do here 
    
    print("on_session_started requestId=" + session_started_request['requestId']
          + ", sessionId=" + session['sessionId'])


def on_launch(launch_request, session):
    #called when the user launches the skill without specifying what they want

    print("on_launch requestId=" + launch_request['requestId'] +
          ", sessionId=" + session['sessionId'])
    # Dispatch to your skill's launch
    return get_welcome_response()


def on_intent(intent_request, session):
    #called when the user specifies an intent for this skill

    print("on_intent requestId=" + intent_request['requestId'] +
          ", sessionId=" + session['sessionId'])

    intent = intent_request['intent']
    intent_name = intent_request['intent']['name']

    #Plex Credentials - Please Fill In
    username = "[username here]"
    password = "[password]"

    authKey = plex_login(username,password)

    # Dispatch to your skill's intent handlers
    if intent_name == "Plex":
        return plex_list_on_desk(intent, session, authKey)
    elif intent_name == "OnDeck":
        return plex_list_on_desk(intent, session, authKey)
    elif intent_name == "Download":
        return plex_list_download(intent, session, authKey)
    elif intent_name == "AMAZON.HelpIntent":
        return get_welcome_response()
    else:
        raise ValueError("Invalid intent")


def on_session_ended(session_ended_request, session):
    #Called when the user ends the session
    
    print("on_session_ended requestId=" + session_ended_request['requestId'] +
          ", sessionId=" + session['sessionId'])
    # add cleanup logic here

# --------------- Functions that control the skill's behavior ------------------


def get_welcome_response():
    #the standard welcome message 
    
    session_attributes = {}
    card_title = "Welcome"
    speech_output = "Welcome to the Plex Skill. " \
                    "You can requet items on deck " \
                    "or list downloads"
    # If the user either does not reply to the welcome message or says something
    # that is not understood, they will be prompted again with this text.
    reprompt_text = "Welcome to the Plex Skill. " \
                    "You can requet items on deck " \
                    "or list downloads"
    should_end_session = False
    return build_response(session_attributes, build_speechlet_response(
        card_title, speech_output, reprompt_text, should_end_session))


def plex_list_on_desk(intent, session, authKey):
    #return on deck TV shows
    
    card_title = intent['name']
    session_attributes = {}
    should_end_session = True

    #call the on deck function
    OnDeck = ret_on_deck(str(find_plex_server(authKey)),authKey)
   
    speech_output = OnDeck
    
    reprompt_text = OnDeck
                    
    return build_response(session_attributes, build_speechlet_response(
        card_title, speech_output, reprompt_text, should_end_session))
        
def plex_list_download(intent, session, authKey):
    #return recently downloaded 
    
    card_title = intent['name']
    session_attributes = {}
    should_end_session = True

    #call the recently downloaded function 
    OnDeck = ret_download(str(find_plex_server(authKey)),authKey)
   
    speech_output = OnDeck
    
    reprompt_text = OnDeck
                    
    return build_response(session_attributes, build_speechlet_response(
        card_title, speech_output, reprompt_text, should_end_session))

# --------------- Helpers that build all of the responses ----------------------

def build_speechlet_response(title, output, reprompt_text, should_end_session):
    #creates the JSON payload for Alexa 
    
    return {
        'outputSpeech': {
            'type': 'PlainText',
            'text': output
        },
        'card': {
            'type': 'Simple',
            'title': 'SessionSpeechlet - ' + title,
            'content': 'SessionSpeechlet - ' + output
        },
        'reprompt': {
            'outputSpeech': {
                'type': 'PlainText',
                'text': reprompt_text
            }
        },
        'shouldEndSession': should_end_session
    }

def build_response(session_attributes, speechlet_response):
    return {
        'version': '1.0',
        'sessionAttributes': session_attributes,
        'response': speechlet_response
    }

# --------------- Functions specific to Plex ----------------------
    
def find_plex_server(authKey):
    #Select the first server you find
    url = "https://plex.tv/devices.xml?X-Plex-Token=" + authKey
    
    try:
        request = urllib2.Request(url) 
        result = urllib2.urlopen(request)
        e = xml.etree.ElementTree.fromstring(result.read())
        x = 0

        #[TODO]: There should be a better way to do this. Check the API docs
        for atype in e.findall('Device'):
            if atype.get('provides') == "server":
                for conns in atype.findall('Connection'):
                    if x == 0:
                        return(conns.get('uri'))
                        x += 1
                
    except urllib2.URLError, e:
        print(e)
        
def ret_on_deck(url,authKey):
    #returns a list of TV Shows that are "on deck"
    MainServerURL = url + "/library/onDeck?X-Plex-Token=" + authKey
    
    s = "TV Shows On Deck: \n"
    
    try:
        request = urllib2.Request(MainServerURL)   
        result = urllib2.urlopen(request)
        e = xml.etree.ElementTree.fromstring(result.read())

        #look for the TV shows only 
        for atype in e.findall('Video'):
            if atype.get('librarySectionTitle') == "TV Shows":
                s += atype.get('grandparentTitle').split("(")[0].strip() + ". \n"
                
        return s
                
    except urllib2.URLError, e:
        print(e)
        
def ret_download(url,authKey):
    #returns a list of downloads
    MainServerURL = url + "/library/recentlyAdded?X-Plex-Token=" + authKey

    #start the string
    s = "Recently Added: \n"
    
    #number of shows to list
    c = 5
    
    try:
        request = urllib2.Request(MainServerURL)   
        result = urllib2.urlopen(request)
        e = xml.etree.ElementTree.fromstring(result.read())
        t = 0
        m = 0

        #search for TV shows
        for atype in e.findall('Directory'):
            if t == 0:
                s = s + "In TV: \n"
            if atype.get('type') == "season" and t < c:
                s += atype.get('parentTitle').split("(")[0].strip() + ". \n"
                t += 1
                
        #search for Movies        
        for atype in e.findall('Video'):
            if m == 0:
                s += "In Movies: \n" 
            if atype.get('type') == "movie" and m < c:
                s += atype.get('title').split("(")[0].strip() + ". \n"
                m += 1
                
        return s
                
    except urllib2.URLError, e:
        print(e)
        
def plex_login(username,password):
    #returns a Plex auth token
    
    try:
        url = "https://plex.tv/users/sign_in.xml"
        
        headers = {
            'x-plex-device-name': "AWS Lambda",
            'x-plex-device': "AWSv01",
            'x-plex-client-identifier': "049ouolknf9u42oihen"
            }

        values = {
            'user[login]' : username,
            'user[password]': password
            }
            
        data = urllib.urlencode(values)
        req = urllib2.Request(url,data,headers)
        response = urllib2.urlopen(req)
        
        e = xml.etree.ElementTree.fromstring(response.read())
        return(e.get('authenticationToken'))
                
    except urllib2.URLError, e:
        print(e)

以上是关于python 这是我试图让Alexa返回Plex的On Deck和Recent Downloaded列表。它不是最漂亮的,但Plex的API不是最佳选择的主要内容,如果未能解决你的问题,请参考以下文章

聆听 Alexa 技能包中的响应

如何在 Alexa Skill lambda 函数中正确指定 SSML?

使用当天选择 Alexa Intent

python Plex随机预卷

python Plex Pass Updatescript

使用虚拟 Alexa 设备进行持续集成?