Skip to content

Timeseries

Time series-specific data cleaning functions.

fill_missing_timestamps(df, frequency, first_time_stamp=None, last_time_stamp=None)

Fills a DataFrame with missing timestamps based on a defined frequency.

If timestamps are missing, this function will re-index the DataFrame. If timestamps are not missing, then the function will return the DataFrame unmodified.

Examples:

Functional usage

>>> import pandas as pd
>>> import janitor.timeseries
>>> df = janitor.timeseries.fill_missing_timestamps(
...     df=pd.DataFrame(...),
...     frequency="1H",
... )

Method chaining example:

>>> import pandas as pd
>>> import janitor.timeseries
>>> df = (
...     pd.DataFrame(...)
...     .fill_missing_timestamps(frequency="1H")
... )

Parameters:

Name Type Description Default
df DataFrame

DataFrame which needs to be tested for missing timestamps

required
frequency str

Sampling frequency of the data. Acceptable frequency strings are available here. Check offset aliases under time series in user guide

required
first_time_stamp Timestamp

Timestamp expected to start from; defaults to None. If no input is provided, assumes the minimum value in time_series.

None
last_time_stamp Timestamp

Timestamp expected to end with; defaults to None. If no input is provided, assumes the maximum value in time_series.

None

Returns:

Type Description
DataFrame

DataFrame that has a complete set of contiguous datetimes.

Source code in janitor/timeseries.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@pf.register_dataframe_method
def fill_missing_timestamps(
    df: pd.DataFrame,
    frequency: str,
    first_time_stamp: pd.Timestamp = None,
    last_time_stamp: pd.Timestamp = None,
) -> pd.DataFrame:
    """Fills a DataFrame with missing timestamps based on a defined frequency.

    If timestamps are missing, this function will re-index the DataFrame.
    If timestamps are not missing, then the function will return the DataFrame
    unmodified.

    Examples:
        Functional usage

        >>> import pandas as pd
        >>> import janitor.timeseries
        >>> df = janitor.timeseries.fill_missing_timestamps(
        ...     df=pd.DataFrame(...),
        ...     frequency="1H",
        ... )  # doctest: +SKIP

        Method chaining example:

        >>> import pandas as pd
        >>> import janitor.timeseries
        >>> df = (
        ...     pd.DataFrame(...)
        ...     .fill_missing_timestamps(frequency="1H")
        ... )  # doctest: +SKIP

    Args:
        df: DataFrame which needs to be tested for missing timestamps
        frequency: Sampling frequency of the data.
            Acceptable frequency strings are available
            [here](https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases).
            Check offset aliases under time series in user guide
        first_time_stamp: Timestamp expected to start from;
            defaults to `None`. If no input is provided, assumes the
            minimum value in `time_series`.
        last_time_stamp: Timestamp expected to end with; defaults to `None`.
            If no input is provided, assumes the maximum value in
            `time_series`.

    Returns:
        DataFrame that has a complete set of contiguous datetimes.
    """
    # Check all the inputs are the correct data type
    check("frequency", frequency, [str])
    check("first_time_stamp", first_time_stamp, [pd.Timestamp, type(None)])
    check("last_time_stamp", last_time_stamp, [pd.Timestamp, type(None)])

    if first_time_stamp is None:
        first_time_stamp = df.index.min()
    if last_time_stamp is None:
        last_time_stamp = df.index.max()

    # Generate expected timestamps
    expected_timestamps = pd.date_range(
        start=first_time_stamp, end=last_time_stamp, freq=frequency
    )

    return df.reindex(expected_timestamps)

flag_jumps(df, scale='percentage', direction='any', threshold=0.0, strict=False)

Create boolean column(s) that flag whether or not the change between consecutive rows exceeds a provided threshold.

Examples:

Applies specified criteria across all columns of the DataFrame
and appends a flag column for each column in the DataFrame

>>> df = (
...     pd.DataFrame(...)
...     .flag_jumps(
...         scale="absolute",
...         direction="any",
...         threshold=2
...     )
... )  # doctest: +SKIP

Applies specific criteria to certain DataFrame columns,
applies default criteria to columns *not* specifically listed and
appends a flag column for each column in the DataFrame

>>> df = (
...     pd.DataFrame(...)
...     .flag_jumps(
...         scale=dict(col1="absolute", col2="percentage"),
...         direction=dict(col1="increasing", col2="any"),
...         threshold=dict(col1=1, col2=0.5),
...     )
... )  # doctest: +SKIP

Applies specific criteria to certain DataFrame columns,
applies default criteria to columns *not* specifically listed and
appends a flag column for only those columns found in specified
criteria

