Skip to content

Timeseries

Time series-specific data cleaning functions.

fill_missing_timestamps(df, frequency, first_time_stamp=None, last_time_stamp=None)

Fills a DataFrame with missing timestamps based on a defined frequency.

If timestamps are missing, this function will re-index the DataFrame. If timestamps are not missing, then the function will return the DataFrame unmodified.

Functional usage example:

import pandas as pd
import janitor.timeseries

df = pd.DataFrame(...)

df = janitor.timeseries.fill_missing_timestamps(
    df=df,
    frequency="1H",
)

Method chaining example:

import pandas as pd
import janitor.timeseries

df = (
    pd.DataFrame(...)
    .fill_missing_timestamps(frequency="1H")
)

Parameters:

Name Type Description Default
df DataFrame

DataFrame which needs to be tested for missing timestamps

required
frequency str

sampling frequency of the data. Acceptable frequency strings are available here. Check offset aliases under time series in user guide

required
first_time_stamp Timestamp

timestamp expected to start from; defaults to None. If no input is provided, assumes the minimum value in time_series.

None
last_time_stamp Timestamp

timestamp expected to end with; defaults to None. If no input is provided, assumes the maximum value in time_series.

None

Returns:

Type Description
DataFrame

DataFrame that has a complete set of contiguous datetimes.

Source code in janitor/timeseries.py
@pf.register_dataframe_method
def fill_missing_timestamps(
    df: pd.DataFrame,
    frequency: str,
    first_time_stamp: pd.Timestamp = None,
    last_time_stamp: pd.Timestamp = None,
) -> pd.DataFrame:
    """
    Fills a DataFrame with missing timestamps based on a defined frequency.

    If timestamps are missing, this function will re-index the DataFrame.
    If timestamps are not missing, then the function will return the DataFrame
    unmodified.

    Functional usage example:

    ```python
    import pandas as pd
    import janitor.timeseries

    df = pd.DataFrame(...)

    df = janitor.timeseries.fill_missing_timestamps(
        df=df,
        frequency="1H",
    )
    ```

    Method chaining example:

    ```python
    import pandas as pd
    import janitor.timeseries

    df = (
        pd.DataFrame(...)
        .fill_missing_timestamps(frequency="1H")
    )
    ```

    :param df: DataFrame which needs to be tested for missing timestamps
    :param frequency: sampling frequency of the data.
        Acceptable frequency strings are available
        [here](https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases).
        Check offset aliases under time series in user guide
    :param first_time_stamp: timestamp expected to start from;
        defaults to `None`. If no input is provided, assumes the
        minimum value in `time_series`.
    :param last_time_stamp: timestamp expected to end with; defaults to `None`.
        If no input is provided, assumes the maximum value in `time_series`.
    :returns: DataFrame that has a complete set of contiguous datetimes.
    """
    # Check all the inputs are the correct data type
    check("frequency", frequency, [str])
    check("first_time_stamp", first_time_stamp, [pd.Timestamp, type(None)])
    check("last_time_stamp", last_time_stamp, [pd.Timestamp, type(None)])

    if first_time_stamp is None:
        first_time_stamp = df.index.min()
    if last_time_stamp is None:
        last_time_stamp = df.index.max()

    # Generate expected timestamps
    expected_timestamps = pd.date_range(
        start=first_time_stamp, end=last_time_stamp, freq=frequency
    )

    return df.reindex(expected_timestamps)

flag_jumps(df, scale='percentage', direction='any', threshold=0.0, strict=False)

Create boolean column(s) that flag whether or not the change between consecutive rows exceeds a provided threshold.

Functional usage example:

import pandas as pd
import janitor.timeseries

df = pd.DataFrame(...)

df = flag_jumps(
    df=df,
    scale="absolute",
    direction="any",
    threshold=2,
)

Method chaining example:

import pandas as pd
import janitor.timeseries

df = (
    pd.DatFrame(...)
    .flag_jumps(
        scale="absolute",
        direction="any",
        threshold=2,
    )
)

Detailed chaining examples:

# Applies specified criteria across all columns of the DataFrame
# Appends a flag column for each column in the DataFrame
df = (
    pd.DataFrame(...)
    .flag_jumps(
        scale="absolute",
        direction="any",
        threshold=2
    )
)

# Applies specific criteria to certain DataFrame columns
# Applies default criteria to columns not specifically listed
# Appends a flag column for each column in the DataFrame
df = (
    pd.DataFrame(...)
    .flag_jumps(
        scale=dict(col1="absolute", col2="percentage"),
        direction=dict(col1="increasing", col2="any"),
        threshold=dict(col1=1, col2=0.5),
    )
)

# Applies specific criteria to certain DataFrame columns
# Applies default criteria to columns not specifically listed
# Appends a flag column for each column in the DataFrame
df = (
    pd.DataFrame(...)
    .flag_jumps(
        scale=dict(col1="absolute"),
        direction=dict(col2="increasing"),
    )
)

