python (非自动化)Tealium ETL
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python (非自动化)Tealium ETL相关的知识,希望对你有一定的参考价值。
# -*- coding: utf-8 -*-
"""
Created on Wed Apr 10 13:57:50 2019
@author: AMaass
"""
#required modules to execute script
import json
import pandas as pd
import glob
import pyodbc
import os
import os.path
from os import path
import gzip
import shutil
import ntpath
from pandas.io.json import json_normalize
from datetime import datetime #, timedelta #may be needed later...
#This creates a timestamp for Naming files after they are processed
now = datetime.now()
now = now.replace(hour=0, minute=0, second=0, microsecond=0)
#This unzips the .gz files that come from Tealium's S3 bucket (currently accessed through cyberduck)
source_dir = r'C:\Users\amaass\Documents\FilesFromTealium'
dest_dir = r'C:\Users\amaass\Documents\Informatica'
for src_name in glob.glob(os.path.join(source_dir, '*.gz')):
base = os.path.basename(src_name)
dest_name = os.path.join(dest_dir, base[:-3])
if not os.path.exists(dest_name):
with gzip.open(src_name, 'rb') as infile, open(dest_name, 'wb') as outfile:
try:
for line in infile:
outfile.write(line)
except EOFError:
print("End of file error occurred.")
except Exception:
print("Some error occurred.")
#identifies all files in directory
for i in glob.glob(r'C:\Users\amaass\Documents\Informatica\*'):
with open(i, encoding='utf8') as f:
with open(r'C:\Users\amaass\Documents\Informatica\AAA_JSON_SCHEMA_DO_NOT_DELETE.txt', encoding='utf8') as j: #this appends the json schema to the beginning of the file so each file is read properly
data = f.read() + j.read()
#this gives the name to the file and its eventual destination, and checks to see if it has successfully been processed in the past
filename = ntpath.basename(i)
base_filepath = r'C:\Users\amaass\Documents\Informatica' + '\\' + filename
success_filepath = r'C:\Users\amaass\Documents\Python ETLs\Successful_Tealium_Files_Archive' +'\\' + filename
failure_filepath = r'C:\Users\amaass\Documents\Python ETLs\Unsuccessful_Tealium_Files_Archive' +'\\' + filename + '.txt'
success_dir = path.exists(success_filepath)
failure_dir = path.exists(failure_filepath)
print(filename + " : " + " success_dir is ")
print(success_dir)
print(filename + " : " + " failure_dir is ")
print(failure_dir)
if not success_dir and not failure_dir:
try:
#Creates empty dataframe that data will be dumped into
df_ = pd.DataFrame(index = [])
#structures data
data = "[" + data.replace('}', '}, ', data.count('}')-1) + "]"
#reformats data as JSON
json_data = json.loads(data)
#normalizes data to account for varying data structures and dumps into dataframe
df1 = json_normalize(json_data)
#dumps each individual dataframe into master dataframe to push to sql
df_ = df_.append(df1)
#Selects desired columns from master dataframe
df_push = pd.concat([df_['visitorid'].astype('object'),
df_['eventid'].astype('object'),
df_['useragent'].astype('object'),
df_['pageurl_full_url'].astype('object'),
df_['pageurl_query_params_device_os_version'].astype('object'),
df_['pageurl_query_params_platform'].astype('object'),
df_['pageurl_domain'].astype('object'),
df_['pageurl_path'].astype('object'),
df_['udo_app_version'].astype('object'),
df_['udo_link_id'].astype('object'),
df_['udo_tealium_timestamp_epoch'].astype('object'),
df_['udo_ut_event'].astype('object'),
df_['udo_tealium_library_name'].astype('object'),
df_['udo_tealium_environment'].astype('object'),
df_['udo_page_type'].astype('object'),
df_['udo_page_name'].astype('object'),
df_['udo_km_api_key'].astype('object'),
df_['udo_event_name'].astype('object'),
df_['udo_device'].astype('object'),
df_['udo_orientation'].astype('object'),
df_['udo_origin'].astype('object'),
df_['udo_call_type'].astype('object'),
df_['udo_app_name'].astype('object'),
df_['udo_customer_first_name'].astype('object'),
df_['udo_customer_last_name'].astype('object'),
df_['firstpartycookies_accountname'].astype('object').str.replace('"',''),
df_['firstpartycookies_accountguid'].astype('object').str.replace('"',''),
df_['firstpartycookies_userguid'].astype('object').str.replace('"',''),
df_['firstpartycookies_name'].astype('object').str.replace('"',''),
df_['firstpartycookies_utag_main_ses_id'].astype('object'),
df_['firstpartycookies_km_ni'].astype('object'),
df_['firstpartycookies_km_ai'].astype('object'),
df_['firstpartycookies_state'].astype('object'),
df_['referrerurl_full_url'].astype('object')],
axis=1)
#python automatically makes any NaN a float type, which cannot be iterated through. This replaces NaN with None, which is recognized as NULL by SQL
df_nan = df_push.where((pd.notnull(df_push)), None)
#converts dataframe to list of lists
reformat = df_nan.values.tolist()
#This block of code confirms connection to write to InformaticaTest database in SQL server
cnxn = pyodbc.connect(
'Driver={SQL Server Native Client 11.0};'
'Server=Lab;'
'Database=InformaticaTest;'
'Trusted_Connection=yes;')
cursor = cnxn.cursor()
#This is the write statement to insert data into the table
# LEFT(RTRIM(?), 250) is used for keys that may have a value exceeding 255 characters
stmt = '''INSERT INTO InformaticaTest.web.TealiumEvents8 (
visitorid,
eventid,
useragent,
pageurl_full_url,
pageurl_query_params_device_os_version,
pageurl_query_params_platform,
pageurl_domain,
pageurl_path,
udo_app_version,
udo_link_id,
udo_tealium_timestamp_epoch,
udo_ut_event,
udo_tealium_library_name,
udo_tealium_environment,
udo_page_type,
udo_page_name,
udo_km_api_key,
udo_event_name,
udo_device,
udo_orientation,
udo_origin,
udo_call_type,
udo_app_name,
udo_customer_first_name,
udo_customer_last_name,
firstpartycookies_accountname,
firstpartycookies_accountguid,
firstpartycookies_userguid,
firstpartycookies_name,
firstpartycookies_utag_main_ses_id,
firstpartycookies_km_ni,
firstpartycookies_km_ai
,firstpartycookies_state
,referrerurl_full_url
)
VALUES (
?,
?,
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
?,
LEFT(RTRIM(?), 250),
?,
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
?,
?,
?,
?,
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
?,
?,
?,
?,
?,
?,
LEFT(RTRIM(?), 250),
?,
?,
?,
?,
?,
?,
LEFT(RTRIM(?), 250),
cast(? as nvarchar(max))
)
'''
#Number of question marks must match the number of keys above
#Use this in place of left(rtrim(?), 250) for fields that throw an error
#cast(? as nvarchar(max))
print('step 1: transformed data')
try:
#pushes data into sql based on the above statement. Needs to be in a try statement because if it fails the script goes right to the parent exception, which means the sql connection does not close. This try/except block addresses this.
cursor.executemany(stmt, reformat)
except:
cnxn.commit()
cnxn.close()
print('step 2: performed cursor.executemany')
cnxn.commit()
print('step 3: performed cnxn.commit')
cnxn.close()
print('step 4: performed cnxn.close()')
print(filename + ' ingested successfully!')
if filename not in 'AAA_JSON_SCHEMA_DO_NOT_DELETE.txt':
shutil.move(i, r'C:\Users\amaass\Documents\Python ETLs\Successful_Tealium_Files_Archive\\' + filename)
except:
#This block moves the file that threw an error to a separate directory so it can be handled later
shutil.move(i, r'C:\Users\amaass\Documents\Python ETLs\Unsuccessful_Tealium_Files_Archive\\' + filename + '.txt')
print(filename)
print('had issues')
else:
os.remove(base_filepath)
print('')
print('')
print('Great job Aaron!')
print('')
#SQL Create Staging Table Informatica.web.TealiumEvents script below
#
'''
create table InformaticaTest.web.TealiumEvents8 (
TealiumEventId int identity(1,1) not null,
VisitorId varchar(255) null,
firstpartycookies_accountname varchar(255) null,
firstpartycookies_accountguid varchar(255) null,
firstpartycookies_km_ni varchar(255) null,
firstpartycookies_km_ai varchar(255) null,
EventId varchar(255) null,
udo_tealium_timestamp_epoch varchar(255) null,
udo_ut_session_id varchar(255) null,
pageurl_domain varchar(255) null,
--dom_referrer varchar(255) null, --the values for this key sometimes exceed 255 characters. I have commented it out for now
udo_page_type varchar(255) null,
udo_page_name varchar(255) null,
udo_ut_event varchar(255) null,
udo_km_api_key varchar(255) null,
constraint PK_TealiumEvents6 primary key (TealiumEventId)
)
#
#use this to format epoch time as datetime
#DATEADD(ss, CAST (udo_tealium_timestamp_epoch AS int), '19700101') as new_date
#
#'''
以上是关于python (非自动化)Tealium ETL的主要内容,如果未能解决你的问题,请参考以下文章
[Analytics] Add Tealium debugger in Chrome
强大的ETL工具fme和python结合实现不动产登记确权项目入库扫描件自动分类归档
在构建到 Android 应用程序后,用于 tealium 的 utag 被转换为 ionic 3 中的 file://