>>> df = (
...     pd.DataFrame(...)
...     .flag_jumps(
...         scale=dict(col1="absolute"),
...         threshold=dict(col2=1),
...         strict=True,
...     )
... )  # doctest: +SKIP

Parameters:

Name Type Description Default
df DataFrame

DataFrame which needs to be flagged for changes between consecutive rows above a certain threshold.

required
scale Union[str, Dict[str, str]]

Type of scaling approach to use. Acceptable arguments are

  • 'absolute' (consider the difference between rows)
  • 'percentage' (consider the percentage change between rows).
'percentage'
direction Union[str, Dict[str, str]]

Type of method used to handle the sign change when comparing consecutive rows. Acceptable arguments are

  • 'increasing' (only consider rows that are increasing in value)
  • 'decreasing' (only consider rows that are decreasing in value)
  • 'any' (consider rows that are either increasing or decreasing; sign is ignored).
'any'
threshold Union[int, float, Dict[str, Union[int, float]]]

The value to check if consecutive row comparisons exceed. Always uses a greater than comparison. Must be >= 0.0.

0.0
strict bool

Flag to enable/disable appending of a flag column for each column in the provided DataFrame. If set to True, will only append a flag column for those columns found in at least one of the input dictionaries. If set to False, will append a flag column for each column found in the provided DataFrame. If criteria is not specified, the defaults for each criteria is used.

False

Raises:

Type Description
JanitorError

If strict=True and at least one of scale, direction, or threshold inputs is not a dictionary.

JanitorError

If scale is not one of ("absolute", "percentage").

JanitorError

If direction is not one of ("increasing", "decreasing", "any").

JanitorError

If threshold is less than 0.0.

Returns:

Type Description
DataFrame

DataFrame that has flag jump columns.

Source code in janitor/timeseries.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
@pf.register_dataframe_method
def flag_jumps(
    df: pd.DataFrame,
    scale: Union[str, Dict[str, str]] = "percentage",
    direction: Union[str, Dict[str, str]] = "any",
    threshold: Union[int, float, Dict[str, Union[int, float]]] = 0.0,
    strict: bool = False,
) -> pd.DataFrame:
    """Create boolean column(s) that flag whether or not the change
    between consecutive rows exceeds a provided threshold.

    Examples:

        Applies specified criteria across all columns of the DataFrame
        and appends a flag column for each column in the DataFrame

        >>> df = (
        ...     pd.DataFrame(...)
        ...     .flag_jumps(
        ...         scale="absolute",
        ...         direction="any",
        ...         threshold=2
        ...     )
        ... )  # doctest: +SKIP

        Applies specific criteria to certain DataFrame columns,
        applies default criteria to columns *not* specifically listed and
        appends a flag column for each column in the DataFrame

        >>> df = (
        ...     pd.DataFrame(...)
        ...     .flag_jumps(
        ...         scale=dict(col1="absolute", col2="percentage"),
        ...         direction=dict(col1="increasing", col2="any"),
        ...         threshold=dict(col1=1, col2=0.5),
        ...     )
        ... )  # doctest: +SKIP

        Applies specific criteria to certain DataFrame columns,
        applies default criteria to columns *not* specifically listed and
        appends a flag column for only those columns found in specified
        criteria

        >>> df = (
        ...     pd.DataFrame(...)
        ...     .flag_jumps(
        ...         scale=dict(col1="absolute"),
        ...         threshold=dict(col2=1),
        ...         strict=True,
        ...     )
        ... )  # doctest: +SKIP

    Args:
        df: DataFrame which needs to be flagged for changes between
            consecutive rows above a certain threshold.
        scale:
            Type of scaling approach to use.
            Acceptable arguments are

            * `'absolute'` (consider the difference between rows)
            * `'percentage'` (consider the percentage change between rows).

        direction: Type of method used to handle the sign change when
            comparing consecutive rows.
            Acceptable arguments are

            * `'increasing'` (only consider rows that are increasing in value)
            * `'decreasing'` (only consider rows that are decreasing in value)
            * `'any'` (consider rows that are either increasing or decreasing;
                sign is ignored).
        threshold: The value to check if consecutive row comparisons
            exceed. Always uses a greater than comparison. Must be `>= 0.0`.
        strict: Flag to enable/disable appending of a flag column for
            each column in the provided DataFrame. If set to `True`, will
            only append a flag column for those columns found in at least
            one of the input dictionaries. If set to `False`, will append
            a flag column for each column found in the provided DataFrame.
            If criteria is not specified, the defaults for each criteria
            is used.

    Raises:
        JanitorError: If `strict=True` and at least one of
            `scale`, `direction`, or `threshold` inputs is not a
            dictionary.
        JanitorError: If `scale` is not one of
            `("absolute", "percentage")`.
        JanitorError: If `direction` is not one of
            `("increasing", "decreasing", "any")`.
        JanitorError: If `threshold` is less than `0.0`.

    Returns:
        DataFrame that has `flag jump` columns.

    <!--
    # noqa: DAR101
    -->
    """
    df = df.copy()

    if strict:
        if (
            any(isinstance(arg, dict) for arg in (scale, direction, threshold))
            is False
        ):
            raise JanitorError(
                "When enacting 'strict=True', 'scale', 'direction', or "
                + "'threshold' must be a dictionary."
            )

        # Only append a flag col for the cols that appear
        # in at least one of the input dicts
        arg_keys = [
            arg.keys()
            for arg in (scale, direction, threshold)
            if isinstance(arg, dict)
        ]
        cols = set(itertools.chain.from_iterable(arg_keys))

    else:
        # Append a flag col for each col in the DataFrame
        cols = df.columns

    columns_to_add = {}
    for col in sorted(cols):
        # Allow arguments to be a mix of dict and single instances
        s = scale.get(col, "percentage") if isinstance(scale, dict) else scale
        d = (
            direction.get(col, "any")
            if isinstance(direction, dict)
            else direction
        )
        t = (
            threshold.get(col, 0.0)
            if isinstance(threshold, dict)
            else threshold
        )

        columns_to_add[f"{col}_jump_flag"] = _flag_jumps_single_col(
            df, col, scale=s, direction=d, threshold=t
        )

    df = df.assign(**columns_to_add)

    return df

