"""
Contains functions to help transform columns data containing complex types,
like lists or dictionaries.
"""
from functools import reduce
from itertools import zip_longest
import numpy as np
import pandas as pd
[docs]def expand_list(dataframe, column, new_column=None):
"""
Expands lists to new rows.
.. code-block:: python
>>> df = DataFrame({
... 'trial_num': [1, 2, 3, 1, 2, 3],
... 'subject': [1, 1, 1, 2, 2, 2],
... 'samples': [
... [1, 2, 3, 4],
... [1, 2, 3],
... [1, 2],
... [1],
... [],
... None,
... ]
... })
>>> df.pipe(expand_list, 'samples', new_column='sample_id').head(7)
trial_num subject sample_id
0 1 1 1
0 1 1 2
0 1 1 3
0 1 1 4
1 2 1 1
1 2 1 2
1 2 1 3
.. warning::
Between calls of ``expand_list`` and/or ``expand_lists``, the dataframe index
duplications must be removed, otherwise plenty of duplications will occur.
.. warning::
Calling ``expand_list`` on multiple columns might cause data duplications,
that shall be handled.
:param dataframe: The DataFrame object to work on.
:type dataframe: :class:`DataFrame <pandas.DataFrame>`
:param column: The name of the column which should be extracted.
:type column: :class: str
:param new_column: Name of the new columns. If not defined, columns will not be renamed.
:type new_column: :class: str
:returns: The expanded DataFrame
:rtype: :class:`DataFrame <pandas.DataFrame>`
"""
new_column = new_column or column
values, indices = [], []
for index, value in dataframe[column].items():
if value and not isinstance(value, float):
values.extend(value)
indices.extend([index, ] * len(value))
if indices and isinstance(indices[0], tuple):
indices = pd.MultiIndex.from_tuples(indices, names=dataframe.index.names)
else:
indices = pd.Series(indices, name=dataframe.index.name)
return pd.DataFrame({new_column: values}, index=indices).\
merge(dataframe.drop(column, axis=1), left_index=True, right_index=True, how='outer')
[docs]def expand_lists(dataframe, columns, new_columns=None):
"""
Expands multiple lists to new rows. Pairs elements of lists respective to their index.
Pads with ``None`` to the longest list.
.. code-block:: python
>>> df = DataFrame({
... 'trial_num': [1, 2, 3, 1, 2, 3],
... 'subject': [1, 1, 1, 2, 2, 2],
... 'samples': [
... [1, 2, 3, 4],
... [1, 2, 3],
... [1, 2],
... [1],
... [],
... None,
... ],
... 'samples2': [
... [1, 2],
... [1, 2, 3],
... [1, 2],
... [1],
... [],
... None,
... ]
... })
>>> df.pipe(
... expand_lists, ['samples', 'samples'], new_column=['sample_id', 'sample_id2']
... ).head(7)
trial_num subject sample_id sample_id2
0 1 1 1 1
0 1 1 2 2
0 1 1 3 Nan
0 1 1 4 Nan
1 2 1 1 1
1 2 1 2 2
1 2 1 3 3
.. warning::
Between calls of ``expand_list`` and/or ``expand_lists``, the dataframe index
duplications must be removed, otherwise plenty of duplications will occur.
.. warning::
Calling ``expand_lists`` on multiple columns might cause data duplications,
that shall be handled.
:param dataframe: The DataFrame object to work on.
:type dataframe: :class:`DataFrame <pandas.DataFrame>`
:param columns: The name of the columns which should be extracted.
:type columns: :class: list or :class: tuple of :class: str
:param new_columns: Name of the new columns. If not defined, columns will not be renamed.
:type new_columns: :class: list or :class: tuple of :class: str
:returns: The expanded DataFrame
:rtype: :class:`DataFrame <pandas.DataFrame>`
"""
new_columns = new_columns or columns
if not len(columns) == len(new_columns):
raise ValueError('new_columns must contain the same amount of items as columns')
if len(columns) == 1:
return expand_list(dataframe, *columns, *new_columns)
if not len(columns) > 1:
raise ValueError('columns argument must contain at least two items.')
values, indices = [], []
for index, row in dataframe[columns].iterrows():
if not row.empty and all(row.notna()):
values.extend(zip_longest(*row))
indices.extend([index, ] * max(map(len, row)))
if indices and isinstance(indices[0], tuple):
indices = pd.MultiIndex.from_tuples(indices, names=dataframe.index.names)
else:
indices = pd.Series(indices, name=dataframe.index.name)
return pd.DataFrame(values, columns=new_columns, index=indices).fillna(np.nan).\
merge(dataframe.drop(columns, axis=1), left_index=True, right_index=True, how='outer')
[docs]def merge_columns(dataframe, col_header_list, new_column_name, keep=None, aggr=None):
"""
Add a new column or modify an existing one in *dataframe* called *new_column_name* by
iterating over the rows and select the proper notnull element from the values of
*col_header_list* columns in the given row if *keep* is filled OR call the *aggr*
function with the values of *col_header_list*. Only one of (*keep*, *aggr*) can be filled.
:param dataframe: the pandas.DataFrame object to modify
:param col_header_list: list of the names of the headers to merge
:param str new_column_name: the name of the new column, if it already exists the operation
will overwrite it
:param str keep: Specify whether the first or the last proper value is needed.
values: *first* and *last* as string.
:param aggr: Callable function which will get the values of *col_header_list* as parameter.
The return value of this function will be the value in *new_column_name*
:returns: The merged DataFrame
:rtype: :class:`DataFrame <pandas.DataFrame>`
"""
if keep and aggr:
raise ValueError(
'Parameter keep and aggr can not be handled at the same time. Use only one.'
)
old_columns = [x for x in col_header_list if x in list(dataframe)]
if not old_columns:
raise ValueError(
f'None of the following columns were found: {", ".join(col_header_list)}'
)
if keep:
if keep not in ('first', 'last'):
raise ValueError('Improper value for parameter keep. Possible values: first, last.')
first_valid = lambda x, y: y if pd.isnull(x) else x
if keep.startswith('f'):
aggr = lambda x: reduce(first_valid, x.tolist())
else:
aggr = lambda x: reduce(first_valid, x.tolist()[::-1])
if not callable(aggr):
raise ValueError('Improper value for parameter aggr. It should be a function.')
dataframe[new_column_name] = dataframe[old_columns].apply(aggr, axis=1)
return dataframe
[docs]def concatenate_columns(dataframe, columns, new_column, descriptor=None, mapper=None):
"""
Concatenates `columns` together along the indeces and adds a `descriptor` column,
if specified, with the column name where the data originates from.
.. code-block:: python
>>> df = pd.DataFrame([
... {'key': 'TICKET-1', 'assignee': 'Bob', 'reporter': 'Alice'},
... {'key': 'TICKET-2', 'assignee': 'Bob', 'reporter': 'Alice'},
... {'key': 'TICKET-3', 'assignee': 'Bob', 'reporter': 'Alice'},
... ])
>>> df.pipe(concatenate_columns, ['assignee', 'reporter'], 'user')
key user descriptor
0 'TICKET-1' 'Alice' 'reporter'
0 'TICKET-1' 'Bob' 'assignee'
1 'TICKET-2' 'Alice' 'reporter'
1 'TICKET-2' 'Bob' 'assignee'
2 'TICKET-3' 'Alice' 'reporter'
2 'TICKET-3' 'Bob' 'assignee'
:param dataframe: The DataFrame object to work on.
:type dataframe: :class:`DataFrame <pandas.DataFrame>`
:param columns: The name of the columns which should be concatenated.
:type columns: :class: list
:param new_column: Name of the new column.
:type new_column: :class: str
:param descriptor: Name of the new descriptor column.
:type descriptor: :class: str
:param mapper: A map to apply to `descriptor` values
:type mapper: :class: dict
:returns: The concatenated DataFrame
:rtype: :class:`DataFrame <pandas.DataFrame>`
"""
if mapper is None:
mapper = {}
descriptor = descriptor or '_desc'
parts = (
pd.DataFrame(
data={
new_column: dataframe[col],
descriptor: [mapper.get(col, col) for _ in range(len(dataframe.index))]
},
index=dataframe.index
) for col in columns if col in dataframe
)
return pd.concat(list(parts)).drop('_desc', axis=1, errors='ignore').sort_index()