# Applies specific criteria to certain DataFrame columns
# Applies default criteria to columns not specifically listed
# Appends a flag column for only those columns found in
#   specified criteria
df = (
    pd.DataFrame(...)
    .flag_jumps(
        scale=dict(col1="absolute"),
        threshold=dict(col2=1),
        strict=True,
    )
)

Parameters:

Name Type Description Default
df DataFrame

DataFrame which needs to be flagged for changes between consecutive rows above a certain threshold.

required
scale Union[str, Dict[str, str]]

Type of scaling approach to use. Acceptable arguments are 'absolute' (consider the difference between rows) and 'percentage' (consider the percentage change between rows); defaults to 'percentage'.

'percentage'
direction Union[str, Dict[str, str]]

Type of method used to handle the sign change when comparing consecutive rows. Acceptable arguments are 'increasing' (only consider rows that are increasing in value), 'decreasing' (only consider rows that are decreasing in value), and 'any' (consider rows that are either increasing or decreasing; sign is ignored); defaults to 'any'.

'any'
threshold Union[int, float, Dict[str, Union[int, float]]]

The value to check if consecutive row comparisons exceed. Always uses a greater than comparison. Must be >= 0.0; defaults to 0.0.

0.0
strict bool

flag to enable/disable appending of a flag column for each column in the provided DataFrame. If set to True, will only append a flag column for those columns found in at least one of the input dictionaries. If set to False, will appen a flag column for each column found in the provided DataFrame. If criteria is not specified, the defaults for each criteria is used; defaults to False.

False

Returns:

Type Description
DataFrame

DataFrame that has flag jump columns.

Exceptions:

Type Description
JanitorError

if strict=True and at least one of scale, direction, or threshold inputs is not a dictionary.

JanitorError

if scale is not one of ("absolute", "percentage").

JanitorError

if direction is not one of ("increasing", "decreasing", "any").

JanitorError

if threshold is less than 0.0.

Source code in janitor/timeseries.py
@pf.register_dataframe_method
def flag_jumps(
    df: pd.DataFrame,
    scale: Union[str, Dict[str, str]] = "percentage",
    direction: Union[str, Dict[str, str]] = "any",
    threshold: Union[int, float, Dict[str, Union[int, float]]] = 0.0,
    strict: bool = False,
) -> pd.DataFrame:
    """
    Create boolean column(s) that flag whether or not the change
    between consecutive rows exceeds a provided threshold.

    Functional usage example:

    ```python
    import pandas as pd
    import janitor.timeseries

    df = pd.DataFrame(...)

    df = flag_jumps(
        df=df,
        scale="absolute",
        direction="any",
        threshold=2,
    )
    ```

    Method chaining example:

    ```python
    import pandas as pd
    import janitor.timeseries

    df = (
        pd.DatFrame(...)
        .flag_jumps(
            scale="absolute",
            direction="any",
            threshold=2,
        )
    )
    ```

    Detailed chaining examples:

    ```python
    # Applies specified criteria across all columns of the DataFrame
    # Appends a flag column for each column in the DataFrame
    df = (
        pd.DataFrame(...)
        .flag_jumps(
            scale="absolute",
            direction="any",
            threshold=2
        )
    )

    # Applies specific criteria to certain DataFrame columns
    # Applies default criteria to columns not specifically listed
    # Appends a flag column for each column in the DataFrame
    df = (
        pd.DataFrame(...)
        .flag_jumps(
            scale=dict(col1="absolute", col2="percentage"),
            direction=dict(col1="increasing", col2="any"),
            threshold=dict(col1=1, col2=0.5),
        )
    )

    # Applies specific criteria to certain DataFrame columns
    # Applies default criteria to columns not specifically listed
    # Appends a flag column for each column in the DataFrame
    df = (
        pd.DataFrame(...)
        .flag_jumps(
            scale=dict(col1="absolute"),
            direction=dict(col2="increasing"),
        )
    )

    # Applies specific criteria to certain DataFrame columns
    # Applies default criteria to columns not specifically listed
    # Appends a flag column for only those columns found in
    #   specified criteria
    df = (
        pd.DataFrame(...)
        .flag_jumps(
            scale=dict(col1="absolute"),
            threshold=dict(col2=1),
            strict=True,
        )
    )
    ```

    :param df: DataFrame which needs to be flagged for changes between
        consecutive rows above a certain threshold.
    :param scale: Type of scaling approach to use.
        Acceptable arguments are `'absolute'` (consider the difference
        between rows) and `'percentage'` (consider the percentage
        change between rows); defaults to `'percentage'`.
    :param direction: Type of method used to handle the sign change when
        comparing consecutive rows.
        Acceptable arguments are `'increasing'` (only consider rows
        that are increasing in value), `'decreasing'` (only consider
        rows that are decreasing in value), and `'any'` (consider rows
        that are either increasing or decreasing; sign is ignored);
        defaults to `'any'`.
    :param threshold: The value to check if consecutive row comparisons
        exceed. Always uses a greater than comparison. Must be `>= 0.0`;
        defaults to `0.0`.
    :param strict: flag to enable/disable appending of a flag column for
        each column in the provided DataFrame. If set to `True`, will
        only append a flag column for those columns found in at least
        one of the input dictionaries. If set to `False`, will appen
        a flag column for each column found in the provided DataFrame.
        If criteria is not specified, the defaults for each criteria
        is used; defaults to `False`.
    :returns: DataFrame that has `flag jump` columns.
    :raises JanitorError: if `strict=True` and at least one of
        `scale`, `direction`, or `threshold` inputs is not a
        dictionary.
    :raises JanitorError: if `scale` is not one of
        `("absolute", "percentage")`.
    :raises JanitorError: if `direction` is not one of
        `("increasing", "decreasing", "any")`.
    :raises JanitorError: if `threshold` is less than `0.0`.
    """
    df = df.copy()

    if strict:
        if (
            any(isinstance(arg, dict) for arg in (scale, direction, threshold))
            is False
        ):
            raise JanitorError(
                "When enacting 'strict=True', 'scale', 'direction', or "
                + "'threshold' must be a dictionary."
            )

        # Only append a flag col for the cols that appear
        # in at least one of the input dicts
        arg_keys = [
            arg.keys()
            for arg in (scale, direction, threshold)
            if isinstance(arg, dict)
        ]
        cols = set(itertools.chain.from_iterable(arg_keys))

    else:
        # Append a flag col for each col in the DataFrame
        cols = df.columns

    columns_to_add = {}
    for col in sorted(cols):

        # Allow arguments to be a mix of dict and single instances
        s = scale.get(col, "percentage") if isinstance(scale, dict) else scale
        d = (
            direction.get(col, "any")
            if isinstance(direction, dict)
            else direction
        )
        t = (
            threshold.get(col, 0.0)
            if isinstance(threshold, dict)
            else threshold
        )

        columns_to_add[f"{col}_jump_flag"] = _flag_jumps_single_col(
            df, col, scale=s, direction=d, threshold=t
        )

    df = df.assign(**columns_to_add)

    return df