sort_timestamps_monotonically(df, direction='increasing', strict=False)

Sort DataFrame such that index is monotonic.

If timestamps are monotonic, this function will return the DataFrame unmodified. If timestamps are not monotonic, then the function will sort the DataFrame.

Examples:

Functional usage

>>> import pandas as pd
>>> import janitor.timeseries
>>> df = janitor.timeseries.sort_timestamps_monotonically(
...     df=pd.DataFrame(...),
...     direction="increasing",
... )

Method chaining example:

>>> import pandas as pd
>>> import janitor.timeseries
>>> df = (
...     pd.DataFrame(...)
...     .sort_timestamps_monotonically(direction="increasing")
... )

Parameters:

Name Type Description Default
df DataFrame

DataFrame which needs to be tested for monotonicity.

required
direction str

Type of monotonicity desired. Acceptable arguments are 'increasing' or 'decreasing'.

'increasing'
strict bool

Flag to enable/disable strict monotonicity. If set to True, will remove duplicates in the index by retaining first occurrence of value in index. If set to False, will not test for duplicates in the index.

False

Returns:

Type Description
DataFrame

DataFrame that has monotonically increasing (or decreasing) timestamps.

Source code in janitor/timeseries.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@pf.register_dataframe_method
def sort_timestamps_monotonically(
    df: pd.DataFrame, direction: str = "increasing", strict: bool = False
) -> pd.DataFrame:
    """Sort DataFrame such that index is monotonic.

    If timestamps are monotonic, this function will return
    the DataFrame unmodified. If timestamps are not monotonic,
    then the function will sort the DataFrame.

    Examples:
        Functional usage

        >>> import pandas as pd
        >>> import janitor.timeseries
        >>> df = janitor.timeseries.sort_timestamps_monotonically(
        ...     df=pd.DataFrame(...),
        ...     direction="increasing",
        ... )  # doctest: +SKIP

        Method chaining example:

        >>> import pandas as pd
        >>> import janitor.timeseries
        >>> df = (
        ...     pd.DataFrame(...)
        ...     .sort_timestamps_monotonically(direction="increasing")
        ... )  # doctest: +SKIP

    Args:
        df: DataFrame which needs to be tested for monotonicity.
        direction: Type of monotonicity desired.
            Acceptable arguments are `'increasing'` or `'decreasing'`.
        strict: Flag to enable/disable strict monotonicity.
            If set to `True`, will remove duplicates in the index
            by retaining first occurrence of value in index.
            If set to `False`, will not test for duplicates in the index.

    Returns:
        DataFrame that has monotonically increasing (or decreasing)
            timestamps.
    """
    # Check all the inputs are the correct data type
    check("df", df, [pd.DataFrame])
    check("direction", direction, [str])
    check("strict", strict, [bool])

    # Remove duplicates if requested
    if strict:
        df = df[~df.index.duplicated(keep="first")]

    # Sort timestamps
    if direction == "increasing":
        df = df.sort_index()
    else:
        df = df.sort_index(ascending=False)

    # Return the DataFrame
    return df