import Europart
import LKWTeile24
import Winkler
from time import perf_counter
from selenium import webdriver
import math
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
# Initializing a Chrome WebDriver.
def initialize_driver():
# Implicit wait is set to 10 seconds.
# If a website does not load until then, no results will be retrieved.
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome('chromedriver', options=options)
driver.implicitly_wait(10)
return driver
def read_from_xlsx(file_directory):
# For uniformity reasons PNs are converted to string.
# This is required later to compare the search-term with the resulted PN.
return pd.read_excel(file_directory, header=None)[0].apply(str).to_list()
def scrape_all_records(driver, pn, keep):
tic = perf_counter()
records = Europart.scrape_records(driver, pn, keep), \
Winkler.scrape_records(driver, pn, keep), \
LKWTeile24.scrape_records(driver, pn, keep)
toc = perf_counter()
print(f"PN: {pn} - {toc - tic:0.2f} seconds")
print(records)
return records
# Here, all the functions and files for webscraping are put into action.
def web_scraping(file_directory, keep):
driver = initialize_driver()
login(driver)
# Lists for the results of each website + a list to record the relevant vehicle types
ep_results, wk_results, lkw_results, model_types = ([] for i in range(4))
pn_list = read_from_xlsx(file_directory)
# The scraping is handled in each corresponding python file.
for pn in pn_list:
ep_records, wk_records, (lkw_records, models) = scrape_all_records(driver, pn, keep)
if ep_records:
ep_results.extend(ep_records)
else:
ep_results.append(('Europart', pn, 'No results found', None, None, None))
if wk_records:
wk_results.extend(wk_records)
else:
wk_results.append(('Winkler', pn, 'No results found', None, None, None))
if lkw_records:
lkw_results.extend(lkw_records)
else:
lkw_results.append(('LKW-Teile 24', pn, 'No results found', None, None))
if models:
model_types.append(models)
driver.quit()
# The results are saved in separate DataFrames, as some websites provide more/less information.
ep_df = pd.DataFrame(data=ep_results, columns=['Anbieter', 'Teilenummer', 'Artikelname', 'Beschreibung', 'Marke', 'Nettopreis'])
lkw_df = pd.DataFrame(data=lkw_results, columns=['Anbieter', 'Teilenummer', 'Artikelname', 'Marke', 'Nettopreis'])
wk_df = pd.DataFrame(data=wk_results, columns=['Anbieter', 'Teilenummer', 'Artikelname', 'Marke', 'Nettopreis', 'Check'])
model_df = pd.DataFrame(data=model_types, columns=['Teilenummer', 'Passend fΓΌr'])
return [ep_df, wk_df, lkw_df], model_df
# Combining all three steps of ETL into one function
def main(file_directory='Input1.xlsx', keep=0):
### EXTRACT ###
# The results are concatenated, but the DataFrame with the vehicle model types is kept separated.
record_dfs, model_df = web_scraping(file_directory, keep)
df = pd.concat(record_dfs).reset_index(drop=True)
### TRANSFORM ###
# Deduplicating entries for PNs without any results, and putting them at the end (in 2 steps)
# 1. Creating a DataFrame with all PNs for which no results have been found on any website.
empties = df.groupby('Teilenummer')['Nettopreis'].count().reset_index()
empties = empties[empties.Nettopreis == 0]
empties['Artikelname'] = 'No results found'
empties.drop(columns=['Nettopreis'], inplace=True)
df.sort_values(by=['Teilenummer', 'Anbieter'], inplace=True)
# 2. Delete all entries without 'No results found' and append them back on the bottom.
df = df[df.Artikelname != "No results found"]
df = df.append(empties, ignore_index=True)
# Left join with vehicle types and formatting.
df = df.merge(model_df, on='Teilenummer', how='left')
df['Marke'] = df.Marke.str.title()
### LOAD ###
# Save to Excel but also return DataFrame for further analysis on this notebook
df.to_excel('Output.xlsx', sheet_name='Final', index=False)
return df
# The file can be changed through the input cell further above.
file_directory = f'InputFiles/{chooseFile}'
print('PNs:\n', dict((i,j) for i,j in enumerate(read_from_xlsx(file_directory))), '\n')
df = main(file_directory)
df.head(3)