sort_timestamps_monotonically(df, direction='increasing', strict=False)

Sort DataFrame such that index is monotonic.

If timestamps are monotonic, this function will return the DataFrame unmodified. If timestamps are not monotonic, then the function will sort the DataFrame.

Functional usage example:

import pandas as pd
import janitor.timeseries

df = pd.DataFrame(...)

df = janitor.timeseries.sort_timestamps_monotonically(
    direction="increasing"
)

Method chaining example:

import pandas as pd
import janitor.timeseries

df = (
    pd.DataFrame(...)
    .sort_timestamps_monotonically(direction="increasing")
)

Parameters:

Name Type Description Default
df DataFrame

DataFrame which needs to be tested for monotonicity.

required
direction str

type of monotonicity desired. Acceptable arguments are 'increasing' or 'decreasing'.

'increasing'
strict bool

flag to enable/disable strict monotonicity. If set to True, will remove duplicates in the index by retaining first occurrence of value in index. If set to False, will not test for duplicates in the index; defaults to False.

False

Returns:

Type Description
DataFrame

DataFrame that has monotonically increasing (or decreasing) timestamps.

Source code in janitor/timeseries.py
@pf.register_dataframe_method
def sort_timestamps_monotonically(
    df: pd.DataFrame, direction: str = "increasing", strict: bool = False
) -> pd.DataFrame:
    """
    Sort DataFrame such that index is monotonic.

    If timestamps are monotonic, this function will return
    the DataFrame unmodified. If timestamps are not monotonic,
    then the function will sort the DataFrame.

    Functional usage example:

    ```python
    import pandas as pd
    import janitor.timeseries

    df = pd.DataFrame(...)

    df = janitor.timeseries.sort_timestamps_monotonically(
        direction="increasing"
    )
    ```

    Method chaining example:

    ```python
    import pandas as pd
    import janitor.timeseries

    df = (
        pd.DataFrame(...)
        .sort_timestamps_monotonically(direction="increasing")
    )
    ```

    :param df: DataFrame which needs to be tested for monotonicity.
    :param direction: type of monotonicity desired.
        Acceptable arguments are `'increasing'` or `'decreasing'`.
    :param strict: flag to enable/disable strict monotonicity.
        If set to `True`, will remove duplicates in the index
        by retaining first occurrence of value in index.
        If set to `False`, will not test for duplicates in the index;
        defaults to `False`.
    :returns: DataFrame that has monotonically increasing
        (or decreasing) timestamps.
    """
    # Check all the inputs are the correct data type
    check("df", df, [pd.DataFrame])
    check("direction", direction, [str])
    check("strict", strict, [bool])

    # Remove duplicates if requested
    if strict:
        df = df[~df.index.duplicated(keep="first")]

    # Sort timestamps
    if direction == "increasing":
        df = df.sort_index()
    else:
        df = df.sort_index(ascending=False)

    # Return the DataFrame
    return df