!pip install faker
!pip install pandera
!pip install great_expectations==0.15.50
from faker import Faker
import pandas as pd
import random
import numpy as np
from scipy import stats
from scipy.stats import shapiro
import pandera as pa
from pandera.typing import Index, DataFrame, Series
from pandera import Column, DataFrameSchema, Check, Hypothesis
import great_expectations as gx
fake = Faker('pt_BR')
num_linhas = 10
dados = {
'Nome': [fake.name() for _ in range(num_linhas)],
'Idade': [fake.random_int(min=18, max=80) if random.random() > 0.05 else np.nan for _ in range(num_linhas)],
'Email': [fake.email() for _ in range(num_linhas)],
'Endereço': [fake.address() for _ in range(num_linhas)],
'Telefone': [fake.phone_number() for _ in range(num_linhas)],
'Profissão': [fake.job() for _ in range(num_linhas)],
'Data de Nascimento': [fake.date_of_birth() for _ in range(num_linhas)],
'Cidade': [fake.city() for _ in range(num_linhas)],
'Estado': [fake.state_abbr() if random.random() > 0.05 else np.nan for _ in range(num_linhas)],
'Salário': [fake.random_int(min=30000, max=120000) if random.random() > 0.05 else np.nan for _ in range(num_linhas)]
}
#df = pd.DataFrame(dados)
#df.to_csv('dados.csv', index=False)
df = pd.read_csv('dados.csv')
df.head()
df_titanic = pd.read_csv("Titanic.csv")
df_titanic.head()
def validate_null_values(column, column_name):
if column.isnull().any():
print(f"The column '{column_name}' contains null values.")
else:
print(f"The column '{column_name}' does not contain null values.")
def validate_binary_values(column, column_name):
if set(column.unique()) == {0, 1}:
print(f"The column '{column_name}' contains only 0 or 1 values.")
else:
print(f"The column '{column_name}' contains values other than 0 or 1.")
def validate_class_values(column, column_name):
if set(column.unique()) == {1, 2, 3}:
print(f"The column '{column_name}' contains only 1, 2, or 3 values.")
else:
print(f"The column '{column_name}' contains values other than 1, 2, or 3.")
validate_null_values(df_titanic["Age"], "Age")
validate_binary_values(df_titanic["Survived"], "Survived")
validate_class_values(df_titanic["Pclass"], "Pclass")
validate_binary_values(df_titanic["Sex"], "Sex")
validate_null_values(df_titanic["Fare"], "Fare")
df_titanic['Age'].isnull().sum()
df_titanic['Age'].fillna(df_titanic.groupby('Pclass')['Age'].transform('mean'), inplace=True)
df_titanic['Age'].isnull().sum()
df
schema = pa.DataFrameSchema({
"Nome": pa.Column(pa.String, nullable=False, checks=pa.Check(lambda s: len(s) >= 3, element_wise=True)),
"Idade": pa.Column(pa.Int, nullable=False, checks=pa.Check.greater_than_or_equal_to(18)),
"Email": pa.Column(pa.String, nullable=False, checks=pa.Check.str_matches(r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$')),
"Endereço": pa.Column(pa.String, nullable=False, checks=pa.Check.str_length(4)),
"Telefone": pa.Column(pa.String, nullable=False, checks=pa.Check.str_matches(r'^\+(55)\s?[1-9]{2}\s?9?[6-9][0-9]{3,4}\-[0-9]{4}$')),
"Profissão": pa.Column(pa.String, nullable=False, checks=pa.Check.str_length(3)),
"Data de Nascimento": pa.Column(pa.String, nullable=False, checks=pa.Check.str_matches(r'^\d{4}-\d{2}-\d{2}$')),
"Cidade": pa.Column(pa.String, nullable=False, checks=pa.Check.str_length(4)),
"Estado": pa.Column(pa.String, nullable=True, checks=pa.Check.str_length(2)),
"Salário": pa.Column(pa.Int, nullable=False, checks=pa.Check.greater_than_or_equal_to(30000))
})
try:
validated_df = schema(df, lazy=True)
print("Todos os testes passaram!")
except pa.errors.SchemaErrors as err:
print(f"Alguns testes não passaram:\n{err}")
df.isnull().sum()
#Tem valor nulo no estado, mas não é possível preenche-lo com base na Cidade, porque é um valor fake, só restou excluir a linha
df.dropna(subset=['Estado'], inplace=True)
#Preenchendo idades nulas com a média
df['Idade'].fillna(round(df['Idade'].mean()), inplace=True)
df.isnull().sum()
df
data_tmp = {
'age': [15, 20, 25, 30, 35, 40, 45],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace'],
'category': ['A', 'B', 'A', 'A', 'B', 'A', 'B']
}
df_tmp = pd.DataFrame(data_tmp)
def ttest(array1, array2):
return stats.ttest_ind(array1, array2)
def null_relationship(stat, pvalue, alpha=0.01):
return pvalue / 2 >= alpha
hip_schema_tmp = DataFrameSchema({
"age": Column(
int, [
Hypothesis(
test=ttest,
samples=['A', 'B'],
groupby="category",
relationship=null_relationship,
relationship_kwargs={"alpha": 0.05}
)
]),
"category": Column(str, checks=Check.isin(["A", "B"]))
})
try:
hip_schema_tmp.validate(df_tmp, lazy=True)
print("Valid dataframe :)")
except pa.errors.SchemaErrors as e:
print(e)
df['Idade'] = df['Idade'].astype(int)
def ttest(array):
return shapiro(array)
hip_schema = DataFrameSchema({
"Idade": Column(
int, [
Hypothesis(
test=ttest,
relationship_kwargs={"alpha": 0.05}
)
]),
})
try:
hip_schema.validate(df, lazy=True)
print("Valid dataframe :)")
except pa.errors.SchemaErrors as e:
print(e)
!great_expectations --yes --v3-api init
df = pd.read_csv('dados.csv')
df.head(10)
df['Data de Nascimento'] = pd.to_datetime(df['Data de Nascimento'])
df.dtypes
# Criando contexto
context = gx.get_context()
# Adicionando datasource com configurações para leitura de um dataframe pandas em memória
datasource_config = {
"name": "random_datasource",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "PandasExecutionEngine",
},
"data_connectors": {
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
},
}
context.add_or_update_datasource(**datasource_config)
df_gx = gx.from_pandas(df)
for col in df_gx.columns:
if col != 'Idade' and col != 'Estado':
df_gx.expect_column_values_to_not_be_null(col)
df_gx.expect_column_values_to_be_in_type_list("Salário", ["int", "int64"])
df_gx.expect_column_values_to_be_between("Salário", 0, 1000000)
df_gx.expect_column_values_to_match_regex('Email', r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$')
df_gx.expect_column_values_to_be_in_type_list("Idade", ["float", "float64"])
df_gx.expect_column_values_to_be_between("Idade", 18, 80)
df_gx.expect_column_max_to_be_between(column='Data de Nascimento', min_value=None, max_value='2023-12-31');
q25 = df['Idade'].quantile(0.25)
q75 = df['Idade'].quantile(0.75)
iqr = q75 - q25
lower_limit = q25 - 1.5 * iqr
upper_limit = q75 + 1.5 * iqr
df_gx.expect_column_values_to_be_between('Idade', min_value=lower_limit, max_value=upper_limit);
# Salvando para reuso
df_gx.save_expectation_suite("./expectation_suite.json")
# Verificando todas as expectations configuradas
df_gx.get_expectations_config()
# Adicionando expectation suite criada anteriormente nas questes 13, 14 e 15 no contexto
context.add_or_update_expectation_suite(expectation_suite=df_gx.get_expectation_suite())
# É possível validar tudo sem criar contexto e checkpoint. (Apenas com o df_gx)
df_gx.validate(expectation_suite=df_gx.get_expectation_suite())
# Para usar o checkpoint, necessário cria-lo com a configuração do datasource criado anteriormente
checkpoint_config = {
"name": "my_checkpoint",
"config_version": 1,
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": {
"datasource_name": "random_datasource",
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": "random_data",
},
"expectation_suite_name": "default",
}
],
}
checkpoint = context.add_or_update_checkpoint(**checkpoint_config)
# Rodando checkpoint com o dataframe em memória
checkpoint_result = context.run_checkpoint(
checkpoint_name="my_checkpoint",
batch_request={
"runtime_parameters": {"batch_data": df_gx},
"batch_identifiers": {
"default_identifier_name": "foo"
},
},
)
# Exibindo resultados. (Tem demorado um pouco no deepnote)
checkpoint_result
#context.view_validation_result(checkpoint_result) não existe na versão 0.15.50