python 自动Tealium ETL

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 自动Tealium ETL相关的知识,希望对你有一定的参考价值。

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 16 10:54:51 2019

@author: AMaass
"""

#BLOCK 0: Required modules to execute script


import glob
import json
import pandas as pd
import boto3
import pyodbc
import os
import os.path
from os import path
import gzip
import shutil
import ntpath
from pandas.io.json import json_normalize
from datetime import datetime, timedelta 


#BLOCK 1: EXTRACT - PT. 1 OF 2
#This block establishes connection to Tealium AWS Buckets for Echo


#This code establishes the connection to the correct Tealium AWS bucket.
s3 = boto3.resource('s3', aws_access_key_id = 'AKIAJ24563M7ON7SAUJQ', aws_secret_access_key = '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' )
my_bucket = s3.Bucket('dataaccess-us-east-1.tealiumiq.com')
client = boto3.client('s3', aws_access_key_id= 'AKIAJ24563M7ON7SAUJQ', aws_secret_access_key = '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')
files = client.list_objects_v2(Bucket='dataaccess-us-east-1.tealiumiq.com',Prefix="echo/main/events/all_events")
aws_path = r's3://dataaccess-us-east-1.tealiumiq.com/echo/main/'
filename = r'C:\Users\amaass\Desktop\KissmetricsDestination\TealiumEventStore_FileDump'

#It's useful to print 'files' to see which the attributes are present in the head of the files from Tealium's AWS bucket, should the need arise to modify the extract logic.
print(files)

#This code is used to establish datetime. Will be used to determine which files to pull from Tealium's AWS bucket
now = datetime.now()
now = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = now - timedelta(hours=24)

#This code downloads files from Tealium that were created the previous day. Note that they are downloaded to where the script is saved, which currently is in 'C:\Users\amaass\Documents\FilesFromTealium'
for obj in my_bucket.objects.filter(Prefix='echo/main/events/all_events/'):
    km = client.head_object(Bucket = 'dataaccess-us-east-1.tealiumiq.com', Key = obj.key)
    kmdt = datetime.strptime(km['ResponseMetadata']['HTTPHeaders']['last-modified'][5:-4], '%d %b %Y %H:%M:%S')
    kmdt = kmdt.replace(hour=0, minute=0, second=0, microsecond=0)
    print(kmdt)
    if kmdt == yesterday:
        aws_path, filename = os.path.split(obj.key)
        my_bucket.download_file(obj.key, filename)


#BLOCK 2: EXTRACT - PT. 2 OF 2        
#This block unzips the .gz files that come from Tealium's S3 bucket


source_dir = r'C:\Users\amaass\Documents\FilesFromTealium'
dest_dir = r'C:\Users\amaass\Documents\Informatica'

for src_name in glob.glob(os.path.join(source_dir, '*.gz')):

    base = os.path.basename(src_name)
    dest_name = os.path.join(dest_dir, base[:-3])

    if not os.path.exists(dest_name):
        with gzip.open(src_name, 'rb') as infile, open(dest_name, 'wb') as outfile:
            try:
                for line in infile:
                    outfile.write(line)

            except EOFError:
                print("End of file error occurred.")

            except Exception:
                print("Some error occurred.")
       
         
#BLOCK 3: TRANSFORM
#This block transforms the semi-structured JSON data into a proper dataframe, and then transforms the dataframe into a list of rows for ingestion.


#identifies all files in directory
for i in glob.glob(r'C:\Users\amaass\Documents\Informatica\*'): 
    with open(i, encoding='utf8') as f:
        with open(r'C:\Users\amaass\Documents\Informatica\AAA_JSON_SCHEMA_DO_NOT_DELETE.txt', encoding='utf8') as j: #this appends the json schema to the beginning of the file so each file is read properly
            data = j.read() + f.read()

    #this gives the name to the file and its eventual destination, and checks to see if it has successfully been processed in the past 
    filename = ntpath.basename(i)
    base_filepath =    r'C:\Users\amaass\Documents\Informatica' + '\\' + filename    
    success_filepath = r'C:\Users\amaass\Documents\Python ETLs\Successful_Tealium_Files_Archive' +'\\' + filename
    failure_filepath = r'C:\Users\amaass\Documents\Python ETLs\Unsuccessful_Tealium_Files_Archive' +'\\' + filename #+ '.txt'
    success_dir = path.exists(success_filepath)
    failure_dir = path.exists(failure_filepath)
    
    print(filename)
    print("success_dir is")
    print(success_dir)
    print("failure_dir is")
    print(failure_dir)
    
    if not success_dir and not failure_dir:
        try:      
           
            #Creates empty dataframe that data will be dumped into
            df_ = pd.DataFrame(index = []) 
            
            #structures data    
            data = "[" + data.replace('}', '}, ', data.count('}')-1) + "]" 
            
            #reformats data as JSON
            json_data = json.loads(data)
                   
            #normalizes data to account for varying data structures and dumps into dataframe
            df1 = json_normalize(json_data) 
                    
            #dumps each individual dataframe into master dataframe to push to sql
            df_ = df_.append(df1)
            
            #Selects desired columns from master dataframe
            df_push = pd.concat([df_['visitorid'].astype('object'),
                                 df_['eventid'].astype('object'),
                                 df_['pageurl_domain'].astype('object'),
                                 df_['pageurl_path'].astype('object'),
                                 df_['udo_tealium_timestamp_epoch'].astype('object'),
                                 df_['udo_ut_event'].astype('object'),
                                 df_['udo_page_type'].astype('object'),
                                 df_['udo_page_name'].astype('object'),
                                 df_['udo_event_name'].astype('object'),
                                 df_['firstpartycookies_accountname'].astype('object').str.replace('"',''),
                                 df_['firstpartycookies_accountguid'].astype('object').str.replace('"',''),
                                 df_['firstpartycookies_userguid'].astype('object').str.replace('"',''),
                                 df_['firstpartycookies_name'].astype('object').str.replace('"',''),
                                 df_['firstpartycookies_utag_main_ses_id'].astype('object'),
                                 df_['firstpartycookies_km_ni'].astype('object'),        
                                 df_['firstpartycookies_km_ai'].astype('object')],                         
                                 axis=1)
                            
            #python automatically makes any NaN a float type, which cannot be iterated through. This replaces NaN with None, which is recognized as NULL by SQL
            df_nan = df_push.where((pd.notnull(df_push)), None)
                    
            #converts dataframe to list of lists
            reformat = df_nan.values.tolist()
         
            #This block of code confirms connection to write to InformaticaTest database in SQL server
            cnxn = pyodbc.connect(
                    'Driver={SQL Server Native Client 11.0};'
                    'Server=Lab;'
                    'Database=InformaticaTest;'
                    'Trusted_Connection=yes;')
            cursor = cnxn.cursor()
        
            #This is the write statement to insert data into the table
            #Notes about 'stmt' can be found below
        
            stmt = '''INSERT INTO InformaticaTest.web.TealiumStaging (
                                visitorid, 
                                eventid,
                                pageurl_domain,
                                pageurl_path,
                                udo_tealium_timestamp_epoch,
                                udo_ut_event,
                                udo_page_type, 
                                udo_page_name, 
                                udo_event_name,
                                firstpartycookies_accountname, 
                                firstpartycookies_accountguid,
                                firstpartycookies_userguid,
                                firstpartycookies_name,
                                firstpartycookies_utag_main_ses_id,
                                firstpartycookies_km_ni, 
                                firstpartycookies_km_ai   
                                )
                                VALUES (
                                ?, 
                                ?, 
                                LEFT(RTRIM(?), 250),
                                LEFT(RTRIM(?), 250),
                                ?,
                                LEFT(RTRIM(?), 250),
                                ?,
                                ?,
                                ?,
                                LEFT(RTRIM(?), 250),
                                LEFT(RTRIM(?), 250),
                                ?,
                                ?,
                                try_cast(? as int),
                                ?,
                                ?
                                )
                ''' 
                
                #STMT NOTES
                #A question mark represents the value being inserted
                #Number of question marks must match the number of keys above
                #LEFT(RTRIM(?), 250) is used for keys that may have a value exceeding 255 characters
                #Use cast(? as nvarchar(max)) in place of left(rtrim(?), 250) for fields that throw an error. referral_url is one such field that throws an error - it may have a character count exceeding 1000 characters

            
            print('Transformed data')                      
            
            
            #BLOCK 4: LOAD - this block pushes the data into SQL
            
            
            try:
                #pushes data into sql based on the above statement. Needs to be in a try statement because if it fails the script goes right to the parent exception, which means the sql connection does not close. This try/except block addresses this.
                cursor.executemany(stmt, reformat)
                
            except:
                print('an error with cursor.executemany occurred. Check the datatypes of the file.')
                cnxn.commit()
                cnxn.close()
            
            print('Performed cursor.executemany')
            cnxn.commit()  
            print('Committed')              
            cnxn.close()
            print('Closed connection')
            print(filename + ' ingested successfully!')
            
            if filename not in 'AAA_JSON_SCHEMA_DO_NOT_DELETE.txt':
                shutil.move(i, r'C:\Users\amaass\Documents\Python ETLs\Successful_Tealium_Files_Archive\\' + filename)
    
        except:
           #This block moves the file that threw an error to a separate directory so it can be handled later
            shutil.move(i, r'C:\Users\amaass\Documents\Python ETLs\Unsuccessful_Tealium_Files_Archive\\' + filename)
            
            print(filename)
            print('had issues')
    
    else:
        os.remove(base_filepath)


#BLOCK 5: CLEAN
#This code removes the files that have been processed.
            
for i in glob.glob(os.path.join(source_dir, '*.gz')):
    os.remove(i)

print('')
print('')
print('Great job Aaron! Time for some ping-pong, don\'t ya think?')
print('')


#BLOCK 6: REFERENCE FOR SQL CREATE TABLE
#SQL Create Staging Table Informatica.web.TealiumEvents script below
#
'''
create table InformaticaTest.web.TealiumEvents8 (
TealiumEventId int identity(1,1) not null,
VisitorId varchar(255) null,
firstpartycookies_accountname varchar(255) null,
firstpartycookies_accountguid varchar(255) null,
firstpartycookies_km_ni varchar(255) null,
firstpartycookies_km_ai varchar(255) null,
EventId varchar(255) null,
udo_tealium_timestamp_epoch varchar(255) null,
udo_ut_session_id varchar(255) null,
pageurl_domain varchar(255) null,
--dom_referrer varchar(255) null, --the values for this key sometimes exceed 255 characters. I have commented it out for now
udo_page_type varchar(255) null,
udo_page_name varchar(255) null,
udo_ut_event varchar(255) null,
udo_km_api_key varchar(255) null,
constraint PK_TealiumEvents6 primary key (TealiumEventId)
)
#
#use this to format epoch time as datetime 
#DATEADD(ss, CAST (udo_tealium_timestamp_epoch AS int), '19700101') as new_date
#
#'''

以上是关于python 自动Tealium ETL的主要内容,如果未能解决你的问题,请参考以下文章

Tealium 分析

Tealium 分析

markdown 优化Tealium片段

强大的ETL工具fme和python结合实现不动产登记确权项目入库扫描件自动分类归档

[Analytics] Add Tealium debugger in Chrome

在构建到 Android 应用程序后,用于 tealium 的 utag 被转换为 ionic 3 中的 file://