import itertools
import pandas as pd
import upsetplot
try:
del pd.DataFrame.missing
except AttributeError:
pass
@pd.api.extensions.register_dataframe_accessor("missing")
class MissingMethods:
def __init__(self, pandas_obj):
self._obj = pandas_obj
def number_missing(self) -> int:
return self._obj.isna().sum().sum()
def number_complete(self) -> int:
return self._obj.size - self._obj.missing.number_missing()
def missing_variable_summary(self) -> pd.DataFrame:
return self._obj.isnull().pipe(
lambda df_1: (
df_1.sum()
.reset_index(name="n_missing")
.rename(columns={"index": "variable"})
.assign(
n_cases=len(df_1),
pct_missing=lambda df_2: df_2.n_missing / df_2.n_cases * 100,
)
)
)
def missing_case_summary(self) -> pd.DataFrame:
return self._obj.assign(
case=lambda df: df.index,
n_missing=lambda df: df.apply(
axis="columns", func=lambda row: row.isna().sum()
),
pct_missing=lambda df: df["n_missing"] / df.shape[1] * 100,
)[["case", "n_missing", "pct_missing"]]
def missing_variable_table(self) -> pd.DataFrame:
return (
self._obj.missing.missing_variable_summary()
.value_counts("n_missing")
.reset_index()
.rename(columns={"n_missing": "n_missing_in_variable", 0: "n_variables"})
.assign(
pct_variables=lambda df: df.n_variables / df.n_variables.sum() * 100
)
.sort_values("pct_variables", ascending=False)
)
def missing_case_table(self) -> pd.DataFrame():
return (
self._obj.missing.missing_case_summary()
.value_counts("n_missing")
.reset_index()
.rename(columns={"n_missing": "n_missing_in_case", 0: "n_cases"})
.assign(pct_case=lambda df: df.n_cases / df.n_cases.sum() * 100)
.sort_values("pct_case", ascending=False)
)
def missing_variable_span(self, variable: str, span_every: int) -> pd.DataFrame:
return (
self._obj.assign(
span_counter=lambda df: (
np.repeat(a=range(df.shape[0]), repeats=span_every)[: df.shape[0]]
)
)
.groupby("span_counter")
.aggregate(
n_in_span=(variable, "size"),
n_missing=(variable, lambda s: s.isnull().sum()),
)
.assign(
n_complete=lambda df: df.n_in_span - df.n_missing,
pct_missing=lambda df: df.n_missing / df.n_in_span * 100,
pct_complete=lambda df: 100 - df.pct_missing,
)
.drop(columns=["n_in_span"])
.reset_index()
)
def missing_variable_run(self, variable) -> pd.DataFrame:
rle_list = self._obj[variable].pipe(
lambda s: [[len(list(g)), k] for k, g in itertools.groupby(s.isnull())]
)
return pd.DataFrame(data=rle_list, columns=["run_length", "is_na"]).replace(
{False: "complete", True: "missing"}
)
def sort_variables_by_missingness(self, ascending = False):
return (
self._obj
.pipe(
lambda df: (
df[df.isna().sum().sort_values(ascending = ascending).index]
)
)
)
def create_shadow_matrix(
self,
true_string: str = "Missing",
false_string: str = "Not Missing",
only_missing: bool = False,
) -> pd.DataFrame:
return (
self._obj
.isna()
.pipe(lambda df: df[df.columns[df.any()]] if only_missing else df)
.replace({False: false_string, True: true_string})
.add_suffix("_NA")
)
def bind_shadow_matrix(
self,
true_string: str = "Missing",
false_string: str = "Not Missing",
only_missing: bool = False,
) -> pd.DataFrame:
return pd.concat(
objs=[
self._obj,
self._obj.missing.create_shadow_matrix(
true_string=true_string,
false_string=false_string,
only_missing=only_missing
)
],
axis="columns"
)
def missing_scan_count(self, search) -> pd.DataFrame:
return (
self._obj.apply(axis="rows", func=lambda column: column.isin(search))
.sum()
.reset_index()
.rename(columns={"index": "variable", 0: "n"})
.assign(original_type=self._obj.dtypes.reset_index()[0])
)
# Plotting functions ---
def missing_variable_plot(self):
df = self._obj.missing.missing_variable_summary().sort_values("n_missing")
plot_range = range(1, len(df.index) + 1)
plt.hlines(y=plot_range, xmin=0, xmax=df.n_missing, color="black")
plt.plot(df.n_missing, plot_range, "o", color="black")
plt.yticks(plot_range, df.variable)
plt.grid(axis="y")
plt.xlabel("Number missing")
plt.ylabel("Variable")
def missing_case_plot(self):
df = self._obj.missing.missing_case_summary()
sns.displot(data=df, x="n_missing", binwidth=1, color="black")
plt.grid(axis="x")
plt.xlabel("Number of missings in case")
plt.ylabel("Number of cases")
def missing_variable_span_plot(
self, variable: str, span_every: int, rot: int = 0, figsize=None
):
(
self._obj.missing.missing_variable_span(
variable=variable, span_every=span_every
).plot.bar(
x="span_counter",
y=["pct_missing", "pct_complete"],
stacked=True,
width=1,
color=["black", "lightgray"],
rot=rot,
figsize=figsize,
)
)
plt.xlabel("Span number")
plt.ylabel("Percentage missing")
plt.legend(["Missing", "Present"])
plt.title(
f"Percentage of missing values\nOver a repeating span of { span_every } ",
loc="left",
)
plt.grid(False)
plt.margins(0)
plt.tight_layout(pad=0)
def missing_upsetplot(self, variables: list[str] = None, **kwargs):
if variables is None:
variables = self._obj.columns.tolist()
return (
self._obj.isna()
.value_counts(variables)
.pipe(lambda df: upsetplot.plot(df, **kwargs))
)